Commit e0a467cf authored by Alexander Lercher's avatar Alexander Lercher

Fetching data for community prediction

Use-cases, layers, clusters, timeslices, layerpairs
parent b1a8e730
......@@ -2,13 +2,15 @@ FROM python:3
LABEL maintainer="Alexander Lercher"
RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/role-stage-discovery-microservice/app/requirements.txt /app/
RUN pip install -r requirements.txt
COPY src/modules/ /app/
COPY src/data-hub/proactive-community-detection-microservice/app/ /app/
RUN chmod a+x main.py
......
from db.dao.cluster import Cluster as ClusterDao
from db.dao.layer import Layer as LayerDao
from db.dao.timeslice import TimeSlice as TimeSliceDao
from db.dao.layer_pair import LayerPair as LayerPairDao
import json
from typing import List, Dict
from datetime import date, datetime
class Cluster:
'''
A cluster for an arbitrary layer containing some nodes.
:param use_case: The use-case of the layer
:param use_case_table: The use-case table of the layer
:param layer_name: The name of the layer in which the cluster is located
:param cluster_label: The label of the cluster unique for the layer
:param nodes: The individual nodes of the cluster
:param label: A human readable label
'''
def __init__(self, use_case: str = None, use_case_table: str = None, layer_name: str = None,
cluster_label: int = None, nodes: List[Dict] = None, label: str = None,
cluster_dict: Dict = None, from_db=False):
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.cluster_label = cluster_label
self.nodes = nodes
self.label = label
if cluster_dict is not None:
self.from_serializable_dict(cluster_dict, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"use_case": self.use_case,
"use_case_table": self.use_case_table,
"layer_name": self.layer_name,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes,
"label": self.label,
}
def from_serializable_dict(self, cluster_dict: Dict, from_db=False):
self.use_case = cluster_dict["use_case"]
self.use_case_table = cluster_dict["use_case_table"]
self.layer_name = cluster_dict["layer_name"]
self.cluster_label = cluster_dict["cluster_label"]
self.nodes = json.loads(cluster_dict["nodes"]) \
if from_db else cluster_dict["nodes"]
self.label = cluster_dict["label"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"Cluster({self.__repr__()})"
import json
from datetime import datetime
from typing import Dict
class Layer:
'''
This class represents a single layer of the Multilayer Graph.
:param layer_info: Information as dictionary to restore the layer object.
'''
def __init__(self, layer_info: Dict = None, from_db=False):
if layer_info is not None:
self.from_serializable_dict(layer_info, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"layer_name": self.layer_name,
"properties": self.properties,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
"total_properties": self.total_properties,
}
@staticmethod
def from_business_logic_dict(layer_info: Dict):
layer = Layer()
layer.layer_name = layer_info["name"]
layer.properties = layer_info["cluster_properties"]
layer.total_properties = layer_info["properties"]
layer.use_case = layer_info["use_case"]
layer.use_case_table = layer_info["table"]
return layer
def from_serializable_dict(self, layer_info: Dict, from_db=False):
self.layer_name = layer_info['layer_name']
self.properties = layer_info['properties']
self.use_case = layer_info["use_case"]
self.use_case_table = layer_info["use_case_table"]
self.total_properties = layer_info["total_properties"] if "total_properties"in layer_info.keys() else None
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"Layer({self.__repr__()})"
from typing import List, Dict
class LayerPair:
def __init__(self, use_case: str, table: str, layer: str, reference_layer: str):
self.use_case = use_case
self.table = table
self.layer = layer
self.reference_layer = reference_layer
@staticmethod
def create_from_dict(dict_) -> 'LayerPair':
lp = LayerPair(None, None, None, None)
lp.__dict__.update(dict_)
return lp
import json
from typing import List, Dict, NewType, Any
from datetime import date, datetime
Node = NewType('Node', dict)
class TimeSlice:
'''
A time slice for a single layer containing all nodes for that time.
:param time: The tag indicating the time
:param layer_name: The name of the layer the nodes belong to
'''
def __init__(self, time: Any = None, use_case: str = None, use_case_table: str = None, layer_name: str = None,
time_slice_dict: Dict = None, from_db = False):
self.time = str(time)
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.clusters: Dict[str, List[Node]] = {}
if time_slice_dict is not None:
self.from_serializable_dict(time_slice_dict, from_db)
def init_all_clusters(self, cluster_labels: List[str]):
'''Initializes internal clusters for all labels with an empty list.'''
for cluster_label in cluster_labels:
# only string keys can be stored in json
cluster_label = str(cluster_label)
self.clusters[cluster_label] = []
def add_node_to_cluster(self, cluster_label: str, node):
# only string keys can be stored in json
cluster_label = str(cluster_label)
if cluster_label not in self.clusters:
# self.clusters[cluster_label] = []
raise KeyError(f"self::init_all_clusters must be used to add all global cluster labels beforehand (got {cluster_label})")
# node = self._get_unique_id(node)
self.clusters[cluster_label].append(node)
def get_nodes_for_cluster(self, cluster_label: str):
if cluster_label in self.clusters:
return self.clusters[cluster_label]
else:
return []
def _get_unique_id(self, node : Dict) -> Dict:
'''Returns a new dict with the unique id only.'''
uid_key = 'UniqueID'
if uid_key in node:
return {uid_key: node[uid_key]}
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"time": self.time,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
'layer_name': self.layer_name,
"clusters": json.dumps(self.clusters) if for_db else self.clusters
}
def from_serializable_dict(self, dict: Dict, from_db=False):
self.time = dict["time"]
self.use_case = dict["use_case"]
self.use_case_table = dict["use_case_table"]
self.layer_name = dict['layer_name']
self.clusters = json.loads(dict['clusters']) if from_db else dict['clusters']
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"TimeSlice({self.__repr__()})"
import pymongo
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
import json
from db.dao import *
from typing import List
import logging
LOGGER = logging.getLogger(__name__)
class Repository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
def __init__(self):
super().__init__(netconst.PROACTIVE_COMMUNITY_DETECTION_DB_HOSTNAME,
netconst.PROACTIVE_COMMUNITY_DETECTION_DB_PORT,
'proactiveCommunityDb')
self._layer_collection = 'layers'
self._layer_pair_collection = 'layer_pairs'
self._clusters_collection = 'clusters'
self._time_slice_collection = 'time_slices'
#region Layers
def add_layer(self, layer: LayerDao):
super().insert_entry(self._layer_collection, layer.to_serializable_dict())
def get_layers(self) -> List[LayerDao]:
'''Retrieves all layers from the db, independent of use-case.'''
entries = super().get_entries(self._layer_collection, projection={'_id': 0})
return [LayerDao(e) for e in entries]
def get_layers_for_use_case(self, use_case: str) -> LayerDao:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case})
return [LayerDao(e) for e in entries]
def get_layers_for_table(self, use_case: str, use_case_table: str) -> LayerDao:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case, 'use_case_table': use_case_table})
return [LayerDao(e) for e in entries]
def get_layer_by_name(self, use_case:str, use_case_table:str, layer_name:str) -> LayerDao:
'''Returns a singe layer for use-case and layer-name.'''
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
entries = [LayerDao(e) for e in entries]
if entries is not None and len(entries) > 0:
if len(entries) > 1:
LOGGER.error(f"Layer Key {use_case}, {layer_name} is not unique.")
return entries[0]
else:
return None
def delete_all_layers(self):
super().drop_collection(self._layer_collection)
#endregion Layers
#region Clusters
def add_cluster(self, cluster: ClusterDao):
super().insert_entry(self._clusters_collection, cluster.to_serializable_dict(for_db=True))
def add_clusters(self, clusters: List[ClusterDao]):
cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters]
super().insert_many(self._clusters_collection, cluster_dicts)
def get_clusters_for_layer(self, use_case: str, use_case_table: str, layer_name: str) -> List[ClusterDao]:
entries = super().get_entries(self._clusters_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name}, projection={'_id': 0})
return [ClusterDao(cluster_dict=e, from_db=True) for e in entries]
def delete_all_clusters(self):
super().drop_collection(self._clusters_collection)
#endregion
#region TimeSlice
def add_time_slice(self, timeslice: TimeSliceDao):
super().insert_entry(self._time_slice_collection, timeslice.to_serializable_dict(for_db=True))
def get_time_slices(self) -> List[TimeSliceDao]:
'''Returns all time slices.'''
entries = super().get_entries(self._time_slice_collection)
return [TimeSliceDao(None, None, time_slice_dict=e, from_db=True) for e in entries]
def get_time_slices_by_name(self, use_case: str, use_case_table: str, layer_name: str) -> List[TimeSliceDao]:
'''Returns all time slices with the given layer_name.'''
entries = super().get_entries(self._time_slice_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
return [TimeSliceDao(time_slice_dict=e, from_db=True) for e in entries]
def remove_all_time_slices(self):
super().drop_collection(self._time_slice_collection)
#endregion
#region LayerPair
def add_layer_pair(self, layer_pair: LayerPairDao):
super().insert_entry(self._layer_pair_collection, layer_pair.__dict__)
def get_layer_pairs(self, use_case: str, use_case_table: str) -> List[LayerPairDao]:
entries = super().get_entries(self._layer_pair_collection)
return [LayerPairDao.create_from_dict(e) for e in entries]
#endregion
from flask import request
def echo():
import processing.fetching.fetching as f
# print(f._fetch_use_cases())
print(f._fetch_use_cases())
return request.json
\ No newline at end of file
from entities.timewindow import TimeWindow
from entities.cluster import Cluster
from entities.layer import Layer
\ No newline at end of file
# from __future__ import annotations
from typing import Dict, List, Iterable, Any, Tuple
from entities.timewindow import TimeWindow
import numpy as np
import scipy
from processing import ClusterMetricsCalculatorFactory
class Cluster:
'''A cluster from one time window containing all metrics used for machine learning.'''
def __init__(self, time_window_id: Any, cluster_id: Any, cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int,
global_cluster_center, global_center_distance=None):
self.time_window_id = time_window_id
self.cluster_id = cluster_id
metrics_calculator = ClusterMetricsCalculatorFactory.create_metrics_calculator(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
self.size = metrics_calculator.get_size()
self.std_dev = metrics_calculator.get_standard_deviation()
self.scarcity = metrics_calculator.get_scarcity()
self.importance1 = metrics_calculator.get_importance1()
self.importance2 = metrics_calculator.get_importance2()
self.range_ = metrics_calculator.get_range()
self.center = metrics_calculator.get_center()
self.global_center_distance = \
scipy.spatial.distance.euclidean(self.center, global_cluster_center) \
if self.size > 0 \
else 0
def get_time_info(self) -> int:
'''Returns the week of the time tuple str, eg. 25 for "(2014, 25)".'''
str_tuple = self.time_window_id
return int(str_tuple.split(',')[1].strip()[:-1])
def __repr__(self):
return str(self.__dict__)
def __str__(self):
return f"Cluster({self.time_window_id}, {self.cluster_id}, " \
f"{self.size}, {self.std_dev}, {self.scarcity}, " \
f"{self.importance1}, {self.importance2}, " \
f"{self.range_}, {self.center})"
@staticmethod
def create_multiple_from_time_window(time_window: TimeWindow, cluster_feature_names: List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> Iterable['Cluster']:
total_layer_nodes = sum([len(nodes) for nodes in time_window.clusters.values()])
layer_diversity = len([nodes for nodes in time_window.clusters.values() if len(nodes) > 0])
for cluster_nr, cluster_nodes in time_window.clusters.items():
yield Cluster(time_window.time, cluster_nr, cluster_nodes, cluster_feature_names, total_layer_nodes, layer_diversity, global_cluster_centers[cluster_nr])
@staticmethod
def create_from_dict(dict_) -> 'Cluster':
cl = Cluster(0, 0, [], 'None', 0, 0, None)
cl.__dict__.update(dict_)
return cl
from typing import Dict, List, Tuple, Any
import scipy.spatial
from entities.timewindow import TimeWindow
from processing import ClusterMetricsCalculatorFactory
class InternalCluster:
def __init__(self, cluster_id, cluster_nodes: List[dict], feature_names:List[str], global_cluster_center: Tuple[float], n_layer_nodes: int):
self.cluster_id = cluster_id
metrics_calculator = ClusterMetricsCalculatorFactory.create_metrics_calculator(cluster_nodes, feature_names, n_layer_nodes, None)
self.size = metrics_calculator.get_size()
self.relative_size = metrics_calculator.get_importance1()
self.center = metrics_calculator.get_center()
if self.size > 0:
self.global_center_distance = scipy.spatial.distance.euclidean(self.center, global_cluster_center)
else:
self.global_center_distance = 0
@staticmethod
def create_many_from_cluster_nodes(clusters: Dict[str, List[dict]], feature_names: List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> List['InternalCluster']:
res_clusters = []
total_layer_nodes = sum([len(nodes) for nodes in clusters.values()])
for key, value in clusters.items():
# ignore noise as it contains no meaningful cluster information
if key == '-1':
continue
res_clusters.append(InternalCluster(key, value, feature_names, global_cluster_centers[key], total_layer_nodes))
return res_clusters
class Layer:
'''Represents metrics for one layer for a single time window.'''
def __init__(self, time_window_id: Any, clusters: List[InternalCluster]):
self.time_window_id = time_window_id
active_clusters = [c for c in clusters if c.size > 0]
self.n_nodes = sum([c.size for c in clusters])
self.n_clusters = len(active_clusters)
self.relative_cluster_sizes = self.get_relative_cluster_sizes(active_clusters)
self.cluster_size_agg_metrics = self.get_size_min_max_avg_sum(active_clusters)
self.cluster_relative_size_agg_metrics = self.get_relative_size_min_max_avg_sum(active_clusters)
self.entropy = self.get_entropy(active_clusters)
self.centers = [c.center for c in active_clusters]
self.distances_from_global_centers = self.get_distances_from_global_center(active_clusters)
self.cluster_center_distance_agg_metrics = self.get_center_distance_min_max_avg_sum(active_clusters)
def get_size_min_max_avg_sum(self, clusters: List[InternalCluster]) -> dict:
'''Returns min, max, avg, and sum of the cluster's absolute sizes.'''
if len(clusters) == 0:
return {'min':0, 'max':0, 'avg':0, 'sum':0}
min_ = clusters[0].size
max_ = clusters[0].size
sum_ = 0
for c in clusters:
value = c.size
min_ = min(min_, value)
max_ = max(max_, value)
sum_ += value
avg_ = sum_ / len(clusters)
return {'min': min_, 'max': max_, 'avg': avg_, 'sum': sum_}
def get_relative_size_min_max_avg_sum(self, clusters: List[InternalCluster]) -> dict:
'''Returns min, max, avg, and sum of the cluster's relative sizes.'''
if len(clusters) == 0:
return {'min':0, 'max':0, 'avg':0, 'sum':0}
min_ = clusters[0].relative_size
max_ = clusters[0].relative_size
sum_ = 0
for c in clusters:
value = c.relative_size
min_ = min(min_, value)
max_ = max(max_, value)
sum_ += value
avg_ = sum_ / len(clusters)
return {'min': min_, 'max': max_, 'avg': avg_, 'sum': sum_}
def get_center_distance_min_max_avg_sum(self, clusters: List[InternalCluster]) -> dict:
'''Returns min, max, avg, and sum of the cluster's center distances.'''
if len(clusters) == 0:
return {'min':0, 'max':0, 'avg':0, 'sum':0}
min_ = clusters[0].global_center_distance
max_ = clusters[0].global_center_distance
sum_ = 0
for c in clusters:
value = c.global_center_distance
min_ = min(min_, value)
max_ = max(max_, value)
sum_ += value
avg_ = sum_ / len(clusters)
return {'min': min_, 'max': max_, 'avg': avg_, 'sum': sum_}
def get_relative_cluster_sizes(self, clusters: List[InternalCluster]):
return [c.relative_size for c in clusters]
def get_entropy(self, clusters: List[InternalCluster]):
'''
Returns the entropy over all clusters C,
where P(c_i) is the probability that a node belongs to cluster c_i.
'''
return scipy.stats.entropy(self.get_relative_cluster_sizes(clusters), base=2)
def get_distances_from_global_center(self, clusters: List[InternalCluster]):
return [cluster.global_center_distance for cluster in clusters]
def __repr__(self):
return str(self.__dict__)
def __str__(self):
return f"Layer({self.time_window_id}, " \
f"{self.n_nodes}, {self.n_clusters}, {self.relative_cluster_sizes}, " \
f"{self.entropy}, {self.centers}, {self.distances_from_global_centers})"
@staticmethod
def create_from_time_window(time_window: TimeWindow, feature_names:List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> 'Layer':
clusters: List[InternalCluster] = InternalCluster.create_many_from_cluster_nodes(time_window.clusters, feature_names, global_cluster_centers)
return Layer(time_window.time, clusters)
@staticmethod
def create_from_dict(dict_) -> 'Layer':
l = Layer(0, [])
l.__dict__.update(dict_)
return l
\ No newline at end of file
import json
from typing import List, Dict, NewType, Any
from datetime import date, datetime
class TimeWindow:
'''
A time slice for a single layer containing all nodes for that time.
:param time: The tag indicating the time
:param layer_name: The name of the layer the nodes belong to
'''
def __init__(self, time: Any = None, use_case: str = None, use_case_table: str = None, layer_name: str = None,
time_slice_dict: Dict = None, from_db = False):
self.time = str(time)
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.clusters: Dict[str, List[dict]] = {}
if time_slice_dict is not None:
self.from_serializable_dict(time_slice_dict, from_db)
def add_node_to_cluster(self, cluster_label: str, node):
# only string keys can be stored in json
cluster_label = str(cluster_label)
if cluster_label not in self.clusters:
self.clusters[cluster_label] = []
# node = self._get_unique_id(node)
self.clusters[cluster_label].append(node)
def get_nodes_for_cluster(self, cluster_label: str):
if cluster_label in self.clusters:
return self.clusters[cluster_label]
else:
return []
def _get_unique_id(self, node : Dict) -> Dict:
'''Returns a new dict with the unique id only.'''
uid_key = 'UniqueID'
if uid_key in node:
return {uid_key: node[uid_key]}
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"time": self.time,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
'layer_name': self.layer_name,
"clusters": json.dumps(self.clusters) if for_db else self.clusters
}
def from_serializable_dict(self, dict: Dict, from_db=False):
self.time = dict["time"]
self.use_case = dict["use_case"]
self.use_case_table = dict["use_case_table"]
self.layer_name = dict['layer_name']
self.clusters = json.loads(dict['clusters']) if from_db else dict['clusters']
@staticmethod
def create_from_serializable_dict(dict: Dict, from_db=False):
ts = TimeWindow()
ts.from_serializable_dict(dict, from_db)
return ts
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"TimeWindow({self.__repr__()})"
......@@ -51,4 +51,4 @@ app.add_api(swagger_util.get_bundled_specs(Path(swagger_path)),
# start app
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, ssl_context=context)
app.run(host='0.0.0.0', port=5000, ssl_context=context, debug=True)
import warnings
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Tuple
import numpy as np
from scipy.spatial import ConvexHull, qhull, distance
from math import sqrt
from statistics import mean
warnings.simplefilter(action='ignore', category=UserWarning)
# UserWarning: geopandas not available. Some functionality will be disabled.
from pointpats.centrography import std_distance
warnings.simplefilter(action='default', category=UserWarning)
class ClusterMetricsCalculator(ABC):
def __init__(self, cluster_nodes: List[dict], nr_layer_nodes: int, layer_diversity: int):
self.cluster_nodes = cluster_nodes
self.nr_layer_nodes = nr_layer_nodes
self.layer_diversity = layer_diversity