Commit d728e14e authored by Alexander Lercher's avatar Alexander Lercher

Improved tests for clustering

parent ad4c67ae
...@@ -22,6 +22,8 @@ class Clusterer: ...@@ -22,6 +22,8 @@ class Clusterer:
'''Creates labels for the items based on OPTICS.''' '''Creates labels for the items based on OPTICS.'''
if features is None or len(features) == 0: if features is None or len(features) == 0:
return features # trash in trash out return features # trash in trash out
if len(features) == 1:
return [-1]
optics = OPTICS(min_samples=self.min_points) optics = OPTICS(min_samples=self.min_points)
optics = optics.fit(features) optics = optics.fit(features)
...@@ -31,7 +33,6 @@ class Clusterer: ...@@ -31,7 +33,6 @@ class Clusterer:
def _extract_features(self, dataset: List[Dict], features:List[str]) -> np.ndarray: def _extract_features(self, dataset: List[Dict], features:List[str]) -> np.ndarray:
'''Extracts the feature values from the dataset into a np array with same order as original dataset.''' '''Extracts the feature values from the dataset into a np array with same order as original dataset.'''
# TODO single input
extracted_features = [] extracted_features = []
for data in dataset: for data in dataset:
entry = [float(data[feature]) for feature in features] entry = [float(data[feature]) for feature in features]
......
...@@ -46,19 +46,22 @@ class TestClusterer(unittest.TestCase): ...@@ -46,19 +46,22 @@ class TestClusterer(unittest.TestCase):
def test_create_labels_emptyInput_emptyOutput(self): def test_create_labels_emptyInput_emptyOutput(self):
labels = self.clusterer.create_labels([]) labels = self.clusterer.create_labels([])
self.assertEqual([], labels) self.assertEqual([], labels)
def test_create_labels_singleInput_error(self): def test_create_labels_singleInput_noise(self):
clusterer = Clusterer(min_points=2)
features = clusterer._extract_features(dataset=[self.location(1,2)], features=self.get_location_features())
with self.assertRaises(ValueError):
# Fails because (min_pts > |input elements|)
clusterer.create_labels(features)
def test_create_labels_singleInput_error_2(self):
clusterer = Clusterer(min_points=1) clusterer = Clusterer(min_points=1)
features = clusterer._extract_features(dataset=[self.location(1,2)], features=self.get_location_features()) features = clusterer._extract_features(dataset=[self.location(1,2)], features=self.get_location_features())
labels = clusterer.create_labels(features)
self.assertEqual(1, len(labels))
self.assertEqual(-1, labels[0])
def test_create_labels_tooSmallInputForMinPtsHyperparameter_error(self):
clusterer = Clusterer(min_points=3)
features = clusterer._extract_features(dataset=[self.location(1,2), self.location(1,2)], features=self.get_location_features())
with self.assertRaises(ValueError): with self.assertRaises(ValueError):
# Fails because fitting does not work internally # Fails because (min_pts > |input elements|)
clusterer.create_labels(features) clusterer.create_labels(features)
def test_create_labels_nearInputs_singleCluster(self): def test_create_labels_nearInputs_singleCluster(self):
...@@ -138,15 +141,17 @@ class TestClusterer(unittest.TestCase): ...@@ -138,15 +141,17 @@ class TestClusterer(unittest.TestCase):
self.assertClusteringResult(exp_res, res) self.assertClusteringResult(exp_res, res)
def test_cluster_dataset_locationsMultInput_correctlyLabeled_2(self): def test_cluster_dataset_locationsMultInput_correctlyLabeled_2(self):
return
# TODO why is the single location added to the last cluster?
clusterer = Clusterer(3) clusterer = Clusterer(3)
locations = [self.location(1,2), self.location(2,2), self.location(2,2), self.location(20,20), self.location(20,21), self.location(20,20), self.location(400,1000), self.location(200,1), self.location(200,2), self.location(201,-1)] locations = [
labels = [0,0,1,1] self.location(1,2), self.location(2,2), self.location(2,2),
exp_res = {0:locations[0:2], 1:locations[2:4]} self.location(20,20), self.location(20,21), self.location(20,20),
self.location(50,50),
self.location(50,1), self.location(50,2), self.location(50,-1)
]
labels = [0,0,0,1,1,1,-1,2,2,2]
exp_res = {0:locations[0:3], 1:locations[3:6], -1:locations[6:7], 2:locations[7:10]}
res = clusterer.cluster_dataset(locations, self.get_location_features()) res = clusterer.cluster_dataset(locations, self.get_location_features())
print(res)
self.assertHaveLabelsAsNewKey(locations, labels) self.assertHaveLabelsAsNewKey(locations, labels)
self.assertClusteringResult(exp_res, res) self.assertClusteringResult(exp_res, res)
......
# clustering of generated nodes
import sys
import os
modules_path = './'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import matplotlib.pyplot as plt
import sklearn.datasets
import numpy as np
from processing.clustering.clusterer import Clusterer
# parameters for data generation
N_SAMPLES = 20
N_FEATURES = 2
N_CENTERS = 3
STD_DEVIATION = 1.0
def show_generated_data(ax, nodes, labels):
distinct_colors = plt.cm.rainbow(np.linspace(0, 1, len(set(labels))))
colors = [distinct_colors[label] for label in labels]
ax.set_title('Generated Dataset')
ax.set_xlabel('Feature 1')
ax.set_ylabel('Feature 2')
ax.scatter(nodes[:,0], nodes[:,1], c=colors)
def show_clustering_result(ax, min_pts, clusters: dict):
labels = clusters.keys()
# flatten values in dict
nodes = [node for subset in clusters.values() for node in subset]
if -1 in labels:
# clustering contains noise, add them in black
distinct_colors = plt.cm.rainbow(np.linspace(0, 1, len(set(labels))-1))
distinct_colors = np.append(distinct_colors, [[0,0,0,1]], axis=0)
else:
distinct_colors = plt.cm.rainbow(np.linspace(0, 1, len(set(labels))))
colors = [distinct_colors[node['cluster_label']] for node in nodes]
ax.set_title(f'Clustering Result with MinPts={min_pts}')
ax.set_xlabel('Feature 1')
ax.set_ylabel('Feature 2')
ax.scatter( [n['1'] for n in nodes],
[n['2'] for n in nodes],
c=colors)
def run_clustering(min_points, dataset):
clusterer = Clusterer(min_points=min_points)
return clusterer.cluster_dataset(
dataset=dataset,
features=['1','2']
)
if __name__ == '__main__':
fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2)
fig.tight_layout(pad=3.0)
nodes, labels = sklearn.datasets.make_blobs(n_samples=20, n_features=N_FEATURES, centers=[[5,5]], cluster_std=1)
nodes2, labels2 = sklearn.datasets.make_blobs(n_samples=20, n_features=N_FEATURES, centers=[[30,5]], cluster_std=5)
nodes = np.append(nodes, nodes2, axis=0)
labels = np.append(labels, labels2+1)
show_generated_data(ax1, nodes, labels)
for min_pts, ax in zip([5, 10, 15], [ax2, ax3, ax4]):
dataset = [{'1':n[0], '2':n[1]} for n in nodes]
clusters = run_clustering(min_pts, dataset)
show_clustering_result(ax, min_pts, clusters)
plt.show()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment