Commit b2381df0 authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'feature/multilayer-integration' into feature/community-detection-pipeline

parents 36d89199 ba3fd6f6
...@@ -4,6 +4,7 @@ ...@@ -4,6 +4,7 @@
*.log *.log
**/env **/env
**/venv **/venv
**/venv3
src/modules/certificate/articonf1.key src/modules/certificate/articonf1.key
......
...@@ -4,4 +4,4 @@ from db.entities.cluster import Cluster ...@@ -4,4 +4,4 @@ from db.entities.cluster import Cluster
from db.entities.clusterset import ClusterSet from db.entities.clusterset import ClusterSet
from db.entities.user_cluster_graph import UserClusterGraph from db.entities.user_cluster_graph import UserClusterGraph
from db.entities.layer import Layer from db.entities.layer import Layer
from db.entities.timeslice import TimeSlice from db.entities.timeslice import TimeSlice
\ No newline at end of file
...@@ -2,7 +2,6 @@ import json ...@@ -2,7 +2,6 @@ import json
from datetime import datetime from datetime import datetime
from typing import Dict from typing import Dict
class Layer: class Layer:
''' '''
This class represents a single layer of the Multilayer Graph. This class represents a single layer of the Multilayer Graph.
...@@ -18,13 +17,24 @@ class Layer: ...@@ -18,13 +17,24 @@ class Layer:
return { return {
"layer_name": self.layer_name, "layer_name": self.layer_name,
"properties": self.properties, "properties": self.properties,
"use_case": self.use_case "use_case": self.use_case,
"total_properties": self.total_properties,
} }
@staticmethod
def from_business_logic_dict(layer_info: Dict):
layer = Layer()
layer.layer_name = layer_info["name"]
layer.properties = layer_info["cluster_properties"]
layer.total_properties = layer_info["properties"]
layer.use_case = layer_info["use_case"]
return layer
def from_serializable_dict(self, layer_info: Dict, from_db=False): def from_serializable_dict(self, layer_info: Dict, from_db=False):
self.layer_name = layer_info['layer_name'] self.layer_name = layer_info['layer_name']
self.properties = layer_info['properties'] self.properties = layer_info['properties']
self.use_case = layer_info['use_case'] self.use_case = layer_info["use_case"] if "use_case" in layer_info.keys() else None
self.total_properties = layer_info["total_properties"] if "total_properties"in layer_info.keys() else None
def __repr__(self): def __repr__(self):
return json.dumps(self.to_serializable_dict()) return json.dumps(self.to_serializable_dict())
......
...@@ -24,6 +24,7 @@ class Repository(MongoRepositoryBase): ...@@ -24,6 +24,7 @@ class Repository(MongoRepositoryBase):
self._similarity_collection = 'similarity' self._similarity_collection = 'similarity'
self._connected_run = 'connected_run' self._connected_run = 'connected_run'
#region Layers #region Layers
def add_layer(self, layer: Layer): def add_layer(self, layer: Layer):
super().insert_entry(self._layer_collection, layer.to_serializable_dict()) super().insert_entry(self._layer_collection, layer.to_serializable_dict())
......
from security.token_manager import TokenManager
import network_constants
from db.entities.layer import Layer
from typing import List, Dict
import requests
import json
def _fetch_layers(use_case: str) -> List[Layer]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/layers'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"}
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch data from the server, statuscode: {response.status_code}!")
data = json.loads(response.text)
layers = []
for row in data:
layers.append(Layer.from_business_logic_dict(row))
return layers
def _fetch_nodes(use_case: str, layer: Layer) -> List[Dict]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.SEMANTIC_LINKING_HOSTNAME}:{network_constants.SEMANTIC_LINKING_REST_PORT}/api/use-cases/{use_case}/layers/{layer.layer_name}/nodes'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"}
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch data from the server, statuscode: {response.status_code}!")
data = json.loads(response.text)
nodes = []
for row in data:
nodes.append(row)
return nodes
def fetch_nodes_from_semantic_linking(use_case: str):
layers = _fetch_layers(use_case)
print(str(layers))
for layer in layers:
nodes = _fetch_nodes(use_case, layer)
print("nodes for layer: "+str(nodes))
\ No newline at end of file
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import json
import processing.fetching.fetching as f
if __name__ == "__main__":
# TODO read use_case from somewhere
f.fetch_nodes_from_semantic_linking("debug")
\ No newline at end of file
...@@ -4,6 +4,10 @@ info: ...@@ -4,6 +4,10 @@ info:
description: This is the documentation for the semantic linking microservice. description: This is the documentation for the semantic linking microservice.
version: "1.0.0" version: "1.0.0"
# Import security definitions from global security definition
securityDefinitions:
$ref: '../security/security.yml#securityDefinitions'
consumes: consumes:
- "application/json" - "application/json"
produces: produces:
...@@ -14,7 +18,9 @@ basePath: "/api" ...@@ -14,7 +18,9 @@ basePath: "/api"
paths: paths:
/debug: /debug:
post: post:
operationId: "rest.debug.echo" security:
- JwtRegular: []
operationId: "routes.debug.echo"
tags: tags:
- "Echo" - "Echo"
summary: "Echo function for debugging purposes" summary: "Echo function for debugging purposes"
...@@ -26,35 +32,102 @@ paths: ...@@ -26,35 +32,102 @@ paths:
schema: schema:
type: object type: object
responses: responses:
200: '200':
description: "Successful echo of request data" description: "Successful echo of request data"
/agi/multilayer/multilayer.png: # nodes region
/use-cases/{use_case}/layers/{layer_name}/nodes:
get: get:
operationId: "rest.multilayer.get_image" security:
- JwtRegular: []
operationId: "routes.nodes.nodes_for_use_case_and_layer"
tags: tags:
- "Multilayer" - "Nodes"
summary: "Returning the multilayer created from AGI data" summary: "Get all nodes for a Layer"
parameters: [] parameters:
produces: - name: "use_case"
- "image/png" in: "path"
description: "Name of the requested Use-Case"
required: true
type: "string"
- name: "layer_name"
in: "path"
description: "Name of the requested Layer"
required: true
type: "string"
responses: responses:
200: '200':
description: "Successful echo of request data" description: "Successful operation"
schema:
$ref: "#/definitions/Node"
'404':
description: "No nodes found"
# endregion nodes
# Layers region
/layers:
get:
security:
- JwtRegular: []
operationId: "routes.layers.get"
tags:
- "Layers"
summary: "Get all layer data"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/LayerCollection"
/use-cases/{use_case}/layers/{name}:
get:
security:
- JwtRegular: []
operationId: "routes.layers.get_by_name_and_use_case"
tags:
- "Layers"
summary: "Get single layer data"
parameters:
- name: "use_case"
in: "path"
description: "Name of the requested Use-Case"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the requested layer"
required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/Layer"
'404':
description: "Layer not found"
# /graphinfo: /use-cases/{use_case}/layers:
# get: get:
# operationId: "rest.graphinfo.get" security:
# tags: - JwtRegular: []
# - "GraphInfo" operationId: "routes.layers.get_by_use_case"
# summary: "Get info about clustered nodes" tags:
# description: "Returns multiple metrics for all nodes created by analyzing and clustering the blockchain traces" - "Layers"
# parameters: [] summary: "Get all layer data for single use case"
# responses: parameters:
# 200: - name: "use_case"
# description: "Successful operation" in: "path"
# schema: description: "Name of the requested Use-Case"
# $ref: "#/definitions/NodeInfo" required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/LayerCollection"
'404':
description: "No layers found"
# endregion layers
definitions: definitions:
NodeInfo: NodeInfo:
...@@ -77,4 +150,51 @@ definitions: ...@@ -77,4 +150,51 @@ definitions:
type: number type: number
betweenness_centrality: betweenness_centrality:
type: number type: number
Layer:
\ No newline at end of file type: object
properties:
use_case:
type: string
layer_name:
type: string
properties:
type: array
items:
type: string
total_properties:
type: array
items:
type: string
LayerCollection:
type: array
items:
$ref: "#/definitions/Layer"
Dataset:
type: object
properties:
usecase_name:
type: string
properties:
type: array
items:
type: string
DatasetCollection:
type: array
items:
$ref: "#/definitions/Dataset"
Node:
type: object
example:
"Finished_time": 1576631193265951
"Latitude_Destination": -5.973257
"Longitude_Destination": 37.416316
"TravelID": "5e57ec9159bc0668543f156a"
"TravelPrice": 15
"UniqueID": "a95075f5042b1b27060080156d87fe34ec7e712c5e57ec9159bc0668543f156a"
"UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
NodeCollection:
type: array
items:
$ref: "#/definitions/Node"
swagger: "2.0"
info:
title: Semantic Linking microservice
description: This is the documentation for the semantic linking microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
paths:
/debug:
post:
operationId: "routes.debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
'200':
description: "Successful echo of request data"
# nodes region
/use-cases/{use_case}/layers/{layer_name}/nodes:
get:
operationId: "routes.nodes.nodes_for_use_case_and_layer"
tags:
- "Nodes"
summary: "Get all nodes for a Layer"
parameters:
- name: "use_case"
in: "path"
description: "Name of the requested Use-Case"
required: true
type: "string"
- name: "layer_name"
in: "path"
description: "Name of the requested Layer"
required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/Node"
'404':
description: "No nodes found"
# endregion nodes
# Layers region
/layers:
get:
operationId: "routes.layers.get"
tags:
- "Layers"
summary: "Get all layer data"
parameters: []
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/LayerCollection"
/use-cases/{use_case}/layers/{name}:
get:
operationId: "routes.layers.get_by_name_and_use_case"
tags:
- "Layers"
summary: "Get single layer data"
parameters:
- name: "use_case"
in: "path"
description: "Name of the requested Use-Case"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the requested layer"
required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/Layer"
'404':
description: "Layer not found"
/use-cases/{use_case}/layers:
get:
operationId: "routes.layers.get_by_use_case"
tags:
- "Layers"
summary: "Get all layer data for single use case"
parameters:
- name: "use_case"
in: "path"
description: "Name of the requested Use-Case"
required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/LayerCollection"
'404':
description: "No layers found"
# endregion layers
definitions:
NodeInfo:
type: "object"
properties:
label:
type: string
centrality:
type: number
adjacencies:
type: integer
degree:
type: number
betweenness:
type: object
properties:
to_node:
type: integer
value:
type: number
betweenness_centrality:
type: number
Layer:
type: object
properties:
use_case:
type: string
layer_name:
type: string
properties:
type: array
items:
type: string
total_properties:
type: array
items:
type: string
LayerCollection:
type: array
items:
$ref: "#/definitions/Layer"
Dataset:
type: object
properties:
usecase_name:
type: string
properties:
type: array
items:
type: string
DatasetCollection:
type: array
items:
$ref: "#/definitions/Dataset"
Node:
type: object
example:
"Finished_time": 1576631193265951
"Latitude_Destination": -5.973257
"Longitude_Destination": 37.416316
"TravelID": "5e57ec9159bc0668543f156a"
"TravelPrice": 15
"UniqueID": "a95075f5042b1b27060080156d87fe34ec7e712c5e57ec9159bc0668543f156a"
"UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
NodeCollection:
type: array
items:
$ref: "#/definitions/Node"
from db.entities.layer import Layer
\ No newline at end of file
import json
from datetime import datetime
from typing import Dict
class Layer:
'''
This class represents a single layer of the Multilayer Graph.
:param layer_info: Information as dictionary to restore the layer object.
'''
def __init__(self, layer_info: Dict = None, from_db=False):
if layer_info is not None:
self.from_serializable_dict(layer_info, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"layer_name": self.layer_name,
"properties": self.properties,
"use_case": self.use_case,
"total_properties": self.total_properties,
}
@staticmethod
def from_business_logic_dict(layer_info: Dict):
layer = Layer()
layer.layer_name = layer_info["name"]
layer.properties = layer_info["cluster_properties"]
layer.total_properties = layer_info["properties"]
layer.use_case = layer_info["use_case"]
return layer
def from_serializable_dict(self, layer_info: Dict, from_db=False):
self.layer_name = layer_info['layer_name']
self.properties = layer_info['properties']
self.use_case = layer_info["use_case"] if "use_case" in layer_info.keys() else None
self.total_properties = layer_info["total_properties"] if "total_properties"in layer_info.keys() else None
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"Layer({self.__repr__()})"
import pymongo
import network_constants as netconst import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase from database.MongoRepositoryBase import MongoRepositoryBase
from db.entities import Layer
import pymongo
import json import json
from typing import List, Dict
from typing import List # init logging to file
import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
class Repository(MongoRepositoryBase): class Repository(MongoRepositoryBase):
'''This is a repository for MongoDb.''' '''This is a repository for MongoDb.'''
...@@ -12,6 +19,68 @@ class Repository(MongoRepositoryBase): ...@@ -12,6 +19,68 @@ class Repository(MongoRepositoryBase):
super().__init__(netconst.SEMANTIC_LINKING_DB_HOSTNAME, super().__init__(netconst.SEMANTIC_LINKING_DB_HOSTNAME,
netconst.SEMANTIC_LINKING_DB_PORT, netconst.SEMANTIC_LINKING_DB_PORT,
'semanticLinkingDb') 'semanticLinkingDb')
self._layer_collection = 'layers'
self._layer_nodes_collection = 'layer_nodes'
self._clusters_collection = 'clusters'
self._time_slice_collection = 'time_slices'
# region Layers
def add_layer(self, layer: Layer):
super().insert_entry(self._layer_collection, layer.to_serializable_dict(for_db=True))
def get_layers(self) -> List[Layer]:
entries = super().get_entries(self._layer_collection)
return [Layer(e) for e in entries]
def get_layers_for_use_case(self, use_case: str) -> List[Layer]:
result = super().get_entries(self._layer_collection, projection={'_id': False}, selection={"use_case": use_case})
return [Layer(e) for e in result]
def get_layer(self, layer_name) -> Layer:
entries = super().get_entries(self._layer_collection, selection={'layer_name': layer_name})
entries = [Layer(e) for e in entries]
if entries is not None and len(entries) > 0:
return entries[0]
else:
return None
def get_layers_by_use_case(self, use_case: str) -> List[Layer]:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case})
return [Layer(e) for e in entries]
def get_layers_by_name_and_use_case(self, layer_name: str, use_case: str) -> Layer:
entries = super().get_entries(self._layer_collection, selection={'layer_name': layer_name, 'use_case': use_case})
entries = list(entries) # materialize
if len(entries) > 1:
LOGGER.error(f"Use case + layer name was not unique for {use_case} and {layer_name}.")
return None
return Layer(entries[0]) if len(entries) == 1 else None
def delete_layer(self, layer_name:str):
collection = self._database[self._layer_collection]
collection.delete_one({"layer_name": layer_name})
def delete_layers_for_use_case(self, use_case: str):
collection = self._database[self._layer_collection]
collection.delete_many({"use_case": use_case})
def add_layer_node(self, node: dict):
super().insert_entry(self._layer_nodes_collection, node)
def add_layer_nodes(self, nodes: List[dict]):
super().insert_many(self._layer_nodes_collection, nodes)
self._layer_collection = 'layer' def get_layer_nodes(self, layer_name: str) -> dict:
'''Returns all nodes for the layer.'''
entries = super().get_entries(self._layer_nodes_collection, selection={'layer_name': layer_name},
projection={'_id': 0})
return [e for e in entries]
def get_layer_nodes_with_use_case(self, layer_name: str, use_case: str) -> List[Dict]:
'''Returns all nodes for the layer.'''
entries = super().get_entries(self._layer_nodes_collection, selection={'layer_name': layer_name, 'use_case': use_case},
projection={'_id': 0})
return list(entries)
# endregion
import json
class HyperGraph:
cluster_labels = []
dest_cluster_labels = []
label_values = []
def __init__(self):
pass
def classify(self):
df_nodes = self.load_values()
ret_val = self.init(df_nodes)
nodeIds = ret_val['nodeIds']
clusterlabels = ret_val['clusterlabels']
destIds = ret_val['destIds']
clusterlabels = self.classify_input(nodeIds, clusterlabels)
labelvals = self.calc_cluster_num(clusterlabels)
cluster = self.cluster_with_labels(nodeIds, clusterlabels, labelvals)
cluster = self.remove_duplicates(cluster)
destclusterlabel = self.cluster_dest_ids(labelvals, cluster, destIds)
self.cluster_labels = clusterlabels
self.dest_cluster_labels = destclusterlabel
self.labelvals = labelvals
def load_values(self):
with open("mult_in_out_large.json", "r") as json_file:
df_nodes = json.load(json_file)
return df_nodes
def init(self, df_nodes):
nodeIds = []
clusterlabels = []
destIds = []
for row in df_nodes:
for j in range(len(row['TransactionFrom'])):
print(" Input Ids: ", row['TransactionFrom'][j])
nodeIds.append(row['TransactionFrom'])
print("This is nodes: ", nodeIds)
for row in df_nodes:
destIds.append(row['TransactionTo'])
for row in range(len(nodeIds)):
print(nodeIds[row])
print("Finish InputIDs")
i = 0
for row in range(len(nodeIds)):
clusterlabels.append(row)
i += 1
print(i)
return {'nodeIds': nodeIds,
'clusterlabels': clusterlabels,
'destIds': destIds}
def classify_input(self, nodeIds, clusterlabels):
"""" classifying Inputs"""
"""" Labaling inputs"""
for row in range(len(nodeIds)):
for rown in range(len(nodeIds[row])):
for row1 in range(len(nodeIds)):
for rown1 in range(len(nodeIds[row1])):
if(nodeIds[row][rown]==nodeIds[row1][rown1]):
# print("row: ",row,"row1: ",row1)
if(row < row1):
for row2 in clusterlabels:
if( clusterlabels[row1]== clusterlabels[row2]):
clusterlabels[row2]=clusterlabels[row]
clusterlabels[row1] = clusterlabels[row]
else:
for row2 in clusterlabels:
if (clusterlabels[row] == clusterlabels[row2]):
clusterlabels[row2] = clusterlabels[row1]
clusterlabels[row] = clusterlabels[row1]
print(clusterlabels)
print("cluster labels:", len(clusterlabels))
print("NodeIDs: ", len(nodeIds))
return clusterlabels
def calc_cluster_num(self, clusterlabels):
"""" Calculating the number of clusters"""
labelvals = []
labelvals.append(clusterlabels[0])
for row in range(len(clusterlabels)):
flag = True
for row1 in range(len(labelvals)):
if(clusterlabels[row]== labelvals[row1]):
flag = False
if (flag):
labelvals.append(clusterlabels[row])
print("label values (source Ids in the network): ", labelvals, " and the number of clusters is: ", len(labelvals))
return labelvals
def cluster_with_labels(self, nodeIds, clusterlabels, labelvals):
"""" clustering Ids according to their labels"""
cluster = []
for row in range(len(labelvals)):
cluster.append([])
for row3 in range(len(nodeIds)):
if (labelvals[row] == clusterlabels[row3]):
cluster[row].extend(nodeIds[row3])
print("clusters: ", cluster)
return cluster
def remove_duplicates(self, cluster):
""" Removing duplicating items in cluster"""
flag = True
while(flag):
for row in range(len(cluster)):
flag= False
for row1 in range(len(cluster[row])):
flag= False
for row2 in range (len(cluster[row])):
if(row1 != row2):
if(cluster[row][row1] == cluster[row][row2]):
del cluster[row][row2]
flag=True
break
if(flag):
break
if(flag):
break
print("cluster:", cluster)
return cluster
def cluster_dest_ids(self, labelvals, cluster, destIds):
"""" Clustering Destination Ids """
destclusterlabel = []
for row in range(len(destIds)):
destclusterlabel.append([])
for row2 in range(len(destIds[row])):
flag = True
for rownum in range(len(labelvals)):
for row1 in range(len(cluster[rownum])):
if(destIds[row][row2]== cluster[rownum][row1]):
destclusterlabel[row].append(labelvals[rownum])
flag = False
if(flag):
destclusterlabel.append(destIds[row][row2])
print("destination labels (destination Ids): ", destclusterlabel)
return destclusterlabel
\ No newline at end of file
class NodeInfo:
'''Contains information about the individual nodes in the generated graph'''
label = None
centrality = None
adjacencies = None
degree = None
betweenness = None
betweenness_centrality = None
def __init__(self):
self.label = 'Node123'
self.centrality = 0
self.adjacencies = 0
self.degree = 0
self.betweenness = None
self.betweenness_centrality = 0
import networkx as nx
import matplotlib.pyplot as plt
from collections import Counter
from HyperGraph import HyperGraph
import warnings
# pip install networkx
# pip install matplotlib
## pip install pandas
## pip install community
## pip install mplleaflet
## pip install values
class SemanticLinking:
hg: HyperGraph = None
df_nodes = []
destf_nodes = []
G: nx.MultiDiGraph = None
color_map = {1: '#f09494', 2: '#eebcbc', 3: '#72bbd0', 4: '#91f0a1', 5: '#629fff', 6: '#bcc2f2',
7: '#eebcbc', 8: '#f1f0c0', 9: '#d2ffe7', 10: '#caf3a6', 11: '#ffdf55', 12: '#ef77aa',
13: '#d6dcff', 14: '#d2f5f0'}
def __init__(self):
warnings.filterwarnings('ignore')
# init HyperGraph
self.hg = HyperGraph()
self.hg.classify()
self.df_nodes = self.hg.cluster_labels
self.destf_nodes = self.hg.dest_cluster_labels
# init visual graph
self.G = nx.MultiDiGraph(day="Stackoverflow")
def _color_network(self, G):
"""Colors the network so that neighboring nodes all have distinct colors.
Returns a dict keyed by color to a set of nodes with that color.
"""
coloring = dict() # color => set(node)
colors = nx.coloring.greedy_color(G)
for node, color in colors.items():
if color in coloring:
coloring[color].add(node)
else:
coloring[color] = set([node])
return coloring
def _labeling_complete(self, labeling, G):
"""Determines whether or not LPA is done.
Label propagation is complete when all nodes have a label that is
in the set of highest frequency labels amongst its neighbors.
Nodes with no neighbors are considered complete.
"""
return all(labeling[v] in self._most_frequent_labels(v, labeling, G)
for v in G if len(G[v]) > 0)
def _most_frequent_labels(self, node, labeling, G):
"""Returns a set of all labels with maximum frequency in `labeling`.
Input `labeling` should be a dict keyed by node to labels.
"""
if not G[node]:
# Nodes with no neighbors are themselves a community and are labeled
# accordingly, hence the immediate if statement.
return {labeling[node]}
# Compute the frequencies of all neighbours of node
freqs = Counter(labeling[q] for q in G[node])
max_freq = max(freqs.values())
return {label for label, freq in freqs.items() if freq == max_freq}
def _update_label(self, node, labeling, G):
"""Updates the label of a node using the Prec-Max tie breaking algorithm
The algorithm is explained in: 'Community Detection via Semi-Synchronous
Label Propagation Algorithms' Cordasco and Gargano, 2011
"""
high_labels = self._most_frequent_labels(node, labeling, G)
if len(high_labels) == 1:
labeling[node] = high_labels.pop()
elif len(high_labels) > 1:
# Prec-Max
if labeling[node] not in high_labels:
labeling[node] = max(high_labels)
def drawedges(self):
"""drawing edges in graph"""
labelvalues = self.hg.label_values
weight1 = []
for drow in range(len(self.df_nodes)):
for row in range(len(self.destf_nodes[drow])):
self.G.add_edge(self.df_nodes[drow], self.destf_nodes[drow][row])
for row in range(len(labelvalues)):
for row1 in range(len(labelvalues)):
weight1.append(self.G.number_of_edges(labelvalues[row], labelvalues[row1]))
print("The number of coccurance from node ", labelvalues[row],"to node ", labelvalues[row1], ": ", weight1[row1])
self.G.weight = weight1
return weight1
def dolabeling(self):
"""label_propagation_communities(G) """
coloring = self._color_network(self.G)
# Create a unique label for each node in the graph
labeling = {v: k for k, v in enumerate(self.G)}
print("lable value: ", labeling.values())
while not self._labeling_complete(labeling, self.G):
# Update the labels of every node with the same color.
print("lable value: ", labeling.values())
for color, nodes in coloring.items():
for n in nodes:
self._update_label(n, labeling, self.G)
for label in set(labeling.values()):
print("lable value: ", labeling.values())
return labeling
def findigneighbors(self):
""" findig nodes' adjecencies"""
node_text = []
node_adjacencies = []
for node, adjacencies in enumerate(self.G.adjacency()):
node_adjacencies.append(len(adjacencies[1]))
node_text.append('# of connections: '+str(len(adjacencies[1])))
self.G.color = node_adjacencies
return node_adjacencies
def print_metrics(self, weight1, labeling, node_adjacencies):
weigth = []
edge_width = []
plt.figure(figsize=(25, 25))
# colors = [color_map[G.node[node][1]] for node in G]
# sizes = [G.node[node]['Timestamp'] * 10 for node in G]
d = nx.degree_centrality(self.G)
d_list = list(d.values())
print("node centrality: ", d_list)
print("node adjacencies: ", node_adjacencies)
for row in range(len(weigth)):
edge_width.append([])
for drow in range(len(weigth[row])):
edge_width[row].append(weigth[row][drow])
edge_width = [row * 0.5 for row in weight1]
print("Nodes' Degree: ", nx.degree(self.G))
print("Nodes' Betweeness ", nx.edge_betweenness_centrality(self.G))
print("Nodes' Betweeness-centrality: ", nx.betweenness_centrality(self.G))
def draw_edges(self, weight1, labeling, node_adjacencies):
"""
Using the spring layout :
- k controls the distance between the nodes and varies between 0 and 1
- iterations is the number of times simulated annealing is run
default k=0.1 and iterations=50
"""
labels2 = {}
options = {
'with_labels': True,
'font_weight': 'regular',
}
d = nx.degree_centrality(self.G)
node_size = [v * 80 for v in d.values()] # setting node size based on node centrality
for idx, edge in enumerate(self.G.edges):
labels2[edge] = "s"
pos_nodes = nx.spring_layout(self.G, k=0.25, iterations=50)
nx.draw(self.G, pos_nodes, node_color=node_adjacencies, node_size=node_size, width=2, arrowstyle='->',
arrowsize=10, weight=weight1, edge_color='gray', **options)
edge_labels = nx.get_edge_attributes(self.G, 'weight')
pos_attrs = {}
for node, coords in pos_nodes.items():
pos_attrs[node] = (coords[0], coords[1] + 0.02)
nx.draw_networkx_edge_labels(self.G, pos_nodes, edge_labels=edge_labels, font_size=10, font_color='red')
nx.draw_networkx_labels(self.G, pos_attrs, labels=labeling, font_size=10, font_color='red')
ax = plt.gca()
ax.collections[0].set_edgecolor("#555555")
plt.show()
def main(self):
weight1 = self.drawedges()
labeling = self.dolabeling()
node_adjacencies = self.findigneighbors()
self.print_metrics(weight1, labeling, node_adjacencies)
self.draw_edges(weight1, labeling, node_adjacencies)
if __name__ == '__main__':
linking = SemanticLinking()
linking.main()
\ No newline at end of file
# add modules folder to interpreter path # add modules folder to interpreter path
import sys import sys
import os import os
import prance
from pathlib import Path
modules_path = '../../../modules/' modules_path = '../../../modules/'
if os.path.exists(modules_path): if os.path.exists(modules_path):
sys.path.insert(1, modules_path) sys.path.insert(1, modules_path)
...@@ -13,6 +15,8 @@ LOGGER = logging.getLogger(__name__) ...@@ -13,6 +15,8 @@ LOGGER = logging.getLogger(__name__)
################################# #################################
import connexion import connexion
from security import swagger_util
from env_info import is_running_locally
from messaging.ReconnectingMessageManager import ReconnectingMessageManager from messaging.ReconnectingMessageManager import ReconnectingMessageManager
from messaging.MessageHandler import MessageHandler from messaging.MessageHandler import MessageHandler
...@@ -23,15 +27,31 @@ def message_received_callback(channel, method, properties, body): ...@@ -23,15 +27,31 @@ def message_received_callback(channel, method, properties, body):
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
@app.route('/', methods=['GET']) @app.route('/', methods=['GET'])
def api_root(): def api_root():
return 'Endpoint of semantic-linking-microservice!' return 'Endpoint of semantic-linking-microservice!'
# SSL configuration
try:
certificate_path = os.environ['ARTICONF_CERTIFICATE_PATH']
except KeyError:
certificate_path = '/srv/articonf/'
context = (os.path.normpath(f'{certificate_path}/articonf1.crt'), os.path.normpath(f'{certificate_path}/articonf1.key')) # certificate and key files
if is_running_locally():
# Local Mode
print("Running with local settings...")
app.add_api(swagger_util.get_bundled_specs(Path("configs/swagger_local.yml")),
resolver = connexion.RestyResolver("cms_rest_api"))
context = 'adhoc'
else:
app.add_api(swagger_util.get_bundled_specs(Path("configs/swagger.yml")),
resolver = connexion.RestyResolver("cms_rest_api"))
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
message_manager = ReconnectingMessageManager.getInstance() message_manager = ReconnectingMessageManager.getInstance()
message_manager.start_consuming('datahub', 'direct', 'semantic-linking', True, message_received_callback) message_manager.start_consuming('datahub', 'direct', 'semantic-linking', True, message_received_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False, ssl_context=context)
import network_constants as netconst
from security.token_manager import TokenManager
from db.entities import Layer
from db.repository import Repository
import json import json
import requests import requests
from typing import Dict, List
from threading import Thread from threading import Thread
import network_constants as netconst
import logging import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageHandler: class MessageHandler:
def __init__(self): def __init__(self):
pass self._repository = Repository()
def handle_generic(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
...@@ -24,11 +29,68 @@ class MessageHandler: ...@@ -24,11 +29,68 @@ class MessageHandler:
LOGGER.warning("Message has no type field and is ignored") LOGGER.warning("Message has no type field and is ignored")
return return
if message['type'] == 'new-traces-available': if message['type'] == 'new-trace':
self.handle_new_trace(message['content'])
elif message['type'] == 'new-traces-available':
self.handle_new_traces_available() self.handle_new_traces_available()
else: else:
LOGGER.info("Message Type could not be processed") LOGGER.info("Message Type could not be processed")
def _fetch_layer_information(self, use_case: str) -> List[Layer]:
# fetch token for authentication
jwt_token = TokenManager.getInstance().getToken()
# query schema information
url = f'https://{netconst.BUSINESS_LOGIC_HOSTNAME}:{netconst.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/layers'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt_token}"}
)
if response.status_code != 200:
raise ValueError("no schema information available")
layers = [Layer.from_business_logic_dict(row) for row in json.loads(response.text)]
# update local DB, insert each layer that does not already exists
for layer in layers:
print(f"Add layer to DB: {layer.to_serializable_dict(for_db=True)}")
self._repository.delete_layer(layer.layer_name)
self._repository.add_layer(layer)
return layers
def handle_new_trace(self, content: Dict):
if "use_case" not in content.keys() or "id" not in content.keys() or "properties" not in content.keys():
LOGGER.error(f"Missing fields in trace, required fields: (use_case, id, properties), given fields: ({content.keys()})")
return
use_case = content["use_case"]
# fetch layer information
layers = self._repository.get_layers_for_use_case(use_case)
# if no local layers are found, fetch information from server
if len(layers) == 0:
layers = self._fetch_layer_information(use_case)
nodes = []
for layer in layers:
node = {}
for prop in layer.total_properties:
node[prop] = content["properties"][prop]
node["layer_name"] = layer.layer_name
node["use_case"] = use_case
nodes.append(node)
self._repository.add_layer_nodes(nodes)
def handle_new_traces_available(self): def handle_new_traces_available(self):
# get all traces and call the Processor # get all traces and call the Processor
url = f'http://{netconst.TRACE_RETRIEVAL_HOSTNAME}:{netconst.TRACE_RETRIEVAL_REST_PORT}/api/trace' url = f'http://{netconst.TRACE_RETRIEVAL_HOSTNAME}:{netconst.TRACE_RETRIEVAL_REST_PORT}/api/trace'
......
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This package implements community detection.
Package name is community but refer to python-louvain on pypi
"""
# from .community_louvain import (
# partition_at_level,
# modularity,
# best_partition,
# generate_dendrogram,
# induced_graph,
# load_binary,
# )
__author__ = """Thomas Aynaud (thomas.aynaud@lip6.fr)"""
# Copyright (C) 2009 by
# Thomas Aynaud <thomas.aynaud@lip6.fr>
# All rights reserved.
# BSD license.
# coding=utf-8
class Status(object):
"""
To handle several data in one struct.
Could be replaced by named tuple, but don't want to depend on python 2.6
"""
node2com = {}
total_weight = 0
internals = {}
degrees = {}
gdegrees = {}
def __init__(self):
self.node2com = dict([])
self.total_weight = 0
self.degrees = dict([])
self.gdegrees = dict([])
self.internals = dict([])
self.loops = dict([])
def __str__(self):
return ("node2com : " + str(self.node2com) + " degrees : "
+ str(self.degrees) + " internals : " + str(self.internals)
+ " total_weight : " + str(self.total_weight))
def copy(self):
"""Perform a deep copy of status"""
new_status = Status()
new_status.node2com = self.node2com.copy()
new_status.internals = self.internals.copy()
new_status.degrees = self.degrees.copy()
new_status.gdegrees = self.gdegrees.copy()
new_status.total_weight = self.total_weight
def init(self, graph, weight, part=None):
"""Initialize the status of a graph with every node in one community"""
count = 0
self.node2com = dict([])
self.total_weight = 0
self.degrees = dict([])
self.gdegrees = dict([])
self.internals = dict([])
self.total_weight = graph.size(weight=weight)
if part is None:
for node in graph.nodes():
self.node2com[node] = count
deg = float(graph.degree(node, weight=weight))
if deg < 0:
error = "Bad node degree ({})".format(deg)
raise ValueError(error)
self.degrees[count] = deg
self.gdegrees[node] = deg
edge_data = graph.get_edge_data(node, node, default={weight: 0})
self.loops[node] = float(edge_data.get(weight, 1))
self.internals[count] = self.loops[node]
count += 1
else:
for node in graph.nodes():
com = part[node]
self.node2com[node] = com
deg = float(graph.degree(node, weight=weight))
self.degrees[com] = self.degrees.get(com, 0) + deg
self.gdegrees[node] = deg
inc = 0.
for neighbor, datas in graph[node].items():
edge_weight = datas.get(weight, 1)
if edge_weight <= 0:
error = "Bad graph type ({})".format(type(graph))
raise ValueError(error)
if part[neighbor] == com:
if neighbor == node:
inc += float(edge_weight)
else:
inc += float(edge_weight) / 2.
self.internals[com] = self.internals.get(com, 0) + inc
## a framework for community-based node ranking
import networkx as nx
import numpy as np
from sklearn.cluster import AffinityPropagation
import multiprocessing as mp
from ..network_ranking.node_ranking import sparse_page_rank,modularity,stochastic_normalization
from scipy.cluster.hierarchy import dendrogram, linkage
from scipy.cluster.hierarchy import fcluster
import scipy.sparse as sp
def page_rank_kernel(index_row):
## call as results = p.map(pr_kernel, batch)
pr = sparse_page_rank(_RANK_GRAPH, [index_row],
epsilon=1e-6,
max_steps=100000,
damping=0.90,
spread_step=10,
spread_percent=0.1,
try_shrink=True)
norm = np.linalg.norm(pr, 2)
if norm > 0:
pr = pr / np.linalg.norm(pr, 2)
return (index_row,pr)
else:
return (index_row,np.zeros(G.shape[1]))
def create_tree(centers):
clusters = {}
to_merge = linkage(centers, method='single')
for i, merge in enumerate(to_merge):
if merge[0] <= len(to_merge):
# if it is an original point read it from the centers array
a = centers[int(merge[0]) - 1]
else:
# other wise read the cluster that has been created
a = clusters[int(merge[0])]
if merge[1] <= len(to_merge):
b = centers[int(merge[1]) - 1]
else:
b = clusters[int(merge[1])]
# the clusters are 1-indexed by scipy
clusters[1 + i + len(to_merge)] = {
'children' : [a, b]
}
# ^ you could optionally store other info here (e.g distances)
return clusters
def return_infomap_communities(network):
infomapWrapper = infomap.Infomap("--two-level --silent")
# Add link weight as an optional third argument
for e in network.edges():
infomapWrapper.addLink(e[0], e[1])
infomapWrapper.run()
tree = infomapWrapper.tree
print("Found %d modules with codelength: %f" % (tree.numTopModules(), tree.codelength()))
print("\n#node module")
part = defaultdict(list)
for node in tree.leafIter():
part[node.moduleIndex()].append(node.physIndex)
return list(part.values())
if __name__ == "__main__":
from infomap import infomap
from collections import defaultdict
from itertools import product
import community
from networkx.algorithms.community import LFR_benchmark_graph
from sklearn.cluster import AffinityPropagation,DBSCAN,MiniBatchKMeans
from scipy import cluster
from scipy.cluster.hierarchy import fcluster
from scipy.spatial.distance import pdist
global _RANK_GRAPH
print("Generating communities..")
n = 500
tau1 = 4
tau2 = 1.5
mu = 0.1
# _RANK_GRAPH = nx.windmill_graph(20, 5)
_RANK_GRAPH = LFR_benchmark_graph(n,
tau1,
tau2,
mu,
average_degree=5,
min_community=30,
seed=10)
print(nx.info(_RANK_GRAPH))
A = _RANK_GRAPH.copy()
_RANK_GRAPH = nx.to_scipy_sparse_matrix(_RANK_GRAPH)
_RANK_GRAPH = stochastic_normalization(_RANK_GRAPH) ## normalize
n = _RANK_GRAPH.shape[1]
with mp.Pool(processes=mp.cpu_count()) as p:
results = p.map(page_rank_kernel,range(n))
vectors = np.zeros((n, n))
for pr_vector in results:
if pr_vector != None:
vectors[pr_vector[0],:] = pr_vector[1]
vectors = np.nan_to_num(vectors)
option = "cpu"
dx_rc = defaultdict(list)
dx_lx = defaultdict(list)
dx_hc = defaultdict(list)
if option == "cpu":
mx_opt = 0
for nclust in range(2,_RANK_GRAPH.shape[0]):
clustering_algorithm = MiniBatchKMeans(n_clusters=nclust)
clusters = clustering_algorithm.fit_predict(vectors)
for a, b in zip(clusters,A.nodes()):
dx_rc[a].append(b)
partitions = dx_rc.values()
mx = modularity(A, partitions, weight='weight')
if mx > mx_opt:
mx_opt = mx
dx_rc = defaultdict(list)
print("KM: {}".format(mx_opt))
Z = linkage(vectors, 'ward')
mod_hc_opt = 0
for nclust in range(3,_RANK_GRAPH.shape[0]):
try:
k = nclust
cls = fcluster(Z, k, criterion='maxclust')
for a,b in zip(cls,A.nodes()):
dx_hc[a].append(b)
partition_hi = dx_hc.values()
mod = modularity(A, partition_hi, weight='weight')
if mod > mod_hc_opt:
mod_hc_opt = mod
except:
pass
print("Hierarchical: {}".format(mod))
## the louvain partition
partition = community.best_partition(A)
for a,b in partition.items():
dx_lx[b].append(a)
partition_louvain = dx_lx.values()
print("Louvain: {}".format(modularity(A, partition_louvain, weight='weight')))
parts_im = return_infomap_communities(A)
print("Infomap: {}".format(modularity(A, parts_im, weight='weight')))
## high level interface for community detection algorithms
from .community_louvain import *
def louvain_communities(network,input_type="mat",verbose=True):
# if input_type == "mat":
# network = nx.from_scipy_sparse_matrix(network)
if verbose:
print ("Detecting communities..")
partition = best_partition(network)
return partition
## Compute many possible network statistics
import scipy.io
import networkx as nx
import pandas as pd
import numpy as np
from collections import Counter
from operator import itemgetter
def identify_n_hubs(G,top_n=100,node_type=None):
if node_type is not None:
target_nodes = []
for n in G.nodes(data=True):
try:
if n[1]['type'] == node_type:
target_nodes.append(n[0])
except:
pass
else:
target_nodes = G.nodes()
degree_dict = {x : G.degree(x) for x in target_nodes}
top_n_id = {x[0]:x[1] for e,x in enumerate(sorted(degree_dict.items(), key = itemgetter(1), reverse = True)) if e < top_n}
return top_n_id
def core_network_statistics(G,labels=None,name="example"):
rframe = pd.DataFrame(columns=["Name",
"classes",
"nodes",
"edges",
"degree",
"diameter",
"connected components",
"clustering coefficient",
"density",
"flow_hierarchy"])
nodes = len(G.nodes())
edges = len(G.edges())
cc = len(list(nx.connected_components(G.to_undirected())))
try:
cc = nx.average_clustering(G.to_undirected())
except:
cc = None
try:
dx = nx.density(G)
except:
dx = None
clustering = None
if labels is not None:
number_of_classes = labels.shape[1]
else:
number_of_classes = None
node_degree_vector = list(dict(nx.degree(G)).values())
mean_degree = np.mean(node_degree_vector)
try:
diameter = nx.diameter(G)
except:
diameter = "intractable"
try:
flow_hierarchy = nx.flow_hierarchy(G)
except:
flow_hierarchy = "intractable"
point = {"Name": name,
"classes":number_of_classes,
"nodes":nodes,
"edges":edges,
"diameter":diameter,
"degree":mean_degree,
"flow hierarchy":flow_hierarchy,
"connected components":cc,
"clustering coefficient":clustering,
"density":dx}
rframe = rframe.append(point,ignore_index=True)
return rframe
## compute enrichment via FET
## first for only simple terms,
## continue with communities
#!/usr/bin/python
# -*- coding: utf-8 -*-
"""
This package implements community detection.
Package name is community but refer to python-louvain on pypi
"""
# from .community_louvain import (
# partition_at_level,
# modularity,
# best_partition,
# generate_dendrogram,
# induced_graph,
# load_binary,
# )
__author__ = """Thomas Aynaud (thomas.aynaud@lip6.fr)"""
# Copyright (C) 2009 by
# Thomas Aynaud <thomas.aynaud@lip6.fr>
# All rights reserved.
# BSD license.
This diff is collapsed.
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment