Commit 2cacde05 authored by Alexander Lercher's avatar Alexander Lercher

Added Community Prediction preparation and training

Copied from https://github.com/Alexx882/community-prediction (private repo)
parent 4b461053
from entities.timewindow import TimeWindow
from entities.cluster import Cluster
from entities.layer import Layer
\ No newline at end of file
# from __future__ import annotations
from typing import Dict, List, Iterable, Any
from entities.timewindow import TimeWindow
import numpy as np
from processing import ClusterMetricsCalculatorFactory
class Cluster:
'''A cluster from one time window containing all metrics used for machine learning.'''
def __init__(self, time_window_id: Any, cluster_id: Any, cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int):
self.time_window_id = time_window_id
self.cluster_id = cluster_id
metrics_calculator = ClusterMetricsCalculatorFactory.create_metrics_calculator(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
self.size = metrics_calculator.get_size()
self.std_dev = metrics_calculator.get_standard_deviation()
self.scarcity = metrics_calculator.get_scarcity()
self.importance1 = metrics_calculator.get_importance1()
self.importance2 = metrics_calculator.get_importance2()
def get_time_info(self) -> int:
'''Returns the week of the time tuple str, eg. 25 for "(2014, 25)".'''
str_tuple = self.time_window_id
return int(str_tuple.split(',')[1].strip()[:-1])
def __repr__(self):
return str(self.__dict__)
def __str__(self):
return f"Cluster({self.time_window_id}, {self.cluster_id}, " \
f"{self.size}, {self.std_dev}, {self.scarcity}, " \
f"{self.importance1}, {self.importance2})"
@staticmethod
def create_multiple_from_time_window(time_window: TimeWindow, cluster_feature_names: List[str]) -> Iterable['Cluster']:
total_layer_nodes = sum([len(nodes) for nodes in time_window.clusters.values()])
layer_diversity = len([nodes for nodes in time_window.clusters.values() if len(nodes) > 0])
for cluster_nr, cluster_nodes in time_window.clusters.items():
yield Cluster(time_window.time, cluster_nr, cluster_nodes, cluster_feature_names, total_layer_nodes, layer_diversity)
@staticmethod
def create_from_dict(dict_) -> 'Cluster':
cl = Cluster(0, 0, [], 'None', 0, 0)
cl.__dict__.update(dict_)
return cl
from typing import Dict, List, Tuple, Any
import scipy.spatial
from entities.timewindow import TimeWindow
class InternalCluster:
def __init__(self, cluster_id, cluster_nodes: List[dict], feature_names:List[str], global_cluster_center: Tuple[float]):
self.cluster_id = cluster_id
self.size = len(cluster_nodes)
if len(cluster_nodes) > 0:
self.global_center_distance = scipy.spatial.distance.euclidean(self.get_current_cluster_center(cluster_nodes, feature_names), global_cluster_center)
else:
self.global_center_distance = 0
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
def get_current_cluster_center(self, nodes, features) -> ('x', 'y'):
if len(features) == 1:
values = [self._convert_feature_to_float(node[features[0]]) for node in nodes]
return (sum(values)/len(values), 0)
if len(features) == 2:
x = [self._convert_feature_to_float(node[features[0]]) for node in nodes]
y = [self._convert_feature_to_float(node[features[1]]) for node in nodes]
centroid = (sum(x) / len(nodes), sum(y) / len(nodes))
return centroid
@staticmethod
def create_many_from_cluster_nodes(clusters: Dict[str, List[dict]], feature_names: List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> List['InternalCluster']:
res_clusters = []
for key, value in clusters.items():
# ignore noise as it contains no meaningful cluster information
if key == '-1':
continue
res_clusters.append(InternalCluster(key, value, feature_names, global_cluster_centers[key]))
return res_clusters
class Layer:
'''Represents metrics for one layer for a single time window.'''
def __init__(self, time_window_id: Any, clusters: List[InternalCluster]):
self.time_window_id = time_window_id
self.relative_cluster_sizes = self.get_relative_cluster_sizes(clusters)
self.entropy = self.get_entropy(clusters)
self.distances_from_global_centers = self.get_distances_from_global_center(clusters)
def get_relative_cluster_sizes(self, clusters: List[InternalCluster]):
total_size = sum([cluster.size for cluster in clusters])
if total_size > 0:
return [cluster.size / total_size for cluster in clusters]
else:
return [0] * len(clusters)
def get_entropy(self, clusters: List[InternalCluster]):
'''
Returns the entropy over all clusters C,
where P(c_i) is the probability that a node belongs to cluster c_i.
'''
return scipy.stats.entropy(self.get_relative_cluster_sizes(clusters), base=2)
def __repr__(self):
return str(self.__dict__)
def __str__(self):
return f"Layer({self.time_window_id}, " \
f"{self.relative_cluster_sizes}, {self.entropy}, {self.distances_from_global_centers})"
def get_distances_from_global_center(self, clusters: List[InternalCluster]):
return [cluster.global_center_distance for cluster in clusters]
@staticmethod
def create_from_time_window(time_window: TimeWindow, feature_names:List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> 'Layer':
clusters: List[InternalCluster] = InternalCluster.create_many_from_cluster_nodes(time_window.clusters, feature_names, global_cluster_centers)
return Layer(time_window.time, clusters)
@staticmethod
def create_from_dict(dict_) -> 'Layer':
l = Layer(0, [])
l.__dict__.update(dict_)
return l
\ No newline at end of file
import json
from typing import List, Dict, NewType, Any
from datetime import date, datetime
class TimeWindow:
'''
A time slice for a single layer containing all nodes for that time.
:param time: The tag indicating the time
:param layer_name: The name of the layer the nodes belong to
'''
def __init__(self, time: Any = None, use_case: str = None, use_case_table: str = None, layer_name: str = None,
time_slice_dict: Dict = None, from_db = False):
self.time = str(time)
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.clusters: Dict[str, List[dict]] = {}
if time_slice_dict is not None:
self.from_serializable_dict(time_slice_dict, from_db)
def add_node_to_cluster(self, cluster_label: str, node):
# only string keys can be stored in json
cluster_label = str(cluster_label)
if cluster_label not in self.clusters:
self.clusters[cluster_label] = []
# node = self._get_unique_id(node)
self.clusters[cluster_label].append(node)
def get_nodes_for_cluster(self, cluster_label: str):
if cluster_label in self.clusters:
return self.clusters[cluster_label]
else:
return []
def _get_unique_id(self, node : Dict) -> Dict:
'''Returns a new dict with the unique id only.'''
uid_key = 'UniqueID'
if uid_key in node:
return {uid_key: node[uid_key]}
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"time": self.time,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
'layer_name': self.layer_name,
"clusters": json.dumps(self.clusters) if for_db else self.clusters
}
def from_serializable_dict(self, dict: Dict, from_db=False):
self.time = dict["time"]
self.use_case = dict["use_case"]
self.use_case_table = dict["use_case_table"]
self.layer_name = dict['layer_name']
self.clusters = json.loads(dict['clusters']) if from_db else dict['clusters']
@staticmethod
def create_from_serializable_dict(dict: Dict, from_db=False):
ts = TimeWindow()
ts.from_serializable_dict(dict, from_db)
return ts
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"TimeWindow({self.__repr__()})"
# originally downloaded datasets from: (both contain the same csv)
## https://www.kaggle.com/c/pkdd-15-predict-taxi-service-trajectory-i/data
## https://www.kaggle.com/c/pkdd-15-taxi-trip-time-prediction-ii
*.zip
train.csv
# clusters as received from the SMART pipeline
clusters/
# time slices as created by the SMART pipeline
timeslices/
## This folder contains the old time slices, where empty clusters were not added to the slices.
timeslices_old/
# calculated metrics for the clusters from the notebook
metrics/
metrics_old/
# calculated metrics for the layers from the notebook
layer_metrics/
layer_metrics_old/
\ No newline at end of file
# models trained by the `train.sh` and `train.py` scripts
/cluster_metrics/**/*.model
# models trained by the `train_layer.sh` and `train_layer.py` scripts
/layer_metrics/**/*.model
import warnings
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Tuple
import numpy as np
from scipy.spatial import ConvexHull, qhull, distance
from math import sqrt
from statistics import mean
warnings.simplefilter(action='ignore', category=UserWarning)
# UserWarning: geopandas not available. Some functionality will be disabled.
from pointpats.centrography import std_distance
warnings.simplefilter(action='default', category=UserWarning)
class ClusterMetricsCalculator(ABC):
def __init__(self, cluster_nodes: List[dict], nr_layer_nodes: int, layer_diversity: int):
self.cluster_nodes = cluster_nodes
self.nr_layer_nodes = nr_layer_nodes
self.layer_diversity = layer_diversity
def get_size(self) -> int:
'''Returns the size of the cluster.'''
return len(self.cluster_nodes)
@abstractmethod
def get_standard_deviation(self) -> float:
'''Returns the std dev from the center of the distribution.'''
pass
@abstractmethod
def get_scarcity(self) -> float:
'''
Returns the scarcity of the data points regarding the complete range for possible points.
High scarcity indicates low density.
'''
pass
def get_importance1(self) -> float:
'''Returns the ratio of cluster_nodes to layer_nodes.'''
return float(len(self.cluster_nodes)) / self.nr_layer_nodes if len(self.cluster_nodes) > 0 else 0
def get_importance2(self) -> float:
'''Returns the inverse of the layer_diversity, where layer_diversity = number of clusters with #nodes > 0.'''
return 1.0 / self.layer_diversity if len(self.cluster_nodes) > 0 else 0
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
class ClusterMetricsCalculator1D(ClusterMetricsCalculator):
'''Metrics calculator for clusters which were clustered based on 1 feature (1d clustering).'''
def __init__(self, cluster_nodes: List[dict], cluster_feature_name: str, nr_layer_nodes: int, layer_diversity: int):
super().__init__(cluster_nodes, nr_layer_nodes, layer_diversity)
self.feature_values: List[Any] = [self._convert_feature_to_float(node[cluster_feature_name])
for node in cluster_nodes]
def get_standard_deviation(self):
return np.std(self.feature_values) if len(self.feature_values) > 0 else 0
def get_scarcity(self):
'''Returns the scarcity as cluster_range / cluster_size, or 0 if len(nodes)=0.'''
if len(self.feature_values) == 0:
return 0
range_ = max(self.feature_values) - min(self.feature_values)
return float(range_) / self.get_size()
class ClusterMetricsCalculator2D(ClusterMetricsCalculator):
'''Metrics calculator for clusters which were clustered based on 2 features (2d clustering).'''
def __init__(self, cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int):
assert len(cluster_feature_names) == 2, "This class is for 2d cluster results only!"
super().__init__(cluster_nodes, nr_layer_nodes, layer_diversity)
self.feature_values: List[Tuple[Any]] = [
(self._convert_feature_to_float(node[cluster_feature_names[0]]), self._convert_feature_to_float(node[cluster_feature_names[1]]))
for node in cluster_nodes
]
def get_standard_deviation(self):
if len(self.feature_values) == 0:
return 0
warnings.simplefilter(action='ignore', category=RuntimeWarning)
std_dist = std_distance(self.feature_values)
warnings.simplefilter(action='default', category=RuntimeWarning)
if np.isnan(std_dist):
return 0 # somehow std_dist=nan if all feature values are same with many decimals
return std_dist
def get_scarcity(self):
'''Returns the scarcity as cluster_range / cluster_size, or 0 if len(nodes)=0.'''
if len(self.feature_values) == 0:
return 0
if len(self.feature_values) == 1:
# exactly 1 element gives inf density
return 0
if len(self.feature_values) == 2:
# cannot calculate area with 2 points - just use 2d distance as range instead
range_ = distance.euclidean(self.feature_values[0], self.feature_values[1])
return float(range_) / self.get_size()
try:
# calculate range as 2d area
points = self._get_polygon_border_points(self.feature_values)
range_ = self._calc_polygon_area(points)
# use sqrt to compare with 1d scarcity
return sqrt(float(range_) / self.get_size())
except qhull.QhullError as err:
# possible reasons that there is no hull with real area:
# 1. all points are at the same location
# 2. all points have the same x or y coordinates (lie on one hori/vert line)
points = np.asarray(self.feature_values)
same_x = len(set(points[:,0])) == 1
if same_x:
# use only y feature
features = points[:,1]
range_ = max(features) - min(features)
return float(range_) / self.get_size()
same_y = len(set(points[:,1])) == 1
if same_y:
# use only x feature
features = points[:,0]
range_ = max(features) - min(features)
return float(range_) / self.get_size()
print("Scarcity calc did not work with 1d feature")
return 0
def _get_polygon_border_points(self, points: List[List[float]]) -> 'np.array':
points = np.asarray(points)
hull = ConvexHull(points)
return points[hull.vertices]
def _calc_polygon_area(self, border_points: 'np.array') -> float:
x: 'np.array' = border_points[:,0]
y: 'np.array' = border_points[:,1]
# https://en.wikipedia.org/wiki/Shoelace_formula
area = 0.5 * np.abs(np.dot(x, np.roll(y,1)) - np.dot(y, np.roll(x,1)))
return float(area)
class ClusterMetricsCalculatorFactory:
@staticmethod
def create_metrics_calculator(cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int) -> ClusterMetricsCalculator:
"""
This factory creates a class which contains metrics about a single cluster based on
its nodes, feature values, its layer total node number and its layer diversity.
:param cluster_nodes: all nodes from the cluster
:param cluster_feature_names: all field names which where used during clustering
:param nr_layer_nodes: the number of total layer nodes
:param layer_diversity: the diversity of the layer calculated as: number of clusters with nodes > 0
"""
if isinstance(cluster_feature_names, str):
return ClusterMetricsCalculator1D(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
if len(cluster_feature_names) == 1:
return ClusterMetricsCalculator1D(cluster_nodes, cluster_feature_names[0], nr_layer_nodes, layer_diversity)
if len(cluster_feature_names) == 2:
return ClusterMetricsCalculator2D(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
from processing.ClusterMetricsCalculator import ClusterMetricsCalculator, ClusterMetricsCalculator1D, ClusterMetricsCalculator2D, ClusterMetricsCalculatorFactory
\ No newline at end of file
backcall==0.2.0
beautifulsoup4==4.9.3
branca==0.4.2
certifi==2020.12.5
chardet==4.0.0
colorama==0.4.4
cycler==0.10.0
cython==0.28.5
decorator==4.4.2
folium==0.11.0
icecream
idna==2.10
# ipykernel==5.4.2
# ipython==7.19.0
# ipython-genutils==0.2.0
jedi==0.18.0
Jinja2==2.11.2
joblib==1.0.0
jupyter-client==6.1.7
jupyter-core==4.7.0
kiwisolver==1.3.1
libpysal==4.3.0
MarkupSafe==1.1.1
matplotlib==3.2.0
numpy==1.19.3
opencv-contrib-python==4.5.1.48
pandas
parso==0.8.1
pickleshare==0.7.5
Pillow==8.1.0
pointpats==2.2.0
prompt-toolkit==3.0.8
Pygments==2.7.3
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.5
# pywin32==300
pyzmq==20.0.0
requests==2.25.1
scikit-build
scikit-learn==0.24.0
scipy
six==1.15.0
sklearn==0.0
soupsieve==2.1
threadpoolctl==2.1.0
tornado==6.1
# traitlets==5.0.5
urllib3==1.26.2
wcwidth==0.2.5
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing import ClusterMetricsCalculator2D
class TestClusterMetricsCalculator(unittest.TestCase):
def test__get_standard_deviation__same_points_many_decimals__zero_and_not_nan(self):
nodes = [{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567}]
calc = ClusterMetricsCalculator2D(nodes, ['f1','f2'], len(nodes), 1)
self.assertAlmostEqual(0, calc.get_standard_deviation())
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from entities import Cluster, TimeWindow
from typing import Any, Tuple
from datetime import date, datetime
import json
from math import sqrt
import statistics as stat
class TestCluster(unittest.TestCase):
def test__init__single_cluster__all_values_set(self):
tw = self._get_timewindow_single_cluster_same_feature()
c = Cluster("time_abc", "clusterId 1", list(tw.clusters.values())[0], "feature", nr_layer_nodes=3, layer_diversity=1)
self.assertEqual("time_abc", c.time_window_id)
self.assertEqual("clusterId 1", c.cluster_id)
self.assert_cluster((3, 0, 0, 1, 1), c)
def test__create_multiple_from_time_window__single_cluster__all_values_set(self):
tw = self._get_timewindow_single_cluster_same_feature()
clusters = list(Cluster.create_multiple_from_time_window(tw, "feature"))
self.assertEqual(1, len(clusters))
c = clusters[0]
self.assertEqual("KW1", c.time_window_id)
self.assertEqual("1", c.cluster_id)
self.assert_cluster((3, 0, 0, 1, 1), c)
def test__create_multiple_from_time_window__two_clusters__correct_time_id_cluster_id(self):
tw = self._get_timewindow_two_clusters_same_feature()
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
expected = [("KW1", "1"), ("KW1", "2")]
for c, exp in zip(clusters, expected):
self.assertEqual(exp[0], c.time_window_id)
self.assertEqual(exp[1], c.cluster_id)
def test__create_multiple_from_time_window__two_clusters_same_features__correct_calculation(self):
tw = self._get_timewindow_two_clusters_same_feature()
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
expected = [(3, 0, 0, 3/5, 1/2), (2, 0, 0, 2/5, 1/2)]
for c, exp in zip(clusters, expected):
self.assert_cluster(exp, c)
def test__create_multiple_from_time_window__two_clusters_same_features_and_feature_names_list__correct_calculation(self):
tw = self._get_timewindow_two_clusters_same_feature()
clusters = Cluster.create_multiple_from_time_window(tw, ["feature"])
expected = [(3, 0, 0, 3/5, 1/2), (2, 0, 0, 2/5, 1/2)]
for c, exp in zip(clusters, expected):
self.assert_cluster(exp, c)
def test__create_multiple_from_time_window__two_clusters_different_features__correct_calculation(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":2})
tw.add_node_to_cluster("1", {"feature":3})
tw.add_node_to_cluster("2", {"feature":70})
tw.add_node_to_cluster("2", {"feature":75})
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
# variance for stddev calculated with: http://www.alcula.com/calculators/statistics/variance/
expected = [(3, sqrt(2.0/3), 2.0/3, 3/5, 1/2), (2, sqrt(6.25), 5.0/2, 2/5, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__empty_cluster__all_zero_for_empty_cluster(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":2})
tw.add_node_to_cluster("1", {"feature":3})
tw.add_node_to_cluster("2", {"feature":70})
tw.add_node_to_cluster("2", {"feature":75})
tw.clusters["3"] = []
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
expected = [(3, sqrt(2.0/3), 2.0/3, 3/5, 1/2), # diversity is still 2 as len=0 is ignored
(2, sqrt(6.25), 5.0/2, 2/5, 1/2),
(0, 0, 0, 0, 0)] # len 0 -> everything 0
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering_single_feature_value__no_stddev_no_scarcity(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
expected = [(3, 0, 0, 3/5, 1/2), (2, 0, 0, 2/5, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering__correct_stddev_and_scarcity(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":2, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":3})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
tw.add_node_to_cluster("2", {"f1":72, "f2":75})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
# stddev calculated manually as in: https://glenbambrick.com/tag/standard-distance/
# area of the polygon calculated with: https://www.mathopenref.com/coordpolygonareacalc.html
expected = [(3, sqrt(2/9+8/9), sqrt(1/3), 3/5, 1/2), (2, sqrt(7.25), sqrt(2*2+5*5)/2, 2/5, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering_complex__correct_stddev_and_scarcity(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":0, "f2":0})
tw.add_node_to_cluster("1", {"f1":1, "f2":3})
tw.add_node_to_cluster("1", {"f1":3, "f2":2})
tw.add_node_to_cluster("1", {"f1":0, "f2":2})
tw.add_node_to_cluster("1", {"f1":1, "f2":2}) # inside the convex hull
tw.add_node_to_cluster("1", {"f1":2, "f2":2}) # inside the convex hull
tw.add_node_to_cluster("1", {"f1":2, "f2":1})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
# stddev calculated manually as in: https://glenbambrick.com/tag/standard-distance/
X = [0,1,3,0,1,2,2]
Y = [0,3,2,2,2,2,1]
x_mean = stat.mean(X)
y_mean = stat.mean(Y)
sum_x = 0
for x in X:
sum_x += (x - x_mean)**2
sum_y = 0
for y in Y:
sum_y += (y - y_mean)**2
sd = sqrt(sum_x/7 + sum_y/7)
# area of the polygon calculated with: https://www.mathopenref.com/coordpolygonareacalc.html
area = 5
scarcity = sqrt(area / 7)
expected = [[7, sd, scarcity, 1, 1]]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering_1d_single_feature_value__correct_calculation(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":2})
tw.add_node_to_cluster("1", {"f1":1, "f2":3})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
tw.add_node_to_cluster("2", {"f1":75, "f2":70})
tw.add_node_to_cluster("2", {"f1":72, "f2":70})
tw.add_node_to_cluster("2", {"f1":71, "f2":70})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
# variance/stddev calculated as for 1d cluster (as f1/f2 is always the same)
# scarcity calculated as for 1d cluster
expected = [(3, sqrt(2/3), 2/3, 3/7, 1/2),
(4, sqrt(3.5), 5/4, 4/7, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
#region setup methods
def _get_timewindow_single_cluster_same_feature(self) -> TimeWindow:
'''Returns a TimeWindow with time=KW1 and three nodes in cluster 1, all feature values = 1.'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
return tw
def _get_timewindow_two_clusters_same_feature(self) -> TimeWindow:
'''
Returns a TimeWindow with time=KW1 and:
Three nodes in cluster 1, all feature values = 1.
Two nodes in cluster 2, all feature values = 2.
'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("2", {"feature":2})
tw.add_node_to_cluster("2", {"feature":2})
return tw
#endregion setup methods
#region custom asserts
def assert_cluster(self, expected_values: Tuple[Any], cluster: Cluster):
"""
Checks if the cluster values equal the expected_values.
:param expected_values: A tuple (exp_size, exp_stddev, exp_scarcity, exp_import1, exp_import2)
"""
self.assertEqual(expected_values[0], cluster.size)
self.assertAlmostEqual(expected_values[1], cluster.std_dev)
self.assertAlmostEqual(expected_values[2], cluster.scarcity)
self.assertAlmostEqual(expected_values[3], cluster.importance1)
self.assertAlmostEqual(expected_values[4], cluster.importance2)
#endregion custom asserts
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from entities import Layer, TimeWindow
from entities.layer import InternalCluster
from typing import Any, Tuple, List
from datetime import date, datetime
import json
from math import sqrt
import statistics as stat
class TestInternalCluster(unittest.TestCase):
def test__init__1d_features__all_values_set(self):
cluster_nodes = [{"feature":1}, {"feature":1}, {"feature":1}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(1.5,0))
self.assert_internal_cluster(c, '123', 3, .5)
def test__init__2d_features__all_values_set(self):
cluster_nodes = [{"feature1":1,'feature2':1}, {"feature1":1,'feature2':1}, {"feature1":1,'feature2':1}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature1", 'feature2'], global_cluster_center=(1.5,1.5))
# distance: https://www.calculatorsoup.com/calculators/geometry-plane/distance-two-points.php
self.assert_internal_cluster(c, '123', 3, sqrt(.5))
def test__get_current_cluster_center__1d(self):
cluster_nodes = [{"feature":1}, {"feature":2}, {"feature":3}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(2, 0))
self.assert_internal_cluster(c, '123', 3, 0)
def test__get_current_cluster_center__1d_weighted_result(self):
cluster_nodes = [{"feature":1}, {"feature":1}, {"feature":3}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(5/3, 0))
self.assert_internal_cluster(c, '123', 3, 0)
def test__get_current_cluster_center__2d_weighted_result(self):
cluster_nodes = [{"feature1":1,"feature2":1},
{"feature1":1,"feature2":1},
{"feature1":2,"feature2":2},
{"feature1":3,"feature2":1}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature1", 'feature2'], global_cluster_center=(1.75, 1.25))
self.assert_internal_cluster(c, '123', 4, 0)
def assert_internal_cluster(self, actual_cluster: InternalCluster, expected_id, expected_size, expected_distance):
self.assertEqual(expected_id, actual_cluster.cluster_id)
self.assertEqual(expected_size, actual_cluster.size)
self.assertAlmostEqual(expected_distance, actual_cluster.global_center_distance)
class TestLayer(unittest.TestCase):
def test__init__1d_single_cluster(self):
cluster_nodes = list(self._get_timewindow_single_cluster_1d_same_feature().clusters.values())[0]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(1,0))
l = Layer('123', [c])
self.assert_layer(l, [1], 0, [0])
def test__create_from_time_window__1d_single_cluster(self):
tw = self._get_timewindow_single_cluster_1d_same_feature()
l = Layer.create_from_time_window(tw, feature_names=['feature'], global_cluster_centers={'1': (1,0)})
self.assert_layer(l, [1], 0, [0])
def test__create_from_time_window__2d_single_cluster(self):
tw = self._get_timewindow_single_cluster_2d_same_feature()
l = Layer.create_from_time_window(tw, feature_names=['feature1', 'feature2'], global_cluster_centers={'1': (1,1)})
self.assert_layer(l, [1], 0, [0])
def test__create_from_time_window__1d_two_clusters(self):
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature1":1})
tw.add_node_to_cluster("1", {"feature1":1})
tw.add_node_to_cluster("2", {"feature1":5})
tw.add_node_to_cluster("2", {"feature1":5})
tw.add_node_to_cluster("2", {"feature1":7})
tw.add_node_to_cluster("2", {"feature1":6})
l = Layer.create_from_time_window(tw, feature_names=['feature1'], global_cluster_centers={'1': (1.5,0), '2': (5,0)})
# entropy: https://planetcalc.com/2476/
# distance: https://www.calculatorsoup.com/calculators/geometry-plane/distance-two-points.php
self.assert_layer(l, [2/6, 4/6], 0.91829583, [.5, .75])
def test__create_from_time_window__2d_two_clusters(self):
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature1":1,"feature2":1})
tw.add_node_to_cluster("1", {"feature1":1,"feature2":2})
tw.add_node_to_cluster("1", {"feature1":1,"feature2":2})
tw.add_node_to_cluster("2", {"feature1":5,"feature2":5})
tw.add_node_to_cluster("2", {"feature1":7,"feature2":4})
l = Layer.create_from_time_window(tw, feature_names=['feature1', 'feature2'], global_cluster_centers={'1': (1,1), '2': (6.5,5)})
# entropy: https://planetcalc.com/2476/
# distance: https://www.calculatorsoup.com/calculators/geometry-plane/distance-two-points.php
self.assert_layer(l, [3/5, 2/5], 0.97095059, [2/3, sqrt(.5)])
#region setup methods
def _get_timewindow_single_cluster_1d_same_feature(self) -> TimeWindow:
'''Returns a TimeWindow with time=KW1 and three nodes in cluster 1, all feature values = 1.'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
return tw
def _get_timewindow_single_cluster_2d_same_feature(self) -> TimeWindow:
'''Returns a TimeWindow with time=KW1 and three nodes in cluster 1, all feature1 & feature2 values = 1.'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature1":1, "feature2":1})
tw.add_node_to_cluster("1", {"feature1":1, "feature2":1})
tw.add_node_to_cluster("1", {"feature1":1, "feature2":1})
return tw
#endregion setup methods
def assert_layer(self, actual_layer: Layer, relative_sizes: List[float], entropy: float, center_dist: List[float]):
self.assertEqual(len(actual_layer.relative_cluster_sizes), len(relative_sizes))
for i in range(len(relative_sizes)):
self.assertAlmostEqual(relative_sizes[i], actual_layer.relative_cluster_sizes[i])
self.assertAlmostEqual(entropy, actual_layer.entropy)
self.assertEqual(len(actual_layer.distances_from_global_centers), len(center_dist))
for i in range(len(center_dist)):
self.assertAlmostEqual(center_dist[i], actual_layer.distances_from_global_centers[i])
if __name__ == '__main__':
unittest.main()
LAYER_NAME = 'CallTypeLayer'
import sys
if len(sys.argv) > 1:
LAYER_NAME = sys.argv[1]
print(f"Working on {LAYER_NAME}")
##########
import json
from entities import Cluster
import collections
import numpy as np
from typing import Iterable
def get_evolution_label(old_size: int, new_size: int) -> int:
'''Returns the evolution label as int by mapping 0..4 to {continuing, shrinking, growing, dissolving, forming}.'''
if old_size == new_size:
return 0 # continuing
if old_size == 0 and new_size != 0:
return 4 # forming
if old_size != 0 and new_size == 0:
return 3 # dissolving
if old_size > new_size:
return 1 # shrinking
if old_size < new_size:
return 2 # growing
def get_cyclic_time_feature(time: int, max_time_value: int = 52) -> (float, float):
return (np.sin(2*np.pi*time/max_time_value),
np.cos(2*np.pi*time/max_time_value))
def create_metrics_training_data(N: int = 3, layer_name: str = 'CallTypeLayer') -> Iterable:
"""
A single metrics training data point should look like this:
(cluster_size, cluster_std_dev, cluster_scarcity, cluster_import1, cluster_import2, time_info) ^ N, evolution_label
time_info ... the time as 2d cyclic feature, i.e. time_info := (time_f1, time_f2)
The first tuple represents metrics from the cluster in t_i-(N-1).
The Nth tuple represents metrics from the cluster in t_i.
The label is one of {continuing, shrinking, growing, dissolving, forming} \ {splitting, merging} and identifies the change for t_i+1.
:param N: number of cluster metric tuples
"""
path_in = f"input/metrics/{layer_name}.json"
with open(path_in, 'r') as file:
data = [Cluster.create_from_dict(cl_d) for cl_d in json.loads(file.read())]
data.sort(key=lambda cl: (cl.cluster_id, cl.time_window_id))
# manually prepare deque with N metric_tuples + evolution label
tuples = []
prev_cluster_id = -1
for i, cur_cluster in enumerate(data[:-1]):
if cur_cluster.cluster_id != data[i+1].cluster_id:
# next cluster slice in list will be another cluster id -> restart deque and skip adding the current (last) cluster slice
tuples = []
continue
cur_metrics = (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2, get_cyclic_time_feature(cur_cluster.get_time_info()))
# deque function: adding N+1st element will remove oldest one
if len(tuples) == N:
tuples.pop(0)
tuples.append(cur_metrics)
label = get_evolution_label(cur_cluster.size, data[i+1].size)
if len(tuples) == N:
yield list(tuples) + [label]
###########
def flatten_metrics_datapoint(datapoint: list) -> ('X', 'Y'):
'''
Flattens a single metrics data point in the form:
[(cluster_size, cluster_variance, cluster_density, cluster_import1, cluster_import2, (time_f1, time_f2))^N, evolution_label]
to:
(X: np.array, evolution_label)
'''
flat_list = []
for entry in datapoint[:-1]: # for all x
flat_list.extend(entry[:-1]) # add all number features except the time tuple
flat_list.extend(entry[-1]) # add time tuple
# flat_list.append(datapoint[-1]) # add y
return np.asarray(flat_list), datapoint[-1]
##########
def convert_metrics_data_for_training(data: Iterable) -> ('nparray with Xs', 'nparray with Ys'):
'''Flattens and splits metrics data to match ML conventions.'''
X = []
Y = []
for element in data:
x, y = flatten_metrics_datapoint(element)
X.append(x)
Y.append(y)
return (np.asarray(X), np.asarray(Y))
##########
import numpy as np
import pandas as pd
import collections
import statistics as stat
def balance_dataset(X: np.array, Y: np.array, imbalance_threshold=.3) -> ('X: np.array', 'Y: np.array'):
'''Balances an unbalanced dataset by ignoring elements from the majority label, so that majority-label data size = median of other cluster sizes.'''
y = Y.tolist()
counter = collections.Counter(y)
print(f"Label Occurrences: Total = {counter}")
# find key with max values
max_key = max(counter, key=lambda k: counter[k])
max_val = counter[max_key]
unbalanced_labels = all([v < max_val * (1-imbalance_threshold) for k, v in counter.items() if k != max_key])
if unbalanced_labels: # if all other labels are >=30% less frequent than max_key
median_rest = int(stat.median([v for k, v in counter.items() if k != max_key]))
print(f"Labels are unbalanced, keeping {median_rest} for label {max_key}")
# merge X and Y
data = np.append(X, Y.reshape(Y.shape[0], 1), 1)
df = pd.DataFrame(data, columns=['_']*21+['label'])
# take only median_rest for the max_key label
max_labeled_data = df.loc[df['label'] == max_key].sample(n=median_rest)
other_labeled_data = df.loc[df['label'] != max_key]
balanced_data = pd.concat([max_labeled_data, other_labeled_data])
balanced_data = balanced_data.sample(frac=1) # shuffle
X = balanced_data.loc[:, balanced_data.columns != 'label'].to_numpy()
Y = balanced_data.loc[:, balanced_data.columns == 'label'].to_numpy()
Y = Y.reshape(Y.shape[0],).astype(int)
return X, Y
def get_training_data(layer_name='CallTypeLayer', test_dataset_frac=.2) -> '(X_train, Y_train, X_test, Y_test)':
# load metrics data from disk
data: Iterable = create_metrics_training_data(layer_name=layer_name)
# convert to X and Y
X, Y = convert_metrics_data_for_training(data)
X, Y = balance_dataset(X, Y)
# split in training and test set
test_size = int(X.shape[0] * test_dataset_frac)
X_train = X[test_size:]
Y_train = Y[test_size:]
X_test = X[:test_size]
Y_test = Y[:test_size]
print(f"\nWorking with: {X_train.shape[0]} training points + {X_test.shape[0]} test points ({X_test.shape[0]/(X_train.shape[0]+X_test.shape[0])}).")
print(f"Label Occurrences: Total = {collections.Counter(Y_train.tolist() + Y_test.tolist())}, "\
f"Training = {collections.Counter(Y_train)}, Test = {collections.Counter(Y_test)}")
try:
print(f"Label Majority Class: Training = {stat.mode(Y_train)}, Test = {stat.mode(Y_test)}\n")
except stat.StatisticsError:
print(f"Label Majority Class: no unique mode; found 2 equally common values")
return X_train, Y_train, X_test, Y_test
X_train, Y_train, X_test, Y_test = get_training_data(LAYER_NAME)
###########
# train
from sklearn import svm
svc = svm.SVC(kernel='linear')
svc.fit(X_train, Y_train)
# verify
import sklearn
pred_Y = svc.predict(X_test)
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
# export
import pickle
import os
if not os.path.exists('output'):
os.makedirs('output')
with open(f'output/{LAYER_NAME}.model', 'wb') as file:
b = pickle.dump(svc, file)
#! /bin/bash
source venv/bin/activate
for layer in CallTypeLayer DayTypeLayer EndLocationLayer OriginCallLayer OriginStandLayer StartLocationLayer TaxiIdLayer
do
python3 train.py $layer
done
\ No newline at end of file
#! /bin/bash
source venv/bin/activate
# create result folders
mkdir output/layer_metrics/5
mkdir output/layer_metrics/10
mkdir output/layer_metrics/15
# train
python3 train_layer.py CallTypeLayer DayTypeLayer
python3 train_layer.py OriginCallLayer CallTypeLayer
python3 train_layer.py OriginStandLayer CallTypeLayer
python3 train_layer.py TaxiIdLayer OriginCallLayer
python3 train_layer.py StartLocationLayer OriginCallLayer
python3 train_layer.py EndLocationLayer OriginCallLayer
python3 train_layer.py TaxiIdLayer OriginStandLayer
python3 train_layer.py StartLocationLayer OriginStandLayer
python3 train_layer.py EndLocationLayer OriginStandLayer
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment