This is a review of Fraud Detection methods based on graph algorithms, graph databases, machine learning, and graph neural networks on NebulaGraph, and in addition to an introduction to the basic methodological ideas, I’ve also got a Playground you can run. it’s worth mentioning that this is the first time I’ve introduced you to the Nebula-DGL project ๐.
1 Fraud detection methods based on graph database
1.1 Graph Modeling
We started the modeling with the existing historical data, annotated information oriented to the relationship of the property graph. The data source could be the transaction event records, user data, and risk control annotations in the banking, e-commerce, or insurance industries in multiple table structures.
The modeling process is to abstract the entities we care about, the relationships between them, and the meaningful properties attached to both entities and relationships.
In general, persons, corporate entities, phone numbers, addresses, devices (e.g., terminal devices, network addresses, WiFi SSIDs to which terminal devices are connected, etc.), and orders are entities we started with to consider, and other information such as is_risky label, and information about persons and corporate entities (occupation, income, education, etc.) are modeled as properties of entities.
The model looks like this and the corresponding dataset could be generated with fraud-detection-datagen, with which you could generate dataset in any expected scale and community sturcture.
1.2 Fraud detection with Graph Query
With a graph that encompasses persons, companies, historical loan application records, phone calls, and online applications for web-based devices, we can uncover some interesting information with certain graph queries directly.
In fact, many frauds are clusterred in nature. For example, a fraud ring may be a small group of people (e.g., 3 to 5 people) who collect ID information on a larger scale (e.g., 30) in an organized manner, initiate a large number of loans from multiple financial institutions at the same time, and then choose to discard the batch of IDs that have left a record of default after they have been disbursed, and then further choose the next batch of ID information as they have done.
Due to the group of frauds keeps utilizing new identity information, it’s hard to detect with historical records based blacklist mechanism. However, with the help of the patterns being queried in graph, such case could be resovled in real-time.
These patterns can be categorised into two types:
One is that which can be directly described by the risk control expert in terms of some pattern, e.g., a direct or indirect association with an entity that has been marked as high risk (new order applicants use the same network devices as past high risk records), and this pattern corresponds to the graph, which gives results in real time through a graph query.
Another type of association is implicitly behind the correlation of the data, which needs to be mined by graph algorithms for some risk hints, e.g., although a given entity has no matching association with a limited number of labeled high-risk entities, the aggregation it forms in the graph may suggest that this may be one of the applications of an ongoing gang loan fraud that has not yet succeeded, which can be derived by periodically batching in the historical data This situation can be derived by periodically performing community discovery algorithms in bulk in historical data, and using centrality algorithms in highly aggregated communities to give core entities that are prompted to risk experts for subsequent evaluation and risk labeling.
1.2.1 Fraud detection based on expert graph pattern matching
Before we get started, let’s prepare for a NebulaGraph playground with the above graph dataset being loaded:
With such a mapping, risk control experts can explore the relationships between entities on-demand in a visual exploration tool that maps the corresponding risk patterns:
In this screenshot of rendered query, we can clearly see a risk pattern for a group-controlled device that can be given to a graph database developer and abstracted into NebulaGraph database statements that can be queried by anyone or certain application in real-time fashion:
1
2
3
4
## Query started from a person for given transaction
MATCH(n)WHEREid(n)=="200000010265"OPTIONALMATCHp_shared_d=(n)-[:`used_device`]->(d)<-[:`used_device`]-(:`applicant`)-[:`with_phone_num`]->(pn:`phone_num`)<-[e:`with_phone_num`]-(:`applicant`)RETURNp_shared_d
Then we could create an API based on queries like the following, which returns count(e) as a metrics.
1
2
3
4
## group controlled device metric
MATCH(n)WHEREid(n)=="200000010265"OPTIONALMATCHp_shared_d=(n)-[:`used_device`]->(d)<-[:`used_device`]-(:`applicant`)-[:`with_phone_num`]->(pn:`phone_num`)<-[e:`with_phone_num`]-(:`applicant`)RETURNcount(e)
In this way, we can build an online risk control system that uses limited labeled data and expert resources to more efficiently control the risk of gang fraud.
Another example of leveraging labeled high-risk vertices could be like querying the count of ones whose is_risky flag is True:
However, in real world, most of our labeled data is still too expensive to obtain, so is there any way to more effectively use the limited risk labeling and graph structure to predict the risk?
The answer is yes.
1.3 Expand labels in Graph
In the paper: Learning from Labeled and Unlabeled Data with Label Propagation (CMU-CALD-02-107) by Xiaojin Z. and Zoubin G., the Label Propagation algorithm is used to propagate limited labeled information on the graph to more entities through the edges.
In this way, we can easily “propagate” more labeled information with a limited number of high-risk annotations in the graphs we build. These extended labeles can, on the one hand, give more results in real-time graph queries and, on the other hand, serve as important input for risk control experts to help advance anti-fraud investigation actions.
In general, we can scan the graph data offline periodically, expand and update the labels by the graph algorithm, and then write the valid updated labels back to the graph.
Note that there is a similar method, SIGNDiffusion, for those who are interested.
1.3.1 Try expanding labels in graph
Here is an example that works.
In this example, I use the public Yelp dataset. This data will not only be used in this example, but also in the later cases in the GNN method, so you can be patient and import the data into NebulaGraph.
git clone https://github.com/wey-gu/nebulagraph-yelp-frauddetection
cd nebulagraph-yelp-frauddetection
python3 -m pip install -r requirements.txt
# download and process datasetpython3 data_download.py
# load it into NebulaGraphdocker run --rm -ti \
--network=nebula-net \
-v ${PWD}/yelp_nebulagraph_importer.yaml:/root/importer.yaml \
-v ${PWD}/data:/root \
vesoft/nebula-importer:v3.1.0 \
--config /root/importer.yaml
After it’s done, we could see the data stats:
1
~/.nebula-up/console.sh -e "USE yelp; SHOW STATS"
It should look like:
1
2
3
4
5
6
7
8
9
10
11
12
([email protected])[(none)]> USE yelp; SHOW STATS
+---------+---------------------------------------+---------+
| Type | Name | Count |+---------+---------------------------------------+---------+
|"Tag"|"review"|45954||"Edge"|"shares_restaurant_in_one_month_with"|1147232||"Edge"|"shares_restaurant_rating_with"|6805486||"Edge"|"shares_user_with"|98630||"Space"|"vertices"|45954||"Space"|"edges"|8051348|+---------+---------------------------------------+---------+
Got 6 rows (time spent 1911/4488 us)
Currently, the general LPA tag propagation algorithm is used for community detection and few implementations are used for tag expansion (only SK-Learn has this implementation), here, we refer to the implementation given by [Thibaud M](https://datascience.stackexchange.com/users/77683/ thibaud-m) for the implementation given.
importtorchimportjsonfromtorchimporttensorfromdglimportDGLHeteroGraph,heterographfromnebula3.gclient.netimportConnectionPoolfromnebula3.ConfigimportConfigconfig=Config()config.max_connection_pool_size=2connection_pool=ConnectionPool()connection_pool.init([('graphd',9669)],config)vertex_id=2048client=connection_pool.get_session('root','nebula')r=client.execute_json("USE yelp;"f"GET SUBGRAPH WITH PROP 2 STEPS FROM {vertex_id} YIELD VERTICES AS nodes, EDGES AS relationships;")r=json.loads(r)data=r.get('results',[{}])[0].get('data')columns=r.get('results',[{}])[0].get('columns')# create node and nodedatanode_id_map={}# key: vertex id in NebulaGraph, value: node id in dgl_graphnode_idx=0features=[[]for_inrange(32)]+[[]]foriinrange(len(data)):forindex,nodeinenumerate(data[i]['meta'][0]):nodeid=data[i]['meta'][0][index]['id']ifnodeidnotinnode_id_map:node_id_map[nodeid]=node_idxnode_idx+=1forfinrange(32):features[f].append(data[i]['row'][0][index][f"review.f{f}"])features[32].append(data[i]['row'][0][index]['review.is_fraud'])rur_start,rur_end,rsr_start,rsr_end,rtr_start,rtr_end=[],[],[],[],[],[]foriinrange(len(data)):foredgeindata[i]['meta'][1]:edge=edge['id']ifedge['name']=='shares_user_with':rur_start.append(node_id_map[edge['src']])rur_end.append(node_id_map[edge['dst']])elifedge['name']=='shares_restaurant_rating_with':rsr_start.append(node_id_map[edge['src']])rsr_end.append(node_id_map[edge['dst']])elifedge['name']=='shares_restaurant_in_one_month_with':rtr_start.append(node_id_map[edge['src']])rtr_end.append(node_id_map[edge['dst']])data_dict={}ifrur_start:data_dict[('review','shares_user_with','review')]=tensor(rur_start),tensor(rur_end)ifrsr_start:data_dict[('review','shares_restaurant_rating_with','review')]=tensor(rsr_start),tensor(rsr_end)ifrtr_start:data_dict[('review','shares_restaurant_in_one_month_with','review')]=tensor(rtr_start),tensor(rtr_end)# construct a dgl_graph, ref: https://docs.dgl.ai/en/0.9.x/generated/dgl.heterograph.htmldgl_graph:DGLHeteroGraph=heterograph(data_dict)# load node features to dgl_graphdgl_graph.ndata['label']=tensor(features[32])# heterogeneous graph to heterogeneous graph, keep ndata and edataimportdglhg=dgl.to_homogeneous(dgl_graph,ndata=['label'])
Then, let’s apply the Label Spreading algorithm mentioned above to this subgraph:
fromabcimportabstractmethodimporttorchclassBaseLabelPropagation:"""Base class for label propagation models.
Parameters
----------
adj_matrix: torch.FloatTensor
Adjacency matrix of the graph.
"""def__init__(self,adj_matrix):self.norm_adj_matrix=self._normalize(adj_matrix)self.n_nodes=adj_matrix.size(0)self.one_hot_labels=Noneself.n_classes=Noneself.labeled_mask=Noneself.predictions=None@staticmethod@abstractmethoddef_normalize(adj_matrix):raiseNotImplementedError("_normalize must be implemented")@abstractmethoddef_propagate(self):raiseNotImplementedError("_propagate must be implemented")def_one_hot_encode(self,labels):# Get the number of classesclasses=torch.unique(labels)classes=classes[classes!=-1]self.n_classes=classes.size(0)# One-hot encode labeled data instances and zero rows corresponding to unlabeled instancesunlabeled_mask=(labels==-1)labels=labels.clone()# defensive copyinglabels[unlabeled_mask]=0self.one_hot_labels=torch.zeros((self.n_nodes,self.n_classes),dtype=torch.float)self.one_hot_labels=self.one_hot_labels.scatter(1,labels.unsqueeze(1),1)self.one_hot_labels[unlabeled_mask,0]=0self.labeled_mask=~unlabeled_maskdeffit(self,labels,max_iter,tol):"""Fits a semi-supervised learning label propagation model.
labels: torch.LongTensor
Tensor of size n_nodes indicating the class number of each node.
Unlabeled nodes are denoted with -1.
max_iter: int
Maximum number of iterations allowed.
tol: float
Convergence tolerance: threshold to consider the system at steady state.
"""self._one_hot_encode(labels)self.predictions=self.one_hot_labels.clone()prev_predictions=torch.zeros((self.n_nodes,self.n_classes),dtype=torch.float)foriinrange(max_iter):# Stop iterations if the system is considered at a steady statevariation=torch.abs(self.predictions-prev_predictions).sum().item()ifvariation<tol:print(f"The method stopped after {i} iterations, variation={variation:.4f}.")breakprev_predictions=self.predictionsself._propagate()defpredict(self):returnself.predictionsdefpredict_classes(self):returnself.predictions.max(dim=1).indicesclassLabelPropagation(BaseLabelPropagation):def__init__(self,adj_matrix):super().__init__(adj_matrix)@staticmethoddef_normalize(adj_matrix):"""Computes D^-1 * W"""degs=adj_matrix.sum(dim=1)degs[degs==0]=1# avoid division by 0 errorreturnadj_matrix/degs[:,None]def_propagate(self):self.predictions=torch.matmul(self.norm_adj_matrix,self.predictions)# Put back already known labelsself.predictions[self.labeled_mask]=self.one_hot_labels[self.labeled_mask]deffit(self,labels,max_iter=1000,tol=1e-3):super().fit(labels,max_iter,tol)classLabelSpreading(BaseLabelPropagation):def__init__(self,adj_matrix):super().__init__(adj_matrix)self.alpha=None@staticmethoddef_normalize(adj_matrix):"""Computes D^-1/2 * W * D^-1/2"""degs=adj_matrix.sum(dim=1)norm=torch.pow(degs,-0.5)norm[torch.isinf(norm)]=1returnadj_matrix*norm[:,None]*norm[None,:]def_propagate(self):self.predictions=(self.alpha*torch.matmul(self.norm_adj_matrix,self.predictions)+(1-self.alpha)*self.one_hot_labels)deffit(self,labels,max_iter=1000,tol=1e-3,alpha=0.5):"""
Parameters
----------
alpha: float
Clamping factor.
"""self.alpha=alphasuper().fit(labels,max_iter,tol)importpandasaspdimportnumpyasnpimportnetworkxasnximportmatplotlib.pyplotaspltnx_hg=hg.to_networkx()adj_matrix=nx.adjacency_matrix(nx_hg).toarray()labels=hg.ndata['label']# Create input tensorsadj_matrix_t=torch.FloatTensor(adj_matrix)labels_t=torch.LongTensor(labels)# Learn with Label Propagationlabel_propagation=LabelPropagation(adj_matrix_t)print("Label Propagation: ",end="")label_propagation.fit(labels_t)label_propagation_output_labels=label_propagation.predict_classes()# Learn with Label Spreadinglabel_spreading=LabelSpreading(adj_matrix_t)print("Label Spreading: ",end="")label_spreading.fit(labels_t,alpha=0.8)label_spreading_output_labels=label_spreading.predict_classes()
We could see some of the label was spread(color change).
1.4 Machine Learning with Graph Features
Before the field of risk control started to leverage the methods of the graph, there have been many approaches to predict high-risk behavior based on historical data using machine learning classification algorithms that use information in records that domain experts consider relevant (e.g., age, education, income) as features and historical label information to train risk prediction models.
So reading this, does it occur to us that on top of these methods, models trained as features might be more effective if attributes based on graph structure were also taken into account?
The answer is also yes, and there have been many papers and engineering practices revealing that such models are more effective than algorithms that do not consider graph features: these graph structure features that are tried to be effective could be PageRank values of entities, Degree values, or community ids derived from one of the community discovery algorithms.
In production, we can periodically obtain real-time full graph information from the graph, analyze it in a graph computing platform to obtain the required features, go through a predefined data pipeline, import it into a machine learning model cycle to obtain new risk cues, and write some of the results back to the graph for easy extraction and reference by other systems and experts.
1.4.1 Example of ML with Graph Features
Here, I will not demonstrate the end-to-end machine learning example, which is a common classification approach, on top of which we can get some new properties in the data by graph algorithms, which are then processed as new features. I will only demonstrate a community discovery method where we can run a Louvain on the full graph, derive the community identity of different nodes, and then process the community values as a classification into numerical features.
And the result will be stored in sparkmaster container, under path /output
1
2
# docker exec -it sparkmaster bashls -l /output
After that, we can do some pre-processing on this Louvain’s graph algorithm features and start the traditional model training.
1.5 The Graph Neural Network approach
However, the problem with these previous graph feature-based approaches is that
graph features do not fully reflect the correlations and the locality nature of the data, into our models/ methods.
graph feature engineering could be expensive and cumbersome.
In recent years, GNN-based approaches have enabled us to get better results than traditional graph feature-based machine learning by embedding graph structure and attribute information into the representation without graph feature extraction, feature engineering, and data annotation by experts and engineering methods. Interestingly, this is the period when these methods are rapidly being discovered and evolving, and graph-based deep learning is one of the hottest machine learning research directions in the previous years.
At the same time, some methods of graph deep learning can do Inductive Learning - models can inference/reason on new points and edges, so that, together with the ability to query subgraphs on the graph database online, online real-time risk prediction becomes simple and feasible.
1.5.1 Example of GNN fraud detection system
The storage of data can be in several other common media, but the graph database can maximize the benefit of model training, model updating, and online results updating. When we use the graph database as the single source of truth for the data, all online, offline, and graph-based approaches can be easily integrated to combine the advantages and results of all approaches to make a more effective composite system for fraud detection.
In this example we are divided into the same parts: Data processing, Model training, Building an online detection system.
Note, I will use Deep Graph library(DGL), NebulaGraph and the bridge between them: Nebula-DGL.
In this case, the dataset we use is Yelp-Fraud, who comes from the paper [Enhancing Graph Neural Network-based Fraud Detectors against Camouflaged Fraudsters](https:// paperswithcode.com/paper/enhancing-graph-neural-network-based-fraud).
There is one type of vertex in this diagram and three types of edges.
Top points: reviews from restaurants, hotels in Yelp with two types of attributes.
Each review has a label labeled whether it is a false or fraudulent review
32 numeric attributes that have been processed
Edge: the association between the three types of reviews
R-U-R: two reviews issued by the same user shares_user_with
R-S-R: two reviews by the same restaurant with the same rating (rating can be 1 to 5) shares_restaurant_rating_with
R-T-R: two ratings are from the same restaurant in the same month of submission shares_restaurant_in_one_month_with
Before we start, it’s asumed this grpah is already loaded into NebulaGraph.
To load yelp dataset into NeublaGraph, in short, you just do:
The task of this part is to engineer the topological representation of the risk-related subgraphs of the graph and the related features (attributes) in them, and serialize them into graph objects of the DGL.
DGL itself supports constructing its graph objects from CSV files in the form of point and edge lists (edgelist), or from data in the serialized sparse adjacency matrix of NetworkX and SciPy, and we can export the raw graph data or the full amount of data in the graph library to these forms. However, in the real case the data in the library is changing in real time and it is generally better to do GNN training directly on the subgraphs in NebulaGraph. Thanks to the Nebula-DGL library, this is a natural thing to do.
Now let’s start this data import, before that I’ll introduce Nebula-DGL.
Nebula-DGL can construct graph objects as DGL by processing vertices, edges, and their properties in NebulaGraph into vertecies, edges, and their labels and features according to the given mapping and transformation rules (YAML format). Among them, it is worth mentioning the property-to-feature conversion.
As we know, a feature can be:
The value of a property
The value of one or more properties with certain mathematical transformations
The output of a character property as a number according to enumeration rules
Thus, they can be expressed in the Nebula-DGL API using filter for each of these cases, and let’s see how this could be done in examples:
Featre extracted directly from value of vertex/edge property
In this example, the edge type follow will be extracted, where the property degree’s value will be treated as a feature directly, and its expression in Nebula-DGL is:
Feature comes from mathematical transformations of properties
In this example, we take the two properties in the serve edge and process (end_year - start_year) / 30 to become a feature called service_time.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
edge_types:- name:servestart_vertex_tag:playerend_vertex_tag:teamfeatures:- name:service_timeproperties:- name:start_yeartype:intnullable:False- name:end_yeartype:intnullable:False# The variable was mapped by order of propertiesfilter:type:functionfunction:"lambda start_year, end_year: (end_year - start_year) / 30"
Enumerated property values into numeric features
In this example, we enumerate the name perperty in the team vertex, based on the whether they are east cost or west coast.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
vertex_tags:- name:teamfeatures:- name:coastproperties:- name:nametype:strnullable:Falsefilter:# 0 stand for the east coast, 1 stand for the west coasttype:enumerationenumeration:Celtics:0Nets:0Nuggets:1Timberwolves:1Thunder:1# ... not showing all teams here
With this transforming API being undsood, let’s started to process the Yelp dataset:
First, let’s define this transforming rule, where, we transform TAG: review and all three EDGE Types, with their properties values directly transforming into features:
---# If vertex id is string-typed, remap_vertex_id must be true.remap_vertex_id:Truespace:yelp# str or intvertex_id_type:intvertex_tags:- name:reviewlabel:name:is_fraudproperties:- name:is_fraudtype:intnullable:Falsefilter:type:valuefeatures:- name:f0properties:- name:f0type:floatnullable:Falsefilter:type:value- name:f1properties:- name:f1type:floatnullable:Falsefilter:type:value# ...- name:f31properties:- name:f31type:floatnullable:Falsefilter:type:valueedge_types:- name:shares_user_withstart_vertex_tag:reviewend_vertex_tag:review- name:shares_restaurant_rating_withstart_vertex_tag:reviewend_vertex_tag:review- name:shares_restaurant_in_one_month_withstart_vertex_tag:reviewend_vertex_tag:review
Then, after nebula-dgl being installed, we could construct a DGL DGLHeteroGraph in these following lines:
fromnebula_dglimportNebulaLoadernebula_config={"graph_hosts":[('graphd',9669),('graphd1',9669),('graphd2',9669)],"nebula_user":"root","nebula_password":"nebula",}# load feature_mapper from yaml filewithopen('nebulagraph_yelp_dgl_mapper.yaml','r')asf:feature_mapper=yaml.safe_load(f)nebula_loader=NebulaLoader(nebula_config,feature_mapper)g=nebula_loader.load()g=g.to('cpu')device=torch.device('cpu')
1.5.1.3 Model Training
Here, I use the Node Classification method of the GraphSAGE algorithm as an example, the original version of GraphSAGE is an Inductive Learning algorithm.
An Inductive Learning algorithm, which is distinguished from its opposite: Transductive Learning, means that new data can be used on top of a completely old graph to acquire a model so that the trained model can be used for online incremental data fraud detection (instead of needing to be reloaded to the full graph for training).
Model training system (left).
Input: A historical transaction graph with fraud annotation
Output: A GraphSAGE DGL model
Online inference system (right).
Model: GraphSAGE-based training based on historical transaction graphs with fraud annotations
Input: A new transaction
Output: Whether the transaction is suspected of being fraudulent
Dataset split
The process of machine learning training requires partitioning the already available data, information for training, validation and testing subsets, they can be either disjoint true subsets of the overall data or overlap each other.
In practice, sometimes our labeling of data is often inadequate, so it may make more sense to partition the data according to the proportion of labeling, the following example I segmented the dataset according to whether the vertices are labeled with fraud or not.
There are two points worth noting here.
stratify=g.ndata['is_fraud'] in train_test_split represents to keep the distribution of the values of is_fraud to split, which is in line with the idea we mentioned above.
we split the idx index, so that we can end up with three sets of indexes for training, validation and testing. At the same time, we also put the corresponding set of masks into the graph object g.
# Split the graph into train, validation, and test setsimportpandasaspdimportnumpyasnpfromsklearn.model_selectionimporttrain_test_split# features are g.ndata['f0'], g.ndata['f1'], g.ndata['f2'], ... g.ndata['f31']# label is in g.ndata['is_fraud']# concatenate all featuresfeatures=[]foriinrange(32):features.append(g.ndata['f'+str(i)])g.ndata['feat']=torch.stack(features,dim=1)g.ndata['label']=g.ndata['is_fraud']# numpy array as an index of range nidx=torch.tensor(np.arange(g.number_of_nodes()),device=device,dtype=torch.int64)# split based on value distribution of label: the property "is_fraud", which is a binary variable.X_train_and_val_idx,X_test_idx,y_train_and_val,y_test=train_test_split(idx,g.ndata['is_fraud'],test_size=0.2,random_state=42,stratify=g.ndata['is_fraud'])# split train and valX_train_idx,X_val_idx,y_train,y_val=train_test_split(X_train_and_val_idx,y_train_and_val,test_size=0.2,random_state=42,stratify=y_train_and_val)# list of index to masktrain_mask=torch.zeros(g.number_of_nodes(),dtype=torch.bool)train_mask[X_train_idx]=Trueval_mask=torch.zeros(g.number_of_nodes(),dtype=torch.bool)val_mask[X_val_idx]=Truetest_mask=torch.zeros(g.number_of_nodes(),dtype=torch.bool)test_mask[X_test_idx]=Trueg.ndata['train_mask']=train_maskg.ndata['val_mask']=val_maskg.ndata['test_mask']=test_mask
Heterogeneous graph to Homogeneous graph
GraphSAGE is an algorithm for Homogeneous graphs with no feature on edges, while our current Yelp graph is Heterogeneous: one type of vertex(TAG) and three types of edges. So, how can we use GraphSAGE to model Yelp graphs?
Instead of finding another Inductive Learning to supports heterogeneous graphs, I actually find a way to convert homogeneous graph into a heterogeneous one. In order not to lose important edge type information in the conversion, we can make the edge type numeric.
Here I have given a one-dimensional edge feature, but of course (3-1) two-dimensional is also possible.
Note: we could directly parse 3-type of edges mapped to [0, 1, 2] from hg.edata['_TYPE'], referring to https://docs.dgl.ai/en/0.9.x/generated/dgl.to_homogeneous.html, while I chose not to do so due to 0 cannot be appied to edge weight(it will end up 0 value in message passsing for this 0 type).
# three types of edgesIn[1]:g.etypesOut[1]:['shares_restaurant_in_one_month_with','shares_restaurant_rating_with','shares_user_with']In[2]:g.edges['shares_restaurant_in_one_month_with'].data['he']=torch.ones(g.number_of_edges('shares_restaurant_in_one_month_with'),dtype=torch.int64)g.edges['shares_restaurant_rating_with'].data['he']=torch.full((g.number_of_edges('shares_restaurant_rating_with'),),2,dtype=torch.int64)g.edges['shares_user_with'].data['he']=torch.full((g.number_of_edges('shares_user_with'),),4,dtype=torch.int64)In[3]:g.edata['he']Out[3]:{('review','shares_restaurant_in_one_month_with','review'):tensor([1,1,1,...,1,1,1]),('review','shares_restaurant_rating_with','review'):tensor([2,2,2,...,2,2,2]),('review','shares_user_with','review'):tensor([4,4,4,...,4,4,4])}
The vanilla GraphSAGE implementation in dgl doesn’t consider edge feature, thus I override/changed that message passing part of code, which will be revealed later.
fromdglimportfunctionasfnfromdgl.utilsimportcheck_eq_shape,expand_as_pairclassSAGEConv(dglnn.SAGEConv):defforward(self,graph,feat,edge_weight=None):r"""
Description
-----------
Compute GraphSAGE layer.
Parameters
----------
graph : DGLGraph
The graph.
feat : torch.Tensor or pair of torch.Tensor
If a torch.Tensor is given, it represents the input feature of shape
:math:`(N, D_{in})`
where :math:`D_{in}` is size of input feature, :math:`N` is the number of nodes.
If a pair of torch.Tensor is given, the pair must contain two tensors of shape
:math:`(N_{in}, D_{in_{src}})` and :math:`(N_{out}, D_{in_{dst}})`.
edge_weight : torch.Tensor, optional
Optional tensor on the edge. If given, the convolution will weight
with regard to the message.
Returns
-------
torch.Tensor
The output feature of shape :math:`(N_{dst}, D_{out})`
where :math:`N_{dst}` is the number of destination nodes in the input graph,
:math:`D_{out}` is the size of the output feature.
"""self._compatibility_check()withgraph.local_scope():ifisinstance(feat,tuple):feat_src=self.feat_drop(feat[0])feat_dst=self.feat_drop(feat[1])else:feat_src=feat_dst=self.feat_drop(feat)ifgraph.is_block:feat_dst=feat_src[:graph.number_of_dst_nodes()]msg_fn=fn.copy_src('h','m')ifedge_weightisnotNone:assertedge_weight.shape[0]==graph.number_of_edges()graph.edata['_edge_weight']=edge_weightmsg_fn=fn.u_mul_e('h','_edge_weight','m')h_self=feat_dst# Handle the case of graphs without edgesifgraph.number_of_edges()==0:graph.dstdata['neigh']=torch.zeros(feat_dst.shape[0],self._in_src_feats).to(feat_dst)# Determine whether to apply linear transformation before message passing A(XW)lin_before_mp=self._in_src_feats>self._out_feats# Message Passingifself._aggre_type=='mean':graph.srcdata['h']=self.fc_neigh(feat_src)iflin_before_mpelsefeat_src# graph.update_all(msg_fn, fn.mean('m', 'neigh'))########################################################################## consdier datatype with different weight, g.edata['he'] as weight hereg.update_all(fn.u_mul_e('h','he','m'),fn.mean('m','h'))#########################################################################h_neigh=graph.dstdata['neigh']ifnotlin_before_mp:h_neigh=self.fc_neigh(h_neigh)elifself._aggre_type=='gcn':check_eq_shape(feat)graph.srcdata['h']=self.fc_neigh(feat_src)iflin_before_mpelsefeat_srcifisinstance(feat,tuple):# heterogeneousgraph.dstdata['h']=self.fc_neigh(feat_dst)iflin_before_mpelsefeat_dstelse:ifgraph.is_block:graph.dstdata['h']=graph.srcdata['h'][:graph.num_dst_nodes()]else:graph.dstdata['h']=graph.srcdata['h']graph.update_all(msg_fn,fn.sum('m','neigh'))graph.update_all(fn.copy_e('he','m'),fn.sum('m','neigh'))# divide in_degreesdegs=graph.in_degrees().to(feat_dst)h_neigh=(graph.dstdata['neigh']+graph.dstdata['h'])/(degs.unsqueeze(-1)+1)ifnotlin_before_mp:h_neigh=self.fc_neigh(h_neigh)elifself._aggre_type=='pool':graph.srcdata['h']=F.relu(self.fc_pool(feat_src))graph.update_all(msg_fn,fn.max('m','neigh'))graph.update_all(fn.copy_e('he','m'),fn.max('m','neigh'))h_neigh=self.fc_neigh(graph.dstdata['neigh'])elifself._aggre_type=='lstm':graph.srcdata['h']=feat_srcgraph.update_all(msg_fn,self._lstm_reducer)h_neigh=self.fc_neigh(graph.dstdata['neigh'])else:raiseKeyError('Aggregator type {} not recognized.'.format(self._aggre_type))# GraphSAGE GCN does not require fc_self.ifself._aggre_type=='gcn':rst=h_neighelse:rst=self.fc_self(h_self)+h_neigh# bias termifself.biasisnotNone:rst=rst+self.bias# activationifself.activationisnotNone:rst=self.activation(rst)# normalizationifself.normisnotNone:rst=self.norm(rst)returnrst
classSAGE(nn.Module):def__init__(self,in_size,hid_size,out_size):super().__init__()self.layers=nn.ModuleList()# three-layer GraphSAGE-meanself.layers.append(dglnn.SAGEConv(in_size,hid_size,'mean'))self.layers.append(dglnn.SAGEConv(hid_size,hid_size,'mean'))self.layers.append(dglnn.SAGEConv(hid_size,out_size,'mean'))self.dropout=nn.Dropout(0.5)self.hid_size=hid_sizeself.out_size=out_sizedefforward(self,blocks,x):h=xforl,(layer,block)inenumerate(zip(self.layers,blocks)):h=layer(block,h)ifl!=len(self.layers)-1:h=F.relu(h)h=self.dropout(h)returnhdefinference(self,g,device,batch_size):"""Conduct layer-wise inference to get all the node embeddings."""feat=g.ndata['feat']sampler=MultiLayerFullNeighborSampler(1,prefetch_node_feats=['feat'])dataloader=DataLoader(g,torch.arange(g.num_nodes()).to(g.device),sampler,device=device,batch_size=batch_size,shuffle=False,drop_last=False,num_workers=0)buffer_device=torch.device('cpu')pin_memory=(buffer_device!=device)forl,layerinenumerate(self.layers):y=torch.empty(g.num_nodes(),self.hid_sizeifl!=len(self.layers)-1elseself.out_size,device=buffer_device,pin_memory=pin_memory)feat=feat.to(device)forinput_nodes,output_nodes,blocksintqdm.tqdm(dataloader):x=feat[input_nodes]h=layer(blocks[0],x)# len(blocks) = 1ifl!=len(self.layers)-1:h=F.relu(h)h=self.dropout(h)# by design, our output nodes are contiguousy[output_nodes[0]:output_nodes[-1]+1]=h.to(buffer_device)feat=yreturny
fromnebula_dglimportNebulaLoadernebula_config={"graph_hosts":[('graphd',9669),('graphd1',9669),('graphd2',9669)],"nebula_user":"root","nebula_password":"nebula",}withopen('nebulagraph_yelp_dgl_mapper.yaml','r')asf:feature_mapper=yaml.safe_load(f)nebula_loader=NebulaLoader(nebula_config,feature_mapper)g=nebula_loader.load()# This will take you some time# We use CPU as a poor guyg=g.to('cpu')device=torch.device('cpu')
Split dataset into training, validation and test sets, and convert it into homo graph.
# Split the graph into train, validation and test setsimportpandasaspdimportnumpyasnpfromsklearn.model_selectionimporttrain_test_split# features are g.ndata['f0'], g.ndata['f1'], g.ndata['f2'], ... g.ndata['f31']# label is in g.ndata['is_fraud']# concatenate all featuresfeatures=[]foriinrange(32):features.append(g.ndata['f'+str(i)])g.ndata['feat']=torch.stack(features,dim=1)g.ndata['label']=g.ndata['is_fraud']# numpy array as index of range nidx=torch.tensor(np.arange(g.number_of_nodes()),device=device,dtype=torch.int64)# features.append(idx)# concatenate one dim with index of node# feature_and_idx = torch.stack(features, dim=1)# split based on value distribution of label: the property "is_fraud", which is a binary variable.X_train_and_val_idx,X_test_idx,y_train_and_val,y_test=train_test_split(idx,g.ndata['is_fraud'],test_size=0.2,random_state=42,stratify=g.ndata['is_fraud'])# split train and valX_train_idx,X_val_idx,y_train,y_val=train_test_split(X_train_and_val_idx,y_train_and_val,test_size=0.2,random_state=42,stratify=y_train_and_val)# list of index to masktrain_mask=torch.zeros(g.number_of_nodes(),dtype=torch.bool)train_mask[X_train_idx]=Trueval_mask=torch.zeros(g.number_of_nodes(),dtype=torch.bool)val_mask[X_val_idx]=Truetest_mask=torch.zeros(g.number_of_nodes(),dtype=torch.bool)test_mask[X_test_idx]=Trueg.ndata['train_mask']=train_maskg.ndata['val_mask']=val_maskg.ndata['test_mask']=test_mask# shares_restaurant_in_one_month_with: 1, b"001"# shares_restaurant_rating_with: 2, b"010"# shares_user_with: 4, b"100"# set edata of shares_restaurant_in_one_month_with to n of 1 tensor arrayg.edges['shares_restaurant_in_one_month_with'].data['he']=torch.ones(g.number_of_edges('shares_restaurant_in_one_month_with'),dtype=torch.float32)g.edges['shares_restaurant_rating_with'].data['he']=torch.full((g.number_of_edges('shares_restaurant_rating_with'),),2,dtype=torch.float32)g.edges['shares_user_with'].data['he']=torch.full((g.number_of_edges('shares_user_with'),),4,dtype=torch.float32)# heterogeneous graph to heterogeneous graph, keep ndata and edatahg=dgl.to_homogeneous(g,edata=['he'],ndata=['feat','label','train_mask','val_mask','test_mask'])
Train and test the model
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# create GraphSAGE modelin_size=hg.ndata['feat'].shape[1]out_size=2model=SAGE(in_size,256,out_size).to(device)# model trainingprint('Training...')train(device,hg,model,X_train_idx,X_val_idx)# test the modelprint('Testing...')acc=layerwise_infer(device,hg,X_test_idx,model,batch_size=4096)print("Test Accuracy {:.4f}".format(acc.item()))# ่ฟ่ก็ปๆ# Test Accuracy 0.9996
After we have a trained model, it could be serialized as a file, and when needed, it could be loaded and used as a PyTorch model in your inference service.
1
2
3
4
5
6
7
# save modeltorch.save(model.state_dict(),"fraud_d.model")# load modeldevice=torch.device('cpu')model=SAGE(32,256,2).to(device)model.load_state_dict(torch.load("fraud_d.model"))
Then, let’s see how do make this model work in an online fraud detection system.
1.5.1.4 Inference API
As mentioned earlier, GraphSAGE is the simplest model to support Inductive Learning, thus our training inference process above is actually not yet the same as our test and training graphs, although the indexes of the training points are labeled, the whole graph is actually used as input.
In order to do Inductive Learning we just need to divide the training and testing into two non-intersecting subgraphs for training and final testing.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# Inductive Learning, our test dataset are new nodes and new edgeshg_train=hg.subgraph(torch.cat([X_train_idx,X_val_idx]))# model trainingprint('Training...')train(device,hg_train,model,torch.arange(X_train_idx.shape[0]),torch.arange(X_train_idx.shape[0],hg_train.num_nodes()))# test the modelprint('Testing...')hg_test=hg.subgraph(torch.cat([X_test_idx]))sg_X_test_idx=torch.arange(hg_test.num_nodes())acc=layerwise_infer(device,hg_test,sg_X_test_idx,model,batch_size=4096)print("Test Accuracy {:.4f}".format(acc.item()))# Result# Test Accuracy 0.9990
As you can see, in our code above, the graph used for testing and the graph used for training are two completely different sets of data, which allows our online system to be data that we haven’t encountered before at all.
Thus, we can simply write the data for a new incoming transaction request into NebulaGraph and then get a small subgraph that the online system can return from that point, and use it as input to the model inference to get the labels of the subgraph!
A new trasnaction request๏ผ
Remember the flowchart of an online inference system with GNN?
Now, assume a new transaction request comes to the system, and corresponding relations has been already written to the graph database, we could randomly find one review to simulate that:
# get SUBGRAPH of one nodeimportjsonfromtorchimporttensorfromdglimportDGLHeteroGraph,heterographfromnebula3.gclient.netimportConnectionPoolfromnebula3.ConfigimportConfigconfig=Config()config.max_connection_pool_size=2connection_pool=ConnectionPool()connection_pool.init([('graphd',9669)],config)vertex_id=2048client=connection_pool.get_session('root','nebula')r=client.execute_json("USE yelp;"f"GET SUBGRAPH WITH PROP 2 STEPS FROM {vertex_id} YIELD VERTICES AS nodes, EDGES AS relationships;")r=json.loads(r)data=r.get('results',[{}])[0].get('data')
And Nebula-Python was used here, and we are leveraging execute_json to execute the GET SUBGRAPH query and get the subgraph in JSON.
# create node and nodedatanode_id_map={}# key: vertex id in NebulaGraph, value: node id in dgl_graphnode_idx=0features=[[]for_inrange(32)]+[[]]foriinrange(len(data)):forindex,nodeinenumerate(data[i]['meta'][0]):nodeid=data[i]['meta'][0][index]['id']ifnodeidnotinnode_id_map:node_id_map[nodeid]=node_idxnode_idx+=1forfinrange(32):features[f].append(data[i]['row'][0][index][f"review.f{f}"])features[32].append(data[i]['row'][0][index]['review.is_fraud'])"""
- R-U-R: shares_user_with
- R-S-R: shares_restaurant_rating_with
- R-T-R: shares_restaurant_in_one_month_with
"""rur_start,rur_end,rsr_start,rsr_end,rtr_start,rtr_end=[],[],[],[],[],[]foriinrange(len(data)):foredgeindata[i]['meta'][1]:edge=edge['id']ifedge['name']=='shares_user_with':rur_start.append(node_id_map[edge['src']])rur_end.append(node_id_map[edge['dst']])elifedge['name']=='shares_restaurant_rating_with':rsr_start.append(node_id_map[edge['src']])rsr_end.append(node_id_map[edge['dst']])elifedge['name']=='shares_restaurant_in_one_month_with':rtr_start.append(node_id_map[edge['src']])rtr_end.append(node_id_map[edge['dst']])data_dict={}ifrur_start:data_dict[('review','shares_user_with','review')]=tensor(rur_start),tensor(rur_end)ifrsr_start:data_dict[('review','shares_restaurant_rating_with','review')]=tensor(rsr_start),tensor(rsr_end)ifrtr_start:data_dict[('review','shares_restaurant_in_one_month_with','review')]=tensor(rtr_start),tensor(rtr_end)# construct a dgl_graphdgl_graph:DGLHeteroGraph=heterograph(data_dict)
In fact, we just construct it following https://docs.dgl.ai/en/0.9.x/generated/dgl.heterograph.html, which leverages heterograph() to create a dgl graph object from a data_dict. And node_id_map is the map between Vertex_id in NebulaGraph and the node_id in this object.
Finally, we load node features into the graph object, too.
1
2
3
4
# load node features to dgl_graphforiinrange(32):dgl_graph.ndata[f"f{i}"]=tensor(features[i])dgl_graph.ndata['label']=tensor(features[32])
Before the inferring call, we still need to convert it into homo graph, as we had done before during the training.
importtorch# to homogeneous graphfeatures=[]foriinrange(32):features.append(dgl_graph.ndata[f"f{i}"])dgl_graph.ndata['feat']=torch.stack(features,dim=1)dgl_graph.edges['shares_restaurant_in_one_month_with'].data['he']=torch.ones(dgl_graph.number_of_edges('shares_restaurant_in_one_month_with'),dtype=torch.float32)dgl_graph.edges['shares_restaurant_rating_with'].data['he']=torch.full((dgl_graph.number_of_edges('shares_restaurant_rating_with'),),2,dtype=torch.float32)dgl_graph.edges['shares_user_with'].data['he']=torch.full((dgl_graph.number_of_edges('shares_user_with'),),4,dtype=torch.float32)# heterogeneous graph to heterogeneous graph, keep ndata and edataimportdglhg=dgl.to_homogeneous(dgl_graph,edata=['he'],ndata=['feat','label'])
And here is the inference API:
1
2
3
4
5
defdo_inference(device,graph,node_idx,model,batch_size):model.eval()withtorch.no_grad():pred=model.inference(graph,device,batch_size)# pred in buffer_devicereturnpred[node_idx]
Let’s try calling it with this new vertex’s subgraph:
deftest_inference(device,graph,nid,model,batch_size):model.eval()withtorch.no_grad():pred=model.inference(graph,device,batch_size)# pred in buffer_devicepred=pred[nid]label=graph.ndata['label'][nid].to(pred.device)returnMF.accuracy(pred,label)node_idx=torch.tensor(list(node_id_map.values()))acc=test_inference(device,hg,node_idx,model,batch_size=4096)print("Test Accuracy {:.4f}".format(acc.item()))
Result:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
In[307]:deftest_inference(device,graph,nid,model,batch_size):...:model.eval()...:withtorch.no_grad():...:pred=model.inference(graph,device,batch_size)# pred in buffer...:_device...:pred=pred[nid]...:label=graph.ndata['label'][nid].to(pred.device)...:returnMF.accuracy(pred,label)...:...:node_idx=torch.tensor(list(node_id_map.values()))...:acc=test_inference(device,hg,node_idx,model,batch_size=4096)...:print("Test Accuracy {:.4f}".format(acc.item()))...:100%|โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ|1/1[00:00<00:00,130.31it/s]100%|โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ|1/1[00:00<00:00,152.29it/s]100%|โโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโโ|1/1[00:00<00:00,173.55it/s]TestAccuracy0.9688
The whole example code, including a frontend is here in the github, and this is a video demo:
1.6 Conclusion
To summarize, fraud detection with NebulaGraph Graph Database could be done:
With graph queries to get risk metrics from graph database
With risky label being expanded by graph algorithms and written back to graph database
With ML methods including graph features being fetched from graph database
Process the property in the graph into the node and edge features to predict risk offline using GNN methods, some of which can be combined with the graph database to achieve online risk prediction by Inductive Learning methods