Commit 37cf0a63 authored by Alex's avatar Alex

Applied generic clustering to layers and storing result in db

parent 9fa5ed39
from db.entities.location import Location from db.entities.location import Location
from db.entities.popular_location import PopularLocation from db.entities.popular_location import PopularLocation
from db.entities.cluster import Cluster, LocationCluster, TimeCluster from db.entities.cluster import Cluster, LocationCluster, TimeCluster
from db.entities.clusterset import ClusterSet
from db.entities.user_cluster_graph import UserClusterGraph from db.entities.user_cluster_graph import UserClusterGraph
from db.entities.layer import Layer from db.entities.layer import Layer
...@@ -4,10 +4,38 @@ from datetime import date, datetime ...@@ -4,10 +4,38 @@ from datetime import date, datetime
class Cluster: class Cluster:
def __init__(self, cluster_label: int = None, nodes: List = None): '''
A cluster for an arbitrary layer containing some nodes.
:param cluster_label: The label of the cluster unique for the layer
:param nodes: The individual nodes of the cluster
'''
def __init__(self, cluster_label: int = None, nodes: List = None,
cluster_dict: Dict = None, from_db=False):
self.cluster_label = cluster_label self.cluster_label = cluster_label
self.nodes = nodes self.nodes = nodes
if cluster_dict is not None:
self.from_serializable_dict(cluster_dict, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
}
def from_serializable_dict(self, cluster_dict: Dict, from_db=False):
self.cluster_label = cluster_dict["cluster_label"]
self.nodes = json.loads(cluster_dict["nodes"]) \
if from_db else cluster_dict["nodes"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"Cluster({self.__repr__()})"
class LocationCluster(Cluster): class LocationCluster(Cluster):
def __init__(self, cluster_label: int = None, nodes: List = None, def __init__(self, cluster_label: int = None, nodes: List = None,
...@@ -67,7 +95,7 @@ class TimeCluster(Cluster): ...@@ -67,7 +95,7 @@ class TimeCluster(Cluster):
if from_db else time_dict["nodes"] if from_db else time_dict["nodes"]
def __repr__(self): def __repr__(self):
return json.dumps(self.to_serializable_dict()) return json.dumps(self.to_serializable_dict(True))
def __str__(self): def __str__(self):
return f"TimeCluster({self.__repr__()})" return f"TimeCluster({self.__repr__()})"
import json
from db.entities.cluster import Cluster
from typing import List, Dict
from datetime import date, datetime
class ClusterSet:
'''
A clusterset for an arbitrary layer containing all clusters.
:param layer_name: The name of the layer
:param clusters: The individual clusters
'''
def __init__(self, layer_name: str = None, clusters: List[Cluster] = None,
cluster_set_dict: Dict = None, from_db=False):
self.layer_name = layer_name
self.clusters = clusters
if cluster_set_dict is not None:
self.from_serializable_dict(cluster_set_dict, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
serialized_dict_clusters = [cluster.to_serializable_dict(for_db)
for cluster in self.clusters]
return {
"layer_name": self.layer_name,
"clusters": json.dumps(serialized_dict_clusters) if for_db else serialized_dict_clusters
}
def from_serializable_dict(self, cluster_set_dict: Dict, from_db=False):
self.layer_name = cluster_set_dict["layer_name"]
serialized_dict_clusters = json.loads(cluster_set_dict["clusters"]) \
if from_db else cluster_set_dict["clusters"]
self.clusters = [Cluster(cluster_dict=cluster_dict, from_db=from_db)
for cluster_dict in serialized_dict_clusters]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"ClusterSet({self.__repr__()})"
...@@ -32,20 +32,3 @@ class Layer: ...@@ -32,20 +32,3 @@ class Layer:
def __str__(self): def __str__(self):
return f"Layer({self.__repr__()})" return f"Layer({self.__repr__()})"
layer_d = {
"layer_name": "Destination",
"nodes": [ {
"TravelID": 1,
"UserID": "Micah",
"Latitude_Destination": -5.95081,
"Longitude_Destination": 37.415281,
"Finished_time": 1579143634812589,
"TravelPrice": 19
}],
"properties": ['Latitude_StartingPoint', 'Longitude_StartingPoint']
}
layer = Layer(layer_d)
print(layer.to_serializable_dict(for_db=True))
\ No newline at end of file
...@@ -22,6 +22,7 @@ class Repository(MongoRepositoryBase): ...@@ -22,6 +22,7 @@ class Repository(MongoRepositoryBase):
self._time_cluster_collection = 'time_cluster' self._time_cluster_collection = 'time_cluster'
self._user_cluster_graph_collection = 'user_cluster_graph' self._user_cluster_graph_collection = 'user_cluster_graph'
self._layer_collection = 'layer' self._layer_collection = 'layer'
self._clusterset_collection = 'cluster_set'
self.agi_repo = AgiRepository() self.agi_repo = AgiRepository()
...@@ -66,3 +67,10 @@ class Repository(MongoRepositoryBase): ...@@ -66,3 +67,10 @@ class Repository(MongoRepositoryBase):
def get_layers(self) -> List[Layer]: def get_layers(self) -> List[Layer]:
entries = super().get_entries(self._layer_collection) entries = super().get_entries(self._layer_collection)
return [Layer(e) for e in entries] return [Layer(e) for e in entries]
def add_clusterset(self, cluster_set: ClusterSet):
super().insert_entry(self._clusterset_collection, cluster_set.to_serializable_dict())
def get_clustersets(self) -> List[ClusterSet]:
entries = super().get_entries(self._clusterset_collection)
return [ClusterSet(cluster_set_dict=e) for e in entries]
\ No newline at end of file
...@@ -6,6 +6,7 @@ from typing import List, Dict, Any, TypeVar ...@@ -6,6 +6,7 @@ from typing import List, Dict, Any, TypeVar
from deprecated import deprecated from deprecated import deprecated
T = TypeVar('T') T = TypeVar('T')
ClusterGroup = Dict[Any, List[Dict]]
class Clusterer: class Clusterer:
''' '''
...@@ -94,7 +95,7 @@ class Clusterer: ...@@ -94,7 +95,7 @@ class Clusterer:
continue continue
dataset[i]['cluster_label'] = labels[i] dataset[i]['cluster_label'] = labels[i]
def group_by_clusters(self, dataset:List[Dict], labels:List[T]) -> Dict[T, List[Dict]]: def group_by_clusters(self, dataset:List[Dict], labels:List[Any]) -> ClusterGroup:
self.label_dataset(dataset, labels) self.label_dataset(dataset, labels)
clusters = {} clusters = {}
...@@ -104,7 +105,7 @@ class Clusterer: ...@@ -104,7 +105,7 @@ class Clusterer:
return clusters return clusters
@deprecated(reason="Use generic version instead") @deprecated(reason="Use generic version instead")
def cluster_locations(self, locations:List[Dict]) -> Dict[int, List[Dict]]: def cluster_locations(self, locations:List[Dict]) -> ClusterGroup:
'''Returns a dictionary with identified clusters and their locations copied from the input''' '''Returns a dictionary with identified clusters and their locations copied from the input'''
if locations is None or len(locations) == 0: if locations is None or len(locations) == 0:
# raise Exception("locations has to contain something") # raise Exception("locations has to contain something")
...@@ -118,7 +119,7 @@ class Clusterer: ...@@ -118,7 +119,7 @@ class Clusterer:
return self.group_by_clusters(locations, labels) return self.group_by_clusters(locations, labels)
@deprecated(reason="Use generic version instead") @deprecated(reason="Use generic version instead")
def cluster_times(self, times:List[Dict]) -> Dict[int, List[Dict]]: def cluster_times(self, times:List[Dict]) -> ClusterGroup:
'''Returns a dictionary with identified clusters and their times copied from the input''' '''Returns a dictionary with identified clusters and their times copied from the input'''
features = self.extract_time_features(times) features = self.extract_time_features(times)
...@@ -127,14 +128,14 @@ class Clusterer: ...@@ -127,14 +128,14 @@ class Clusterer:
return self.group_by_clusters(times, labels) return self.group_by_clusters(times, labels)
def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> List: def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> ClusterGroup:
''' '''
Returns the identified clusters containing a subset of nodes from the dataset. Returns the identified clusters containing a subset of nodes from the dataset.
:param dataset: The nodes to assign to clusters :param dataset: The nodes to assign to clusters
:param features: The feature names of the nodes to use for clustering :param features: The feature names of the nodes to use for clustering
:returns: A list of clusters :returns: A dictionary of clusters, where each value is a non-empty subset of dataset if dataset was not empty
''' '''
arr = self._extract_features(dataset, features) arr = self._extract_features(dataset, features)
...@@ -142,34 +143,3 @@ class Clusterer: ...@@ -142,34 +143,3 @@ class Clusterer:
return self.group_by_clusters(dataset, labels) return self.group_by_clusters(dataset, labels)
# TODO remove
if __name__ == '__main__':
import sys
sys.path.insert(1, './')
from db.agi.agi_repository import AgiRepository
clusterer = Clusterer()
agi_repo = AgiRepository()
if True:
res_old = clusterer.cluster_locations(agi_repo.getLocationsBasedOnNewDataSchema()['nodes'])
# print(res_old[11])
# [{'id': 'adad64cb-bd71-4b2b-9a70-e08eb8b19901-1570900602', 'latitude': -20.2695062, 'longitude': 57.6297389, 'timestamp': 1570900602, 'user': 'b57ad1fb396cfc18b8867fb2e08be723c2cdc2a6', 'cluster_label': 11}, {'id': '127af17b-e823-4d30-8227-00f5421bd48b-1549291309', 'latitude': -20.5362627, 'longitude': 47.2459749, 'timestamp': 1549291309, 'user': 'ca34bd51c4dc65cbc021cb27bcaa014ca082b8c4', 'cluster_label': 11}]
data = agi_repo.getLocationsBasedOnNewDataSchema()
res = clusterer.cluster_dataset(data['nodes'], data['properties'])
# if res is not None:
# print(res[11])
assert (res_old == res)
# time
res_old = clusterer.cluster_times(agi_repo.getTimesBasedOnNewDataSchema()['nodes'])
data = agi_repo.getTimesBasedOnNewDataSchema()
res = clusterer.cluster_dataset(data['nodes'], data['properties'])
print(res_old[20])
print(res[20])
assert (res_old == res)
\ No newline at end of file
...@@ -4,15 +4,40 @@ modules_path = '../../../modules/' ...@@ -4,15 +4,40 @@ modules_path = '../../../modules/'
if os.path.exists(modules_path): if os.path.exists(modules_path):
sys.path.insert(1, modules_path) sys.path.insert(1, modules_path)
from db.entities import Location, PopularLocation, LocationCluster, TimeCluster from db.entities import *
from typing import List, Dict, Tuple from typing import List, Dict, Tuple
from db.repository import Repository from db.repository import Repository, AgiRepository
from processing.clustering.clusterer import Clusterer from processing.clustering.clusterer import Clusterer
DEBUG = False DEBUG = False
repo = Repository() repo = Repository()
test_repo = AgiRepository()
def run_generic_clustering():
'''Runs the clustering for all layers found in the repository.'''
all_layers:List[Layer] = repo.get_layers()
for layer in all_layers:
print(f"Clustering {layer.layer_name}")
clusters = run_clustering_for_layer(layer)
cluster_set = ClusterSet(layer.layer_name, clusters)
repo.add_clusterset(cluster_set)
def run_clustering_for_layer(layer: Layer) -> List[Cluster]:
clusterer = Clusterer()
res = clusterer.cluster_dataset(
layer.nodes,
layer.properties
)
return [Cluster(key, value) for key, value in res.items()]
def run_location_clustering(): def run_location_clustering():
user_clusterer = Clusterer() user_clusterer = Clusterer()
...@@ -74,5 +99,7 @@ def store_clusters(type: str, clusters: List): ...@@ -74,5 +99,7 @@ def store_clusters(type: str, clusters: List):
if __name__ == "__main__": if __name__ == "__main__":
run_location_clustering() run_generic_clustering()
run_time_clustering()
# run_location_clustering()
# run_time_clustering()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment