Commit 74e9c2e7 authored by Alexander Lercher's avatar Alexander Lercher

Clustering and slicing with individual node and cluster objects

Not using one large layer or clusterset object anymore as size is too large for storing and loading
parent 0edf8deb
......@@ -87,7 +87,7 @@ paths:
operationId: "routes.layers.post"
tags:
- "Layers"
summary: "Add a new layer or overwrite an existing one"
summary: "Add a new layer [TODO: or overwrite an existing one]"
parameters:
- in: body
name: "layer"
......@@ -117,11 +117,11 @@ paths:
operationId: "routes.layers.get_by_name"
tags:
- "Layers"
summary: "Get layer data for layer-name"
summary: "Get single layer data"
parameters:
- name: "name"
in: "path"
description: "Name of the layer to return"
description: "Name of the requested layer"
required: true
type: "string"
responses:
......@@ -137,7 +137,7 @@ paths:
operationId: "routes.layers.get_nodes"
tags:
- "Layers"
summary: "Get all nodes for the layer"
summary: "Get all individual nodes for the layer"
parameters:
- name: "name"
in: "path"
......@@ -148,7 +148,7 @@ paths:
200:
description: "Successful operation"
schema:
type: object
$ref: "#/definitions/NodeCollection"
404:
description: "Layer not found"
post:
......@@ -167,9 +167,7 @@ paths:
description: "The node(s) to be added"
required: true
schema:
type: array
items:
type: object
$ref: "#/definitions/NodeCollection"
responses:
201:
description: "Successful operation"
......@@ -196,6 +194,25 @@ paths:
404:
description: "Layer not found"
/layers/{name}/timeslices:
get:
operationId: "routes.timeslices.get_by_name2"
tags:
- "Layers"
summary: "Get all timeslices for the layer"
parameters:
- name: "name"
in: "path"
description: "Name of the layer"
required: true
type: "string"
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/TimeSliceCollection"
404:
description: "Layer not found"
#endregion
# Clusters
......@@ -389,20 +406,15 @@ definitions:
Cluster:
type: object
properties:
layer_name:
type: string
cluster_label:
type: number
nodes:
type: array
items:
type: object
example:
"Finished_time": 1576631193265951
"Latitude_Destination": -5.973257
"Longitude_Destination": 37.416316
"TravelID": "5e57ec9159bc0668543f156a"
"TravelPrice": 15
"UniqueID": "a95075f5042b1b27060080156d87fe34ec7e712c5e57ec9159bc0668543f156a"
"UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
$ref: "#/definitions/Node"
ClusterCollection:
type: array
......@@ -503,6 +515,22 @@ definitions:
items:
$ref: "#/definitions/Layer"
Node:
type: object
example:
"Finished_time": 1576631193265951
"Latitude_Destination": -5.973257
"Longitude_Destination": 37.416316
"TravelID": "5e57ec9159bc0668543f156a"
"TravelPrice": 15
"UniqueID": "a95075f5042b1b27060080156d87fe34ec7e712c5e57ec9159bc0668543f156a"
"UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
NodeCollection:
type: array
items:
$ref: "#/definitions/Node"
ClusterSet:
type: object
properties:
......
......@@ -7,12 +7,14 @@ class Cluster:
'''
A cluster for an arbitrary layer containing some nodes.
:param layer_name: The name of the layer in which the cluster is located
:param cluster_label: The label of the cluster unique for the layer
:param nodes: The individual nodes of the cluster
'''
def __init__(self, cluster_label: int = None, nodes: List = None,
def __init__(self, layer_name: str = None, cluster_label: int = None, nodes: List[Dict] = None,
cluster_dict: Dict = None, from_db=False):
self.layer_name = layer_name
self.cluster_label = cluster_label
self.nodes = nodes
......@@ -21,11 +23,13 @@ class Cluster:
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"layer_name": self.layer_name,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
}
def from_serializable_dict(self, cluster_dict: Dict, from_db=False):
self.layer_name = cluster_dict["layer_name"]
self.cluster_label = cluster_dict["cluster_label"]
self.nodes = json.loads(cluster_dict["nodes"]) \
if from_db else cluster_dict["nodes"]
......
......@@ -96,14 +96,15 @@ class Repository(MongoRepositoryBase):
def add_layer_nodes(self, nodes:List[dict]):
super().insert_many(self._layer_nodes_collection, nodes)
def get_layer_nodes(self, layer_name: str):
'''Returns a json'''
def get_layer_nodes(self, layer_name: str) -> dict:
'''Returns from json'''
entries = super().get_entries(self._layer_nodes_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
return [e for e in entries]
#endregion
#region ClusterSet
# TODO cleanup
def add_clusterset(self, cluster_set: ClusterSet):
super().insert_entry(self._clusterset_collection, cluster_set.to_serializable_dict())
......@@ -126,6 +127,16 @@ class Repository(MongoRepositoryBase):
return entries[0]
else:
return None
def add_clusters(self, clusters: List[Cluster]):
cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters]
super().insert_many(self._clusterset_collection, cluster_dicts)
def get_clusters_for_layer(self, layer_name: str) -> List[Cluster]:
entries = super().get_entries(self._clusterset_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
return [Cluster(cluster_dict=e, from_db=True) for e in entries]
#endregion
#region TimeSlice
......
......@@ -12,11 +12,11 @@ def get_names():
def get_by_name2(name):
res = repo.get_clusterset(name)
if res is not None:
return res.to_serializable_dict()
else:
res = repo.get_clusters_for_layer(name)
if res is None or len(res) == 0:
return Response(status=404)
else:
return [c.to_serializable_dict() for c in res]
def get_by_name(name):
res = repo.get_clusterset(name)
......
......@@ -11,9 +11,18 @@ def get():
def get_by_name(layername):
res = repo.get_time_slices_by_name(layername)
print(len(res))
# print(len(res))
if res is not None and len(res) != 0:
return [e.to_serializable_dict() for e in res]
else:
return Response(status=404)
def get_by_name2(name):
res = repo.get_time_slices_by_name(name)
# print(len(res))
if res is not None and len(res) != 0:
return [e.to_serializable_dict() for e in res]
else:
return Response(status=404)
\ No newline at end of file
......@@ -22,27 +22,30 @@ def run_generic_clustering():
all_layers:List[Layer] = repo.get_layers()
for layer in all_layers:
print(f"Clustering {layer.layer_name}")
if layer.properties is None or len(layer.properties) == 0:
print("skipping")
continue
print(f"Clustering {layer.layer_name}")
clusters = run_clustering_for_layer(layer)
cluster_set = ClusterSet(layer.layer_name, clusters)
store_clusterset(cluster_set)
# cluster_set = ClusterSet(layer.layer_name, clusters)
store_generic_clusters(clusters)
def run_clustering_for_layer(layer: Layer) -> List[Cluster]:
clusterer = Clusterer()
nodes = repo.get_layer_nodes(layer.layer_name)
clusterer = Clusterer()
res = clusterer.cluster_dataset(
layer.nodes,
nodes,
layer.properties
)
return [Cluster(key, value) for key, value in res.items()]
return [Cluster(layer.layer_name, key, value) for key, value in res.items()]
def store_clusterset(cluster_set: ClusterSet):
repo.add_clusterset(cluster_set)
def store_generic_clusters(clusters: List[Cluster]):
repo.add_clusters(clusters)
# with open(f'clusterset_{cluster_set.layer_name}.txt', 'w') as file:
# file.write(json.dumps(cluster_set.to_serializable_dict()))
......@@ -109,5 +112,6 @@ def store_clusters(type: str, clusters: List):
if __name__ == "__main__":
run_generic_clustering()
# TODO cleanup
# run_location_clustering()
# run_time_clustering()
......@@ -8,8 +8,8 @@ import json
from datetime import datetime, date
from db.repository import Repository
from db.entities.timeslice import TimeSlice
from db.entities import ClusterSet
from typing import Tuple, Dict, Any
from db.entities import ClusterSet, Cluster
from typing import Tuple, Dict, Any, List
TimeSliceKey = Tuple[int, int]
......@@ -20,28 +20,30 @@ def convert_to_time_slice_key(timestamp: str) -> TimeSliceKey:
return (y, w)
def split_clusterset_by_time(clustersets) -> Dict[TimeSliceKey, TimeSlice]:
def split_clusterset_by_time(layer_name: str, clusters: List[Cluster]) -> Dict[TimeSliceKey, TimeSlice]:
'''
Distributes all nodes of a single clusterset into individual time slices based on their timestamps.
Distributes all nodes in clusters of a single layer into individual time slices based on their timestamps.
If a node spans over multiple slices it will be added to all of them.
Information about clusters and the nodes in the clusters will not be changed.
:params clustersets: The clusterset whichs nodes are split
:params clusters: The clusters whichs nodes are split
:returns: A dict of time slices where the key is the time info and value is the information about the time slice
'''
time_property_names = ['Finished_time', 'Starting_time']
time_slices: Dict[Any, TimeSlice] = {}
for cluster_no in clusterset.clusters:
for cluster_no in clusters:
for node in cluster_no.nodes:
time_keys = {
convert_to_time_slice_key(str(node['Finished_time'])),
convert_to_time_slice_key(str(node['Starting_time']))
}
# retrieve times the node is located in based on the defined time properties in the schema
time_keys = set()
for time_property in time_property_names:
if time_property in node:
time_keys.add(convert_to_time_slice_key(str(node[time_property])))
for time_key in time_keys:
if time_key not in time_slices:
time_slices[time_key] = TimeSlice(time_key, clusterset.layer_name)
time_slices[time_key] = TimeSlice(time_key, layer_name)
time_slices[time_key].add_node_to_cluster(cluster_no.cluster_label, node)
......@@ -53,9 +55,12 @@ if __name__ == "__main__":
repo.remove_all_time_slices()
clustersets = repo.get_clustersets()
for clusterset in clustersets:
time_slices = split_clusterset_by_time(clusterset)
layers = repo.get_layers()
for layer in layers:
layer_name = layer.layer_name
print(f"Working on {layer_name}")
clusters_for_layer = repo.get_clusters_for_layer(layer_name)
time_slices = split_clusterset_by_time(layer_name, clusters_for_layer)
for k,v in time_slices.items():
repo.add_time_slice(v)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment