Commit 31f80acb authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'feature/handle-large-datasets' into develop

parents bd4aa55b cf555c9f
......@@ -80,16 +80,17 @@ paths:
400:
description: "Invalid input"
# Layers
#region Layers
/layers:
post:
operationId: "routes.layers.post"
tags:
- "Layers"
summary: "Add a new layer or overwrite an existing one"
summary: "Add a new layer [TODO: or overwrite an existing one]"
parameters:
- in: body
name: "Layer"
name: "layer"
description: "The layer data to be added"
required: true
schema:
......@@ -111,40 +112,108 @@ paths:
schema:
$ref: "#/definitions/LayerCollection"
/layers/names:
/layers/{name}:
get:
operationId: "routes.layers.get_names"
operationId: "routes.layers.get_by_name"
tags:
- "Layers"
summary: "Get all layer names"
parameters: []
summary: "Get single layer data"
parameters:
- name: "name"
in: "path"
description: "Name of the requested layer"
required: true
type: "string"
responses:
200:
description: "Successful operation"
schema:
type: array
items:
type: string
$ref: "#/definitions/Layer"
404:
description: "Layer not found"
/layers/{name}:
/layers/{name}/nodes:
get:
operationId: "routes.layers.get_by_name"
operationId: "routes.layers.get_nodes"
tags:
- "Layers"
summary: "Get layer data for layer-name"
summary: "Get all individual nodes for the layer"
parameters:
- name: "name"
in: "path"
description: "Name of the layer to return"
description: "Name of the layer"
required: true
type: "string"
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/Layer"
$ref: "#/definitions/NodeCollection"
404:
description: "Layer not found"
post:
operationId: "routes.layers.post_nodes"
tags:
- "Layers"
summary: "Adds a single or multiple nodes to the layer"
parameters:
- name: "name"
in: "path"
description: "Name of the layer"
required: true
type: "string"
- name: "node"
in: body
description: "The node(s) to be added"
required: true
schema:
$ref: "#/definitions/NodeCollection"
responses:
201:
description: "Successful operation"
400:
description: "Invalid input"
/layers/{name}/clusters:
get:
operationId: "routes.clustersets.get_by_name2"
tags:
- "Layers"
summary: "Get all clusters for the layer"
parameters:
- name: "name"
in: "path"
description: "Name of the layer"
required: true
type: "string"
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/ClusterCollection"
404:
description: "Layer not found"
/layers/{name}/timeslices:
get:
operationId: "routes.timeslices.get_by_name2"
tags:
- "Layers"
summary: "Get all timeslices for the layer"
parameters:
- name: "name"
in: "path"
description: "Name of the layer"
required: true
type: "string"
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/TimeSliceCollection"
404:
description: "Layer not found"
#endregion
# Clusters
# TODO remove partially
......@@ -200,6 +269,7 @@ paths:
# 200:
# description: "Successful operation"
# TODO remove
/clustersets:
get:
operationId: "routes.clustersets.get"
......@@ -248,6 +318,7 @@ paths:
404:
description: "Clusterset not found"
# TODO remove
/user-cluster-graphs:
get:
......@@ -335,20 +406,20 @@ definitions:
Cluster:
type: object
properties:
layer_name:
type: string
cluster_label:
type: number
nodes:
type: array
items:
type: object
example:
"Finished_time": 1576631193265951
"Latitude_Destination": -5.973257
"Longitude_Destination": 37.416316
"TravelID": "5e57ec9159bc0668543f156a"
"TravelPrice": 15
"UniqueID": "a95075f5042b1b27060080156d87fe34ec7e712c5e57ec9159bc0668543f156a"
"UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
$ref: "#/definitions/Node"
ClusterCollection:
type: array
items:
$ref: "#/definitions/Cluster"
LocationCluster:
type: object
......@@ -416,10 +487,10 @@ definitions:
properties:
LayerName:
type: string
Nodes:
type: array
items:
type: object
# Nodes:
# type: array
# items:
# type: object
Properties:
type: array
items:
......@@ -430,10 +501,10 @@ definitions:
properties:
layer_name:
type: string
nodes:
type: array
items:
type: object
# nodes:
# type: array
# items:
# type: object
properties:
type: array
items:
......@@ -444,6 +515,22 @@ definitions:
items:
$ref: "#/definitions/Layer"
Node:
type: object
example:
"Finished_time": 1576631193265951
"Latitude_Destination": -5.973257
"Longitude_Destination": 37.416316
"TravelID": "5e57ec9159bc0668543f156a"
"TravelPrice": 15
"UniqueID": "a95075f5042b1b27060080156d87fe34ec7e712c5e57ec9159bc0668543f156a"
"UserID": "a95075f5042b1b27060080156d87fe34ec7e712c"
NodeCollection:
type: array
items:
$ref: "#/definitions/Node"
ClusterSet:
type: object
properties:
......
......@@ -7,12 +7,14 @@ class Cluster:
'''
A cluster for an arbitrary layer containing some nodes.
:param layer_name: The name of the layer in which the cluster is located
:param cluster_label: The label of the cluster unique for the layer
:param nodes: The individual nodes of the cluster
'''
def __init__(self, cluster_label: int = None, nodes: List = None,
def __init__(self, layer_name: str = None, cluster_label: int = None, nodes: List[Dict] = None,
cluster_dict: Dict = None, from_db=False):
self.layer_name = layer_name
self.cluster_label = cluster_label
self.nodes = nodes
......@@ -21,11 +23,13 @@ class Cluster:
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"layer_name": self.layer_name,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
}
def from_serializable_dict(self, cluster_dict: Dict, from_db=False):
self.layer_name = cluster_dict["layer_name"]
self.cluster_label = cluster_dict["cluster_label"]
self.nodes = json.loads(cluster_dict["nodes"]) \
if from_db else cluster_dict["nodes"]
......
......@@ -17,15 +17,12 @@ class Layer:
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"layer_name": self.layer_name,
"properties": self.properties,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
"properties": self.properties
}
def from_serializable_dict(self, layer_info: Dict, from_db=False):
self.layer_name = layer_info['layer_name']
self.properties = layer_info['properties']
self.nodes = json.loads(layer_info["nodes"]) \
if from_db else layer_info["nodes"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
......
......@@ -21,9 +21,10 @@ class Repository(MongoRepositoryBase):
self._location_cluster_collection = 'location_cluster'
self._time_cluster_collection = 'time_cluster'
self._user_cluster_graph_collection = 'user_cluster_graph'
self._layer_collection = 'layer'
self._clusterset_collection = 'cluster_set'
self._time_slice_collection = 'time_slice'
self._layer_collection = 'layer-new'
self._layer_nodes_collection = 'layer_nodes-new'
self._clusterset_collection = 'cluster_set-new'
self._time_slice_collection = 'time_slice-new'
self.agi_repo = AgiRepository()
......@@ -88,9 +89,22 @@ class Repository(MongoRepositoryBase):
return entries[0]
else:
return None
def add_layer_node(self, node: dict):
super().insert_entry(self._layer_nodes_collection, node)
def add_layer_nodes(self, nodes:List[dict]):
super().insert_many(self._layer_nodes_collection, nodes)
def get_layer_nodes(self, layer_name: str) -> dict:
'''Returns all nodes for the layer.'''
entries = super().get_entries(self._layer_nodes_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
return [e for e in entries]
#endregion
#region ClusterSet
# TODO cleanup
def add_clusterset(self, cluster_set: ClusterSet):
super().insert_entry(self._clusterset_collection, cluster_set.to_serializable_dict())
......@@ -113,6 +127,16 @@ class Repository(MongoRepositoryBase):
return entries[0]
else:
return None
def add_clusters(self, clusters: List[Cluster]):
cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters]
super().insert_many(self._clusterset_collection, cluster_dicts)
def get_clusters_for_layer(self, layer_name: str) -> List[Cluster]:
entries = super().get_entries(self._clusterset_collection, selection={'layer_name': layer_name}, projection={'_id': 0})
return [Cluster(cluster_dict=e, from_db=True) for e in entries]
#endregion
#region TimeSlice
......
......@@ -10,8 +10,16 @@ def get():
def get_names():
return repo.get_clusterset_names()
def get_by_name(layername):
res = repo.get_clusterset(layername)
def get_by_name2(name):
res = repo.get_clusters_for_layer(name)
if res is None or len(res) == 0:
return Response(status=404)
else:
return [c.to_serializable_dict() for c in res]
def get_by_name(name):
res = repo.get_clusterset(name)
if res is not None:
return res.to_serializable_dict()
else:
......
......@@ -4,15 +4,18 @@ from db.entities import Layer
repo = Repository()
#region layers
def post():
'''Insert a new layer or overwrite an existing one.'''
# TODO overwrite
body = request.json
_insert_layer(body)
return Response(status=201)
def _insert_layer(layer_data: dict):
# convert object keys from ext source
'''Converts object keys from external source and inserts into database.'''
layer_data['layer_name'] = layer_data.pop('LayerName')
layer_data['nodes'] = layer_data.pop('Nodes')
# layer_data['nodes'] = layer_data.pop('Nodes')
layer_data['properties'] = layer_data.pop('Properties')
repo.add_layer(Layer(layer_data))
......@@ -20,12 +23,27 @@ def _insert_layer(layer_data: dict):
def get():
return [l.to_serializable_dict() for l in repo.get_layers()]
def get_names():
return repo.get_layer_names()
def get_by_name(name):
res = repo.get_layer(name)
if res is not None:
return res.to_serializable_dict()
else:
return Response(status=404)
#endregion
#region nodes
def get_nodes(name):
res = repo.get_layer_nodes(name)
# print(res)
return res
def post_nodes(name):
body = request.json
for node in body:
node['layer_name'] = name
repo.add_layer_nodes(body)
return Response(status=201)
#endregion nodes
\ No newline at end of file
......@@ -11,7 +11,16 @@ def get():
def get_by_name(layername):
res = repo.get_time_slices_by_name(layername)
print(len(res))
# print(len(res))
if res is not None and len(res) != 0:
return [e.to_serializable_dict() for e in res]
else:
return Response(status=404)
def get_by_name2(name):
res = repo.get_time_slices_by_name(name)
# print(len(res))
if res is not None and len(res) != 0:
return [e.to_serializable_dict() for e in res]
......
......@@ -22,27 +22,30 @@ def run_generic_clustering():
all_layers:List[Layer] = repo.get_layers()
for layer in all_layers:
print(f"Clustering {layer.layer_name}")
if layer.properties is None or len(layer.properties) == 0:
print("skipping")
continue
print(f"Clustering {layer.layer_name}")
clusters = run_clustering_for_layer(layer)
cluster_set = ClusterSet(layer.layer_name, clusters)
store_clusterset(cluster_set)
# cluster_set = ClusterSet(layer.layer_name, clusters)
store_generic_clusters(clusters)
def run_clustering_for_layer(layer: Layer) -> List[Cluster]:
clusterer = Clusterer()
nodes = repo.get_layer_nodes(layer.layer_name)
clusterer = Clusterer()
res = clusterer.cluster_dataset(
layer.nodes,
nodes,
layer.properties
)
return [Cluster(key, value) for key, value in res.items()]
return [Cluster(layer.layer_name, key, value) for key, value in res.items()]
def store_clusterset(cluster_set: ClusterSet):
repo.add_clusterset(cluster_set)
def store_generic_clusters(clusters: List[Cluster]):
repo.add_clusters(clusters)
# with open(f'clusterset_{cluster_set.layer_name}.txt', 'w') as file:
# file.write(json.dumps(cluster_set.to_serializable_dict()))
......@@ -109,5 +112,6 @@ def store_clusters(type: str, clusters: List):
if __name__ == "__main__":
run_generic_clustering()
# TODO cleanup
# run_location_clustering()
# run_time_clustering()
......@@ -8,8 +8,8 @@ import json
from datetime import datetime, date
from db.repository import Repository
from db.entities.timeslice import TimeSlice
from db.entities import ClusterSet
from typing import Tuple, Dict, Any
from db.entities import ClusterSet, Cluster
from typing import Tuple, Dict, Any, List
TimeSliceKey = Tuple[int, int]
......@@ -20,28 +20,30 @@ def convert_to_time_slice_key(timestamp: str) -> TimeSliceKey:
return (y, w)
def split_clusterset_by_time(clustersets) -> Dict[TimeSliceKey, TimeSlice]:
def split_clusterset_by_time(layer_name: str, clusters: List[Cluster]) -> Dict[TimeSliceKey, TimeSlice]:
'''
Distributes all nodes of a single clusterset into individual time slices based on their timestamps.
Distributes all nodes in clusters of a single layer into individual time slices based on their timestamps.
If a node spans over multiple slices it will be added to all of them.
Information about clusters and the nodes in the clusters will not be changed.
:params clustersets: The clusterset whichs nodes are split
:params clusters: The clusters whichs nodes are split
:returns: A dict of time slices where the key is the time info and value is the information about the time slice
'''
time_property_names = ['Finished_time', 'Starting_time']
time_slices: Dict[Any, TimeSlice] = {}
for cluster_no in clusterset.clusters:
for cluster_no in clusters:
for node in cluster_no.nodes:
time_keys = {
convert_to_time_slice_key(str(node['Finished_time'])),
convert_to_time_slice_key(str(node['Starting_time']))
}
# retrieve times the node is located in based on the defined time properties in the schema
time_keys = set()
for time_property in time_property_names:
if time_property in node:
time_keys.add(convert_to_time_slice_key(str(node[time_property])))
for time_key in time_keys:
if time_key not in time_slices:
time_slices[time_key] = TimeSlice(time_key, clusterset.layer_name)
time_slices[time_key] = TimeSlice(time_key, layer_name)
time_slices[time_key].add_node_to_cluster(cluster_no.cluster_label, node)
......@@ -53,9 +55,17 @@ if __name__ == "__main__":
repo.remove_all_time_slices()
clustersets = repo.get_clustersets()
for clusterset in clustersets:
time_slices = split_clusterset_by_time(clusterset)
layers = repo.get_layers()
for layer in layers:
layer_name = layer.layer_name
print(f"Working on {layer_name}")
clusters_for_layer = repo.get_clusters_for_layer(layer_name)
# if no clusters were generated use one large cluster instead of skipping the layer
if clusters_for_layer is None or len(clusters_for_layer) == 0:
clusters_for_layer = [Cluster(layer_name, -1, repo.get_layer_nodes(layer_name))]
time_slices = split_clusterset_by_time(layer_name, clusters_for_layer)
for k,v in time_slices.items():
repo.add_time_slice(v)
......@@ -19,6 +19,10 @@ class MongoRepositoryBase:
collection = self._database[collection_name]
collection.insert_one(content)
def insert_many(self, collection_name, content: list):
collection = self._database[collection_name]
collection.insert_many(content)
def get_entries(self, collection_name, selection: dict = {}, projection: dict = {'_': 0}) -> cursor.Cursor:
collection = self._database[collection_name]
return collection.find(selection, projection)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment