Commit b4d3f717 authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'feature/role-staging-use-cases' into feature/community-detection-pipeline

parents 553c2da8 174165d4
......@@ -17,41 +17,56 @@ paths:
description: "Successful echo of request data"
#region Layers
/layers:
post:
operationId: "routes.layers.post"
/{use_case}/layers:
get:
operationId: "routes.layers.get_by_use_case"
security:
- JwtRegular: []
tags:
- "Layers"
summary: "Add a new layer [TODO: or overwrite an existing one]"
parameters:
- in: body
name: "layer"
description: "The layer data to be added"
- "Layers"
summary: "Get all layer data for one use case"
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
schema:
$ref: "#/definitions/Layer"
type: "string"
responses:
'201':
'200':
description: "Successful operation"
'400':
description: "Invalid input"
schema:
$ref: "#/definitions/LayerCollection"
'404':
description: "No content for use-case"
/{use_case}/{use_case_table}/layers:
get:
operationId: "routes.layers.get"
operationId: "routes.layers.get_by_table"
security:
- JwtRegular: []
tags:
- "Layers"
summary: "Get all layer data"
parameters: []
summary: "Get all layer data for one use case"
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
type: "string"
- name: "use_case_table"
in: "path"
description: "Name of the use-case table"
required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/LayerCollection"
'404':
description: "No content for use-case"
/layers/{name}:
/{use_case}/{use_case_table}/layers/{name}:
get:
operationId: "routes.layers.get_by_name"
security:
......@@ -60,6 +75,16 @@ paths:
- "Layers"
summary: "Get single layer data"
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
type: "string"
- name: "use_case_table"
in: "path"
description: "Name of the use-case table"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the requested layer"
......@@ -73,7 +98,7 @@ paths:
'404':
description: "Layer not found"
/layers/{name}/nodes:
/{use_case}/{use_case_table}/layers/{name}/nodes:
get:
operationId: "routes.layers.get_nodes"
security:
......@@ -81,7 +106,17 @@ paths:
tags:
- "Layers"
summary: "Get all individual nodes for the layer"
parameters:
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
type: "string"
- name: "use_case_table"
in: "path"
description: "Name of the use-case table"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the layer"
......@@ -94,32 +129,8 @@ paths:
$ref: "#/definitions/NodeCollection"
'404':
description: "Layer not found"
post:
operationId: "routes.layers.post_nodes"
security:
- JwtRegular: []
tags:
- "Layers"
summary: "Adds a single or multiple nodes to the layer"
parameters:
- name: "name"
in: "path"
description: "Name of the layer"
required: true
type: "string"
- name: "node"
in: body
description: "The node(s) to be added"
required: true
schema:
$ref: "#/definitions/NodeCollection"
responses:
'201':
description: "Successful operation"
'400':
description: "Invalid input"
/layers/{name}/clusters:
/{use_case}/{use_case_table}/layers/{name}/clusters:
get:
operationId: "routes.clustersets.get_by_name"
security:
......@@ -128,6 +139,16 @@ paths:
- "Layers"
summary: "Get all clusters for the layer"
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
type: "string"
- name: "use_case_table"
in: "path"
description: "Name of the use-case table"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the layer"
......@@ -141,7 +162,7 @@ paths:
'404':
description: "Layer not found"
/layers/{name}/timeslices:
/{use_case}/{use_case_table}/layers/{name}/timeslices:
get:
operationId: "routes.timeslices.get_by_name"
security:
......@@ -150,6 +171,16 @@ paths:
- "Layers"
summary: "Get all timeslices for the layer"
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
type: "string"
- name: "use_case_table"
in: "path"
description: "Name of the use-case table"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the layer"
......@@ -165,23 +196,7 @@ paths:
#endregion
#region Function Calls
/rfc/run:
post:
operationId: "routes.functions.run_agi_clustering_and_graph_creation"
security:
- JwtRegular: []
tags:
- "Remote function calls"
summary: "Insert locations from AGI, create clusters for starting time and location layers, create graphs for the location clusters"
parameters: []
responses:
'204':
description: "Successful operation"
#endregion
################################################################################
#region Similarity
/connectedClusters:
get:
operationId: "routes.connClusters.get_conn_clusters"
......@@ -239,15 +254,22 @@ paths:
schema:
$ref: "#/definitions/ClusterRunArray"
#endregion Similarity
definitions:
Cluster:
type: object
properties:
use_case:
type: string
use_case_table:
type: string
layer_name:
type: string
cluster_label:
type: number
label:
type: string
nodes:
type: array
items:
......@@ -267,8 +289,14 @@ definitions:
type: array
items:
type: string
total_properties:
type: array
items:
type: string
use_case:
type: string
use_case_table:
type: string
LayerCollection:
type: array
......@@ -279,14 +307,16 @@ definitions:
type: object
example:
"UniqueID": "4437d98b4516e899fb7d93cef0bea6111574473703f0aab9d8c2f02aaa673f5c"
"use_case": "string"
"layer_name": "some_layer_name"
"Finished_time": 1576631193265951
"Latitude_Destination": -5.973257
"Longitude_Destination": 37.416316
"TravelID": "5e57ec9159bc0668543f156a"
"TravelPrice": 15
"UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
"use_case": "debug"
"use_case_table": "debug-table1"
"layer_name": "some_layer"
"some_app_key": "some_app_value"
# "Finished_time": 1576631193265951
# "Latitude_Destination": -5.973257
# "Longitude_Destination": 37.416316
# "TravelID": "5e57ec9159bc0668543f156a"
# "TravelPrice": 15
# "UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
NodeCollection:
type: array
......@@ -299,6 +329,10 @@ definitions:
time:
type: object
example: "(2020, 52)"
use_case:
type: string
use_case_table:
type: string
layer_name:
type: string
clusters:
......@@ -320,10 +354,6 @@ definitions:
items:
$ref: "#/definitions/TimeSlice"
##################################################################
ConnectedDict:
type: array
items:
......
......@@ -7,32 +7,45 @@ class Cluster:
'''
A cluster for an arbitrary layer containing some nodes.
:param use_case: The use-case of the layer
:param use_case_table: The use-case table of the layer
:param layer_name: The name of the layer in which the cluster is located
:param cluster_label: The label of the cluster unique for the layer
:param nodes: The individual nodes of the cluster
:param label: A human readable label
'''
def __init__(self, layer_name: str = None, cluster_label: int = None, nodes: List[Dict] = None,
def __init__(self, use_case: str = None, use_case_table: str = None, layer_name: str = None,
cluster_label: int = None, nodes: List[Dict] = None, label: str = None,
cluster_dict: Dict = None, from_db=False):
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.cluster_label = cluster_label
self.nodes = nodes
self.label = label
if cluster_dict is not None:
self.from_serializable_dict(cluster_dict, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"use_case": self.use_case,
"use_case_table": self.use_case_table,
"layer_name": self.layer_name,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
"nodes": json.dumps(self.nodes) if for_db else self.nodes,
"label": self.label,
}
def from_serializable_dict(self, cluster_dict: Dict, from_db=False):
self.use_case = cluster_dict["use_case"]
self.use_case_table = cluster_dict["use_case_table"]
self.layer_name = cluster_dict["layer_name"]
self.cluster_label = cluster_dict["cluster_label"]
self.nodes = json.loads(cluster_dict["nodes"]) \
if from_db else cluster_dict["nodes"]
self.label = cluster_dict["label"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
......
......@@ -18,6 +18,7 @@ class Layer:
"layer_name": self.layer_name,
"properties": self.properties,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
"total_properties": self.total_properties,
}
......@@ -28,12 +29,14 @@ class Layer:
layer.properties = layer_info["cluster_properties"]
layer.total_properties = layer_info["properties"]
layer.use_case = layer_info["use_case"]
layer.use_case_table = layer_info["table"]
return layer
def from_serializable_dict(self, layer_info: Dict, from_db=False):
self.layer_name = layer_info['layer_name']
self.properties = layer_info['properties']
self.use_case = layer_info["use_case"] if "use_case" in layer_info.keys() else None
self.use_case = layer_info["use_case"]
self.use_case_table = layer_info["use_case_table"]
self.total_properties = layer_info["total_properties"] if "total_properties"in layer_info.keys() else None
def __repr__(self):
......
......@@ -12,9 +12,11 @@ class TimeSlice:
:param layer_name: The name of the layer the nodes belong to
'''
def __init__(self, time: Any, layer_name: str,
def __init__(self, time: Any = None, use_case: str = None, use_case_table: str = None, layer_name: str = None,
time_slice_dict: Dict = None, from_db = False):
self.time = str(time)
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.clusters: Dict[str, List[Node]] = {}
......@@ -47,12 +49,16 @@ class TimeSlice:
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"time": self.time,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
'layer_name': self.layer_name,
"clusters": json.dumps(self.clusters) if for_db else self.clusters
}
def from_serializable_dict(self, dict: Dict, from_db=False):
self.time = dict["time"]
self.use_case = dict["use_case"]
self.use_case_table = dict["use_case_table"]
self.layer_name = dict['layer_name']
self.clusters = json.loads(dict['clusters']) if from_db else dict['clusters']
......
......@@ -7,6 +7,9 @@ from db.entities import *
# from processing.similarityFiles.miscFunctions import *
from typing import List
import logging
LOGGER = logging.getLogger(__name__)
class Repository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
......@@ -30,14 +33,26 @@ class Repository(MongoRepositoryBase):
super().insert_entry(self._layer_collection, layer.to_serializable_dict())
def get_layers(self) -> List[Layer]:
'''Retrieves all layers from the db, independent of use-case.'''
entries = super().get_entries(self._layer_collection)
return [Layer(e) for e in entries]
def get_layer(self, layer_name) -> Layer:
entries = super().get_entries(self._layer_collection, selection={'layer_name': layer_name})
def get_layers_for_use_case(self, use_case: str) -> Layer:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case})
return [Layer(e) for e in entries]
def get_layers_for_table(self, use_case: str, use_case_table: str) -> Layer:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case, 'use_case_table': use_case_table})
return [Layer(e) for e in entries]
def get_layer_by_name(self, use_case:str, use_case_table:str, layer_name:str) -> Layer:
'''Returns a singe layer for use-case and layer-name.'''
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
entries = [Layer(e) for e in entries]
if entries is not None and len(entries) > 0:
if len(entries) > 1:
LOGGER.error(f"Layer Key {use_case}, {layer_name} is not unique.")
return entries[0]
else:
return None
......@@ -45,27 +60,22 @@ class Repository(MongoRepositoryBase):
def delete_all_layers(self):
super().drop_collection(self._layer_collection)
#endregion Layers
#region Layer Nodes
def delete_all_nodes(self):
super().drop_collection(self._layer_nodes_collection)
def get_layer_for_use_case(self, layer_name: str, use_case: str) -> Layer:
entries = super().get_entries(self._layer_collection, selection={'layer_name': layer_name, 'use_case': use_case})
entries = [Layer(e) for e in entries]
if entries is not None and len(entries) > 0:
return entries[0]
else:
return None
def add_layer_node(self, node: dict):
super().insert_entry(self._layer_nodes_collection, node)
def add_layer_nodes(self, nodes:List[dict]):
super().insert_many(self._layer_nodes_collection, nodes)
def get_layer_nodes(self, layer_name: str) -> dict:
'''Returns all nodes for the layer.'''
entries = super().get_entries(self._layer_nodes_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
def get_layer_nodes(self, use_case: str, use_case_table: str, layer_name: str) -> dict:
'''Returns all nodes for the use-case and layer.'''
entries = super().get_entries(self._layer_nodes_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name}, projection={'_id': 0})
return [e for e in entries]
#endregion
......@@ -75,8 +85,8 @@ class Repository(MongoRepositoryBase):
cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters]
super().insert_many(self._clusters_collection, cluster_dicts)
def get_clusters_for_layer(self, layer_name: str) -> List[Cluster]:
entries = super().get_entries(self._clusters_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
def get_clusters_for_layer(self, use_case: str, use_case_table: str, layer_name: str) -> List[Cluster]:
entries = super().get_entries(self._clusters_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name}, projection={'_id': 0})
return [Cluster(cluster_dict=e, from_db=True) for e in entries]
def delete_all_clusters(self):
......@@ -93,10 +103,10 @@ class Repository(MongoRepositoryBase):
entries = super().get_entries(self._time_slice_collection)
return [TimeSlice(None, None, time_slice_dict=e, from_db=True) for e in entries]
def get_time_slices_by_name(self, layer_name) -> List[TimeSlice]:
def get_time_slices_by_name(self, use_case: str, use_case_table: str, layer_name: str) -> List[TimeSlice]:
'''Returns all time slices with the given layer_name.'''
entries = super().get_entries(self._time_slice_collection, selection={'layer_name': layer_name})
return [TimeSlice(None, None, time_slice_dict=e, from_db=True) for e in entries]
entries = super().get_entries(self._time_slice_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
return [TimeSlice(time_slice_dict=e, from_db=True) for e in entries]
def remove_all_time_slices(self):
super().drop_collection(self._time_slice_collection)
......
from processing.clustering.cluster_result import ClusterResultConverter, ClusterResult, ClusterResult1D, ClusterResult2D
from processing.clustering.clusterer import Clusterer
from typing import List, Dict, Any
class ClusterResult:
'''
Represents a single cluster from clustering.
:param nodes: The nodes contained in the cluster
:param label: A human readable label describing the cluster
'''
def __init__(self, nodes: List[Dict], label: str = "n.a."):
self.nodes = nodes
self.label = label
class ClusterResult1D(ClusterResult):
def __init__(self, nodes, lower_bound, upper_bound):
super().__init__(nodes, f"{lower_bound} -- {upper_bound}")
class ClusterResult2D(ClusterResult):
def __init__(self, nodes, center):
super().__init__(nodes, center)
class ClusterResultConverter:
def __init__(self):
pass
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
def convert_to_cluster_results(self, cluster_groups:Dict[Any, List[Dict]], features:List[str]) -> Dict[Any, ClusterResult]:
if len(features) == 1:
return self._convert_to_cluster_results_1d(cluster_groups, features[0])
elif len(features) == 2:
return self._convert_to_cluster_results_2d(cluster_groups, features)
else:
return self._convert_to_cluster_results(cluster_groups, features)
def _convert_to_cluster_results(self, cluster_groups:Dict[Any, List[Dict]], features:List[str]) -> Dict[Any, ClusterResult]:
'''Returns the clustering results as they are, converted to a list of ClusterResults.'''
new_results = {}
for key in cluster_groups:
nodes_in_cluster: List[Dict] = cluster_groups[key]
new_results[key] = ClusterResult(nodes_in_cluster)
return new_results
def _convert_to_cluster_results_1d(self, cluster_groups:Dict[Any, List[Dict]], feature:str) -> Dict[Any, ClusterResult1D]:
'''Returns the clustering results with an added label for the 1d lower and upper bound.'''
new_results = {}
for key in cluster_groups:
nodes_in_cluster: List[Dict] = cluster_groups[key]
# choose the first node's value as min and max
min_ = self._convert_feature_to_float(nodes_in_cluster[0][feature])
max_ = self._convert_feature_to_float(nodes_in_cluster[0][feature])
for node in nodes_in_cluster:
float_feature_value = self._convert_feature_to_float(node[feature])
if min_ > float_feature_value:
min_ = float_feature_value
if max_ < float_feature_value:
max_ = float_feature_value
new_results[key] = ClusterResult1D(nodes_in_cluster, min_, max_)
return new_results
def _convert_to_cluster_results_2d(self, cluster_groups:Dict[Any, List[Dict]], features:List[str]) -> Dict[Any, ClusterResult2D]:
'''Returns the clustering results with an added label for the 2d center.'''
new_results = {}
for key in cluster_groups:
nodes_in_cluster: List[Dict] = cluster_groups[key]
x = [self._convert_feature_to_float(node[features[0]]) for node in nodes_in_cluster]
y = [self._convert_feature_to_float(node[features[1]]) for node in nodes_in_cluster]
centroid = (sum(x) / len(nodes_in_cluster), sum(y) / len(nodes_in_cluster))
new_results[key] = ClusterResult2D(nodes_in_cluster, str(centroid))
return new_results
......@@ -2,10 +2,10 @@ import json
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import OPTICS
from typing import List, Dict, Any, TypeVar
from typing import List, Dict, Any
from processing.clustering.cluster_result import ClusterResultConverter, ClusterResult
T = TypeVar('T')
ClusterGroup = Dict[Any, List[Dict]]
class Clusterer:
'''
......@@ -17,6 +17,7 @@ class Clusterer:
'''
def __init__(self, min_points=5):
self.min_points = min_points
self.cluster_result_converter = ClusterResultConverter()
def create_labels(self, features:np.ndarray) -> List[int]:
'''Creates labels for the items based on OPTICS.'''
......@@ -31,11 +32,14 @@ class Clusterer:
return labels.tolist()
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
def _extract_features(self, dataset: List[Dict], features:List[str]) -> np.ndarray:
'''Extracts the feature values from the dataset into a np array with same order as original dataset.'''
extracted_features = []
for data in dataset:
entry = [float(data[feature] if data[feature] is not "" else 0) for feature in features]
entry = [self._convert_feature_to_float(data[feature]) for feature in features]
extracted_features.append(entry)
return np.asarray(extracted_features)
......@@ -53,21 +57,27 @@ class Clusterer:
continue
dataset[i]['cluster_label'] = labels[i]
def group_by_clusters(self, dataset:List[Dict], labels:List[Any]) -> ClusterGroup:
def group_by_clusters(self, dataset:List[Dict], labels:List[Any]) -> Dict[Any, List[Dict]]:
clusters = {}
for label in labels:
clusters[label] = [ds for ds in dataset if ds['cluster_label'] == label]
for ds in dataset:
label = ds['cluster_label']
if label not in clusters:
clusters[label] = []
clusters[label].append(ds)
return clusters
def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> ClusterGroup:
def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> Dict[Any, ClusterResult]:
'''
Returns the identified clusters containing a subset of nodes from the dataset.
:param dataset: The nodes to assign to clusters
:param features: The feature names of the nodes to use for clustering
:returns: A dictionary of clusters, where each value is a non-empty subset of dataset if dataset was not empty
:returns: A dictionary of cluster results, where each value is a non-empty subset of dataset if dataset was not empty
'''
arr = self._extract_features(dataset, features)
......@@ -75,5 +85,8 @@ class Clusterer:
self.label_dataset(dataset, labels)
return self.group_by_clusters(dataset, labels)
cluster_groups: Dict[Any, List[Dict]] = self.group_by_clusters(dataset, labels)
res: Dict[Any, ClusterResult] = self.cluster_result_converter.convert_to_cluster_results(cluster_groups, features)
return res
......@@ -7,10 +7,11 @@ from typing import List, Dict
import requests
import json
def _fetch_layers(use_case: str) -> List[Layer]:
def _fetch_use_cases() -> List[str]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/layers'
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases'
response = requests.get(
url,
......@@ -20,21 +21,17 @@ def _fetch_layers(use_case: str) -> List[Layer]:
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch layers for {use_case} from business-logic microservice, statuscode: {response.status_code}!")
raise ConnectionError(f"Could not fetch use-cases from business-logic microservice, statuscode: {response.status_code}!")
data = json.loads(response.text)
layers = []
return [row["name"] for row in data]
for row in data:
layers.append(Layer.from_business_logic_dict(row))
return layers
def _fetch_nodes(use_case: str, layer: Layer) -> List[Dict]:
def _fetch_tables(use_case: str) -> List[str]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.SEMANTIC_LINKING_HOSTNAME}:{network_constants.SEMANTIC_LINKING_REST_PORT}/api/use-cases/{use_case}/layers/{layer.layer_name}/nodes'
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables'
response = requests.get(
url,
......@@ -44,21 +41,17 @@ def _fetch_nodes(use_case: str, layer: Layer) -> List[Dict]:
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch nodes for {use_case}/{layer.layer_name} from semantic-linking microservice, statuscode: {response.status_code}!")
raise ConnectionError(f"Could not fetch tables for {use_case} from business-logic microservice, statuscode: {response.status_code}!")
data = json.loads(response.text)
nodes = []
return [row["name"] for row in data]
for row in data:
nodes.append(row)
return nodes
def _fetch_use_cases() -> List[str]:
def _fetch_layers(use_case: str, table: str) -> List[Layer]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases'
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables/{table}/layers'
response = requests.get(
url,
......@@ -68,14 +61,33 @@ def _fetch_use_cases() -> List[str]:
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch use-cases from business-logic microservice, statuscode: {response.status_code}!")
raise ConnectionError(f"Could not fetch layers for {use_case}//{table} from business-logic microservice, statuscode: {response.status_code}!")
data = json.loads(response.text)
return [Layer.from_business_logic_dict(row) for row in data]
def _fetch_nodes(use_case: str, table: str, layer_name: str) -> List[Dict]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.SEMANTIC_LINKING_HOSTNAME}:{network_constants.SEMANTIC_LINKING_REST_PORT}/api/use-cases/{use_case}/tables/{table}/layers/{layer_name}/nodes'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"}
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch nodes for {use_case}//{table}//{layer_name} from semantic-linking microservice, statuscode: {response.status_code}!")
return response.json()
return [row["name"] for row in data]
def fetch_nodes_from_semantic_linking():
# prune DB
'''Empties the db and inserts layers and nodes from BusinessLogic and SemanticLinking'''
repository = Repository()
repository.delete_all_layers()
repository.delete_all_nodes()
......@@ -83,21 +95,31 @@ def fetch_nodes_from_semantic_linking():
use_cases = _fetch_use_cases()
for use_case in use_cases:
print(f"Fetching for use-case {use_case}")
layers = _fetch_layers(use_case)
for layer in layers:
try:
print(f"Fetching nodes for layer {layer.layer_name}")
# check if layer already exists in DB, add it if not
reference_layer = repository.get_layer_for_use_case(layer.layer_name, use_case)
if reference_layer == None:
repository.add_layer(layer)
nodes = _fetch_nodes(use_case, layer)
for node in nodes:
repository.add_layer_node(node)
except ConnectionError as e:
print(str(e))
continue
tables = _fetch_tables(use_case)
for table in tables:
layers = _fetch_layers(use_case, table)
for layer in layers:
try:
print(f"Fetching nodes for layer {use_case}//{table}//{layer.layer_name}.")
# check if layer already exists in DB, add it if not
reference_layer = repository.get_layer_by_name(use_case, table, layer.layer_name)
if reference_layer == None:
repository.add_layer(layer)
else:
raise Exception(f"Layer should be unique, but was not: {reference_layer}")
nodes = _fetch_nodes(use_case, table, layer.layer_name)
for node in nodes:
node['use_case_table'] = node['table']
del node['table']
for node in nodes:
repository.add_layer_node(node)
except ConnectionError as e:
print(str(e))
continue
\ No newline at end of file
......@@ -4,8 +4,8 @@ from db.entities import ClusterSet
repo = Repository()
def get_by_name(name):
res = repo.get_clusters_for_layer(name)
def get_by_name(use_case, use_case_table, name):
res = repo.get_clusters_for_layer(use_case, use_case_table, name)
if res is None or len(res) == 0:
return Response(status=404)
else:
......
......@@ -19,11 +19,22 @@ def _insert_layer(layer_data: dict):
repo.add_layer(Layer(layer_data))
def get():
return [l.to_serializable_dict() for l in repo.get_layers()]
def get_by_use_case(use_case):
res = repo.get_layers_for_use_case(use_case)
if len(res) > 0:
return [l.to_serializable_dict() for l in res]
else:
return Response(status=404)
def get_by_table(use_case, use_case_table):
res = repo.get_layers_for_table(use_case, use_case_table)
if len(res) > 0:
return [l.to_serializable_dict() for l in res]
else:
return Response(status=404)
def get_by_name(name):
res = repo.get_layer(name)
def get_by_name(use_case, use_case_table, name):
res = repo.get_layer_by_name(use_case, use_case_table, name)
if res is not None:
return res.to_serializable_dict()
else:
......@@ -32,8 +43,8 @@ def get_by_name(name):
#endregion
#region nodes
def get_nodes(name):
res = repo.get_layer_nodes(name)
def get_nodes(use_case, use_case_table, name):
res = repo.get_layer_nodes(use_case, use_case_table, name)
# print(res)
return res
......
......@@ -4,8 +4,8 @@ from db.entities import TimeSlice
repo = Repository()
def get_by_name(name):
res = repo.get_time_slices_by_name(name)
def get_by_name(use_case, use_case_table, name):
res = repo.get_time_slices_by_name(use_case, use_case_table, name)
if res is not None and len(res) != 0:
return [e.to_serializable_dict() for e in res]
......
......@@ -6,9 +6,9 @@ if os.path.exists(modules_path):
import json
from db.entities import Layer, Cluster
from typing import List, Dict, Tuple
from typing import List, Dict, Tuple, Any
from db.repository import Repository
from processing.clustering.clusterer import Clusterer
from processing.clustering import Clusterer, ClusterResult
repo = Repository()
......@@ -19,7 +19,7 @@ def run_generic_clustering():
all_layers:List[Layer] = repo.get_layers()
for layer in all_layers:
print(f"Clustering {layer.layer_name}")
print(f"Clustering {layer.use_case}//{layer.use_case_table}//{layer.layer_name}.")
if layer.properties is None or len(layer.properties) == 0:
print("skipping")
......@@ -33,15 +33,17 @@ def run_generic_clustering():
def run_clustering_for_layer(layer: Layer) -> List[Cluster]:
nodes = repo.get_layer_nodes(layer.layer_name)
nodes = repo.get_layer_nodes(layer.use_case, layer.use_case_table, layer.layer_name)
clusterer = Clusterer()
res = clusterer.cluster_dataset(
res: Dict[Any, ClusterResult] = clusterer.cluster_dataset(
nodes,
layer.properties
)
return [Cluster(layer.layer_name, key, value) for key, value in res.items()]
return [Cluster(layer.use_case, layer.use_case_table, layer.layer_name,
cluster_label=key, nodes=cluster_result.nodes, label=cluster_result.label if key != -1 else 'noise')
for key, cluster_result in res.items()]
def store_generic_clusters(clusters: List[Cluster]):
......
......@@ -7,20 +7,23 @@ if os.path.exists(modules_path):
import json
from datetime import datetime, date
from db.repository import Repository
from db.entities.timeslice import TimeSlice
from db.entities import ClusterSet, Cluster
from db.entities import ClusterSet, Cluster, Layer, TimeSlice
from typing import Tuple, Dict, Any, List
TimeSliceKey = Tuple[int, int]
# TODO extract information about time features (maybe from table mapping)
TIME_PROPERTY_NAMES = ['Timestamp']
def convert_to_time_slice_key(timestamp: str) -> TimeSliceKey:
'''Returns the tuple (year, week_of_year) from a timestamp. This is used as the key for the slicing.'''
time = datetime.utcfromtimestamp(float(timestamp[0:10]))
# time = datetime.utcfromtimestamp(float(timestamp[0:10]))
time = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
(y, w, _) = time.isocalendar()
return (y, w)
def split_clusterset_by_time(layer_name: str, clusters: List[Cluster]) -> Dict[TimeSliceKey, TimeSlice]:
def split_clusterset_by_time(layer: Layer, clusters: List[Cluster]) -> Dict[TimeSliceKey, TimeSlice]:
'''
Distributes all nodes in clusters of a single layer into individual time slices based on their timestamps.
If a node spans over multiple slices it will be added to all of them.
......@@ -29,7 +32,6 @@ def split_clusterset_by_time(layer_name: str, clusters: List[Cluster]) -> Dict[T
:params clusters: The clusters whichs nodes are split
:returns: A dict of time slices where the key is the time info and value is the information about the time slice
'''
time_property_names = ['Finished_time', 'Starting_time']
time_slices: Dict[Any, TimeSlice] = {}
for cluster_no in clusters:
......@@ -37,13 +39,13 @@ def split_clusterset_by_time(layer_name: str, clusters: List[Cluster]) -> Dict[T
# retrieve times the node is located in based on the defined time properties in the schema
time_keys = set()
for time_property in time_property_names:
for time_property in TIME_PROPERTY_NAMES:
if time_property in node:
time_keys.add(convert_to_time_slice_key(str(node[time_property])))
for time_key in time_keys:
if time_key not in time_slices:
time_slices[time_key] = TimeSlice(time_key, layer_name)
time_slices[time_key] = TimeSlice(time_key, layer.use_case, layer.use_case_table, layer.layer_name)
time_slices[time_key].add_node_to_cluster(cluster_no.cluster_label, node)
......@@ -58,14 +60,20 @@ if __name__ == "__main__":
layers = repo.get_layers()
for layer in layers:
layer_name = layer.layer_name
print(f"Working on {layer_name}")
use_case = layer.use_case
use_case_table = layer.use_case_table
print(f"Working on {use_case}//{use_case_table}//{layer_name}.")
clusters_for_layer = repo.get_clusters_for_layer(layer_name)
clusters_for_layer = repo.get_clusters_for_layer(use_case, use_case_table, layer_name)
# if no clusters were generated use one large cluster instead of skipping the layer
if clusters_for_layer is None or len(clusters_for_layer) == 0:
clusters_for_layer = [Cluster(layer_name, -1, repo.get_layer_nodes(layer_name))]
nodes = repo.get_layer_nodes(use_case, use_case_table, layer_name)
if nodes is None or len(nodes) == 0:
print("Skipping, because there are no clusters and no nodes for the layer.")
continue
clusters_for_layer = [Cluster(use_case, use_case_table, layer_name, -1, nodes, 'noise')]
time_slices = split_clusterset_by_time(layer_name, clusters_for_layer)
time_slices = split_clusterset_by_time(layer, clusters_for_layer)
for k,v in time_slices.items():
repo.add_time_slice(v)
......@@ -12,8 +12,10 @@ import json
class TestCluster(unittest.TestCase):
def test_init_Cluster(self):
c = Cluster('layer1', 1, [1, 2, 3])
c = Cluster('debug', 'debug-table1', 'layer1', 1, [1, 2, 3])
self.assertEqual('debug', c.use_case)
self.assertEqual('debug-table1', c.use_case_table)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
......
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering import ClusterResultConverter, ClusterResult
from typing import List, Dict, Any
class TestClusterResult(unittest.TestCase):
converter:ClusterResultConverter = None
def setUp(self):
self.converter = ClusterResultConverter()
def test_result_undefined_feature(self):
cluster_groups = self._get_some_cluster_groups_1d()
cluster_res = self.converter.convert_to_cluster_results(
cluster_groups=cluster_groups,
features=[]
)
self.assert_correct_cluster_result_len(cluster_groups, cluster_res)
self.assert_correct_cluster_result_labels(['n.a.','n.a.','n.a.'], cluster_res)
def test_result_1d_feature(self):
cluster_groups = self._get_some_cluster_groups_1d()
cluster_res = self.converter.convert_to_cluster_results(
cluster_groups=cluster_groups,
features=['v']
)
self.assert_correct_cluster_result_len(cluster_groups, cluster_res)
self.assert_correct_cluster_result_labels(['-1.0 -- 1.0','10.0 -- 11.0','2.0 -- 2.0'], cluster_res)
def test_result_2d_features(self):
cluster_groups = self._get_some_cluster_groups_2d()
cluster_res = self.converter.convert_to_cluster_results(
cluster_groups=cluster_groups,
features=['v', 'u']
)
self.assert_correct_cluster_result_len(cluster_groups, cluster_res)
self.assert_correct_cluster_result_labels([str((0.0,0.0)), str((10.5,10.5)), str((2.0,2.0)), str((3.0,6.0))], cluster_res)
#region Custom Assertions
def assert_correct_cluster_result_len(self, expected: 'original dict of lists', actual: Dict[Any, ClusterResult]):
self.assertEqual(len(expected), len(actual))
for i in range(len(expected)):
self.assertEqual(len(expected[i]), len(actual[i].nodes))
self.assertEqual(expected[i], actual[i].nodes)
def assert_correct_cluster_result_labels(self, expected: List[str], actual: Dict[Any, ClusterResult]):
self.assertEqual(len(expected), len(actual))
for i in range(len(expected)):
self.assertEqual(expected[i], actual[i].label)
#endregion Custom Assertions
#region helper methods
def _get_some_cluster_groups_1d(self):
return {
0: [{'v':'0'}, {'v':'1'}, {'v':'-1'}],
1: [{'v':'10'}, {'v':'11'}],
2: [{'v':'2'}],
}
def _get_some_cluster_groups_2d(self):
return {
0: [{'v':'0', 'u':'0'}, {'v':'1', 'u':'1'}, {'v':'-1', 'u':'-1'}],
1: [{'v':'10', 'u':'10'}, {'v':'11', 'u':'11'}],
2: [{'v':'2', 'u':'2'}],
3: [{'v':'7', 'u':'7'}, {'v':'5', 'u':'3'}, {'v':'-3', 'u':'8'}],
}
#endregion helper methods
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
......@@ -4,8 +4,9 @@ for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering.clusterer import Clusterer
from processing.clustering import Clusterer, ClusterResult
import numpy as np
from typing import List, Dict, Any
class TestClusterer(unittest.TestCase):
clusterer:Clusterer = None
......@@ -178,14 +179,14 @@ class TestClusterer(unittest.TestCase):
for i in range(len(locations)):
self.assertEqual(labels[i], locations[i]['cluster_label'])
def assertClusteringResult(self, expected, actual):
def assertClusteringResult(self, expected: Dict[Any, List], actual: Dict[Any, ClusterResult]):
self.assertEqual(len(expected), len(actual))
for k in expected.keys():
if k not in actual:
self.fail(f"Cluster key ({k}, {type(k)}) not in result.")
self.assertListEqual(expected[k], actual[k])
self.assertListEqual(expected[k], actual[k].nodes)
#endregion helper methods
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment