Commit fa778cbc authored by Alexander Lercher's avatar Alexander Lercher

Updated/Added tests

parent ce3886b2
......@@ -23,14 +23,15 @@ class Clusterer:
if features is None or len(features) == 0:
return features # trash in trash out
dbsc = OPTICS(min_samples=self.min_points)
dbsc = dbsc.fit(features)
labels = dbsc.labels_
optics = OPTICS(min_samples=self.min_points)
optics = optics.fit(features)
labels = optics.labels_
return labels.tolist()
def _extract_features(self, dataset: List[Dict], features:List[str]) -> np.ndarray:
'''Extracts the feature values from the dataset into a np array with same order as original dataset.'''
# TODO single input
extracted_features = []
for data in dataset:
entry = [float(data[feature]) for feature in features]
......@@ -38,7 +39,7 @@ class Clusterer:
return np.asarray(extracted_features)
def label_dataset(self, dataset:List[Dict], labels:List[Any]) -> List:
def label_dataset(self, dataset:List[Dict], labels:List[Any]):
'''Adds the labels to the elements of the dataset at the same position. The new key is called cluster_label.'''
if dataset is None or labels is None:
return
......@@ -52,8 +53,6 @@ class Clusterer:
dataset[i]['cluster_label'] = labels[i]
def group_by_clusters(self, dataset:List[Dict], labels:List[Any]) -> ClusterGroup:
self.label_dataset(dataset, labels)
clusters = {}
for label in labels:
clusters[label] = [ds for ds in dataset if ds['cluster_label'] == label]
......@@ -72,6 +71,8 @@ class Clusterer:
arr = self._extract_features(dataset, features)
labels = self.create_labels(arr)
self.label_dataset(dataset, labels)
return self.group_by_clusters(dataset, labels)
......@@ -3,8 +3,7 @@ import sys
sys.path.insert(1, '../')
# python -m unittest discover
from db.entities.cluster import Cluster
from db.entities import TimeCluster, LocationCluster
from db.entities import Cluster
from datetime import date, datetime
import json
......@@ -12,141 +11,11 @@ import json
class TestCluster(unittest.TestCase):
def test_init_Cluster(self):
c = Cluster(1, [1, 2, 3])
c = Cluster('layer1', 1, [1, 2, 3])
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
class TestLocationCluster(unittest.TestCase):
def setUp(self):
self.c = LocationCluster(1, [1, 2, 3])
def test_init_individualArguments(self):
c = LocationCluster(1, [1, 2, 3])
self.assertEqual('1', c.id)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': [1, 2, 3]}
c = LocationCluster(location_dict=dict_)
self.assertEqual('123', c.id)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument_fromDb(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': '[1, 2, 3]'}
c = LocationCluster(location_dict=dict_, from_db=True)
self.assertEqual('123', c.id)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_to_serializable_dict_noDb(self):
c_dict = self.c.to_serializable_dict()
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, c_dict['nodes'])
def test_from_serializable_dict_noDb(self):
new_c = LocationCluster()
new_c.from_serializable_dict(self.c.to_serializable_dict())
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
def test_to_serializable_dict_db_jsonNodes(self):
c_dict = self.c.to_serializable_dict(for_db=True)
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, json.loads(c_dict['nodes']))
def test_from_serializable_dict_fromDb(self):
new_c = LocationCluster()
new_c.from_serializable_dict(
self.c.to_serializable_dict(for_db=True), from_db=True)
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
class TestTimeCluster(unittest.TestCase):
def setUp(self):
self.date_ = date(2020, 1, 1)
self.c = TimeCluster(self.date_, 14, 1, [1, 2, 3])
def test_init_individualArguments(self):
c = TimeCluster(self.date_, 14, 1, [1, 2, 3])
self.assertEqual(f'{self.date_}-14-1', c.id)
self.assertEqual(self.date_, c.date)
self.assertEqual(14, c.hour)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': [1, 2, 3],
'date': str(self.date_), 'hour': 14}
c = TimeCluster(time_dict=dict_)
self.assertEqual('123', c.id)
self.assertEqual(self.date_, c.date)
self.assertEqual(14, c.hour)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_init_dictArgument_fromDb(self):
dict_ = {'id': '123', 'cluster_label': 1, 'nodes': '[1, 2, 3]',
'date': str(self.date_), 'hour': 14}
c = TimeCluster(time_dict=dict_, from_db=True)
self.assertEqual('123', c.id)
self.assertEqual(self.date_, c.date)
self.assertEqual(14, c.hour)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
def test_to_serializable_dict_noDb(self):
c_dict = self.c.to_serializable_dict()
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, c_dict['nodes'])
self.assertEqual(self.c.date, datetime.strptime(
c_dict['date'], '%Y-%m-%d').date())
self.assertEqual(self.c.hour, c_dict['hour'])
def test_from_serializable_dict_noDb(self):
new_c = TimeCluster()
new_c.from_serializable_dict(self.c.to_serializable_dict())
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
def test_to_serializable_dict_fromDb_jsonNodes(self):
c_dict = self.c.to_serializable_dict(for_db=True)
self.assertEqual(self.c.id, c_dict['id'])
self.assertEqual(self.c.cluster_label, c_dict['cluster_label'])
self.assertEqual(self.c.nodes, json.loads(c_dict['nodes']))
self.assertEqual(self.c.date, datetime.strptime(
c_dict['date'], '%Y-%m-%d').date())
self.assertEqual(self.c.hour, c_dict['hour'])
def test_from_serializable_dict_fromDb(self):
new_c = TimeCluster()
new_c.from_serializable_dict(
self.c.to_serializable_dict(for_db=True), from_db=True)
self.assertEqual(self.c.id, new_c.id)
self.assertEqual(str(self.c), str(new_c))
if __name__ == '__main__':
unittest.main()
......@@ -5,12 +5,39 @@ for path in ['../', './']:
# python -m unittest discover
from processing.clustering.clusterer import Clusterer
import numpy as np
class TestClusterer(unittest.TestCase):
clusterer:Clusterer = None
def setUp(self):
self.clusterer = Clusterer(epsilon=10, min_points=2)
self.clusterer = Clusterer(min_points=2)
#region _extract_features
def test_extract_features_emptyDataset_noResults(self):
features = self.clusterer._extract_features(dataset=[], features=['test'])
np.testing.assert_equal(np.asarray([]), features)
def test_extract_features_emptyFeatures_singleEmptyResult(self):
features = self.clusterer._extract_features(dataset=[{'a':1, 'b':2}], features=[])
np.testing.assert_equal(np.asarray([[]]), features)
def test_extract_features_singleFeature_Projection(self):
features = self.clusterer._extract_features(dataset=[{'a':1, 'b':2}], features=['a'])
np.testing.assert_equal(np.asarray([[1]]), features)
def test_extract_features_singleFeature_Projection_2(self):
features = self.clusterer._extract_features(dataset=[{'a':1, 'b':2}, {'a':3, 'b':4}], features=['a'])
np.testing.assert_equal(np.asarray([[1], [3]]), features)
def test_extract_features_multFeature_Projection(self):
features = self.clusterer._extract_features(dataset=[{'a':0, 'b':2, 'c':4}, {'a':1, 'b':3, 'c':5}], features=['a','c'])
np.testing.assert_equal(np.asarray([[0,4], [1,5]]), features)
#endregion _extract_features
#region create_labels
def test_create_labels_noneInput_noneOutput(self):
labels = self.clusterer.create_labels(None)
......@@ -19,51 +46,65 @@ class TestClusterer(unittest.TestCase):
def test_create_labels_emptyInput_emptyOutput(self):
labels = self.clusterer.create_labels([])
self.assertEqual([], labels)
def test_create_labels_singleInput_singleCluster(self):
features = self.clusterer.extract_location_features([self.location(1,2)])
labels = self.clusterer.create_labels(features)
self.assertEqual(1, len(labels))
def test_create_labels_singleInput_error(self):
clusterer = Clusterer(min_points=2)
features = clusterer._extract_features(dataset=[self.location(1,2)], features=self.get_location_features())
with self.assertRaises(ValueError):
# Fails because (min_pts > |input elements|)
clusterer.create_labels(features)
def test_create_labels_singleInput_error_2(self):
clusterer = Clusterer(min_points=1)
features = clusterer._extract_features(dataset=[self.location(1,2)], features=self.get_location_features())
with self.assertRaises(ValueError):
# Fails because fitting does not work internally
clusterer.create_labels(features)
def test_create_labels_nearInputs_singleCluster(self):
locations = [self.location(1,2), self.location(2,2)]
features = self.clusterer.extract_location_features(locations)
features = self.clusterer._extract_features(dataset=locations, features=self.get_location_features())
labels = self.clusterer.create_labels(features)
self.assertEqual(2, len(labels))
self.assertEqual(labels[0], labels[1])
def test_create_labels_nearInputs_twoClusters(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
locations = [self.location(1,2), self.location(2,2), self.location(20,20), self.location(20,23)]
features = self.clusterer.extract_location_features(locations)
features = self.clusterer._extract_features(dataset=locations, features=self.get_location_features())
labels = self.clusterer.create_labels(features)
self.assertEqual(3, len(labels))
self.assertEqual(4, len(labels))
self.assertEqual(labels[0], labels[1])
self.assertEqual(labels[2], labels[3])
self.assertNotEqual(labels[0], labels[2])
def test_label_locations_NoneLocations_NoException(self):
#endregion create_labels
#region label_dataset
def test_label_dataset_NoneLocations_NoException(self):
self.clusterer.label_dataset(None, [])
def test_label_locations_NoneLabels_NoException(self):
def test_label_dataset_NoneLabels_NoException(self):
self.clusterer.label_dataset([], None)
def test_label_locations_emptyInput_emptyOutput(self):
def test_label_dataset_emptyInput_emptyOutput(self):
locations = []
self.clusterer.label_dataset(locations, [])
self.assertEqual(0, len(locations))
def test_label_locations_diffInputLengths_ValueError_1(self):
def test_label_dataset_diffInputLengths_ValueError_1(self):
with self.assertRaises(ValueError):
self.clusterer.label_dataset([], [1])
def test_label_locations_diffInputLengths_ValueError_2(self):
def test_label_dataset_diffInputLengths_ValueError_2(self):
with self.assertRaises(ValueError):
self.clusterer.label_dataset([self.location(1,2)], [])
def test_label_locations_multInput_correctlyLabeled(self):
def test_label_dataset_multInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [17,2,20]
......@@ -72,53 +113,76 @@ class TestClusterer(unittest.TestCase):
self.assertEqual(3, len(locations))
self.assertHaveLabelsAsNewKey(locations, labels)
def test_cluster_locations_multInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [0,0,-1]
res = self.clusterer.cluster_locations(locations)
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertDictEqual(res, {0: [{'latitude': 1, 'longitude': 2, 'cluster_label': 0}, {'latitude': 2, 'longitude': 2, 'cluster_label': 0}], -1: [{'latitude': 20, 'longitude': 20, 'cluster_label': -1}]})
def test_cluster_times_multInput_correctlyLabeled(self):
times = [self.time(123), self.time(128), self.time(223)]
labels = [0,0,-1]
#endregion label_dataset
res = self.clusterer.cluster_times(times)
self.assertHaveLabelsAsNewKey(times, labels)
self.assertDictEqual(res, {0: [{'timestamp': 123, 'cluster_label': 0}, {'timestamp': 128, 'cluster_label': 0}], -1: [{'timestamp': 223, 'cluster_label': -1}]})
#region cluster_dataset
def test_cluster_dataset_locationsMultInput_correctlyLabeled(self):
locations = [self.location(1,2), self.location(2,2), self.location(20,20)]
labels = [0,0,-1]
locations = [self.location(1,2), self.location(2,2), self.location(20,20), self.location(20,21)]
labels = [0,0,1,1]
exp_res = {0:locations[0:2], 1:locations[2:4]}
res = self.clusterer.cluster_dataset(locations, self.get_location_features())
res = self.clusterer.cluster_dataset(locations, ['latitude', 'longitude'])
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertDictEqual(res, {0: [{'latitude': 1, 'longitude': 2, 'cluster_label': 0}, {'latitude': 2, 'longitude': 2, 'cluster_label': 0}], -1: [{'latitude': 20, 'longitude': 20, 'cluster_label': -1}]})
self.assertClusteringResult(exp_res, res)
def test_cluster_dataset_timesMultInput_correctlyLabeled(self):
times = [self.time(123), self.time(128), self.time(223)]
labels = [0,0,-1]
times = [self.time(123), self.time(128), self.time(223), self.time(225)]
labels = [0,0,1,1]
exp_res = {0:times[0:2], 1:times[2:4]}
res = self.clusterer.cluster_dataset(times, ['timestamp'])
res = self.clusterer.cluster_dataset(times, self.get_time_features())
self.assertHaveLabelsAsNewKey(times, labels)
self.assertDictEqual(res, {0: [{'timestamp': 123, 'cluster_label': 0}, {'timestamp': 128, 'cluster_label': 0}], -1: [{'timestamp': 223, 'cluster_label': -1}]})
self.assertClusteringResult(exp_res, res)
def test_cluster_dataset_locationsMultInput_correctlyLabeled_2(self):
return
# TODO why is the single location added to the last cluster?
clusterer = Clusterer(3)
locations = [self.location(1,2), self.location(2,2), self.location(2,2), self.location(20,20), self.location(20,21), self.location(20,20), self.location(400,1000), self.location(200,1), self.location(200,2), self.location(201,-1)]
labels = [0,0,1,1]
exp_res = {0:locations[0:2], 1:locations[2:4]}
res = clusterer.cluster_dataset(locations, self.get_location_features())
print(res)
self.assertHaveLabelsAsNewKey(locations, labels)
self.assertClusteringResult(exp_res, res)
#endregion cluster_dataset
#region helper methods
# helper methods:
def location(self, lat, long_) -> dict:
return {'latitude': lat, 'longitude':long_}
def get_location_features(self):
return ['latitude', 'longitude']
def time(self, ts) -> dict:
return {'timestamp': ts}
def get_time_features(self):
return ['timestamp']
def assertHaveLabelsAsNewKey(self, locations, labels):
self.assertEqual(len(labels), len(locations))
for i in range(len(locations)):
self.assertEqual(labels[i], locations[i]['cluster_label'])
def assertClusteringResult(self, expected, actual):
self.assertEqual(len(expected), len(actual))
for k in expected.keys():
if k not in actual:
self.fail(f"Cluster key ({k}, {type(k)}) not in result.")
self.assertListEqual(expected[k], actual[k])
#endregion helper methods
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering.clustering_config import ClusteringConfig
class TestClusteringConfig(unittest.TestCase):
def setUp(self):
self.clustering_config = ClusteringConfig()
def test_get_layer_configs_noneInput_noneOutput(self):
for layer_config in self.clustering_config.get_layer_configs():
self.assertIn('layer-name', layer_config)
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.user_graph_generator import UserGraphGenerator
import networkx as nx
class TestUserGraphGenerator(unittest.TestCase):
def setUp(self):
self.user_graph = UserGraphGenerator()
def test_count_edges_oneNode(self):
count_res = {}
self.assertEqual(count_res, self.user_graph.count_edges([1]))
def test_count_edges_threeDistinctNodes_threeEdges(self):
count_res = {(1, 2): 1, (1, 3): 1, (2, 3): 1}
self.assertEqual(count_res, self.user_graph.count_edges([1, 2, 3]))
def test_count_edges_twoNodesWithDups_notReflexive(self):
count_res = {}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1]))
def test_count_edges_threeNodesWithDups_countGtOne_notReflexive(self):
count_res = {(1, 3): 2}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1, 3]))
def test_count_edges_fourNodesWithDups_countGtOne_notReflexive(self):
count_res = {(1, 3): 2, (1, 4): 2, (3, 4): 1}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1, 3, 4]))
def test_count_edges_fourStringNodesWithDups_countGtOne_notReflexive(self):
count_res = {('test', 'test2'): 2, ('test', '4'): 2, ('test2', '4'): 1}
self.assertEqual(count_res,
self.user_graph.count_edges(['test', 'test', 'test2', '4']))
def test_count_edges_fourDistinctStringNodes_fullyConnectedEdges(self):
count_res = {
('1', '2'): 1, ('1', '3'): 1, ('1', '4'): 1,
('2', '3'): 1, ('2', '4'): 1, ('3', '4'): 1
}
self.assertEqual(count_res,
self.user_graph.count_edges(['1', '2', '3', '4']))
def test_create_edges_with_weights_SingleEdge(self):
counts = {('a', 'b'): 1}
edge_result = [('a', 'b', {'weight': 1})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_edges_with_weights_SingleEdgeWeightTwo(self):
counts = {('a', 'b'): 2}
edge_result = [('a', 'b', {'weight': 2})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_edges_with_weights_TwoEdgesWithWeights(self):
counts = {('a', 'b'): 2, ('b', 'c'): 1}
edge_result = [('a', 'b', {'weight': 2}), ('b', 'c', {'weight': 1})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_graph_from_nodes_singleNode(self):
nodes = [1]
edges = []
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_twoDistinctNodes(self):
nodes = [1, 2]
edges = [(1, 2, {'weight': 1})]
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_threeDistinctNodes(self):
nodes = [1, 2, 3]
edges = [(1, 2, {'weight': 1}), (1, 3, {'weight': 1}),
(2, 3, {'weight': 1})]
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_threeNodesWithDuplicates_TwoNodes_EdgesWithAccordingWeight(self):
nodes = [1, 1, 3]
edges = [(1, 3, {'weight': 2})]
self.assertGraph(list(set(nodes)), edges,
self.user_graph.create_graph_from_nodes(nodes))
# unittest custom assertions
def assertGraph(self, nodes, edges, g: nx.Graph):
self.assertEqual(len(nodes), g.number_of_nodes())
self.assertEqual(len(edges), g.number_of_edges())
for i in range(len(nodes)):
self.assertEqual(nodes[i], list(g.nodes)[i])
for i in range(len(edges)):
graph_edge = list(g.edges)[i]
first, second, weight = edges[i]
self.assertEqual((first, second), graph_edge)
self.assertEqual(weight, g.edges[graph_edge])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment