Commit 32ebd07f authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'feature/community-prediction' into 'develop'

CA-CEP

See merge request !44
parents 877c5e65 a31e702c
......@@ -2,13 +2,15 @@ FROM python:3
LABEL maintainer="Alexander Lercher"
RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/role-stage-discovery-microservice/app/requirements.txt /app/
RUN pip install -r requirements.txt
COPY src/modules/ /app/
COPY src/data-hub/proactive-community-detection-microservice/app/ /app/
RUN chmod a+x main.py
......
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
'200':
description: "Successful echo of request data"
/use-cases/{use_case}/tables/{table}/layers/{layer_name}/predictions:
get:
operationId: "routes.predictions.get"
security:
- JwtRegular: []
tags:
- "Predictions"
summary: "Get predictions"
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
type: "string"
- name: "table"
in: "path"
description: "Name of the table"
required: true
type: "string"
- name: "layer_name"
in: "path"
description: "Name of the layer"
required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/Prediction"
'404':
description: "Predictions not found"
definitions:
Prediction:
type: object
properties:
use_case:
type: string
table:
type: string
method:
type: string
layer:
type: string
reference_layer:
type: string
cluster_label:
type: string
time_window:
type: string
prediction:
type: integer
......@@ -11,20 +11,9 @@ produces:
basePath: "/api"
# Import security definitions from global security definition
securityDefinitions:
$ref: '../security/security.yml#securityDefinitions'
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
200:
description: "Successful echo of request data"
$ref: 'routes.yml#paths'
swagger: "2.0"
info:
title: Proactive Community Detection microservice
description: This is the documentation for the proactive community detection microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
# Import security definitions from global security definition
securityDefinitions:
$ref: '../../../../modules/security/security_local.yml#securityDefinitions'
paths:
$ref: 'routes.yml#paths'
from db.dao.cluster import Cluster as ClusterDao
from db.dao.layer import Layer as LayerDao
from db.dao.timeslice import TimeSlice as TimeSliceDao
from db.dao.layer_pair import LayerPair as LayerPairDao
from db.dao.prediction_result import PredictionResult
import json
from typing import List, Dict
from datetime import date, datetime
class Cluster:
'''
A cluster for an arbitrary layer containing some nodes.
:param use_case: The use-case of the layer
:param use_case_table: The use-case table of the layer
:param layer_name: The name of the layer in which the cluster is located
:param cluster_label: The label of the cluster unique for the layer
:param nodes: The individual nodes of the cluster
:param label: A human readable label
'''
def __init__(self, use_case: str = None, use_case_table: str = None, layer_name: str = None,
cluster_label: int = None, nodes: List[Dict] = None, label: str = None,
cluster_dict: Dict = None, from_db=False):
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.cluster_label = cluster_label
self.nodes = nodes
self.label = label
if cluster_dict is not None:
self.from_serializable_dict(cluster_dict, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"use_case": self.use_case,
"use_case_table": self.use_case_table,
"layer_name": self.layer_name,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes,
"label": self.label,
}
def from_serializable_dict(self, cluster_dict: Dict, from_db=False):
self.use_case = cluster_dict["use_case"]
self.use_case_table = cluster_dict["use_case_table"]
self.layer_name = cluster_dict["layer_name"]
self.cluster_label = cluster_dict["cluster_label"]
self.nodes = json.loads(cluster_dict["nodes"]) \
if from_db else cluster_dict["nodes"]
self.label = cluster_dict["label"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"Cluster({self.__repr__()})"
import json
from datetime import datetime
from typing import Dict
class Layer:
'''
This class represents a single layer of the Multilayer Graph.
:param layer_info: Information as dictionary to restore the layer object.
'''
def __init__(self, layer_info: Dict = None, from_db=False):
if layer_info is not None:
self.from_serializable_dict(layer_info, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"layer_name": self.layer_name,
"properties": self.properties,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
"total_properties": self.total_properties,
}
@staticmethod
def from_business_logic_dict(layer_info: Dict):
layer = Layer()
layer.layer_name = layer_info["name"]
layer.properties = layer_info["cluster_properties"]
layer.total_properties = layer_info["properties"]
layer.use_case = layer_info["use_case"]
layer.use_case_table = layer_info["table"]
return layer
def from_serializable_dict(self, layer_info: Dict, from_db=False):
self.layer_name = layer_info['layer_name']
self.properties = layer_info['properties']
self.use_case = layer_info["use_case"]
self.use_case_table = layer_info["use_case_table"]
self.total_properties = layer_info["total_properties"] if "total_properties"in layer_info.keys() else None
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"Layer({self.__repr__()})"
from typing import List, Dict
class LayerPair:
def __init__(self, use_case: str, table: str, layer: str, reference_layer: str):
self.use_case = use_case
self.table = table
self.layer = layer
self.reference_layer = reference_layer
@staticmethod
def create_from_dict(dict_) -> 'LayerPair':
lp = LayerPair(None, None, None, None)
lp.__dict__.update(dict_)
return lp
from typing import List, Dict
class PredictionResult:
def __init__(self, use_case: str, table: str, method: str,
layer: str, reference_layer: str, cluster_id: str,
time_window: str, prediction: int):
self.use_case = use_case
self.table = table
self.method = method
self.layer = layer
self.reference_layer = reference_layer
self.cluster_id = cluster_id
self.time_window = time_window
self.prediction = prediction
@staticmethod
def create_from_dict(dict_) -> 'PredictionResult':
obj = PredictionResult(None, None, None, None, None, None, None, None)
obj.__dict__.update(dict_)
return obj
import json
from typing import List, Dict, NewType, Any
from datetime import date, datetime
Node = NewType('Node', dict)
class TimeSlice:
'''
A time slice for a single layer containing all nodes for that time.
:param time: The tag indicating the time
:param layer_name: The name of the layer the nodes belong to
'''
def __init__(self, time: Any = None, use_case: str = None, use_case_table: str = None, layer_name: str = None,
time_slice_dict: Dict = None, from_db = False):
self.time = str(time)
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.clusters: Dict[str, List[Node]] = {}
if time_slice_dict is not None:
self.from_serializable_dict(time_slice_dict, from_db)
def init_all_clusters(self, cluster_labels: List[str]):
'''Initializes internal clusters for all labels with an empty list.'''
for cluster_label in cluster_labels:
# only string keys can be stored in json
cluster_label = str(cluster_label)
self.clusters[cluster_label] = []
def add_node_to_cluster(self, cluster_label: str, node):
# only string keys can be stored in json
cluster_label = str(cluster_label)
if cluster_label not in self.clusters:
# self.clusters[cluster_label] = []
raise KeyError(f"self::init_all_clusters must be used to add all global cluster labels beforehand (got {cluster_label})")
# node = self._get_unique_id(node)
self.clusters[cluster_label].append(node)
def get_nodes_for_cluster(self, cluster_label: str):
if cluster_label in self.clusters:
return self.clusters[cluster_label]
else:
return []
def _get_unique_id(self, node : Dict) -> Dict:
'''Returns a new dict with the unique id only.'''
uid_key = 'UniqueID'
if uid_key in node:
return {uid_key: node[uid_key]}
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"time": self.time,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
'layer_name': self.layer_name,
"clusters": json.dumps(self.clusters) if for_db else self.clusters
}
def from_serializable_dict(self, dict: Dict, from_db=False):
self.time = dict["time"]
self.use_case = dict["use_case"]
self.use_case_table = dict["use_case_table"]
self.layer_name = dict['layer_name']
self.clusters = json.loads(dict['clusters']) if from_db else dict['clusters']
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"TimeSlice({self.__repr__()})"
import pymongo
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
import json
from db.dao import *
from typing import List
import logging
LOGGER = logging.getLogger(__name__)
class Repository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
def __init__(self):
super().__init__(netconst.PROACTIVE_COMMUNITY_DETECTION_DB_HOSTNAME,
netconst.PROACTIVE_COMMUNITY_DETECTION_DB_PORT,
'proactiveCommunityDb')
self._use_case_collection = 'use_cases'
self._layer_collection = 'layers'
self._layer_pair_collection = 'layer_pairs'
self._clusters_collection = 'clusters'
self._time_slice_collection = 'time_slices'
self._prediction_result_collection = 'prediction_results'
def DROP(self, confirm:bool=False):
assert confirm, 'WONT DELETE WHOLE DB WITHOUT CONFIRMATION'
for collection_ in [self._use_case_collection, self._layer_collection, self._layer_pair_collection,
self._clusters_collection, self._time_slice_collection]:
super().drop_collection(collection_)
#region LayerPair
def add_use_case(self, use_case: str):
super().insert_entry(self._use_case_collection, {'name':use_case})
def get_use_cases(self) -> List[str]:
entries = super().get_entries(self._use_case_collection)
return [e['name'] for e in entries]
#endregion
#region Layers
def add_layer(self, layer: LayerDao):
super().insert_entry(self._layer_collection, layer.to_serializable_dict())
def get_layers(self) -> List[LayerDao]:
'''Retrieves all layers from the db, independent of use-case.'''
entries = super().get_entries(self._layer_collection, projection={'_id': 0})
return [LayerDao(e) for e in entries]
def get_layers_for_use_case(self, use_case: str) -> LayerDao:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case})
return [LayerDao(e) for e in entries]
def get_layers_for_table(self, use_case: str, use_case_table: str) -> LayerDao:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case, 'use_case_table': use_case_table})
return [LayerDao(e) for e in entries]
def get_layer_by_name(self, use_case:str, use_case_table:str, layer_name:str) -> LayerDao:
'''Returns a singe layer for use-case and layer-name.'''
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
entries = [LayerDao(e) for e in entries]
if entries is not None and len(entries) > 0:
if len(entries) > 1:
LOGGER.error(f"Layer Key {use_case}, {layer_name} is not unique.")
return entries[0]
else:
return None
def delete_all_layers(self):
super().drop_collection(self._layer_collection)
#endregion Layers
#region Clusters
def add_cluster(self, cluster: ClusterDao):
super().insert_entry(self._clusters_collection, cluster.to_serializable_dict(for_db=True))
def add_clusters(self, clusters: List[ClusterDao]):
cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters]
super().insert_many(self._clusters_collection, cluster_dicts)
def get_clusters_for_layer(self, use_case: str, use_case_table: str, layer_name: str) -> List[ClusterDao]:
entries = super().get_entries(self._clusters_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name}, projection={'_id': 0})
return [ClusterDao(cluster_dict=e, from_db=True) for e in entries]
def delete_all_clusters(self):
super().drop_collection(self._clusters_collection)
#endregion
#region TimeSlice
def add_time_slice(self, timeslice: TimeSliceDao):
super().insert_entry(self._time_slice_collection, timeslice.to_serializable_dict(for_db=True))
def get_time_slices(self) -> List[TimeSliceDao]:
'''Returns all time slices.'''
entries = super().get_entries(self._time_slice_collection)
return [TimeSliceDao(None, None, time_slice_dict=e, from_db=True) for e in entries]
def get_time_slices_for_layer(self, use_case: str, use_case_table: str, layer_name: str) -> List[TimeSliceDao]:
'''Returns all time slices with the given layer_name.'''
entries = super().get_entries(self._time_slice_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
return [TimeSliceDao(time_slice_dict=e, from_db=True) for e in entries]
def remove_all_time_slices(self):
super().drop_collection(self._time_slice_collection)
#endregion
#region LayerPair
def add_layer_pair(self, layer_pair: LayerPairDao):
super().insert_entry(self._layer_pair_collection, layer_pair.__dict__)
def get_layer_pairs(self, use_case: str) -> List[LayerPairDao]:
entries = super().get_entries(self._layer_pair_collection, selection={'use_case': use_case})
return [LayerPairDao.create_from_dict(e) for e in entries]
#endregion
#region PredictionResult
def add_prediction_result(self, prediction_result: PredictionResult):
super().insert_entry(self._prediction_result_collection, prediction_result.__dict__)
def get_prediction_results(self, use_case: str) -> List[PredictionResult]:
entries = super().get_entries(self._prediction_result_collection, selection={'use_case': use_case}, projection={'_id': 0})
return [PredictionResult.create_from_dict(e) for e in entries]
def delete_all_prediction_results(self):
super().drop_collection(self._prediction_result_collection)
#endregion
from flask import request
def echo():
import processing.fetching.fetching as f
# print(f._fetch_use_cases())
print(f._fetch_use_cases())
return request.json
\ No newline at end of file
from entities.timewindow import TimeWindow
from entities.cluster import Cluster
from entities.layer import Layer
\ No newline at end of file
# from __future__ import annotations
from typing import Dict, List, Iterable, Any, Tuple
from entities.timewindow import TimeWindow
import numpy as np
import scipy
from processing import ClusterMetricsCalculatorFactory
class Cluster:
'''A cluster from one time window containing all metrics used for machine learning.'''
def __init__(self, time_window_id: Any, cluster_id: Any, cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int,
global_cluster_center, global_center_distance=None):
self.time_window_id = time_window_id
self.cluster_id = cluster_id
metrics_calculator = ClusterMetricsCalculatorFactory.create_metrics_calculator(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
self.size = metrics_calculator.get_size()
self.std_dev = metrics_calculator.get_standard_deviation()
self.scarcity = metrics_calculator.get_scarcity()
self.importance1 = metrics_calculator.get_importance1()
self.importance2 = metrics_calculator.get_importance2()
self.range_ = metrics_calculator.get_range()
self.center = metrics_calculator.get_center()
self.global_center_distance = \
scipy.spatial.distance.euclidean(self.center, global_cluster_center) \
if self.size > 0 \
else 0
def get_time_info(self) -> int:
'''Returns the week of the time tuple str, eg. 25 for "(2014, 25)".'''
return eval(self.time_window_id)[1]
def __repr__(self):
return str(self.__dict__)
def __str__(self):
return f"Cluster({self.time_window_id}, {self.cluster_id}, " \
f"{self.size}, {self.std_dev}, {self.scarcity}, " \
f"{self.importance1}, {self.importance2}, " \
f"{self.range_}, {self.center})"
@staticmethod
def create_multiple_from_time_window(time_window: TimeWindow, cluster_feature_names: List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> Iterable['Cluster']:
total_layer_nodes = sum([len(nodes) for nodes in time_window.clusters.values()])
layer_diversity = len([nodes for nodes in time_window.clusters.values() if len(nodes) > 0])
for cluster_nr, cluster_nodes in time_window.clusters.items():
yield Cluster(time_window.time, cluster_nr, cluster_nodes, cluster_feature_names, total_layer_nodes, layer_diversity, global_cluster_centers[cluster_nr])
@staticmethod
def create_from_dict(dict_) -> 'Cluster':
cl = Cluster(0, 0, [], 'None', 0, 0, None)
cl.__dict__.update(dict_)
return cl
from typing import Dict, List, Tuple, Any
import scipy.spatial
from entities.timewindow import TimeWindow
from processing import ClusterMetricsCalculatorFactory
class InternalCluster:
def __init__(self, cluster_id, cluster_nodes: List[dict], feature_names:List[str], global_cluster_center: Tuple[float], n_layer_nodes: int):
self.cluster_id = cluster_id
metrics_calculator = ClusterMetricsCalculatorFactory.create_metrics_calculator(cluster_nodes, feature_names, n_layer_nodes, None)
self.size = metrics_calculator.get_size()
self.relative_size = metrics_calculator.get_importance1()
self.center = metrics_calculator.get_center()
if self.size > 0:
self.global_center_distance = scipy.spatial.distance.euclidean(self.center, global_cluster_center)
else:
self.global_center_distance = 0
@staticmethod
def create_many_from_cluster_nodes(clusters: Dict[str, List[dict]], feature_names: List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> List['InternalCluster']:
res_clusters = []
total_layer_nodes = sum([len(nodes) for nodes in clusters.values()])
for key, value in clusters.items():
# ignore noise as it contains no meaningful cluster information
if key == '-1':
continue
res_clusters.append(InternalCluster(key, value, feature_names, global_cluster_centers[key], total_layer_nodes))
return res_clusters
class Layer:
'''Represents metrics for one layer for a single time window.'''
def __init__(self, time_window_id: Any, clusters: List[InternalCluster]):
self.time_window_id = time_window_id
active_clusters = [c for c in clusters if c.size > 0]
self.n_nodes = sum([c.size for c in clusters])
self.n_clusters = len(active_clusters)
self.relative_cluster_sizes = self.get_relative_cluster_sizes(active_clusters)
self.cluster_size_agg_metrics = self.get_size_min_max_avg_sum(active_clusters)
self.cluster_relative_size_agg_metrics = self.get_relative_size_min_max_avg_sum(active_clusters)