Commit ad6f1889 authored by Alex's avatar Alex

Implemented generic clustering

parent 512fadca
......@@ -29,6 +29,24 @@ class AgiRepository:
return locations
def getLocationsBasedOnNewDataSchema(self):
'''Creates the new data generic schema to be used beginning on 24.03.2020'''
data = {
'LayerName': 'Destination',
'Nodes': self.getLocations(),
'Properties': ['latitude', 'longitude']
}
return data
def getTimesBasedOnNewDataSchema(self):
'''Creates the new data generic schema to be used beginning on 24.03.2020'''
data = {
'LayerName': 'Starting_Time',
'Nodes': self.getLocations(),
'Properties': ['timestamp']
}
return data
def readDataFromFile(self) -> List[Dict]:
with open('./db/agi/travels.json', 'r') as f_travels:
travels = json.loads(f_travels.read())
......
......@@ -2,9 +2,19 @@ import json
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import DBSCAN
from typing import List, Dict
from typing import List, Dict, Any, TypeVar
from deprecated import deprecated
T = TypeVar('T')
class Clusterer:
'''
Clusterer for applying density-based clustering on datasets.
The clustering is done with DBSCAN.
:param epsilon: Epsilon used in DBSCAN
:param min_points: Min_points used in DBSCAN
'''
def __init__(self, epsilon=11, min_points=2):
self.epsilon = epsilon
self.min_points = min_points
......@@ -43,7 +53,8 @@ class Clusterer:
return fig
def create_labels(self, features:np.ndarray) -> List:
def create_labels(self, features:np.ndarray) -> List[int]:
'''Creates labels for the items based on DBSCAN.'''
if features is None or len(features) == 0:
return features # trash in trash out
......@@ -53,13 +64,25 @@ class Clusterer:
return labels.tolist()
@deprecated(reason="Use generic version instead")
def extract_location_features(self, locations: List[dict]) -> np.ndarray:
return np.asarray([(float(l['latitude']), float(l['longitude'])) for l in locations])
@deprecated(reason="Use generic version instead")
def extract_time_features(self, times: List[Dict]) -> np.ndarray:
return np.asarray([((t['timestamp']), 0) for t in times])
return np.asarray([[float(t['timestamp'])] for t in times])
def _extract_features(self, dataset: List[Dict], features:List[str]) -> np.ndarray:
'''Extracts the feature values from the dataset into a np array with same order as original dataset.'''
extracted_features = []
for data in dataset:
entry = [float(data[feature]) for feature in features]
extracted_features.append(entry)
def label_dataset(self, dataset:List[Dict], labels:List) -> List:
return np.asarray(extracted_features)
def label_dataset(self, dataset:List[Dict], labels:List[Any]) -> List:
'''Adds the labels to the elements of the dataset at the same position. The new key is called cluster_label.'''
if dataset is None or labels is None:
return
......@@ -67,15 +90,20 @@ class Clusterer:
raise ValueError("dataset and labels has to have same length")
for i in range(len(dataset)):
if 'cluster_label' in dataset[i]:
continue
dataset[i]['cluster_label'] = labels[i]
def group_by_clusters(self, dataset:List[Dict], labels:List) -> Dict[int, List[Dict]]:
def group_by_clusters(self, dataset:List[Dict], labels:List[T]) -> Dict[T, List[Dict]]:
self.label_dataset(dataset, labels)
clusters = {}
for label in labels:
clusters[label] = [ds for ds in dataset if ds['cluster_label'] == label]
return clusters
@deprecated(reason="Use generic version instead")
def cluster_locations(self, locations:List[Dict]) -> Dict[int, List[Dict]]:
'''Returns a dictionary with identified clusters and their locations copied from the input'''
if locations is None or len(locations) == 0:
......@@ -88,7 +116,8 @@ class Clusterer:
self.label_dataset(locations, labels)
return self.group_by_clusters(locations, labels)
@deprecated(reason="Use generic version instead")
def cluster_times(self, times:List[Dict]) -> Dict[int, List[Dict]]:
'''Returns a dictionary with identified clusters and their times copied from the input'''
features = self.extract_time_features(times)
......@@ -96,4 +125,51 @@ class Clusterer:
labels = self.create_labels(features)
self.label_dataset(times, labels)
return self.group_by_clusters(times, labels)
\ No newline at end of file
return self.group_by_clusters(times, labels)
def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> List:
'''
Returns the identified clusters containing a subset of nodes from the dataset.
:param dataset: The nodes to assign to clusters
:param features: The feature names of the nodes to use for clustering
:returns: A list of clusters
'''
arr = self._extract_features(dataset, features)
labels = self.create_labels(arr)
return self.group_by_clusters(dataset, labels)
# TODO remove
if __name__ == '__main__':
import sys
sys.path.insert(1, './')
from db.agi.agi_repository import AgiRepository
clusterer = Clusterer()
agi_repo = AgiRepository()
if True:
res_old = clusterer.cluster_locations(agi_repo.getLocationsBasedOnNewDataSchema()['Nodes'])
# print(res_old[11])
# [{'id': 'adad64cb-bd71-4b2b-9a70-e08eb8b19901-1570900602', 'latitude': -20.2695062, 'longitude': 57.6297389, 'timestamp': 1570900602, 'user': 'b57ad1fb396cfc18b8867fb2e08be723c2cdc2a6', 'cluster_label': 11}, {'id': '127af17b-e823-4d30-8227-00f5421bd48b-1549291309', 'latitude': -20.5362627, 'longitude': 47.2459749, 'timestamp': 1549291309, 'user': 'ca34bd51c4dc65cbc021cb27bcaa014ca082b8c4', 'cluster_label': 11}]
data = agi_repo.getLocationsBasedOnNewDataSchema()
res = clusterer.cluster_dataset(data['Nodes'], data['Properties'])
# if res is not None:
# print(res[11])
assert (res_old == res)
# time
res_old = clusterer.cluster_times(agi_repo.getTimesBasedOnNewDataSchema()['Nodes'])
data = agi_repo.getTimesBasedOnNewDataSchema()
res = clusterer.cluster_dataset(data['Nodes'], data['Properties'])
print(res_old[20])
print(res[20])
assert (res_old == res)
\ No newline at end of file
import yaml
from typing import Generator
### init logging ###
import logging
......@@ -9,13 +10,16 @@ LOGGER = logging.getLogger(__name__)
class ClusteringConfig:
'''Contains the configuration for the clustering algorithm defined in configs/clustering.yaml.'''
config_path = 'configs/clustering.yaml'
config: dict = None
def __init__(self):
self.config = self.load_config()
self.config = self._load_config()
def load_config(self) -> dict:
def _load_config(self) -> dict:
'''Loads the whole configuration from file.'''
config = None
with open(self.config_path, 'r') as stream:
......@@ -30,8 +34,12 @@ class ClusteringConfig:
def get_config(self):
return self.config
def get_layer_configs(self):
def get_layer_configs(self) -> Generator[dict, None, None]:
"""
Returns a generator for the individual layer configs.
Layer configs are dicts including a layer-name.
"""
for key, layer in self.config['layers'].items():
layer['layer-name'] = key
yield layer
import unittest
import sys
sys.path.insert(1, '../')
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering.clusterer import Clusterer
......@@ -72,11 +72,50 @@ class TestClusterer(unittest.TestCase):
self.assertEqual(3, len(locations))
self.assertHaveLabelsAsNewKey(locations, labels)
def test_cluster_locations_multInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [0,0,-1]
res = self.clusterer.cluster_locations(locations)
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertDictEqual(res, {0: [{'latitude': 1, 'longitude': 2, 'cluster_label': 0}, {'latitude': 2, 'longitude': 2, 'cluster_label': 0}], -1: [{'latitude': 20, 'longitude': 20, 'cluster_label': -1}]})
def test_cluster_times_multInput_correctlyLabeled(self):
times = [self.time(123), self.time(128), self.time(223)]
labels = [0,0,-1]
res = self.clusterer.cluster_times(times)
self.assertHaveLabelsAsNewKey(times, labels)
self.assertDictEqual(res, {0: [{'timestamp': 123, 'cluster_label': 0}, {'timestamp': 128, 'cluster_label': 0}], -1: [{'timestamp': 223, 'cluster_label': -1}]})
def test_cluster_dataset_locationsMultInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [0,0,-1]
res = self.clusterer.cluster_dataset(locations, ['latitude', 'longitude'])
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertDictEqual(res, {0: [{'latitude': 1, 'longitude': 2, 'cluster_label': 0}, {'latitude': 2, 'longitude': 2, 'cluster_label': 0}], -1: [{'latitude': 20, 'longitude': 20, 'cluster_label': -1}]})
def test_cluster_dataset_timesMultInput_correctlyLabeled(self):
times = [self.time(123), self.time(128), self.time(223)]
labels = [0,0,-1]
res = self.clusterer.cluster_dataset(times, ['timestamp'])
self.assertHaveLabelsAsNewKey(times, labels)
self.assertDictEqual(res, {0: [{'timestamp': 123, 'cluster_label': 0}, {'timestamp': 128, 'cluster_label': 0}], -1: [{'timestamp': 223, 'cluster_label': -1}]})
# helper methods:
def location(self, lat, long_) -> dict:
return {'latitude': lat, 'longitude':long_}
def time(self, ts) -> dict:
return {'timestamp': ts}
def assertHaveLabelsAsNewKey(self, locations, labels):
for i in range(len(locations)):
self.assertEqual(labels[i], locations[i]['cluster_label'])
......
import unittest
import sys
sys.path.insert(1, './')
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering.clustering_config import ClusteringConfig
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment