Commit e9d9abe1 authored by Alexander Lercher's avatar Alexander Lercher

[RoleStage] Adding human readable label to clusters #20

parent 76a26396
......@@ -12,16 +12,18 @@ class Cluster:
:param layer_name: The name of the layer in which the cluster is located
:param cluster_label: The label of the cluster unique for the layer
:param nodes: The individual nodes of the cluster
:param label: A human readable label
'''
def __init__(self, use_case: str = None, use_case_table: str = None, layer_name: str = None,
cluster_label: int = None, nodes: List[Dict] = None,
cluster_label: int = None, nodes: List[Dict] = None, label: str = None,
cluster_dict: Dict = None, from_db=False):
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.cluster_label = cluster_label
self.nodes = nodes
self.label = label
if cluster_dict is not None:
self.from_serializable_dict(cluster_dict, from_db)
......@@ -32,7 +34,8 @@ class Cluster:
"use_case_table": self.use_case_table,
"layer_name": self.layer_name,
"cluster_label": self.cluster_label,
"nodes": json.dumps(self.nodes) if for_db else self.nodes
"nodes": json.dumps(self.nodes) if for_db else self.nodes,
"label": self.label,
}
def from_serializable_dict(self, cluster_dict: Dict, from_db=False):
......@@ -42,6 +45,7 @@ class Cluster:
self.cluster_label = cluster_dict["cluster_label"]
self.nodes = json.loads(cluster_dict["nodes"]) \
if from_db else cluster_dict["nodes"]
self.label = cluster_dict["label"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
......
from processing.clustering.cluster_result import ClusterResultConverter, ClusterResult, ClusterResult1D, ClusterResult2D
from processing.clustering.clusterer import Clusterer
from typing import List, Dict, Any
class ClusterResult:
'''
Represents a single cluster from clustering.
:param nodes: The nodes contained in the cluster
:param label: A human readable label describing the cluster
'''
def __init__(self, nodes: List[Dict], label: str = "n.a."):
self.nodes = nodes
self.label = label
class ClusterResult1D(ClusterResult):
def __init__(self, nodes, lower_bound, upper_bound):
super().__init__(nodes, f"{lower_bound} -- {upper_bound}")
class ClusterResult2D(ClusterResult):
def __init__(self, nodes, center):
super().__init__(nodes, center)
class ClusterResultConverter:
def __init__(self):
pass
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
def convert_to_cluster_results(self, cluster_groups:Dict[Any, List[Dict]], features:List[str]) -> Dict[Any, ClusterResult]:
if len(features) == 1:
return self._convert_to_cluster_results_1d(cluster_groups, features[0])
elif len(features) == 2:
return self._convert_to_cluster_results_2d(cluster_groups, features)
else:
return self._convert_to_cluster_results(cluster_groups, features)
def _convert_to_cluster_results(self, cluster_groups:Dict[Any, List[Dict]], features:List[str]) -> Dict[Any, ClusterResult]:
'''Returns the clustering results as they are, converted to a list of ClusterResults.'''
new_results = {}
for key in cluster_groups:
nodes_in_cluster: List[Dict] = cluster_groups[key]
new_results[key] = ClusterResult(nodes_in_cluster)
return new_results
def _convert_to_cluster_results_1d(self, cluster_groups:Dict[Any, List[Dict]], feature:str) -> Dict[Any, ClusterResult1D]:
'''Returns the clustering results with an added label for the 1d lower and upper bound.'''
new_results = {}
for key in cluster_groups:
nodes_in_cluster: List[Dict] = cluster_groups[key]
# choose the first node's value as min and max
min_ = self._convert_feature_to_float(nodes_in_cluster[0][feature])
max_ = self._convert_feature_to_float(nodes_in_cluster[0][feature])
for node in nodes_in_cluster:
float_feature_value = self._convert_feature_to_float(node[feature])
if min_ > float_feature_value:
min_ = float_feature_value
if max_ < float_feature_value:
max_ = float_feature_value
new_results[key] = ClusterResult1D(nodes_in_cluster, min_, max_)
return new_results
def _convert_to_cluster_results_2d(self, cluster_groups:Dict[Any, List[Dict]], features:List[str]) -> Dict[Any, ClusterResult2D]:
'''Returns the clustering results with an added label for the 2d center.'''
new_results = {}
for key in cluster_groups:
nodes_in_cluster: List[Dict] = cluster_groups[key]
x = [self._convert_feature_to_float(node[features[0]]) for node in nodes_in_cluster]
y = [self._convert_feature_to_float(node[features[1]]) for node in nodes_in_cluster]
centroid = (sum(x) / len(nodes_in_cluster), sum(y) / len(nodes_in_cluster))
new_results[key] = ClusterResult2D(nodes_in_cluster, str(centroid))
return new_results
......@@ -2,10 +2,10 @@ import json
import numpy as np
import matplotlib.pyplot as plt
from sklearn.cluster import OPTICS
from typing import List, Dict, Any, TypeVar
from typing import List, Dict, Any
from processing.clustering.cluster_result import ClusterResultConverter, ClusterResult
T = TypeVar('T')
ClusterGroup = Dict[Any, List[Dict]]
class Clusterer:
'''
......@@ -17,6 +17,7 @@ class Clusterer:
'''
def __init__(self, min_points=5):
self.min_points = min_points
self.cluster_result_converter = ClusterResultConverter()
def create_labels(self, features:np.ndarray) -> List[int]:
'''Creates labels for the items based on OPTICS.'''
......@@ -31,11 +32,14 @@ class Clusterer:
return labels.tolist()
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
def _extract_features(self, dataset: List[Dict], features:List[str]) -> np.ndarray:
'''Extracts the feature values from the dataset into a np array with same order as original dataset.'''
extracted_features = []
for data in dataset:
entry = [float(data[feature] if data[feature] is not "" else 0) for feature in features]
entry = [self._convert_feature_to_float(data[feature]) for feature in features]
extracted_features.append(entry)
return np.asarray(extracted_features)
......@@ -53,21 +57,22 @@ class Clusterer:
continue
dataset[i]['cluster_label'] = labels[i]
def group_by_clusters(self, dataset:List[Dict], labels:List[Any]) -> ClusterGroup:
def group_by_clusters(self, dataset:List[Dict], labels:List[Any]) -> Dict[Any, List[Dict]]:
clusters = {}
# TODO optimize by iterating through dataset only once
for label in labels:
clusters[label] = [ds for ds in dataset if ds['cluster_label'] == label]
return clusters
def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> ClusterGroup:
def cluster_dataset(self, dataset:List[Dict], features:List[str]) -> Dict[Any, ClusterResult]:
'''
Returns the identified clusters containing a subset of nodes from the dataset.
:param dataset: The nodes to assign to clusters
:param features: The feature names of the nodes to use for clustering
:returns: A dictionary of clusters, where each value is a non-empty subset of dataset if dataset was not empty
:returns: A dictionary of cluster results, where each value is a non-empty subset of dataset if dataset was not empty
'''
arr = self._extract_features(dataset, features)
......@@ -75,5 +80,8 @@ class Clusterer:
self.label_dataset(dataset, labels)
return self.group_by_clusters(dataset, labels)
cluster_groups: Dict[Any, List[Dict]] = self.group_by_clusters(dataset, labels)
res: Dict[Any, ClusterResult] = self.cluster_result_converter.convert_to_cluster_results(cluster_groups, features)
return res
......@@ -6,9 +6,9 @@ if os.path.exists(modules_path):
import json
from db.entities import Layer, Cluster
from typing import List, Dict, Tuple
from typing import List, Dict, Tuple, Any
from db.repository import Repository
from processing.clustering.clusterer import Clusterer
from processing.clustering import Clusterer, ClusterResult
repo = Repository()
......@@ -19,7 +19,7 @@ def run_generic_clustering():
all_layers:List[Layer] = repo.get_layers()
for layer in all_layers:
print(f"Clustering {layer.use_case}, {layer.layer_name}.")
print(f"Clustering {layer.use_case}//{layer.use_case_table}//{layer.layer_name}.")
if layer.properties is None or len(layer.properties) == 0:
print("skipping")
......@@ -33,15 +33,17 @@ def run_generic_clustering():
def run_clustering_for_layer(layer: Layer) -> List[Cluster]:
nodes = repo.get_layer_nodes(layer.use_case, layer.layer_name)
nodes = repo.get_layer_nodes(layer.use_case, layer.use_case_table, layer.layer_name)
clusterer = Clusterer()
res = clusterer.cluster_dataset(
res: Dict[Any, ClusterResult] = clusterer.cluster_dataset(
nodes,
layer.properties
)
return [Cluster(layer.use_case, layer.layer.layer_name, key, value) for key, value in res.items()]
return [Cluster(layer.use_case, layer.use_case_table, layer.layer_name,
cluster_label=key, nodes=cluster_result.nodes, label=cluster_result.label)
for key, cluster_result in res.items()]
def store_generic_clusters(clusters: List[Cluster]):
......
......@@ -12,8 +12,10 @@ import json
class TestCluster(unittest.TestCase):
def test_init_Cluster(self):
c = Cluster('layer1', 1, [1, 2, 3])
c = Cluster('debug', 'debug-table1', 'layer1', 1, [1, 2, 3])
self.assertEqual('debug', c.use_case)
self.assertEqual('debug-table1', c.use_case_table)
self.assertEqual(1, c.cluster_label)
self.assertEqual([1, 2, 3], c.nodes)
......
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering import ClusterResultConverter, ClusterResult
from typing import List, Dict, Any
class TestClusterResult(unittest.TestCase):
converter:ClusterResultConverter = None
def setUp(self):
self.converter = ClusterResultConverter()
def test_result_undefined_feature(self):
cluster_groups = self._get_some_cluster_groups_1d()
cluster_res = self.converter.convert_to_cluster_results(
cluster_groups=cluster_groups,
features=[]
)
self.assert_correct_cluster_result_len(cluster_groups, cluster_res)
self.assert_correct_cluster_result_labels(['n.a.','n.a.','n.a.'], cluster_res)
def test_result_1d_feature(self):
cluster_groups = self._get_some_cluster_groups_1d()
cluster_res = self.converter.convert_to_cluster_results(
cluster_groups=cluster_groups,
features=['v']
)
self.assert_correct_cluster_result_len(cluster_groups, cluster_res)
self.assert_correct_cluster_result_labels(['-1.0 -- 1.0','10.0 -- 11.0','2.0 -- 2.0'], cluster_res)
def test_result_2d_features(self):
cluster_groups = self._get_some_cluster_groups_2d()
cluster_res = self.converter.convert_to_cluster_results(
cluster_groups=cluster_groups,
features=['v', 'u']
)
self.assert_correct_cluster_result_len(cluster_groups, cluster_res)
self.assert_correct_cluster_result_labels([str((0.0,0.0)), str((10.5,10.5)), str((2.0,2.0)), str((3.0,6.0))], cluster_res)
#region Custom Assertions
def assert_correct_cluster_result_len(self, expected: 'original dict of lists', actual: Dict[Any, ClusterResult]):
self.assertEqual(len(expected), len(actual))
for i in range(len(expected)):
self.assertEqual(len(expected[i]), len(actual[i].nodes))
self.assertEqual(expected[i], actual[i].nodes)
def assert_correct_cluster_result_labels(self, expected: List[str], actual: Dict[Any, ClusterResult]):
self.assertEqual(len(expected), len(actual))
for i in range(len(expected)):
self.assertEqual(expected[i], actual[i].label)
#endregion Custom Assertions
#region helper methods
def _get_some_cluster_groups_1d(self):
return {
0: [{'v':'0'}, {'v':'1'}, {'v':'-1'}],
1: [{'v':'10'}, {'v':'11'}],
2: [{'v':'2'}],
}
def _get_some_cluster_groups_2d(self):
return {
0: [{'v':'0', 'u':'0'}, {'v':'1', 'u':'1'}, {'v':'-1', 'u':'-1'}],
1: [{'v':'10', 'u':'10'}, {'v':'11', 'u':'11'}],
2: [{'v':'2', 'u':'2'}],
3: [{'v':'7', 'u':'7'}, {'v':'5', 'u':'3'}, {'v':'-3', 'u':'8'}],
}
#endregion helper methods
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
......@@ -4,8 +4,9 @@ for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.clustering.clusterer import Clusterer
from processing.clustering import Clusterer, ClusterResult
import numpy as np
from typing import List, Dict, Any
class TestClusterer(unittest.TestCase):
clusterer:Clusterer = None
......@@ -178,14 +179,14 @@ class TestClusterer(unittest.TestCase):
for i in range(len(locations)):
self.assertEqual(labels[i], locations[i]['cluster_label'])
def assertClusteringResult(self, expected, actual):
def assertClusteringResult(self, expected: Dict[Any, List], actual: Dict[Any, ClusterResult]):
self.assertEqual(len(expected), len(actual))
for k in expected.keys():
if k not in actual:
self.fail(f"Cluster key ({k}, {type(k)}) not in result.")
self.assertListEqual(expected[k], actual[k])
self.assertListEqual(expected[k], actual[k].nodes)
#endregion helper methods
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment