Commit ad4c67ae authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'feature/role-stage-integration' into develop

parents e7437efb 5df70777
......@@ -17,7 +17,7 @@ def print_help():
def get_microservice_name_from_path(path) -> str:
'''
Extracts the microservice name from the path.
:param path: The path, eg. src\data-hub\stage-discovery-microservice\deployment
:param path: The path, eg. src\\data-hub\\stage-discovery-microservice\\deployment
'''
name = path.split(os.path.normpath('/'))[-2]
......@@ -75,7 +75,7 @@ if __name__ == '__main__':
img_name = None
if len(sys.argv) > 2:
img_name = sys.argv[2]
deployment_file_paths = [p for p in deployment_file_paths if (img_name in p)]
deployment_file_paths = [p for p in deployment_file_paths if (img_name == get_microservice_name_from_path(p))]
error_val = 0
if command == 'delete' or command == 'redeploy':
......
# Agent Discovery Microservice
The semantic linking microservice labels anonymous logins with individual agents, e.g. a human or an automated device.
## Technologies
- Python 3.x
- Python module Flask
- Python module Connexion with Swagger
- Docker
- Kubernetes
\ No newline at end of file
swagger: "2.0"
info:
title: Agent Discovery microservice
description: This is the documentation for the agent discovery microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
200:
description: "Successful echo of request data"
import connexion
# load swagger config
app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
@app.route('/', methods=['GET'])
def api_root():
return 'Endpoint of agent-discovery-microservice!'
# start app
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
apiVersion: v1
kind: Service
metadata:
name: agent-discovery
spec:
type: LoadBalancer
selector:
app: agent-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30104
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: agent-discovery
spec:
replicas: 1
selector:
matchLabels:
app: agent-discovery
template:
metadata:
labels:
app: agent-discovery
spec:
containers:
- name: agent-discovery
image: alexx882/agent-discovery-microservice
ports:
- containerPort: 5000
\ No newline at end of file
layers:
user:
properties:
starting-point:
properties:
- Latitude_StartingPoint
- Longitude_StartingPoint
\ No newline at end of file
import sys
import os
modules_paths = ['../../../modules/']
for modules_path in modules_paths:
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from typing import List, Tuple, Any
from networkx import Graph
from db.entities import LocationCluster, UserClusterGraph, Cluster
from db.repository import Repository
from processing.user_graph_generator import UserGraphGenerator
repo = Repository()
def get_edges_with_weights(g: Graph) -> List[Tuple[Any, Any, int]]:
res = []
for e in g.edges:
res.append((*e, g.edges[e]['weight']))
return res
def create_graphs_for_location_clusters():
graphs_for_clusters = []
ug = UserGraphGenerator()
clusters: Cluster = repo.get_location_clusters()
for cluster in clusters:
user_ids = [n['user'] for n in cluster.nodes]
graph: Graph = ug.create_graph_from_nodes(user_ids)
vertices = list(graph.nodes)
edges = get_edges_with_weights(graph)
cluster_graph = UserClusterGraph(vertices, edges)
graphs_for_clusters.append(cluster_graph)
store_graphs(graphs_for_clusters)
def store_graphs(graphs: List):
for g in graphs:
repo.add_user_cluster_graph(g)
if __name__ == "__main__":
create_graphs_for_location_clusters()
import json
from typing import List, Dict
import hashlib
class AgiRepository:
def getLocations(self) -> List[Dict]:
locations = []
travels = self.readDataFromFile()
# only take started travels
travels = [t for t in travels if t['status'] >= 2]
for travel in travels:
num_complete_travels = min(len(travel['startedBy']), len(travel['users']))
for i in range(num_complete_travels):
cur_location = travel['startedBy'][i]
cur_user = travel['users'][i]
locations.append(
self.location(f'{travel["id"]}-{cur_location["moment"]}',
cur_location['coordinate']['latitude'],
cur_location['coordinate']['longitude'],
cur_location['moment'],
# todo user in travel startedBy not available from dataset - currently using user list
hashlib.sha1(cur_user['userId'].encode()).hexdigest() # not showing generated username
))
return locations
def getLocationsBasedOnNewDataSchema(self):
'''Creates the new data generic schema to be used beginning on 24.03.2020'''
data = {
'layer_name': 'Destination',
'nodes': self.getLocations(),
'properties': ['latitude', 'longitude']
}
return data
def getTimesBasedOnNewDataSchema(self):
'''Creates the new data generic schema to be used beginning on 24.03.2020'''
data = {
'layer_name': 'Starting_Time',
'nodes': self.getLocations(),
'properties': ['timestamp']
}
return data
def readDataFromFile(self) -> List[Dict]:
with open('./db/agi/travels.json', 'r') as f_travels:
travels = json.loads(f_travels.read())
return travels
def location(self, id_, lat, long_, timestamp, username) -> dict:
return {
"id": id_,
'latitude': lat,
'longitude': long_,
"timestamp": timestamp,
"user": username
}
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from db.repository import Repository
def insert_locations():
repo = Repository()
locs = repo.get_agi_locations()
for l in locs:
repo.add_location(l)
if __name__ == "__main__":
insert_locations()
import yaml
from typing import Generator
from pathlib import Path
### init logging ###
import logging
LOG_FORMAT = (
'%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
PROJECT_ROOT = Path(__file__).parent.parent.parent
class ClusteringConfig:
'''Contains the configuration for the clustering algorithm defined in configs/clustering.yaml.'''
config_path = f'{PROJECT_ROOT}/configs/clustering.yaml'
config: dict = None
def __init__(self):
self.config = self._load_config()
def _load_config(self) -> dict:
'''Loads the whole configuration from file.'''
config = None
with open(self.config_path, 'r') as stream:
try:
config = yaml.safe_load(stream)
except yaml.YAMLError as exc:
LOGGER.error(exc)
config = {}
return config
def get_config(self):
return self.config
def get_layer_configs(self) -> Generator[dict, None, None]:
"""
Returns a generator for the individual layer configs.
Layer configs are dicts including a layer-name.
"""
for key, layer in self.config['layers'].items():
layer['layer-name'] = key
yield layer
import itertools
from typing import List, Dict, Tuple, Any
from networkx import Graph
class UserGraphGenerator:
def __init__(self):
pass
def count_edges(self, nodes: List) -> Dict[Tuple, int]:
edge_counts = {}
coms = itertools.combinations(nodes, 2)
for first, second in coms:
if first == second: # dont process reflexive connections
continue
if (first, second) in edge_counts:
edge_counts[first, second] += 1
else:
edge_counts[first, second] = 1
return edge_counts
def create_edges_with_weights(self, edge_counts: Dict[Tuple[Any, Any], int]) -> List[Tuple[Any, Any, Dict]]:
edges = []
for (key1, key2), value in edge_counts.items():
edge = (key1, key2, {'weight': value})
edges.append(edge)
return edges
def create_fully_connected_edges_for_nodes(self, nodes: List) -> List[Tuple[Any, Any, Dict]]:
return self.create_edges_with_weights(self.count_edges(nodes))
def create_graph_from_nodes(self, nodes: List) -> Graph:
'''Creates a networkx.Graph with distinct nodes and weighted edges between these nodes'''
g = Graph()
g.add_nodes_from(nodes)
g.add_edges_from(self.create_fully_connected_edges_for_nodes(nodes))
return g
import io
from flask import request, Response
from db.repository import Repository
from processing.clustering.clusterer import Clusterer
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
repo = Repository()
clusterer = Clusterer()
def get_locations():
clusters = repo.get_location_clusters()
return [c.to_serializable_dict() for c in clusters]
def get_times():
clusters = repo.get_time_clusters()
return [c.to_serializable_dict() for c in clusters]
def get_image_1():
return Response(status=501)
# todo
locations = repo.getLocations()
fig = clusterer.draw_locations(locations)
output = io.BytesIO()
FigureCanvas(fig).print_png(output)
return Response(output.getvalue(), mimetype="image/png")
def get_image_2():
return Response(status=501)
# todo
locations = repo.getLocations()
fig = clusterer.draw_locations(locations)
output = io.BytesIO()
FigureCanvas(fig).print_png(output)
return Response(output.getvalue(), mimetype="image/png")
\ No newline at end of file
import insert_agi_locations
import run_clustering
import create_user_graphs
def run_agi_clustering_and_graph_creation():
insert_agi_locations.insert_locations()
run_clustering.run_location_clustering()
run_clustering.run_time_clustering()
create_user_graphs.create_graphs_for_location_clusters()
from flask import request, Response
from db.repository import Repository
from db.entities import Location
repo = Repository()
def post():
body = request.json
_insert_location(body)
return Response(status=201)
def post_many():
body = request.json
for location in body:
_insert_location(location)
return Response(status=201)
def get():
return [l.to_serializable_dict() for l in repo.get_locations()]
def _insert_location(location_data: dict):
repo.add_location(Location(location_data))
from flask import request, Response
from db.repository import Repository
repo = Repository()
def get():
data = repo.get_user_cluster_graphs()
return [d.to_serializable_dict() for d in data]
import unittest
import sys
sys.path.insert(1, '../')
# python -m unittest discover
from db.entities.cluster import Cluster
from db.entities import TimeCluster, LocationCluster
from datetime import date, datetime
import json
class TestCluster(unittest.TestCase):
def test_init_Cluster(self):
c = Cluster(1, [1, 2, 3])
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
class TestLocationCluster(unittest.TestCase):
def setUp(self):
self.c = LocationCluster(1, [1, 2, 3])
def test_init_individualArguments(self):
c = LocationCluster(1, [1, 2, 3])
self.assertEqual('1', c.id)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': [1, 2, 3]}
c = LocationCluster(location_dict=dict_)
self.assertEqual('123', c.id)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument_fromDb(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': '[1, 2, 3]'}
c = LocationCluster(location_dict=dict_, from_db=True)
self.assertEqual('123', c.id)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_to_serializable_dict_noDb(self):
c_dict = self.c.to_serializable_dict()
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, c_dict['nodes'])
def test_from_serializable_dict_noDb(self):
new_c = LocationCluster()
new_c.from_serializable_dict(self.c.to_serializable_dict())
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
def test_to_serializable_dict_db_jsonNodes(self):
c_dict = self.c.to_serializable_dict(for_db=True)
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, json.loads(c_dict['nodes']))
def test_from_serializable_dict_fromDb(self):
new_c = LocationCluster()
new_c.from_serializable_dict(
self.c.to_serializable_dict(for_db=True), from_db=True)
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
class TestTimeCluster(unittest.TestCase):
def setUp(self):
self.date_ = date(2020, 1, 1)
self.c = TimeCluster(self.date_, 14, 1, [1, 2, 3])
def test_init_individualArguments(self):
c = TimeCluster(self.date_, 14, 1, [1, 2, 3])
self.assertEqual(f'{self.date_}-14-1', c.id)
self.assertEqual(self.date_, c.date)
self.assertEqual(14, c.hour)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': [1, 2, 3],
'date': str(self.date_), 'hour': 14}
c = TimeCluster(time_dict=dict_)
self.assertEqual('123', c.id)
self.assertEqual(self.date_, c.date)
self.assertEqual(14, c.hour)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument_fromDb(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': '[1, 2, 3]',
'date': str(self.date_), 'hour': 14}
c = TimeCluster(time_dict=dict_, from_db=True)
self.assertEqual('123', c.id)
self.assertEqual(self.date_, c.date)
self.assertEqual(14, c.hour)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_to_serializable_dict_noDb(self):
c_dict = self.c.to_serializable_dict()
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, c_dict['nodes'])
self.assertEqual(self.c.date, datetime.strptime(
c_dict['date'], '%Y-%m-%d').date())
self.assertEqual(self.c.hour, c_dict['hour'])
def test_from_serializable_dict_noDb(self):
new_c = TimeCluster()
new_c.from_serializable_dict(self.c.to_serializable_dict())
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
def test_to_serializable_dict_fromDb_jsonNodes(self):
c_dict = self.c.to_serializable_dict(for_db=True)
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, json.loads(c_dict['nodes']))
self.assertEqual(self.c.date, datetime.strptime(
c_dict['date'], '%Y-%m-%d').date())
self.assertEqual(self.c.hour, c_dict['hour'])
def test_from_serializable_dict_fromDb(self):
new_c = TimeCluster()
new_c.from_serializable_dict(
self.c.to_serializable_dict(for_db=True), from_db=True)
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering.clustering_config import ClusteringConfig
class TestClusteringConfig(unittest.TestCase):
def setUp(self):
self.clustering_config = ClusteringConfig()
def test_get_layer_configs_noneInput_noneOutput(self):
for layer_config in self.clustering_config.get_layer_configs():
self.assertIn('layer-name', layer_config)
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.user_graph_generator import UserGraphGenerator
import networkx as nx
class TestUserGraphGenerator(unittest.TestCase):
def setUp(self):
self.user_graph = UserGraphGenerator()
def test_count_edges_oneNode(self):
count_res = {}
self.assertEqual(count_res, self.user_graph.count_edges([1]))
def test_count_edges_threeDistinctNodes_threeEdges(self):
count_res = {(1, 2): 1, (1, 3): 1, (2, 3): 1}
self.assertEqual(count_res, self.user_graph.count_edges([1, 2, 3]))
def test_count_edges_twoNodesWithDups_notReflexive(self):
count_res = {}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1]))
def test_count_edges_threeNodesWithDups_countGtOne_notReflexive(self):
count_res = {(1, 3): 2}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1, 3]))
def test_count_edges_fourNodesWithDups_countGtOne_notReflexive(self):
count_res = {(1, 3): 2, (1, 4): 2, (3, 4): 1}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1, 3, 4]))
def test_count_edges_fourStringNodesWithDups_countGtOne_notReflexive(self):
count_res = {('test', 'test2'): 2, ('test', '4'): 2, ('test2', '4'): 1}
self.assertEqual(count_res,
self.user_graph.count_edges(['test', 'test', 'test2', '4']))
def test_count_edges_fourDistinctStringNodes_fullyConnectedEdges(self):
count_res = {
('1', '2'): 1, ('1', '3'): 1, ('1', '4'): 1,
('2', '3'): 1, ('2', '4'): 1, ('3', '4'): 1
}
self.assertEqual(count_res,
self.user_graph.count_edges(['1', '2', '3', '4']))
def test_create_edges_with_weights_SingleEdge(self):
counts = {('a', 'b'): 1}
edge_result = [('a', 'b', {'weight': 1})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_edges_with_weights_SingleEdgeWeightTwo(self):
counts = {('a', 'b'): 2}
edge_result = [('a', 'b', {'weight': 2})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_edges_with_weights_TwoEdgesWithWeights(self):
counts = {('a', 'b'): 2, ('b', 'c'): 1}
edge_result = [('a', 'b', {'weight': 2}), ('b', 'c', {'weight': 1})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_graph_from_nodes_singleNode(self):
nodes = [1]
edges = []
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_twoDistinctNodes(self):
nodes = [1, 2]
edges = [(1, 2, {'weight': 1})]
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_threeDistinctNodes(self):
nodes = [1, 2, 3]
edges = [(1, 2, {'weight': 1}), (1, 3, {'weight': 1}),
(2, 3, {'weight': 1})]
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_threeNodesWithDuplicates_TwoNodes_EdgesWithAccordingWeight(self):
nodes = [1, 1, 3]
edges = [(1, 3, {'weight': 2})]
self.assertGraph(list(set(nodes)), edges,
self.user_graph.create_graph_from_nodes(nodes))
# unittest custom assertions
def assertGraph(self, nodes, edges, g: nx.Graph):
self.assertEqual(len(nodes), g.number_of_nodes())
self.assertEqual(len(edges), g.number_of_edges())
for i in range(len(nodes)):
self.assertEqual(nodes[i], list(g.nodes)[i])
for i in range(len(edges)):
graph_edge = list(g.edges)[i]
first, second, weight = edges[i]
self.assertEqual((first, second), graph_edge)
self.assertEqual(weight, g.edges[graph_edge])
if __name__ == '__main__':
unittest.main()
import sys
import os
for path in ['../', './', '../../../modules/']:
if os.path.exists(path):
sys.path.insert(1, path)
import matplotlib.pyplot as plt
from db.repository import Repository
from db.entities import TimeSlice
from typing import List
def plt_show_circles(time_slices: List[TimeSlice], cluster_no):
cluster_no = str(cluster_no)
for slice_ in time_slices:
nodes = slice_.get_nodes_for_cluster(cluster_no)
# print(f"{slice_.time} number elements for cluster {cluster_no}: {len(nodes)}")
plt.title(str(slice_.time))
plt.scatter([n['Longitude_Destination'] if 'Longitude_Destination' in n else 0
for n in nodes],
[n['Latitude_Destination'] if 'Latitude_Destination' in n else 0
for n in nodes],
s=[len(nodes)*100]*len(nodes))
plt.pause(0.5)
def plt_show_bars(time_slices: List[TimeSlice], cluster_no):
cluster_no = str(cluster_no)
labels = [ts.time for ts in time_slices]
x_axis_label_stepsize = 10
nodes_per_slice_for_single_cluster = \
[len(time_slice.get_nodes_for_cluster(cluster_no))
for time_slice
in time_slices]
fig, ax = plt.subplots()
ax.bar(x=range(len(labels)),
height=nodes_per_slice_for_single_cluster)
ax.set_ylabel('Size')
ax.set_title(f'Cluster-{cluster_no} size over time')
ax.set_xticks(range(len(labels))[::x_axis_label_stepsize])
ax.set_xticklabels(labels[::x_axis_label_stepsize])
plt.show()
if __name__ == "__main__":
repo = Repository()
time_slices = repo.get_time_slices_by_name("Destination_Layer")
# chronological order
time_slices.sort(key=lambda ts: eval(ts.time))
print(len(time_slices))
plt_show_bars(time_slices, cluster_no = 0)
\ No newline at end of file
FROM python:3
LABEL maintainer="Alexander Lercher"
ENV http_proxy http://proxy.uni-klu.ac.at:3128/
ENV https_proxy http://proxy.uni-klu.ac.at:3128/
RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/democratic-reasoning-microservice/app/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
# Democratic Reasoning Microservice
The democratic reasoning microservice serves as a central knowledge-based component providing all facts and rules for other microservices.
## Technologies
- Python 3.x
- Python module Flask
- Python module Connexion with Swagger
- Docker
- Kubernetes
\ No newline at end of file
from flask import request
def echo():
return request.json
\ No newline at end of file
import connexion
# load swagger config
app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
@app.route('/', methods=['GET'])
def api_root():
return 'Endpoint of democratic-reasoning-microservice!'
# start app
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
apiVersion: v1
kind: Service
metadata:
name: democratic-reasoning
spec:
type: LoadBalancer
selector:
app: democratic-reasoning
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30106
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: democratic-reasoning
spec:
replicas: 1
selector:
matchLabels:
app: democratic-reasoning
template:
metadata:
labels:
app: democratic-reasoning
spec:
containers:
- name: democratic-reasoning
image: alexx882/democratic-reasoning-microservice
ports:
- containerPort: 5000
\ No newline at end of file
FROM python:3
LABEL maintainer="Alexander Lercher"
ENV http_proxy http://proxy.uni-klu.ac.at:3128/
ENV https_proxy http://proxy.uni-klu.ac.at:3128/
RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/geo-profiling-microservice/app/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
# Geo Profiling Microservice
The geo profiling microservice helps to cluster users physically close to each other.
## Technologies
- Python 3.x
- Python module Flask
- Python module Connexion with Swagger
- Docker
- Kubernetes
\ No newline at end of file
swagger: "2.0"
info:
title: Geo Profiling microservice
description: This is the documentation for the geo profiling microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
200:
description: "Successful echo of request data"
from flask import request
def echo():
return request.json
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: geo-profiling
spec:
type: LoadBalancer
selector:
app: geo-profiling
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30105
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: geo-profiling
spec:
replicas: 1
selector:
matchLabels:
app: geo-profiling
template:
metadata:
labels:
app: geo-profiling
spec:
containers:
- name: geo-profiling
image: alexx882/geo-profiling-microservice
ports:
- containerPort: 5000
\ No newline at end of file
......@@ -11,7 +11,8 @@ RUN pip install connexion[swagger-ui]
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/agent-discovery-microservice/app/ /app/
COPY src/data-hub/proactive-community-detection-microservice/app/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
# Community Detection Microservice
# Proactive Community Detection Microservice
The community detection microservice helps in deeper network understand-
ing and reveals interesting properties shared by the members. We propose a novel approach that combines event clustering and link analysis to detect communities along with clustering users into overlapping communities via agent, role, stage discovery microservices.
......
swagger: "2.0"
info:
title: Democratic Reasoning microservice
description: This is the documentation for the democratic reasoning microservice.
title: Proactive Community Detection microservice
description: This is the documentation for the proactive community detection microservice.
version: "1.0.0"
consumes:
......
......@@ -6,7 +6,7 @@ app.add_api('swagger.yml')
@app.route('/', methods=['GET'])
def api_root():
return 'Endpoint of geo-profiling-microservice!'
return 'Endpoint of proactive-community-detection-microservice!'
# start app
if __name__ == '__main__':
......
apiVersion: v1
kind: Service
metadata:
name: proactive-community-detection
spec:
type: LoadBalancer
selector:
app: proactive-community-detection
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30105
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: proactive-community-detection
spec:
replicas: 1
selector:
matchLabels:
app: proactive-community-detection
template:
metadata:
labels:
app: proactive-community-detection
spec:
containers:
- name: proactive-community-detection
image: alexx882/proactive-community-detection-microservice
ports:
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: proactive-community-detection-db
spec:
type: LoadBalancer
selector:
app: proactive-community-detection-db
ports:
- name: http
port: 27017
targetPort: 27017
nodePort: 30106
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: proactive-community-detection-db
spec:
replicas: 1
selector:
matchLabels:
app: proactive-community-detection-db
template:
metadata:
labels:
app: proactive-community-detection-db
spec:
containers:
- name: proactive-community-detection-db
image: mongo
env:
- name: MONGO_INITDB_ROOT_USERNAME
value: root
- name: MONGO_INITDB_ROOT_PASSWORD
value: root
ports:
- containerPort: 27017
\ No newline at end of file
FROM python:3
LABEL maintainer="Alexander Lercher"
ENV http_proxy http://proxy.uni-klu.ac.at:3128/
ENV https_proxy http://proxy.uni-klu.ac.at:3128/
RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/role-discovery-microservice/app/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
# Role Discovery Microservice
The role discovery microservice labels anonymous users with individual roles which are important for classification, e.g. the role father in the concept family.
## Technologies
- Python 3.x
- Python module Flask
- Python module Connexion with Swagger
- Docker
- Kubernetes
\ No newline at end of file
swagger: "2.0"
info:
title: Role Discovery microservice
description: This is the documentation for the role discovery microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
200:
description: "Successful echo of request data"
from flask import request
def echo():
return request.json
\ No newline at end of file
import connexion
# load swagger config
app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
@app.route('/', methods=['GET'])
def api_root():
return 'Endpoint of role-discovery-microservice!'
# start app
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
apiVersion: v1
kind: Service
metadata:
name: role-discovery
spec:
type: LoadBalancer
selector:
app: role-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30102
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: role-discovery
spec:
replicas: 1
selector:
matchLabels:
app: role-discovery
template:
metadata:
labels:
app: role-discovery
spec:
containers:
- name: role-discovery
image: alexx882/role-discovery-microservice
ports:
- containerPort: 5000
\ No newline at end of file
......@@ -11,11 +11,11 @@ EXPOSE 5000
WORKDIR /app
# https://www.aptible.com/documentation/faq/deploy/dockerfile-caching/pip-dockerfile-caching.html
COPY src/data-hub/community-detection-microservice/app/requirements.txt /app/
COPY src/data-hub/role-stage-discovery-microservice/app/requirements.txt /app/
RUN pip install -r requirements.txt
COPY src/modules/ /app/
COPY src/data-hub/community-detection-microservice/app/ /app/
COPY src/data-hub/role-stage-discovery-microservice/app/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
# Role Stage Discovery Microservice
The role stage discovery microservice detects communities, their roles based on contextualized features and stages of roles over time.
## Technologies
- Python 3.x
- Docker
- Kubernetes
\ No newline at end of file
from db.entities.location import Location
from db.entities.popular_location import PopularLocation
from db.entities.cluster import Cluster, LocationCluster, TimeCluster
from db.entities.cluster import Cluster
from db.entities.clusterset import ClusterSet
from db.entities.user_cluster_graph import UserClusterGraph
from db.entities.layer import Layer
......
......@@ -39,67 +39,3 @@ class Cluster:
def __str__(self):
return f"Cluster({self.__repr__()})"
class LocationCluster(Cluster):
def __init__(self, cluster_label: int = None, nodes: List = None,
location_dict: Dict = None, from_db=False):
super().__init__(cluster_label, nodes)
self.id = f'{self.cluster_label}'
if location_dict is not None:
self.from_serializable_dict(location_dict, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"id": self.id,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
}
def from_serializable_dict(self, location_dict: Dict, from_db=False):
self.id = location_dict["id"]
self.cluster_label = location_dict["cluster_label"]
self.nodes = json.loads(location_dict["nodes"]) \
if from_db else location_dict["nodes"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"LocationCluster({self.__repr__()})"
class TimeCluster(Cluster):
def __init__(self, date: date = None, hour: int = None, cluster_label: int = None, nodes: List = None,
time_dict: Dict = None, from_db=False):
super().__init__(cluster_label, nodes)
self.date = date
self.hour = hour
self.id = f'{self.date}-{self.hour}-{self.cluster_label}'
if time_dict is not None:
self.from_serializable_dict(time_dict, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"id": self.id,
"date": str(self.date),
"hour": self.hour,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
}
def from_serializable_dict(self, time_dict: Dict, from_db=False):
self.id = time_dict["id"]
self.date = datetime.strptime(time_dict["date"], '%Y-%m-%d').date()
self.hour = time_dict["hour"]
self.cluster_label = time_dict["cluster_label"]
self.nodes = json.loads(time_dict["nodes"]) \
if from_db else time_dict["nodes"]
def __repr__(self):
return json.dumps(self.to_serializable_dict(True))
def __str__(self):
return f"TimeCluster({self.__repr__()})"
......@@ -3,8 +3,6 @@ import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
import json
from db.agi.agi_repository import AgiRepository
from db.entities import *
from typing import List
......@@ -13,61 +11,14 @@ class Repository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
def __init__(self):
super().__init__(netconst.COMMUNITY_DETECTION_DB_HOSTNAME,
netconst.COMMUNITY_DETECTION_DB_PORT,
'communityDetectionDb')
self._location_collection = 'location'
self._location_cluster_collection = 'location_cluster'
self._time_cluster_collection = 'time_cluster'
self._user_cluster_graph_collection = 'user_cluster_graph'
self._layer_collection = 'layer-new'
self._layer_nodes_collection = 'layer_nodes-new'
self._clusterset_collection = 'cluster_set-new'
self._time_slice_collection = 'time_slice-new'
self.agi_repo = AgiRepository()
#region Location
def add_location(self, location: Location):
super().insert_entry(self._location_collection, location.to_serializable_dict())
def get_locations(self) -> List[Location]:
locations = super().get_entries(self._location_collection)
return [Location(l) for l in locations]
def get_agi_locations(self) -> List[Location]:
agi_locations = self.agi_repo.getLocations()
return [Location(agi_loc) for agi_loc in agi_locations]
#endregion
#region Specific Clusters
def add_location_cluster(self, cluster: LocationCluster):
super().insert_entry(self._location_cluster_collection,
cluster.to_serializable_dict(for_db=True))
super().__init__(netconst.ROLESTAGE_DISCOVERY_DB_HOSTNAME,
netconst.ROLESTAGE_DISCOVERY_DB_PORT,
'roleStageDb')
def get_location_clusters(self) -> List[LocationCluster]:
clusters = super().get_entries(self._location_cluster_collection)
return [LocationCluster(location_dict=c, from_db=True) for c in clusters]
def add_time_cluster(self, cluster: TimeCluster):
super().insert_entry(self._time_cluster_collection,
cluster.to_serializable_dict(for_db=True))
def get_time_clusters(self) -> List[TimeCluster]:
clusters = super().get_entries(self._time_cluster_collection)
return [TimeCluster(time_dict=c, from_db=True) for c in clusters]
#endregion
#region Cluster Graph
def add_user_cluster_graph(self, user_graph: UserClusterGraph):
super().insert_entry(self._user_cluster_graph_collection,
user_graph.to_serializable_dict(for_db=True))
def get_user_cluster_graphs(self) -> List[UserClusterGraph]:
user_graphs = super().get_entries(self._user_cluster_graph_collection)
return [UserClusterGraph(dict_=u, from_db=True) for u in user_graphs]
#endregion
self._layer_collection = 'layers'
self._layer_nodes_collection = 'layer_nodes'
self._clusters_collection = 'clusters'
self._time_slice_collection = 'time_slices'
#region Layers
def add_layer(self, layer: Layer):
......@@ -77,10 +28,6 @@ class Repository(MongoRepositoryBase):
entries = super().get_entries(self._layer_collection)
return [Layer(e) for e in entries]
def get_layer_names(self) -> List[str]:
entries = super().get_entries(self._layer_collection, projection={'layer_name': 1})
return [e['layer_name'] for e in entries]
def get_layer(self, layer_name) -> Layer:
entries = super().get_entries(self._layer_collection, selection={'layer_name': layer_name})
entries = [Layer(e) for e in entries]
......@@ -103,38 +50,13 @@ class Repository(MongoRepositoryBase):
#endregion
#region ClusterSet
# TODO cleanup
def add_clusterset(self, cluster_set: ClusterSet):
super().insert_entry(self._clusterset_collection, cluster_set.to_serializable_dict())
def get_clustersets(self) -> List[ClusterSet]:
'''Returns all clustersets.'''
entries = super().get_entries(self._clusterset_collection)
return [ClusterSet(cluster_set_dict=e) for e in entries]
def get_clusterset_names(self) -> List[str]:
'''Returns the names of all clustersets.'''
entries = super().get_entries(self._clusterset_collection, projection={'layer_name': 1})
return [e['layer_name'] for e in entries]
def get_clusterset(self, layer_name) -> ClusterSet:
'''Returns a single clusterset with the given name or None otherwise.'''
entries = super().get_entries(self._clusterset_collection, selection={'layer_name': layer_name})
entries = [ClusterSet(cluster_set_dict=e) for e in entries]
if entries is not None and len(entries) > 0:
return entries[0]
else:
return None
#region Clusters
def add_clusters(self, clusters: List[Cluster]):
cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters]
super().insert_many(self._clusterset_collection, cluster_dicts)
super().insert_many(self._clusters_collection, cluster_dicts)
def get_clusters_for_layer(self, layer_name: str) -> List[Cluster]:
entries = super().get_entries(self._clusterset_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
entries = super().get_entries(self._clusters_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
return [Cluster(cluster_dict=e, from_db=True) for e in entries]
#endregion
......@@ -155,4 +77,5 @@ class Repository(MongoRepositoryBase):
def remove_all_time_slices(self):
super().drop_collection(self._time_slice_collection)
#endregion
\ No newline at end of file
......@@ -20,7 +20,7 @@ app.add_api('swagger.yml')
@app.route('/', methods=['GET'])
def api_root():
return 'Endpoint of community-detection-microservice!'
return 'Endpoint of role-stage-discovery-microservice!'
# start app
if __name__ == '__main__':
......
......@@ -3,7 +3,6 @@ import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import OPTICS
from typing import List, Dict, Any, TypeVar
from deprecated import deprecated
T = TypeVar('T')
ClusterGroup = Dict[Any, List[Dict]]
......@@ -16,65 +15,23 @@ class Clusterer:
:param epsilon: Eps used in OPTICS
:param min_points: MinPts used in OPTICS
'''
def __init__(self, epsilon=11, min_points=5):
self.epsilon = epsilon
def __init__(self, min_points=5):
self.min_points = min_points
def draw_locations(self, locations:List, labels:List=None) -> plt.Figure:
if locations is None or len(locations) == 0:
return self._draw_locations()
if labels is None or len(locations) != len(labels):
labels = self.create_labels(locations)
return self._draw_locations(
locations = self.extract_location_data(locations),
partition_info = labels
)
def _draw_locations(self, locations:np.ndarray=None, centroids:np.ndarray=None, partition_info:List=None) -> plt.Figure:
fig = plt.Figure()
axis = fig.add_subplot(1, 1, 1)
if locations is not None:
colors = plt.cm.rainbow(np.linspace(0, 1, len(locations)))
if partition_info is not None:
distinct_colors = plt.cm.rainbow(np.linspace(0, 1, len(set(partition_info))))
colors = [distinct_colors[pi] for pi in partition_info]
# draw locations with random colors
axis.scatter(locations[:,0],
locations[:,1],
c=colors)
if centroids is not None:
# draw black centroids
axis.scatter(centroids[:,0], centroids[:,1], c='k', marker='x', s=80)
return fig
def create_labels(self, features:np.ndarray) -> List[int]:
'''Creates labels for the items based on DBSCAN.'''
'''Creates labels for the items based on OPTICS.'''
if features is None or len(features) == 0:
return features # trash in trash out
dbsc = OPTICS(min_samples=self.min_points) # DBSCAN(eps = self.epsilon, min_samples = self.min_points)
dbsc = dbsc.fit(features)
labels = dbsc.labels_
optics = OPTICS(min_samples=self.min_points)
optics = optics.fit(features)
labels = optics.labels_
return labels.tolist()
@deprecated(reason="Use generic version instead")
def extract_location_features(self, locations: List[dict]) -> np.ndarray:
return np.asarray([(float(l['latitude']), float(l['longitude'])) for l in locations])
@deprecated(reason="Use generic version instead")
def extract_time_features(self, times: List[Dict]) -> np.ndarray:
return np.asarray([[float(t['timestamp'])] for t in times])
def _extract_features(self, dataset: List[Dict], features:List[str]) -> np.ndarray:
'''Extracts the feature values from the dataset into a np array with same order as original dataset.'''
# TODO single input
extracted_features = []
for data in dataset:
entry = [float(data[feature]) for feature in features]
......@@ -82,7 +39,7 @@ class Clusterer:
return np.asarray(extracted_features)
def label_dataset(self, dataset:List[Dict], labels:List[Any]) -> List:
def label_dataset(self, dataset:List[Dict], labels:List[Any]):
'''Adds the labels to the elements of the dataset at the same position. The new key is called cluster_label.'''
if dataset is None or labels is None:
return
......@@ -96,38 +53,12 @@ class Clusterer:
dataset[i]['cluster_label'] = labels[i]
def group_by_clusters(self, dataset:List[Dict], labels:List[Any]) -> ClusterGroup:
self.label_dataset(dataset, labels)
clusters = {}
for label in labels:
clusters[label] = [ds for ds in dataset if ds['cluster_label'] == label]
return clusters
@deprecated(reason="Use generic version instead")
def cluster_locations(self, locations:List[Dict]) -> ClusterGroup:
'''Returns a dictionary with identified clusters and their locations copied from the input'''
if locations is None or len(locations) == 0:
# raise Exception("locations has to contain something")
return {}
features = self.extract_location_features(locations)
labels = self.create_labels(features)
self.label_dataset(locations, labels)
return self.group_by_clusters(locations, labels)
@deprecated(reason="Use generic version instead")
def cluster_times(self, times:List[Dict]) -> ClusterGroup:
'''Returns a dictionary with identified clusters and their times copied from the input'''
features = self.extract_time_features(times)
labels = self.create_labels(features)
self.label_dataset(times, labels)
return self.group_by_clusters(times, labels)
def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> ClusterGroup:
'''
Returns the identified clusters containing a subset of nodes from the dataset.
......@@ -140,6 +71,8 @@ class Clusterer:
arr = self._extract_features(dataset, features)
labels = self.create_labels(arr)
self.label_dataset(dataset, labels)
return self.group_by_clusters(dataset, labels)
......@@ -4,23 +4,9 @@ from db.entities import ClusterSet
repo = Repository()
def get():
return [c.to_serializable_dict() for c in repo.get_clustersets()]
def get_names():
return repo.get_clusterset_names()
def get_by_name2(name):
def get_by_name(name):
res = repo.get_clusters_for_layer(name)
if res is None or len(res) == 0:
return Response(status=404)
else:
return [c.to_serializable_dict() for c in res]
def get_by_name(name):
res = repo.get_clusterset(name)
if res is not None:
return res.to_serializable_dict()
else:
return Response(status=404)
\ No newline at end of file
def run_agi_clustering_and_graph_creation():
pass
......@@ -15,7 +15,6 @@ def post():
def _insert_layer(layer_data: dict):
'''Converts object keys from external source and inserts into database.'''
layer_data['layer_name'] = layer_data.pop('LayerName')
# layer_data['nodes'] = layer_data.pop('Nodes')
layer_data['properties'] = layer_data.pop('Properties')
repo.add_layer(Layer(layer_data))
......
......@@ -4,23 +4,8 @@ from db.entities import TimeSlice
repo = Repository()
def get():
return [e.to_serializable_dict() for e in repo.get_time_slices()]
def get_by_name(layername):
res = repo.get_time_slices_by_name(layername)
# print(len(res))
if res is not None and len(res) != 0:
return [e.to_serializable_dict() for e in res]
else:
return Response(status=404)
def get_by_name2(name):
def get_by_name(name):
res = repo.get_time_slices_by_name(name)
# print(len(res))
if res is not None and len(res) != 0:
return [e.to_serializable_dict() for e in res]
......
......@@ -5,16 +5,13 @@ if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import json
from db.entities import *
from db.entities import Layer, Cluster
from typing import List, Dict, Tuple
from db.repository import Repository, AgiRepository
from db.repository import Repository
from processing.clustering.clusterer import Clusterer
DEBUG = False
repo = Repository()
test_repo = AgiRepository()
def run_generic_clustering():
......@@ -29,7 +26,6 @@ def run_generic_clustering():
continue
clusters = run_clustering_for_layer(layer)
# cluster_set = ClusterSet(layer.layer_name, clusters)
store_generic_clusters(clusters)
......@@ -44,74 +40,10 @@ def run_clustering_for_layer(layer: Layer) -> List[Cluster]:
return [Cluster(layer.layer_name, key, value) for key, value in res.items()]
def store_generic_clusters(clusters: List[Cluster]):
repo.add_clusters(clusters)
# with open(f'clusterset_{cluster_set.layer_name}.txt', 'w') as file:
# file.write(json.dumps(cluster_set.to_serializable_dict()))
def run_location_clustering():
user_clusterer = Clusterer()
all_location_traces = repo.get_locations()
cluster_result = user_clusterer.cluster_locations(
[l.to_serializable_dict() for l in all_location_traces])
clusters = [LocationCluster(key, value)
for key, value in cluster_result.items()]
store_clusters('locations', clusters)
def run_time_clustering():
clusters: List[TimeCluster] = []
user_clusterer = Clusterer(epsilon=600) # clustered within 10 minutes
all_location_traces = repo.get_locations()
# for each date in timestamp list
dates = {trace.timestamp.date() for trace in all_location_traces}
for cur_date in dates:
traces_for_cur_date = [
trace for trace in all_location_traces if trace.timestamp.date() == cur_date]
# for each hour of that day
for cur_hour in list(range(24)):
traces_for_time_slice = [
trace for trace in traces_for_cur_date if trace.timestamp.hour == cur_hour]
if len(traces_for_time_slice) == 0:
continue
# clustering per hour
cluster_result = user_clusterer.cluster_times(
[t.to_serializable_dict() for t in traces_for_time_slice])
cur_clusters = [TimeCluster(cur_date, cur_hour, key, value)
for key, value in cluster_result.items()]
clusters.extend(cur_clusters)
store_clusters('times', clusters)
def store_clusters(type: str, clusters: List):
if DEBUG:
print(clusters)
return
if type == 'locations':
for c in clusters:
repo.add_location_cluster(c)
if type == 'times':
for c in clusters:
repo.add_time_cluster(c)
if __name__ == "__main__":
run_generic_clustering()
# TODO cleanup
# run_location_clustering()
# run_time_clustering()
import unittest
import sys
sys.path.insert(1, '../')
# python -m unittest discover
from db.entities import Cluster
from datetime import date, datetime
import json
class TestCluster(unittest.TestCase):
def test_init_Cluster(self):
c = Cluster('layer1', 1, [1, 2, 3])
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
if __name__ == '__main__':
unittest.main()
......@@ -5,12 +5,39 @@ for path in ['../', './']:
# python -m unittest discover
from processing.clustering.clusterer import Clusterer
import numpy as np
class TestClusterer(unittest.TestCase):
clusterer:Clusterer = None
def setUp(self):
self.clusterer = Clusterer(epsilon=10, min_points=2)
self.clusterer = Clusterer(min_points=2)
#region _extract_features
def test_extract_features_emptyDataset_noResults(self):
features = self.clusterer._extract_features(dataset=[], features=['test'])
np.testing.assert_equal(np.asarray([]), features)
def test_extract_features_emptyFeatures_singleEmptyResult(self):
features = self.clusterer._extract_features(dataset=[{'a':1, 'b':2}], features=[])
np.testing.assert_equal(np.asarray([[]]), features)
def test_extract_features_singleFeature_Projection(self):
features = self.clusterer._extract_features(dataset=[{'a':1, 'b':2}], features=['a'])
np.testing.assert_equal(np.asarray([[1]]), features)
def test_extract_features_singleFeature_Projection_2(self):
features = self.clusterer._extract_features(dataset=[{'a':1, 'b':2}, {'a':3, 'b':4}], features=['a'])
np.testing.assert_equal(np.asarray([[1], [3]]), features)
def test_extract_features_multFeature_Projection(self):
features = self.clusterer._extract_features(dataset=[{'a':0, 'b':2, 'c':4}, {'a':1, 'b':3, 'c':5}], features=['a','c'])
np.testing.assert_equal(np.asarray([[0,4], [1,5]]), features)
#endregion _extract_features
#region create_labels
def test_create_labels_noneInput_noneOutput(self):
labels = self.clusterer.create_labels(None)
......@@ -19,51 +46,65 @@ class TestClusterer(unittest.TestCase):
def test_create_labels_emptyInput_emptyOutput(self):
labels = self.clusterer.create_labels([])
self.assertEqual([], labels)
def test_create_labels_singleInput_singleCluster(self):
features = self.clusterer.extract_location_features([self.location(1,2)])
labels = self.clusterer.create_labels(features)
self.assertEqual(1, len(labels))
def test_create_labels_singleInput_error(self):
clusterer = Clusterer(min_points=2)
features = clusterer._extract_features(dataset=[self.location(1,2)], features=self.get_location_features())
with self.assertRaises(ValueError):
# Fails because (min_pts > |input elements|)
clusterer.create_labels(features)
def test_create_labels_singleInput_error_2(self):
clusterer = Clusterer(min_points=1)
features = clusterer._extract_features(dataset=[self.location(1,2)], features=self.get_location_features())
with self.assertRaises(ValueError):
# Fails because fitting does not work internally
clusterer.create_labels(features)
def test_create_labels_nearInputs_singleCluster(self):
locations = [self.location(1,2), self.location(2,2)]
features = self.clusterer.extract_location_features(locations)
features = self.clusterer._extract_features(dataset=locations, features=self.get_location_features())
labels = self.clusterer.create_labels(features)
self.assertEqual(2, len(labels))
self.assertEqual(labels[0], labels[1])
def test_create_labels_nearInputs_twoClusters(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
locations = [self.location(1,2), self.location(2,2), self.location(20,20), self.location(20,23)]
features = self.clusterer.extract_location_features(locations)
features = self.clusterer._extract_features(dataset=locations, features=self.get_location_features())
labels = self.clusterer.create_labels(features)
self.assertEqual(3, len(labels))
self.assertEqual(4, len(labels))
self.assertEqual(labels[0], labels[1])
self.assertEqual(labels[2], labels[3])
self.assertNotEqual(labels[0], labels[2])
def test_label_locations_NoneLocations_NoException(self):
#endregion create_labels
#region label_dataset
def test_label_dataset_NoneLocations_NoException(self):
self.clusterer.label_dataset(None, [])
def test_label_locations_NoneLabels_NoException(self):
def test_label_dataset_NoneLabels_NoException(self):
self.clusterer.label_dataset([], None)
def test_label_locations_emptyInput_emptyOutput(self):
def test_label_dataset_emptyInput_emptyOutput(self):
locations = []
self.clusterer.label_dataset(locations, [])
self.assertEqual(0, len(locations))
def test_label_locations_diffInputLengths_ValueError_1(self):
def test_label_dataset_diffInputLengths_ValueError_1(self):
with self.assertRaises(ValueError):
self.clusterer.label_dataset([], [1])
def test_label_locations_diffInputLengths_ValueError_2(self):
def test_label_dataset_diffInputLengths_ValueError_2(self):
with self.assertRaises(ValueError):
self.clusterer.label_dataset([self.location(1,2)], [])
def test_label_locations_multInput_correctlyLabeled(self):
def test_label_dataset_multInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [17,2,20]
......@@ -72,53 +113,76 @@ class TestClusterer(unittest.TestCase):
self.assertEqual(3, len(locations))
self.assertHaveLabelsAsNewKey(locations, labels)
def test_cluster_locations_multInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [0,0,-1]
res = self.clusterer.cluster_locations(locations)
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertDictEqual(res, {0: [{'latitude': 1, 'longitude': 2, 'cluster_label': 0}, {'latitude': 2, 'longitude': 2, 'cluster_label': 0}], -1: [{'latitude': 20, 'longitude': 20, 'cluster_label': -1}]})
def test_cluster_times_multInput_correctlyLabeled(self):
times = [self.time(123), self.time(128), self.time(223)]
labels = [0,0,-1]
#endregion label_dataset
res = self.clusterer.cluster_times(times)
self.assertHaveLabelsAsNewKey(times, labels)
self.assertDictEqual(res, {0: [{'timestamp': 123, 'cluster_label': 0}, {'timestamp': 128, 'cluster_label': 0}], -1: [{'timestamp': 223, 'cluster_label': -1}]})
#region cluster_dataset
def test_cluster_dataset_locationsMultInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [0,0,-1]
locations = [self.location(1,2), self.location(2,2), self.location(20,20), self.location(20,21)]
labels = [0,0,1,1]
exp_res = {0:locations[0:2], 1:locations[2:4]}
res = self.clusterer.cluster_dataset(locations, self.get_location_features())
res = self.clusterer.cluster_dataset(locations, ['latitude', 'longitude'])
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertDictEqual(res, {0: [{'latitude': 1, 'longitude': 2, 'cluster_label': 0}, {'latitude': 2, 'longitude': 2, 'cluster_label': 0}], -1: [{'latitude': 20, 'longitude': 20, 'cluster_label': -1}]})
self.assertClusteringResult(exp_res, res)
def test_cluster_dataset_timesMultInput_correctlyLabeled(self):
times = [self.time(123), self.time(128), self.time(223)]
labels = [0,0,-1]
times = [self.time(123), self.time(128), self.time(223), self.time(225)]
labels = [0,0,1,1]
exp_res = {0:times[0:2], 1:times[2:4]}
res = self.clusterer.cluster_dataset(times, ['timestamp'])
res = self.clusterer.cluster_dataset(times, self.get_time_features())
self.assertHaveLabelsAsNewKey(times, labels)
self.assertDictEqual(res, {0: [{'timestamp': 123, 'cluster_label': 0}, {'timestamp': 128, 'cluster_label': 0}], -1: [{'timestamp': 223, 'cluster_label': -1}]})
self.assertClusteringResult(exp_res, res)
def test_cluster_dataset_locationsMultInput_correctlyLabeled_2(self):
return
# TODO why is the single location added to the last cluster?
clusterer = Clusterer(3)
locations = [self.location(1,2), self.location(2,2), self.location(2,2), self.location(20,20), self.location(20,21), self.location(20,20), self.location(400,1000), self.location(200,1), self.location(200,2), self.location(201,-1)]
labels = [0,0,1,1]
exp_res = {0:locations[0:2], 1:locations[2:4]}
res = clusterer.cluster_dataset(locations, self.get_location_features())
print(res)
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertClusteringResult(exp_res, res)
#endregion cluster_dataset
#region helper methods
# helper methods:
def location(self, lat, long_) -> dict:
return {'latitude': lat, 'longitude':long_}
def get_location_features(self):
return ['latitude', 'longitude']
def time(self, ts) -> dict:
return {'timestamp': ts}
def get_time_features(self):
return ['timestamp']
def assertHaveLabelsAsNewKey(self, locations, labels):
self.assertEqual(len(labels), len(locations))
for i in range(len(locations)):
self.assertEqual(labels[i], locations[i]['cluster_label'])
def assertClusteringResult(self, expected, actual):
self.assertEqual(len(expected), len(actual))
for k in expected.keys():
if k not in actual:
self.fail(f"Cluster key ({k}, {type(k)}) not in result.")
self.assertListEqual(expected[k], actual[k])
#endregion helper methods
if __name__ == '__main__':
unittest.main()
apiVersion: v1
kind: Service
metadata:
name: community-detection
name: role-stage-discovery
spec:
type: LoadBalancer
selector:
app: community-detection
app: role-stage-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30109
nodePort: 30103
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: community-detection
name: role-stage-discovery
spec:
replicas: 1
selector:
matchLabels:
app: community-detection
app: role-stage-discovery
template:
metadata:
labels:
app: community-detection
app: role-stage-discovery
spec:
containers:
- name: community-detection
image: alexx882/community-detection-microservice
- name: role-discovery
image: alexx882/role-stage-discovery-microservice
ports:
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: community-detection-db
name: role-stage-discovery-db
spec:
type: LoadBalancer
selector:
app: community-detection-db
app: role-stage-discovery-db
ports:
- name: http
port: 27017
targetPort: 27017
nodePort: 30110
nodePort: 30104
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: community-detection-db
name: role-stage-discovery-db
spec:
replicas: 1
selector:
matchLabels:
app: community-detection-db
app: role-stage-discovery-db
template:
metadata:
labels:
app: community-detection-db
app: role-stage-discovery-db
spec:
containers:
- name: community-detection-db
- name: role-stage-discovery-db
image: mongo
env:
- name: MONGO_INITDB_ROOT_USERNAME
......
import pymongo
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
import json
from typing import List
class Repository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
def __init__(self):
super().__init__(netconst.SEMANTIC_LINKING_DB_HOSTNAME,
netconst.SEMANTIC_LINKING_DB_PORT,
'semanticLinkingDb')
self._layer_collection = 'layer'
flask
connexion[swagger-ui]
pika
astroid==2.4.2
attrs==19.3.0
certifi==2020.4.5.2
chardet==3.0.4
click==7.1.2
clickclick==1.2.2
colorama==0.4.3
connexion==2.7.0
Flask==1.1.2
idna==2.9
importlib-metadata==1.6.1
inflection==0.5.0
isort==4.3.21
itsdangerous==1.1.0
Jinja2==2.11.2
jsonschema==3.2.0
lazy-object-proxy==1.4.3
MarkupSafe==1.1.1
mccabe==0.6.1
openapi-spec-validator==0.2.8
pika==1.1.0
pylint==2.5.3
pymongo==3.10.1
pyrsistent==0.16.0
PyYAML==5.3.1
requests==2.23.0
six==1.15.0
swagger-ui-bundle==0.0.6
toml==0.10.1
typed-ast==1.4.1
urllib3==1.25.9
Werkzeug==1.0.1
wrapt==1.12.1
zipp==3.1.0
from flask import request
from db.repository import Repository
repo = Repository()
def echo():
return request.json
......
......@@ -31,4 +31,44 @@ spec:
- name: semantic-linking
image: alexx882/semantic-linking-microservice
ports:
- containerPort: 5000
\ No newline at end of file
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: semantic-linking-db
spec:
type: LoadBalancer
selector:
app: semantic-linking-db
ports:
- name: http
port: 27017
targetPort: 27017
nodePort: 30102
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: semantic-linking-db
spec:
replicas: 1
selector:
matchLabels:
app: semantic-linking-db
template:
metadata:
labels:
app: semantic-linking-db
spec:
containers:
- name: semantic-linking-db
image: mongo
env:
- name: MONGO_INITDB_ROOT_USERNAME
value: root
- name: MONGO_INITDB_ROOT_PASSWORD
value: root
ports:
- containerPort: 27017
\ No newline at end of file
FROM python:3
LABEL maintainer="Alexander Lercher"
ENV http_proxy http://proxy.uni-klu.ac.at:3128/
ENV https_proxy http://proxy.uni-klu.ac.at:3128/
RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
EXPOSE 5000
WORKDIR /app
COPY src/data-hub/stage-discovery-microservice/app/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
# Stage Discovery Microservice
The stage discovery microservice detects individual activities between roles inside of a concept, e.g. teacher giving an assignment to students.
## Technologies
- Python 3.x
- Python module Flask
- Python module Connexion with Swagger
- Docker
- Kubernetes
\ No newline at end of file
swagger: "2.0"
info:
title: Stage Discovery microservice
description: This is the documentation for the stage discovery microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
200:
description: "Successful echo of request data"
from flask import request
def echo():
return request.json
\ No newline at end of file
import connexion
# load swagger config
app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
@app.route('/', methods=['GET'])
def api_root():
return 'Endpoint of stage-discovery-microservice!'
# start app
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True)
apiVersion: v1
kind: Service
metadata:
name: stage-discovery
spec:
type: LoadBalancer
selector:
app: stage-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30103
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: stage-discovery
spec:
replicas: 1
selector:
matchLabels:
app: stage-discovery
template:
metadata:
labels:
app: stage-discovery
spec:
containers:
- name: stage-discovery
image: alexx882/stage-discovery-microservice
ports:
- containerPort: 5000
\ No newline at end of file
......@@ -7,15 +7,17 @@ RABBIT_MQ_PORT = 5672
## Trace Retrieval
TRACE_RETRIEVAL_HOSTNAME = 'trace-retrieval'
TRACE_RETRIEVAL_REST_PORT = 80
TRACE_RETRIEVAL_DB_HOSTNAME = 'trace-retrieval-db'
TRACE_RETRIEVAL_DB_HOSTNAME = f'{TRACE_RETRIEVAL_HOSTNAME}-db'
TRACE_RETRIEVAL_DB_PORT = 27017
# TRACE_RETRIEVAL_DB_HOSTNAME = 'articonf1.itec.aau.at'
# TRACE_RETRIEVAL_DB_PORT = 30003
## Community Detection
COMMUNITY_DETECTION_HOSTNAME = 'community-detection'
COMMUNITY_DETECTION_REST_PORT = 80
COMMUNITY_DETECTION_DB_HOSTNAME = 'community-detection-db'
COMMUNITY_DETECTION_DB_PORT = 27017
# COMMUNITY_DETECTION_DB_HOSTNAME = 'localhost'
# COMMUNITY_DETECTION_DB_PORT = 30110
\ No newline at end of file
## Semantic Linking
SEMANTIC_LINKING_HOSTNAME = 'semantic-linking'
SEMANTIC_LINKING_REST_PORT = 80
SEMANTIC_LINKING_DB_HOSTNAME = f'{SEMANTIC_LINKING_HOSTNAME}-db'
SEMANTIC_LINKING_DB_PORT = 27017
## Role Stage Discovery
ROLESTAGE_DISCOVERY_HOSTNAME = 'role-stage-discovery'
ROLESTAGE_DISCOVERY_REST_PORT = 80
ROLESTAGE_DISCOVERY_DB_HOSTNAME = f'{ROLESTAGE_DISCOVERY_HOSTNAME}-db'
ROLESTAGE_DISCOVERY_DB_PORT = 27017
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment