Commit b1a8e730 authored by Alexander Lercher's avatar Alexander Lercher

Created basic microservice with auth+cors

parent 68ed0cea
from entities.timewindow import TimeWindow
from entities.cluster import Cluster
from entities.layer import Layer
\ No newline at end of file
# from __future__ import annotations
from typing import Dict, List, Iterable, Any
from entities.timewindow import TimeWindow
import numpy as np
from processing import ClusterMetricsCalculatorFactory
class Cluster:
'''A cluster from one time window containing all metrics used for machine learning.'''
def __init__(self, time_window_id: Any, cluster_id: Any, cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int):
self.time_window_id = time_window_id
self.cluster_id = cluster_id
metrics_calculator = ClusterMetricsCalculatorFactory.create_metrics_calculator(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
self.size = metrics_calculator.get_size()
self.std_dev = metrics_calculator.get_standard_deviation()
self.scarcity = metrics_calculator.get_scarcity()
self.importance1 = metrics_calculator.get_importance1()
self.importance2 = metrics_calculator.get_importance2()
def get_time_info(self) -> int:
'''Returns the week of the time tuple str, eg. 25 for "(2014, 25)".'''
str_tuple = self.time_window_id
return int(str_tuple.split(',')[1].strip()[:-1])
def __repr__(self):
return str(self.__dict__)
def __str__(self):
return f"Cluster({self.time_window_id}, {self.cluster_id}, " \
f"{self.size}, {self.std_dev}, {self.scarcity}, " \
f"{self.importance1}, {self.importance2})"
@staticmethod
def create_multiple_from_time_window(time_window: TimeWindow, cluster_feature_names: List[str]) -> Iterable['Cluster']:
total_layer_nodes = sum([len(nodes) for nodes in time_window.clusters.values()])
layer_diversity = len([nodes for nodes in time_window.clusters.values() if len(nodes) > 0])
for cluster_nr, cluster_nodes in time_window.clusters.items():
yield Cluster(time_window.time, cluster_nr, cluster_nodes, cluster_feature_names, total_layer_nodes, layer_diversity)
@staticmethod
def create_from_dict(dict_) -> 'Cluster':
cl = Cluster(0, 0, [], 'None', 0, 0)
cl.__dict__.update(dict_)
return cl
from typing import Dict, List, Tuple, Any
import scipy.spatial
from entities.timewindow import TimeWindow
class InternalCluster:
def __init__(self, cluster_id, cluster_nodes: List[dict], feature_names:List[str], global_cluster_center: Tuple[float]):
self.cluster_id = cluster_id
self.size = len(cluster_nodes)
if len(cluster_nodes) > 0:
self.global_center_distance = scipy.spatial.distance.euclidean(self.get_current_cluster_center(cluster_nodes, feature_names), global_cluster_center)
else:
self.global_center_distance = 0
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
def get_current_cluster_center(self, nodes, features) -> ('x', 'y'):
if len(features) == 1:
values = [self._convert_feature_to_float(node[features[0]]) for node in nodes]
return (sum(values)/len(values), 0)
if len(features) == 2:
x = [self._convert_feature_to_float(node[features[0]]) for node in nodes]
y = [self._convert_feature_to_float(node[features[1]]) for node in nodes]
centroid = (sum(x) / len(nodes), sum(y) / len(nodes))
return centroid
@staticmethod
def create_many_from_cluster_nodes(clusters: Dict[str, List[dict]], feature_names: List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> List['InternalCluster']:
res_clusters = []
for key, value in clusters.items():
# ignore noise as it contains no meaningful cluster information
if key == '-1':
continue
res_clusters.append(InternalCluster(key, value, feature_names, global_cluster_centers[key]))
return res_clusters
class Layer:
'''Represents metrics for one layer for a single time window.'''
def __init__(self, time_window_id: Any, clusters: List[InternalCluster]):
self.time_window_id = time_window_id
self.relative_cluster_sizes = self.get_relative_cluster_sizes(clusters)
self.entropy = self.get_entropy(clusters)
self.distances_from_global_centers = self.get_distances_from_global_center(clusters)
def get_relative_cluster_sizes(self, clusters: List[InternalCluster]):
total_size = sum([cluster.size for cluster in clusters])
if total_size > 0:
return [cluster.size / total_size for cluster in clusters]
else:
return [0] * len(clusters)
def get_entropy(self, clusters: List[InternalCluster]):
'''
Returns the entropy over all clusters C,
where P(c_i) is the probability that a node belongs to cluster c_i.
'''
return scipy.stats.entropy(self.get_relative_cluster_sizes(clusters), base=2)
def __repr__(self):
return str(self.__dict__)
def __str__(self):
return f"Layer({self.time_window_id}, " \
f"{self.relative_cluster_sizes}, {self.entropy}, {self.distances_from_global_centers})"
def get_distances_from_global_center(self, clusters: List[InternalCluster]):
return [cluster.global_center_distance for cluster in clusters]
@staticmethod
def create_from_time_window(time_window: TimeWindow, feature_names:List[str], global_cluster_centers: Dict[str, Tuple[float]]) -> 'Layer':
clusters: List[InternalCluster] = InternalCluster.create_many_from_cluster_nodes(time_window.clusters, feature_names, global_cluster_centers)
return Layer(time_window.time, clusters)
@staticmethod
def create_from_dict(dict_) -> 'Layer':
l = Layer(0, [])
l.__dict__.update(dict_)
return l
\ No newline at end of file
import json
from typing import List, Dict, NewType, Any
from datetime import date, datetime
class TimeWindow:
'''
A time slice for a single layer containing all nodes for that time.
:param time: The tag indicating the time
:param layer_name: The name of the layer the nodes belong to
'''
def __init__(self, time: Any = None, use_case: str = None, use_case_table: str = None, layer_name: str = None,
time_slice_dict: Dict = None, from_db = False):
self.time = str(time)
self.use_case = use_case
self.use_case_table = use_case_table
self.layer_name = layer_name
self.clusters: Dict[str, List[dict]] = {}
if time_slice_dict is not None:
self.from_serializable_dict(time_slice_dict, from_db)
def add_node_to_cluster(self, cluster_label: str, node):
# only string keys can be stored in json
cluster_label = str(cluster_label)
if cluster_label not in self.clusters:
self.clusters[cluster_label] = []
# node = self._get_unique_id(node)
self.clusters[cluster_label].append(node)
def get_nodes_for_cluster(self, cluster_label: str):
if cluster_label in self.clusters:
return self.clusters[cluster_label]
else:
return []
def _get_unique_id(self, node : Dict) -> Dict:
'''Returns a new dict with the unique id only.'''
uid_key = 'UniqueID'
if uid_key in node:
return {uid_key: node[uid_key]}
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"time": self.time,
"use_case": self.use_case,
"use_case_table": self.use_case_table,
'layer_name': self.layer_name,
"clusters": json.dumps(self.clusters) if for_db else self.clusters
}
def from_serializable_dict(self, dict: Dict, from_db=False):
self.time = dict["time"]
self.use_case = dict["use_case"]
self.use_case_table = dict["use_case_table"]
self.layer_name = dict['layer_name']
self.clusters = json.loads(dict['clusters']) if from_db else dict['clusters']
@staticmethod
def create_from_serializable_dict(dict: Dict, from_db=False):
ts = TimeWindow()
ts.from_serializable_dict(dict, from_db)
return ts
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"TimeWindow({self.__repr__()})"
# originally downloaded datasets from: (both contain the same csv)
## https://www.kaggle.com/c/pkdd-15-predict-taxi-service-trajectory-i/data
## https://www.kaggle.com/c/pkdd-15-taxi-trip-time-prediction-ii
*.zip
train.csv
# clusters as received from the SMART pipeline
clusters/
# time slices as created by the SMART pipeline
timeslices/
## This folder contains the old time slices, where empty clusters were not added to the slices.
timeslices_old/
# calculated metrics for the clusters from the notebook
metrics/
metrics_old/
# calculated metrics for the layers from the notebook
layer_metrics/
layer_metrics_old/
\ No newline at end of file
{
"metadata": {
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.6-final"
},
"orig_nbformat": 2,
"kernelspec": {
"name": "python3",
"display_name": "Python 3",
"language": "python"
}
},
"nbformat": 4,
"nbformat_minor": 2,
"cells": [
{
"source": [
"# Community Prediction based on Taxi Dataset"
],
"cell_type": "markdown",
"metadata": {}
},
{
"source": [
"## Load dataset\n",
"1. TRIP_ID: (String) It contains an unique identifier for each trip;\n",
"1. CALL_TYPE: (char) It identifies the way used to demand this service. It may contain one of three possible values:\n",
" - ‘A’ if this trip was dispatched from the central;\n",
" - ‘B’ if this trip was demanded directly to a taxi driver on a specific stand;\n",
" - ‘C’ otherwise (i.e. a trip demanded on a random street).\n",
"1. ORIGIN_CALL: (integer) It contains an unique identifier for each phone number which was used to demand, at least, one service. It identifies the trip’s customer if CALL_TYPE=’A’. Otherwise, it assumes a NULL value;\n",
"1. ORIGIN_STAND: (integer): It contains an unique identifier for the taxi stand. It identifies the starting point of the trip if CALL_TYPE=’B’. Otherwise, it assumes a NULL value;\n",
"1. TAXI_ID: (integer): It contains an unique identifier for the taxi driver that performed each trip;\n",
"1. TIMESTAMP: (integer) Unix Timestamp (in seconds). It identifies the trip’s start; \n",
"1. DAYTYPE: (char) It identifies the daytype of the trip’s start. It assumes one of three possible values:\n",
" - ‘B’ if this trip started on a holiday or any other special day (i.e. extending holidays, floating holidays, etc.);\n",
" - ‘C’ if the trip started on a day before a type-B day;\n",
" - ‘A’ otherwise (i.e. a normal day, workday or weekend).\n",
"1. MISSING_DATA: (Boolean) It is FALSE when the GPS data stream is complete and TRUE whenever one (or more) locations are missing\n",
"1. POLYLINE: (String): It contains a list of GPS coordinates (i.e. WGS84 format) mapped as a string. The beginning and the end of the string are identified with brackets (i.e. \\[ and \\], respectively). Each pair of coordinates is also identified by the same brackets as \\[LONGITUDE, LATITUDE\\]. This list contains one pair of coordinates for each 15 seconds of trip. The last list item corresponds to the trip’s destination while the first one represents its start;\n"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import csv\n",
"import json\n",
"from datetime import datetime\n",
"from typing import Iterator\n",
"\n",
"enum_mapping = {'A': 1, 'B': 2, 'C': 3}\n",
"\n",
"def load_csv_content() -> Iterator:\n",
" '''Returns a generator for all lines in the csv file with correct field types.'''\n",
" \n",
" with open('input/train.csv') as csv_file:\n",
" reader = csv.reader(csv_file) \n",
"\n",
" headers = [h.lower() for h in next(reader)]\n",
"\n",
" for line in reader:\n",
" # convert line fields to correct type\n",
" for i in range(len(headers)):\n",
" # trip_id AS string\n",
" if i == 0:\n",
" continue\n",
" # call_type, day_type \n",
" if i in [1, 6]:\n",
" line[i] = enum_mapping[line[i]]\n",
" # origin_call, origin_stand, taxi_id AS int\n",
" elif i in [2, 3, 4]:\n",
" line[i] = int(line[i]) if line[i] != \"\" else \"\"\n",
" # timestamp AS timestamp\n",
" elif i == 5:\n",
" # datetime is not serializable\n",
" # line[i] = datetime.fromtimestamp(int(line[i]))\n",
" line[i] = int(line[i])\n",
" # missing_data AS bool\n",
" elif i == 7: \n",
" line[i] = line[i].lower() == 'true'\n",
" # polyline AS List[List[float]]\n",
" elif i == 8:\n",
" line[i] = json.loads(line[i])\n",
"\n",
" entry = dict(zip(headers, line))\n",
" yield entry\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print(next(load_csv_content()))"
]
},
{
"source": [
"## Display some dataset routes"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import List\n",
"import folium\n",
"\n",
"def displayNodes(nodes: List[List[float]]): \n",
" '''\n",
" Displays the nodes on a map of the city.\n",
"\n",
" :param nodes: A list of coordinates, eg. [[1,2],[1,3]]\n",
" '''\n",
" m = folium.Map(location=[41.15,-8.6],tiles='stamenterrain',zoom_start=12, control_scale=True) \n",
"\n",
" for idx, node in enumerate(nodes): \n",
" popupLabel = idx\n",
"\n",
" folium.Marker(\n",
" location=[node[1], node[0]],\n",
" #popup='Cluster Nr: '+ str(node.cluster_no),\n",
" popup=popupLabel,\n",
" icon=folium.Icon(color='red', icon='circle'),\n",
" ).add_to(m)\n",
" \n",
" display(m)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"content = load_csv_content()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"displayNodes(next(content)['polyline'])"
]
},
{
"source": [
"# Model Training"
],
"cell_type": "markdown",
"metadata": {}
},
{
"source": [
"## Split dataset into multiple layers\n",
"The SMART pipeline is used to split up the data in multiple layers. Therefore, the csv file is uploaded to the Semantic Linking microservice for layer creation. <br />\n",
"Next, the Role Stage Discovery microservice will cluster the individual layers and splits them into multiple time windows based on the timestamp.\n",
"\n",
"## Define features per cluster\n",
"### \"Local\" features based on single clusters\n",
"- cluster size ($\\#\\ cluster\\ nodes$)\n",
"- cluster standard deviation (variance from cluster mean)\n",
"- cluster scarcity (ratio $\\frac{cluster\\ range}{cluster\\ size}$ ) <br />\n",
"Scarcity is perferred over density to avoid divide-by-zero error\n",
"- (cluster trustworthiness)\n",
"### \"Global\" features based on clusters in the context of a layer\n",
"- cluster importance I (ratio $\\frac{cluster\\ size}{\\#\\ layer\\ nodes}$)\n",
"- cluster importance II (ratio $\\frac{1}{diversity}$, where *diversity* = number of clusters with nodes > 0)"
],
"cell_type": "markdown",
"metadata": {}
},
{
"source": [
"## Calculate the Metrics for the Clusters"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from typing import List\n",
"import json\n",
"import os\n",
"from entities import TimeWindow, Cluster\n",
"\n",
"def calculate_metrics_for_clusters(layer_name: str = 'CallTypeLayer', feature_names: List[str] = ['call_type']):\n",
" print(f\"Working on {layer_name}\")\n",
"\n",
" path_in = f'input/timeslices/{layer_name}'\n",
" path_out = f'input/metrics/{layer_name}.json'\n",
"\n",
" complete_clusters: List[Cluster] = []\n",
"\n",
" for root, _, files in os.walk(path_in):\n",
" for f in files:\n",
" with open(os.path.join(root, f), 'r') as file:\n",
" json_slice = json.loads(file.read())\n",
" time_window = TimeWindow.create_from_serializable_dict(json_slice)\n",
"\n",
" # create all clusters + metrics for one time window\n",
" clusters = Cluster.create_multiple_from_time_window(time_window, feature_names)\n",
" complete_clusters.extend(clusters)\n",
" \n",
" # store the cluster metrics\n",
" with open(path_out, 'w') as file:\n",
" file.write(json.dumps([cl.__dict__ for cl in complete_clusters]))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"layers = [\n",
" ['CallTypeLayer', 'call_type'],\n",
" ['DayTypeLayer', 'day_type'],\n",
" ['TaxiIdLayer', 'taxi_id'],\n",
"\n",
" ['OriginCallLayer', ('call_type', 'origin_call')],\n",
" ['OriginStandLayer', ('call_type', 'origin_stand')],\n",
" ['StartLocationLayer', ('start_location_lat', 'start_location_long')],\n",
" ['EndLocationLayer', ('end_location_lat', 'end_location_long')],\n",
"]\n",
"\n",
"for layer in layers:\n",
" calculate_metrics_for_clusters(layer[0], layer[1])"
]
},
{
"source": [
"## Prepare cluster metrics and evolution labels for ML"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example how to convert time to a cyclic 2d feature\n",
"\n",
"MAX_TIME_VAL = 52 # for weeks\n",
"\n",
"import numpy as np\n",
"import matplotlib.pyplot as plt\n",
"\n",
"times = np.asarray([i+1 for i in range(52)][::])\n",
"\n",
"df = {}\n",
"df['sin_time'] = np.sin(2*np.pi*times/MAX_TIME_VAL)\n",
"df['cos_time'] = np.cos(2*np.pi*times/MAX_TIME_VAL)\n",
"\n",
"plt.plot(df['sin_time'])\n",
"plt.plot(df['cos_time'])\n",
"plt.show()\n",
"\n",
"plt.scatter(df['sin_time'], df['cos_time'])\n",
"plt.show()\n",
"\n",
"# feature_new = {i+1:(s,c) for i,(s,c) in enumerate(zip(df['sin_time'], df['cos_time']))}"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Example how to calculate convex hull from points\n",
"# Used to calculate 2d area for Scarcity metric\n",
"\n",
"from scipy.spatial import ConvexHull, convex_hull_plot_2d\n",
"import numpy as np\n",
"\n",
"points = np.asarray([[0.0,0.0], [1.0,3.0], [3.0,2.0], [0.0,2.0], [1.0,2.0], [2.0,2.0], [2.0,1.0]])\n",
"\n",
"def _get_polygon_border_points(points) -> 'np.array':\n",
" hull = ConvexHull(points)\n",
" return points[hull.vertices]\n",
"\n",
"res = _get_polygon_border_points(points)\n",
"\n",
"import matplotlib.pyplot as plt\n",
"\n",
"plt.plot(points[:,0], points[:,1], 'o')\n",
"plt.plot(res[:,0], res[:,1], 'o')\n",
"\n",
"plt.show()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"from entities import Cluster\n",
"import collections\n",
"import numpy as np\n",
"from typing import Iterable\n",
"\n",
"def get_evolution_label(old_size: int, new_size: int) -> int:\n",
" '''Returns the evolution label as int by mapping 0..4 to {continuing, shrinking, growing, dissolving, forming}.'''\n",
" if old_size == new_size:\n",
" return 0 # continuing\n",
" if old_size == 0 and new_size != 0:\n",
" return 4 # forming\n",
" if old_size != 0 and new_size == 0:\n",
" return 3 # dissolving\n",
" if old_size > new_size:\n",
" return 1 # shrinking\n",
" if old_size < new_size:\n",
" return 2 # growing\n",
"\n",
"def get_cyclic_time_feature(time: int, max_time_value: int = 52) -> (float, float):\n",
" return (np.sin(2*np.pi*time/max_time_value),\n",
" np.cos(2*np.pi*time/max_time_value))\n",
"\n",
"def create_metrics_training_data(N: int = 3, layer_name: str = 'CallTypeLayer') -> Iterable:\n",
" \"\"\"\n",
" Loads the metrics training data for an individual layer from disk.\n",
" A single metrics training data point should look like this:\n",
"\n",
" (cluster_size, cluster_std_dev, cluster_scarcity, cluster_import1, cluster_import2, time_info) ^ N, evolution_label\n",
" time_info ... the time as 2d cyclic feature, i.e. time_info := (time_f1, time_f2)\n",
"\n",
" The first tuple represents metrics from the cluster in t_i-(N-1).\n",
" The Nth tuple represents metrics from the cluster in t_i.\n",
" The label is one of {continuing, shrinking, growing, dissolving, forming} \\ {splitting, merging} and identifies the change for t_i+1.\n",
" \n",
" :param N: number of cluster metric tuples\n",
" :param layer_name: the name of the layer metrics json file\n",
" \"\"\"\n",
" \n",
" path_in = f\"input/metrics/{layer_name}.json\"\n",
" with open(path_in, 'r') as file:\n",
" data = [Cluster.create_from_dict(cl_d) for cl_d in json.loads(file.read())]\n",
"\n",
" data.sort(key=lambda cl: (cl.cluster_id, cl.time_window_id))\n",
"\n",
" # manually prepare deque with N metric_tuples + evolution label\n",
" tuples = []\n",
" prev_cluster_id = -1\n",
"\n",
" for i, cur_cluster in enumerate(data[:-1]):\n",
"\n",
" if cur_cluster.cluster_id != data[i+1].cluster_id:\n",
" # next cluster slice in list will be another cluster id -> restart deque and skip adding the current (last) cluster slice\n",
" tuples = []\n",
" continue\n",
"\n",
" cur_metrics = (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2, get_cyclic_time_feature(cur_cluster.get_time_info()))\n",
"\n",
" # deque function: adding N+1st element will remove oldest one\n",
" if len(tuples) == N:\n",
" tuples.pop(0)\n",
" tuples.append(cur_metrics)\n",
"\n",
" label = get_evolution_label(cur_cluster.size, data[i+1].size)\n",
"\n",
" if len(tuples) == N:\n",
" yield list(tuples) + [label]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def flatten_metrics_datapoint(datapoint: list) -> ('X', 'Y'):\n",
" '''\n",
" Flattens a single metrics data point in the form:\n",
" [(cluster_size, cluster_variance, cluster_density, cluster_import1, cluster_import2, (time_f1, time_f2))^N, evolution_label]\n",
" to:\n",
" (X: np.array, evolution_label)\n",
" '''\n",
" flat_list = []\n",
" for entry in datapoint[:-1]: # for all x\n",
" flat_list.extend(entry[:-1]) # add all number features except the time tuple\n",
" flat_list.extend(entry[-1]) # add time tuple\n",
"\n",
" # flat_list.append(datapoint[-1]) # add y\n",
"\n",
" return np.asarray(flat_list), datapoint[-1]\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"def convert_metrics_data_for_training(data: Iterable) -> ('nparray with Xs', 'nparray with Ys'):\n",
" '''Flattens and splits metrics data to match ML conventions.'''\n",
" X = []\n",
" Y = []\n",
"\n",
" for element in data:\n",
" x, y = flatten_metrics_datapoint(element)\n",
" \n",
" X.append(x)\n",
" Y.append(y)\n",
"\n",
" return (np.asarray(X), np.asarray(Y))"
]
},
{
"source": [
"## Evolution Prediction Approach\n",
"\n",
"### 1. Prediction of cluster evolution based on metrics from clusters in one layer\n",
"Use cluster metrics from last N time windows to predict the change in $t_{i+1}$.\n",
"Either use normal classification with $(cluster\\_metrics)^{N} \\cup (label)$ or choose a RNN.\n"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import pandas as pd\n",
"import collections\n",
"import statistics as stat\n",
"\n",
"def balance_dataset(X: np.array, Y: np.array, imbalance_threshold=.3) -> ('X: np.array', 'Y: np.array'):\n",
" '''Balances an unbalanced dataset by ignoring elements from the majority label, so that majority-label data size = median of other cluster sizes.'''\n",
" y = Y.tolist()\n",
" counter = collections.Counter(y)\n",
" print(f\"Label Occurrences: Total = {counter}\")\n",
"\n",
" # find key with max values\n",
" max_key = max(counter, key=lambda k: counter[k])\n",
" max_val = counter[max_key]\n",
"\n",
" unbalanced_labels = all([v < max_val * (1-imbalance_threshold) for k, v in counter.items() if k != max_key]) \n",
" if unbalanced_labels: # if all other labels are >=30% less frequent than max_key\n",
" median_rest = int(stat.median([v for k, v in counter.items() if k != max_key]))\n",
" print(f\"Labels are unbalanced, keeping {median_rest} for label {max_key}\")\n",
" \n",
" # merge X and Y\n",
" data = np.append(X, Y.reshape(Y.shape[0], 1), 1)\n",
" df = pd.DataFrame(data, columns=['_']*X.shape[1]+['label'])\n",
"\n",
" # take only median_rest for the max_key label\n",
" max_labeled_data = df.loc[df['label'] == max_key].sample(n=median_rest)\n",
" other_labeled_data = df.loc[df['label'] != max_key]\n",
" balanced_data = pd.concat([max_labeled_data, other_labeled_data])\n",
" balanced_data = balanced_data.sample(frac=1) # shuffle\n",
"\n",
" X = balanced_data.loc[:, balanced_data.columns != 'label'].to_numpy()\n",
" Y = balanced_data.loc[:, balanced_data.columns == 'label'].to_numpy()\n",
" Y = Y.reshape(Y.shape[0],).astype(int)\n",
" \n",
" return X, Y\n",
"\n",
"def get_training_data(layer_name='CallTypeLayer', test_dataset_frac=.2) -> '(X_train, Y_train, X_test, Y_test)':\n",
" # load metrics data from disk\n",
" data: Iterable = create_metrics_training_data(layer_name=layer_name)\n",
" \n",
" # convert to X and Y\n",
" X, Y = convert_metrics_data_for_training(data)\n",
" X, Y = balance_dataset(X, Y)\n",
" \n",
" # split in training and test set\n",
" test_size = int(X.shape[0] * test_dataset_frac) \n",
" X_train = X[test_size:]\n",
" Y_train = Y[test_size:]\n",
" X_test = X[:test_size]\n",
" Y_test = Y[:test_size]\n",
"\n",
" print(f\"\\nWorking with: {X_train.shape[0]} training points + {X_test.shape[0]} test points ({X_test.shape[0]/(X_train.shape[0]+X_test.shape[0])}).\")\n",
" print(f\"Label Occurrences: Total = {collections.Counter(Y_train.tolist() + Y_test.tolist())}, \"\\\n",
" f\"Training = {collections.Counter(Y_train)}, Test = {collections.Counter(Y_test)}\")\n",
" try:\n",
" print(f\"Label Majority Class: Training = {stat.mode(Y_train)}, Test = {stat.mode(Y_test)}\\n\")\n",
" except stat.StatisticsError:\n",
" print(f\"Label Majority Class: no unique mode; found 2 equally common values\")\n",
"\n",
" return X_train, Y_train, X_test, Y_test\n",
"\n",
"X_train, Y_train, X_test, Y_test = get_training_data('OriginCallLayer')"
]
},
{
"source": [
"## SVM classifier"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# train\n",
"from sklearn import svm\n",
"\n",
"svc = svm.SVC(kernel='linear')\n",
"svc.fit(X_train, Y_train)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# export\n",
"import pickle \n",
"\n",
"with open('output/svc.model', 'wb') as file:\n",
" b = pickle.dump(svc, file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# import for verification\n",
"FILE_NAME = 'StartLocationLayer'\n",
"\n",
"X_train, Y_train, X_test, Y_test = get_training_data(FILE_NAME)\n",
"\n",
"import pickle \n",
"with open(f'output/cluster_metrics/{FILE_NAME}.model', 'rb') as file:\n",
" svc = pickle.load(file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# verify\n",
"import sklearn\n",
"\n",
"pred_Y = svc.predict(X_test)\n",
"\n",
"print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))"
]
},
{
"source": [
"## Naive classifiers"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sklearn\n",
"import statistics as stat\n",
"import random\n",
"\n",
"def show_majority_class_prediction():\n",
" print(\"### Majority Class Prediction: ###\")\n",
"\n",
" majority_class = stat.mode(Y_train)\n",
" try:\n",
" print(f\"Training majority class = {stat.mode(Y_train)}, Test majority class = {stat.mode(Y_test)}\") \n",
" except stat.StatisticsError:\n",
" print(f\"Label Majority Class: no unique mode; found 2 equally common values\")\n",
"\n",
" pred_Y = len(Y_test) * [majority_class]\n",
" print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))\n",
"\n",
" \n",
"def show_random_prediction():\n",
" print(\"### Random Class Prediction: ###\")\n",
"\n",
" classes = list(set(Y_train))\n",
" print(f\"Classes: {classes}\")\n",
"\n",
" pred_Y = random.choices(classes, k=len(Y_test))\n",
" print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))\n",
"\n",
"\n",
"show_majority_class_prediction()\n",
"show_random_prediction()"
]
},
{
"source": [
"# Model Training 2"
],
"cell_type": "markdown",
"metadata": {}
},
{
"source": [
"## Define features per layer\n",
"- relative cluster sizes (list of all cluster sizes as $\\frac{cluster\\ size}{layer\\ size}$)\n",
"- entropy of the layer calculated over all clusters $C$, where $P(c_i)$ is the probability that a node belongs to cluster $c_i$ (e.g. using the relative sizes as $P(c_i)$).\n",
"- euclidean distance from the global cluster center to the cluster center in $t_i$\n",
"\n",
"## Calculate the Metrics for the Layers"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from typing import List, Tuple\n",
"import statistics as stat\n",
"import json\n",
"import os\n",
"from entities import TimeWindow, Layer\n",
"\n",
"def calculate_center(label: str) -> Tuple[float]:\n",
" if '--' in label:\n",
" return (stat.mean([float(e) for e in label.split('--')]), 0)\n",
" else:\n",
" return [float(e) for e in label.replace('(', '').replace(')', '').split(',')]\n",
"\n",
"def calculate_metrics_for_layers(layer_name: str = 'CallTypeLayer', feature_names: List[str] = ['call_type']):\n",
" print(f\"Working on {layer_name}\")\n",
"\n",
" # load global cluster centers\n",
" path_in = f'input/clusters/{layer_name}.json'\n",
" with open(path_in, 'r') as file:\n",
" clusters = json.loads(file.read())\n",
" cluster_centers: Dict[str, Tuple[float]] = {str(cluster['cluster_label']): calculate_center(cluster['label']) for cluster in clusters if cluster['label'] != 'noise'}\n",
"\n",
" # load time windows \n",
" all_layers: List[Layer] = []\n",
" path_in = f'input/timeslices/{layer_name}'\n",
" for root, _, files in os.walk(path_in):\n",
" for f in files:\n",
" with open(os.path.join(root, f), 'r') as file:\n",
" json_time_slice = json.loads(file.read())\n",
" time_window = TimeWindow.create_from_serializable_dict(json_time_slice)\n",
"\n",
" layer = Layer.create_from_time_window(time_window, feature_names, cluster_centers)\n",
" all_layers.append(layer)\n",
" \n",
" # store the layer metrics\n",
" path_out = f'input/layer_metrics/{layer_name}.json'\n",
" with open(path_out, 'w') as file:\n",
" file.write(json.dumps([l.__dict__ for l in all_layers]))"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"layers = [\n",
" ['CallTypeLayer', ['call_type']],\n",
" ['DayTypeLayer', ['day_type']],\n",
" ['TaxiIdLayer', ['taxi_id']],\n",
"\n",
" ['OriginCallLayer', ('call_type', 'origin_call')],\n",
" ['OriginStandLayer', ('call_type', 'origin_stand')],\n",
" ['StartLocationLayer', ('start_location_lat', 'start_location_long')],\n",
" ['EndLocationLayer', ('end_location_lat', 'end_location_long')],\n",
"]\n",
"\n",
"for layer in layers:\n",
" calculate_metrics_for_layers(layer[0], layer[1])"
]
},
{
"source": [
"## Prepare layer metrics and evolution labels for ML"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TODO remove dup code\n",
"\n",
"import numpy as np\n",
"\n",
"def get_evolution_label(old_size: int, new_size: int) -> int:\n",
" '''Returns the evolution label as int by mapping 0..4 to {continuing, shrinking, growing, dissolving, forming}.'''\n",
" if old_size == new_size:\n",
" return 0 # continuing\n",
" if old_size == 0 and new_size != 0:\n",
" return 4 # forming\n",
" if old_size != 0 and new_size == 0:\n",
" return 3 # dissolving\n",
" if old_size > new_size:\n",
" return 1 # shrinking\n",
" if old_size < new_size:\n",
" return 2 # growing\n",
"\n",
"def get_cyclic_time_feature(time: int, max_time_value: int = 52) -> (float, float):\n",
" return (np.sin(2*np.pi*time/max_time_value),\n",
" np.cos(2*np.pi*time/max_time_value))\n",
"\n",
"def get_cyclic_time_feature_from_time_window(time: str) -> (float, float):\n",
" return get_cyclic_time_feature(int(time.replace('(', '').replace(')', '').split(',')[1]))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"from typing import Iterable, List, Dict, Any\n",
"import json\n",
"from entities import Layer, Cluster\n",
"\n",
"def create_metrics_training_data(N: int = 2, layer_name: str = 'CallTypeLayer', reference_layer: str = 'CallTypeLayer') -> Iterable:\n",
" \"\"\"\n",
" Loads the metrics training data for an individual layer from disk.\n",
" A single metrics training data point should look like this:\n",
"\n",
" [((relative_cluster_size) ^ M, entropy, (distance_from_global_center) ^ M, (time1, time2)) ^ N, cluster_number, evolution_label]\n",
"\n",
" The first tuple represents metrics from the reference layer in t_i-(N-1).\n",
" The Nth tuple represents metrics from the reference layer in t_i.\n",
" The reference_layer has M clusters in total, this might differ from the number of clusters in layer_name.\n",
" The cluster number identifies the cluster for which the evolution_label holds. \n",
" The label is one of {continuing, shrinking, growing, dissolving, forming} \\ {splitting, merging} and identifies the change for a cluster in the layer layer_name for t_i.\n",
" \n",
" # TODO N is not implemented and fixed to 2\n",
" \"\"\"\n",
" \n",
" with open(f'input/metrics/{layer_name}.json') as file:\n",
" cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]\n",
" cluster_ids = {c.cluster_id for c in cluster_metrics}\n",
" cluster_metrics: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}\n",
" \n",
" with open(f'input/layer_metrics/{reference_layer}.json') as file:\n",
" layer_metrics: List[Layer] = [Layer.create_from_dict(e) for e in json.loads(file.read())]\n",
" layer_metrics: Dict[Any, Layer] = {l.time_window_id: l for l in layer_metrics}\n",
"\n",
" # load the time keys chronologically\n",
" ordered_time_keys = list(layer_metrics.keys())\n",
" ordered_time_keys.sort(key=lambda x: [int(v) for v in x.replace('(', '').replace(')', '').split(',')])\n",
" \n",
" # go through all time windows once...\n",
" prev_time_key = ordered_time_keys[0]\n",
" for current_time_key in ordered_time_keys[1:]:\n",
" # ...and load the current and previous layer metrics in the reference_layer\n",
" current_layer_metric = layer_metrics[current_time_key]\n",
" prev_layer_metric = layer_metrics[prev_time_key]\n",
" current_layer_metric_tuple = (current_layer_metric.relative_cluster_sizes, current_layer_metric.entropy, current_layer_metric.distances_from_global_centers, get_cyclic_time_feature_from_time_window(current_layer_metric.time_window_id))\n",
" prev_layer_metric_tuple = (prev_layer_metric.relative_cluster_sizes, prev_layer_metric.entropy, prev_layer_metric.distances_from_global_centers, get_cyclic_time_feature_from_time_window(prev_layer_metric.time_window_id))\n",
"\n",
" # ...then load the current and previous cluster metrics for all clusters in the layer_name\n",
" for cluster_id in cluster_ids:\n",
" current_cluster_metric = cluster_metrics[(current_time_key, cluster_id)]\n",
" prev_cluster_metric = cluster_metrics[(prev_time_key, cluster_id)]\n",
" evolution_label = get_evolution_label(prev_cluster_metric.size, current_cluster_metric.size)\n",
"\n",
" # yield each combination of reference layer metrics to clusters\n",
" yield [prev_layer_metric_tuple, current_layer_metric_tuple, int(cluster_id), evolution_label]\n",
"\n",
" prev_time_key = current_time_key"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def flatten_metrics_datapoint(datapoint: list) -> ('X', 'Y'):\n",
" '''\n",
" Flattens a single layer metrics data point in the form:\n",
" [((relative_cluster_size) ^ M, entropy, (distance_from_global_center) ^ M, (time1, time2)) ^ N, cluster_number, evolution_label]\n",
" to:\n",
" (X: np.array, evolution_label)\n",
" '''\n",
" flat_list = []\n",
" for layer_metric_tuple in datapoint[:-2]:\n",
" flat_list.extend(layer_metric_tuple[0]) # sizes\n",
" flat_list.append(layer_metric_tuple[1]) # entropy\n",
" flat_list.extend(layer_metric_tuple[2]) # distances\n",
" flat_list.extend(layer_metric_tuple[3]) # time1/2\n",
"\n",
" flat_list.append(datapoint[-2]) # cluster num\n",
"\n",
" return np.asarray(flat_list), datapoint[-1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TODO remove dup code\n",
"def convert_metrics_data_for_training(data: Iterable) -> ('nparray with Xs', 'nparray with Ys'):\n",
" '''Flattens and splits metrics data to match ML conventions.'''\n",
" X = []\n",
" Y = []\n",
"\n",
" for element in data:\n",
" x, y = flatten_metrics_datapoint(element)\n",
" \n",
" X.append(x)\n",
" Y.append(y)\n",
"\n",
" return (np.asarray(X), np.asarray(Y))"
]
},
{
"source": [
"\n",
"### 2. Prediction of cluster evolution based on metrics from other layers, called reference layers\n",
"Use reference layer metrics from last N (=2) time windows to predict the change in $t_i$ for each main-layer cluster.\n",
"Either use normal classification with $(layer\\_metrics)^{N} \\cup (cluster\\_num) \\cup (label)$ or choose a RNN."
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# TODO remove dup code\n",
"\n",
"import numpy as np\n",
"import pandas as pd\n",
"import collections\n",
"import statistics as stat\n",
"\n",
"def balance_dataset(X: np.array, Y: np.array, imbalance_threshold=.3) -> ('X: np.array', 'Y: np.array'):\n",
" '''Balances an unbalanced dataset by ignoring elements from the majority label, so that majority-label data size = median of other cluster sizes.'''\n",
" y = Y.tolist()\n",
" counter = collections.Counter(y)\n",
" print(f\"Label Occurrences: Total = {counter}\")\n",
"\n",
" # find key with max values\n",
" max_key = max(counter, key=lambda k: counter[k])\n",
" max_val = counter[max_key]\n",
"\n",
" unbalanced_labels = all([v < max_val * (1-imbalance_threshold) for k, v in counter.items() if k != max_key]) \n",
" if unbalanced_labels: # if all other labels are >=30% less frequent than max_key\n",
" median_rest = int(stat.median([v for k, v in counter.items() if k != max_key]))\n",
" print(f\"Labels are unbalanced, keeping {median_rest} for label {max_key}\")\n",
" \n",
" # merge X and Y\n",
" data = np.append(X, Y.reshape(Y.shape[0], 1), 1)\n",
" df = pd.DataFrame(data, columns=['_']*X.shape[1]+['label'])\n",
"\n",
" # take only median_rest for the max_key label\n",
" max_labeled_data = df.loc[df['label'] == max_key].sample(n=median_rest)\n",
" other_labeled_data = df.loc[df['label'] != max_key]\n",
" balanced_data = pd.concat([max_labeled_data, other_labeled_data])\n",
" balanced_data = balanced_data.sample(frac=1) # shuffle\n",
"\n",
" X = balanced_data.loc[:, balanced_data.columns != 'label'].to_numpy()\n",
" Y = balanced_data.loc[:, balanced_data.columns == 'label'].to_numpy()\n",
" Y = Y.reshape(Y.shape[0],).astype(int)\n",
" \n",
" return X, Y\n",
"\n",
"def get_training_data(layer_name='CallTypeLayer', reference_layer_name='CallTypeLayer', test_dataset_frac=.2) -> '(X_train, Y_train, X_test, Y_test)':\n",
" # load metrics data from disk\n",
" data: Iterable = create_metrics_training_data(layer_name=layer_name, reference_layer=reference_layer_name)\n",
" \n",
" # convert to X and Y\n",
" X, Y = convert_metrics_data_for_training(data)\n",
" X, Y = balance_dataset(X, Y)\n",
" \n",
" # split in training and test set\n",
" test_size = int(X.shape[0] * test_dataset_frac) \n",
" X_train = X[test_size:]\n",
" Y_train = Y[test_size:]\n",
" X_test = X[:test_size]\n",
" Y_test = Y[:test_size]\n",
"\n",
" print(f\"\\nWorking with: {X_train.shape[0]} training points + {X_test.shape[0]} test points ({X_test.shape[0]/(X_train.shape[0]+X_test.shape[0])}).\")\n",
" print(f\"Label Occurrences: Total = {collections.Counter(Y_train.tolist() + Y_test.tolist())}, \"\\\n",
" f\"Training = {collections.Counter(Y_train)}, Test = {collections.Counter(Y_test)}\")\n",
" try:\n",
" print(f\"Label Majority Class: Training = {stat.mode(Y_train)}, Test = {stat.mode(Y_test)}\\n\")\n",
" except stat.StatisticsError:\n",
" print(f\"Label Majority Class: no unique mode; found 2 equally common values\")\n",
"\n",
" return X_train, Y_train, X_test, Y_test\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"tags": []
},
"outputs": [],
"source": [
"X_train, Y_train, X_test, Y_test = get_training_data(layer_name='OriginCallLayer', reference_layer_name='CallTypeLayer')"
]
},
{
"source": [
"## Random Forest Classifier"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"MAX_DEPTH = 10\n",
"\n",
"from sklearn.ensemble import RandomForestClassifier\n",
"classifier = RandomForestClassifier(max_depth=MAX_DEPTH)\n",
"classifier.fit(X_train, Y_train)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# import for verification\n",
"MAX_DEPTH = 15\n",
"LAYER_NAME = 'CallTypeLayer'\n",
"REF_LAYER_NAME = 'DayTypeLayer'\n",
"\n",
"X_train, Y_train, X_test, Y_test = get_training_data(layer_name=LAYER_NAME, reference_layer_name=REF_LAYER_NAME)\n",
"\n",
"import pickle \n",
"with open(f'output/layer_metrics/{MAX_DEPTH}/{LAYER_NAME}_{REF_LAYER_NAME}.model', 'rb') as file:\n",
" classifier = pickle.load(file)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# verify\n",
"import sklearn\n",
"\n",
"pred_Y = classifier.predict(X_test)\n",
"\n",
"print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))"
]
},
{
"source": [
"## Naive Classifiers"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import sklearn\n",
"import statistics as stat\n",
"import random\n",
"\n",
"def show_majority_class_prediction():\n",
" print(\"### Majority Class Prediction: ###\")\n",
"\n",
" majority_class = stat.mode(Y_train)\n",
" try:\n",
" print(f\"Training majority class = {stat.mode(Y_train)}, Test majority class = {stat.mode(Y_test)}\") \n",
" except stat.StatisticsError:\n",
" print(f\"Label Majority Class: no unique mode; found 2 equally common values\")\n",
"\n",
" pred_Y = len(Y_test) * [majority_class]\n",
" print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))\n",
"\n",
" \n",
"def show_random_prediction():\n",
" print(\"### Random Class Prediction: ###\")\n",
"\n",
" classes = list(set(Y_train))\n",
" print(f\"Classes: {classes}\")\n",
"\n",
" pred_Y = random.choices(classes, k=len(Y_test))\n",
" print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))\n",
"\n",
"\n",
"show_majority_class_prediction()\n",
"show_random_prediction()"
]
},
{
"source": [
"# Debug"
],
"cell_type": "markdown",
"metadata": {}
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"from typing import List, Dict, Any\n",
"import json\n",
"from entities import Cluster\n",
"\n",
"with open(f'input/metrics/OriginCallLayer.json') as file:\n",
" cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]\n",
" cluster_ids = {c.cluster_id for c in cluster_metrics}\n",
" cluster_metrics2: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cluster_ids = {}\n",
"for c in cluster_metrics:\n",
" if c.time_window_id not in cluster_ids:\n",
" cluster_ids[c.time_window_id] = []\n",
" \n",
" cluster_ids[c.time_window_id].append(c.cluster_id)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for key, value in cluster_ids.items():\n",
" print(cluster_metrics2[(key, '908')].size)\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"with open(f'input/timeslices/OriginCallLayer/2013_50.json') as file:\n",
" window = json.loads(file.read())\n",
"'908' in window['clusters'].keys()"
]
}
]
}
\ No newline at end of file
# models trained by the `train.sh` and `train.py` scripts
/cluster_metrics/**/*.model
# models trained by the `train_layer.sh` and `train_layer.py` scripts
/layer_metrics/**/*.model
import warnings
from abc import ABC, abstractmethod
from typing import Dict, List, Any, Tuple
import numpy as np
from scipy.spatial import ConvexHull, qhull, distance
from math import sqrt
from statistics import mean
warnings.simplefilter(action='ignore', category=UserWarning)
# UserWarning: geopandas not available. Some functionality will be disabled.
from pointpats.centrography import std_distance
warnings.simplefilter(action='default', category=UserWarning)
class ClusterMetricsCalculator(ABC):
def __init__(self, cluster_nodes: List[dict], nr_layer_nodes: int, layer_diversity: int):
self.cluster_nodes = cluster_nodes
self.nr_layer_nodes = nr_layer_nodes
self.layer_diversity = layer_diversity
def get_size(self) -> int:
'''Returns the size of the cluster.'''
return len(self.cluster_nodes)
@abstractmethod
def get_standard_deviation(self) -> float:
'''Returns the std dev from the center of the distribution.'''
pass
@abstractmethod
def get_scarcity(self) -> float:
'''
Returns the scarcity of the data points regarding the complete range for possible points.
High scarcity indicates low density.
'''
pass
def get_importance1(self) -> float:
'''Returns the ratio of cluster_nodes to layer_nodes.'''
return float(len(self.cluster_nodes)) / self.nr_layer_nodes if len(self.cluster_nodes) > 0 else 0
def get_importance2(self) -> float:
'''Returns the inverse of the layer_diversity, where layer_diversity = number of clusters with #nodes > 0.'''
return 1.0 / self.layer_diversity if len(self.cluster_nodes) > 0 else 0
def _convert_feature_to_float(self, feature_value) -> float:
return float(feature_value if feature_value is not "" else 0)
class ClusterMetricsCalculator1D(ClusterMetricsCalculator):
'''Metrics calculator for clusters which were clustered based on 1 feature (1d clustering).'''
def __init__(self, cluster_nodes: List[dict], cluster_feature_name: str, nr_layer_nodes: int, layer_diversity: int):
super().__init__(cluster_nodes, nr_layer_nodes, layer_diversity)
self.feature_values: List[Any] = [self._convert_feature_to_float(node[cluster_feature_name])
for node in cluster_nodes]
def get_standard_deviation(self):
return np.std(self.feature_values) if len(self.feature_values) > 0 else 0
def get_scarcity(self):
'''Returns the scarcity as cluster_range / cluster_size, or 0 if len(nodes)=0.'''
if len(self.feature_values) == 0:
return 0
range_ = max(self.feature_values) - min(self.feature_values)
return float(range_) / self.get_size()
class ClusterMetricsCalculator2D(ClusterMetricsCalculator):
'''Metrics calculator for clusters which were clustered based on 2 features (2d clustering).'''
def __init__(self, cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int):
assert len(cluster_feature_names) == 2, "This class is for 2d cluster results only!"
super().__init__(cluster_nodes, nr_layer_nodes, layer_diversity)
self.feature_values: List[Tuple[Any]] = [
(self._convert_feature_to_float(node[cluster_feature_names[0]]), self._convert_feature_to_float(node[cluster_feature_names[1]]))
for node in cluster_nodes
]
def get_standard_deviation(self):
if len(self.feature_values) == 0:
return 0
warnings.simplefilter(action='ignore', category=RuntimeWarning)
std_dist = std_distance(self.feature_values)
warnings.simplefilter(action='default', category=RuntimeWarning)
if np.isnan(std_dist):
return 0 # somehow std_dist=nan if all feature values are same with many decimals
return std_dist
def get_scarcity(self):
'''Returns the scarcity as cluster_range / cluster_size, or 0 if len(nodes)=0.'''
if len(self.feature_values) == 0:
return 0
if len(self.feature_values) == 1:
# exactly 1 element gives inf density
return 0
if len(self.feature_values) == 2:
# cannot calculate area with 2 points - just use 2d distance as range instead
range_ = distance.euclidean(self.feature_values[0], self.feature_values[1])
return float(range_) / self.get_size()
try:
# calculate range as 2d area
points = self._get_polygon_border_points(self.feature_values)
range_ = self._calc_polygon_area(points)
# use sqrt to compare with 1d scarcity
return sqrt(float(range_) / self.get_size())
except qhull.QhullError as err:
# possible reasons that there is no hull with real area:
# 1. all points are at the same location
# 2. all points have the same x or y coordinates (lie on one hori/vert line)
points = np.asarray(self.feature_values)
same_x = len(set(points[:,0])) == 1
if same_x:
# use only y feature
features = points[:,1]
range_ = max(features) - min(features)
return float(range_) / self.get_size()
same_y = len(set(points[:,1])) == 1
if same_y:
# use only x feature
features = points[:,0]
range_ = max(features) - min(features)
return float(range_) / self.get_size()
print("Scarcity calc did not work with 1d feature")
return 0
def _get_polygon_border_points(self, points: List[List[float]]) -> 'np.array':
points = np.asarray(points)
hull = ConvexHull(points)
return points[hull.vertices]
def _calc_polygon_area(self, border_points: 'np.array') -> float:
x: 'np.array' = border_points[:,0]
y: 'np.array' = border_points[:,1]
# https://en.wikipedia.org/wiki/Shoelace_formula
area = 0.5 * np.abs(np.dot(x, np.roll(y,1)) - np.dot(y, np.roll(x,1)))
return float(area)
class ClusterMetricsCalculatorFactory:
@staticmethod
def create_metrics_calculator(cluster_nodes: List[dict], cluster_feature_names: List[str], nr_layer_nodes: int, layer_diversity: int) -> ClusterMetricsCalculator:
"""
This factory creates a class which contains metrics about a single cluster based on
its nodes, feature values, its layer total node number and its layer diversity.
:param cluster_nodes: all nodes from the cluster
:param cluster_feature_names: all field names which where used during clustering
:param nr_layer_nodes: the number of total layer nodes
:param layer_diversity: the diversity of the layer calculated as: number of clusters with nodes > 0
"""
if isinstance(cluster_feature_names, str):
return ClusterMetricsCalculator1D(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
if len(cluster_feature_names) == 1:
return ClusterMetricsCalculator1D(cluster_nodes, cluster_feature_names[0], nr_layer_nodes, layer_diversity)
if len(cluster_feature_names) == 2:
return ClusterMetricsCalculator2D(cluster_nodes, cluster_feature_names, nr_layer_nodes, layer_diversity)
from processing.ClusterMetricsCalculator import ClusterMetricsCalculator, ClusterMetricsCalculator1D, ClusterMetricsCalculator2D, ClusterMetricsCalculatorFactory
\ No newline at end of file
backcall==0.2.0
beautifulsoup4==4.9.3
branca==0.4.2
certifi==2020.12.5
chardet==4.0.0
colorama==0.4.4
cycler==0.10.0
cython==0.28.5
decorator==4.4.2
folium==0.11.0
icecream
idna==2.10
# ipykernel==5.4.2
# ipython==7.19.0
# ipython-genutils==0.2.0
jedi==0.18.0
Jinja2==2.11.2
joblib==1.0.0
jupyter-client==6.1.7
jupyter-core==4.7.0
kiwisolver==1.3.1
libpysal==4.3.0
MarkupSafe==1.1.1
matplotlib==3.2.0
numpy==1.19.3
opencv-contrib-python==4.5.1.48
pandas
parso==0.8.1
pickleshare==0.7.5
Pillow==8.1.0
pointpats==2.2.0
prompt-toolkit==3.0.8
Pygments==2.7.3
pyparsing==2.4.7
python-dateutil==2.8.1
pytz==2020.5
# pywin32==300
pyzmq==20.0.0
requests==2.25.1
scikit-build
scikit-learn==0.24.0
scipy
six==1.15.0
sklearn==0.0
soupsieve==2.1
threadpoolctl==2.1.0
tornado==6.1
# traitlets==5.0.5
urllib3==1.26.2
wcwidth==0.2.5
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing import ClusterMetricsCalculator2D
class TestClusterMetricsCalculator(unittest.TestCase):
def test__get_standard_deviation__same_points_many_decimals__zero_and_not_nan(self):
nodes = [{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567},
{'f1': -8.58564, 'f2': 41.148567}]
calc = ClusterMetricsCalculator2D(nodes, ['f1','f2'], len(nodes), 1)
self.assertAlmostEqual(0, calc.get_standard_deviation())
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from entities import Cluster, TimeWindow
from typing import Any, Tuple
from datetime import date, datetime
import json
from math import sqrt
import statistics as stat
class TestCluster(unittest.TestCase):
def test__init__single_cluster__all_values_set(self):
tw = self._get_timewindow_single_cluster_same_feature()
c = Cluster("time_abc", "clusterId 1", list(tw.clusters.values())[0], "feature", nr_layer_nodes=3, layer_diversity=1)
self.assertEqual("time_abc", c.time_window_id)
self.assertEqual("clusterId 1", c.cluster_id)
self.assert_cluster((3, 0, 0, 1, 1), c)
def test__create_multiple_from_time_window__single_cluster__all_values_set(self):
tw = self._get_timewindow_single_cluster_same_feature()
clusters = list(Cluster.create_multiple_from_time_window(tw, "feature"))
self.assertEqual(1, len(clusters))
c = clusters[0]
self.assertEqual("KW1", c.time_window_id)
self.assertEqual("1", c.cluster_id)
self.assert_cluster((3, 0, 0, 1, 1), c)
def test__create_multiple_from_time_window__two_clusters__correct_time_id_cluster_id(self):
tw = self._get_timewindow_two_clusters_same_feature()
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
expected = [("KW1", "1"), ("KW1", "2")]
for c, exp in zip(clusters, expected):
self.assertEqual(exp[0], c.time_window_id)
self.assertEqual(exp[1], c.cluster_id)
def test__create_multiple_from_time_window__two_clusters_same_features__correct_calculation(self):
tw = self._get_timewindow_two_clusters_same_feature()
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
expected = [(3, 0, 0, 3/5, 1/2), (2, 0, 0, 2/5, 1/2)]
for c, exp in zip(clusters, expected):
self.assert_cluster(exp, c)
def test__create_multiple_from_time_window__two_clusters_same_features_and_feature_names_list__correct_calculation(self):
tw = self._get_timewindow_two_clusters_same_feature()
clusters = Cluster.create_multiple_from_time_window(tw, ["feature"])
expected = [(3, 0, 0, 3/5, 1/2), (2, 0, 0, 2/5, 1/2)]
for c, exp in zip(clusters, expected):
self.assert_cluster(exp, c)
def test__create_multiple_from_time_window__two_clusters_different_features__correct_calculation(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":2})
tw.add_node_to_cluster("1", {"feature":3})
tw.add_node_to_cluster("2", {"feature":70})
tw.add_node_to_cluster("2", {"feature":75})
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
# variance for stddev calculated with: http://www.alcula.com/calculators/statistics/variance/
expected = [(3, sqrt(2.0/3), 2.0/3, 3/5, 1/2), (2, sqrt(6.25), 5.0/2, 2/5, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__empty_cluster__all_zero_for_empty_cluster(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":2})
tw.add_node_to_cluster("1", {"feature":3})
tw.add_node_to_cluster("2", {"feature":70})
tw.add_node_to_cluster("2", {"feature":75})
tw.clusters["3"] = []
clusters = Cluster.create_multiple_from_time_window(tw, "feature")
expected = [(3, sqrt(2.0/3), 2.0/3, 3/5, 1/2), # diversity is still 2 as len=0 is ignored
(2, sqrt(6.25), 5.0/2, 2/5, 1/2),
(0, 0, 0, 0, 0)] # len 0 -> everything 0
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering_single_feature_value__no_stddev_no_scarcity(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
expected = [(3, 0, 0, 3/5, 1/2), (2, 0, 0, 2/5, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering__correct_stddev_and_scarcity(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":2, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":3})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
tw.add_node_to_cluster("2", {"f1":72, "f2":75})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
# stddev calculated manually as in: https://glenbambrick.com/tag/standard-distance/
# area of the polygon calculated with: https://www.mathopenref.com/coordpolygonareacalc.html
expected = [(3, sqrt(2/9+8/9), sqrt(1/3), 3/5, 1/2), (2, sqrt(7.25), sqrt(2*2+5*5)/2, 2/5, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering_complex__correct_stddev_and_scarcity(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":0, "f2":0})
tw.add_node_to_cluster("1", {"f1":1, "f2":3})
tw.add_node_to_cluster("1", {"f1":3, "f2":2})
tw.add_node_to_cluster("1", {"f1":0, "f2":2})
tw.add_node_to_cluster("1", {"f1":1, "f2":2}) # inside the convex hull
tw.add_node_to_cluster("1", {"f1":2, "f2":2}) # inside the convex hull
tw.add_node_to_cluster("1", {"f1":2, "f2":1})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
# stddev calculated manually as in: https://glenbambrick.com/tag/standard-distance/
X = [0,1,3,0,1,2,2]
Y = [0,3,2,2,2,2,1]
x_mean = stat.mean(X)
y_mean = stat.mean(Y)
sum_x = 0
for x in X:
sum_x += (x - x_mean)**2
sum_y = 0
for y in Y:
sum_y += (y - y_mean)**2
sd = sqrt(sum_x/7 + sum_y/7)
# area of the polygon calculated with: https://www.mathopenref.com/coordpolygonareacalc.html
area = 5
scarcity = sqrt(area / 7)
expected = [[7, sd, scarcity, 1, 1]]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
def test__create_multiple_from_time_window__2d_clustering_1d_single_feature_value__correct_calculation(self):
tw = TimeWindow("CW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"f1":1, "f2":1})
tw.add_node_to_cluster("1", {"f1":1, "f2":2})
tw.add_node_to_cluster("1", {"f1":1, "f2":3})
tw.add_node_to_cluster("2", {"f1":70, "f2":70})
tw.add_node_to_cluster("2", {"f1":75, "f2":70})
tw.add_node_to_cluster("2", {"f1":72, "f2":70})
tw.add_node_to_cluster("2", {"f1":71, "f2":70})
clusters = Cluster.create_multiple_from_time_window(tw, ["f1", "f2"])
# variance/stddev calculated as for 1d cluster (as f1/f2 is always the same)
# scarcity calculated as for 1d cluster
expected = [(3, sqrt(2/3), 2/3, 3/7, 1/2),
(4, sqrt(3.5), 5/4, 4/7, 1/2)]
for cluster, exp in zip(clusters, expected):
self.assert_cluster(exp, cluster)
#region setup methods
def _get_timewindow_single_cluster_same_feature(self) -> TimeWindow:
'''Returns a TimeWindow with time=KW1 and three nodes in cluster 1, all feature values = 1.'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
return tw
def _get_timewindow_two_clusters_same_feature(self) -> TimeWindow:
'''
Returns a TimeWindow with time=KW1 and:
Three nodes in cluster 1, all feature values = 1.
Two nodes in cluster 2, all feature values = 2.
'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("2", {"feature":2})
tw.add_node_to_cluster("2", {"feature":2})
return tw
#endregion setup methods
#region custom asserts
def assert_cluster(self, expected_values: Tuple[Any], cluster: Cluster):
"""
Checks if the cluster values equal the expected_values.
:param expected_values: A tuple (exp_size, exp_stddev, exp_scarcity, exp_import1, exp_import2)
"""
self.assertEqual(expected_values[0], cluster.size)
self.assertAlmostEqual(expected_values[1], cluster.std_dev)
self.assertAlmostEqual(expected_values[2], cluster.scarcity)
self.assertAlmostEqual(expected_values[3], cluster.importance1)
self.assertAlmostEqual(expected_values[4], cluster.importance2)
#endregion custom asserts
if __name__ == '__main__':
unittest.main()
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from entities import Layer, TimeWindow
from entities.layer import InternalCluster
from typing import Any, Tuple, List
from datetime import date, datetime
import json
from math import sqrt
import statistics as stat
class TestInternalCluster(unittest.TestCase):
def test__init__1d_features__all_values_set(self):
cluster_nodes = [{"feature":1}, {"feature":1}, {"feature":1}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(1.5,0))
self.assert_internal_cluster(c, '123', 3, .5)
def test__init__2d_features__all_values_set(self):
cluster_nodes = [{"feature1":1,'feature2':1}, {"feature1":1,'feature2':1}, {"feature1":1,'feature2':1}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature1", 'feature2'], global_cluster_center=(1.5,1.5))
# distance: https://www.calculatorsoup.com/calculators/geometry-plane/distance-two-points.php
self.assert_internal_cluster(c, '123', 3, sqrt(.5))
def test__get_current_cluster_center__1d(self):
cluster_nodes = [{"feature":1}, {"feature":2}, {"feature":3}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(2, 0))
self.assert_internal_cluster(c, '123', 3, 0)
def test__get_current_cluster_center__1d_weighted_result(self):
cluster_nodes = [{"feature":1}, {"feature":1}, {"feature":3}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(5/3, 0))
self.assert_internal_cluster(c, '123', 3, 0)
def test__get_current_cluster_center__2d_weighted_result(self):
cluster_nodes = [{"feature1":1,"feature2":1},
{"feature1":1,"feature2":1},
{"feature1":2,"feature2":2},
{"feature1":3,"feature2":1}]
c = InternalCluster("123", cluster_nodes, feature_names=["feature1", 'feature2'], global_cluster_center=(1.75, 1.25))
self.assert_internal_cluster(c, '123', 4, 0)
def assert_internal_cluster(self, actual_cluster: InternalCluster, expected_id, expected_size, expected_distance):
self.assertEqual(expected_id, actual_cluster.cluster_id)
self.assertEqual(expected_size, actual_cluster.size)
self.assertAlmostEqual(expected_distance, actual_cluster.global_center_distance)
class TestLayer(unittest.TestCase):
def test__init__1d_single_cluster(self):
cluster_nodes = list(self._get_timewindow_single_cluster_1d_same_feature().clusters.values())[0]
c = InternalCluster("123", cluster_nodes, feature_names=["feature"], global_cluster_center=(1,0))
l = Layer('123', [c])
self.assert_layer(l, [1], 0, [0])
def test__create_from_time_window__1d_single_cluster(self):
tw = self._get_timewindow_single_cluster_1d_same_feature()
l = Layer.create_from_time_window(tw, feature_names=['feature'], global_cluster_centers={'1': (1,0)})
self.assert_layer(l, [1], 0, [0])
def test__create_from_time_window__2d_single_cluster(self):
tw = self._get_timewindow_single_cluster_2d_same_feature()
l = Layer.create_from_time_window(tw, feature_names=['feature1', 'feature2'], global_cluster_centers={'1': (1,1)})
self.assert_layer(l, [1], 0, [0])
def test__create_from_time_window__1d_two_clusters(self):
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature1":1})
tw.add_node_to_cluster("1", {"feature1":1})
tw.add_node_to_cluster("2", {"feature1":5})
tw.add_node_to_cluster("2", {"feature1":5})
tw.add_node_to_cluster("2", {"feature1":7})
tw.add_node_to_cluster("2", {"feature1":6})
l = Layer.create_from_time_window(tw, feature_names=['feature1'], global_cluster_centers={'1': (1.5,0), '2': (5,0)})
# entropy: https://planetcalc.com/2476/
# distance: https://www.calculatorsoup.com/calculators/geometry-plane/distance-two-points.php
self.assert_layer(l, [2/6, 4/6], 0.91829583, [.5, .75])
def test__create_from_time_window__2d_two_clusters(self):
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature1":1,"feature2":1})
tw.add_node_to_cluster("1", {"feature1":1,"feature2":2})
tw.add_node_to_cluster("1", {"feature1":1,"feature2":2})
tw.add_node_to_cluster("2", {"feature1":5,"feature2":5})
tw.add_node_to_cluster("2", {"feature1":7,"feature2":4})
l = Layer.create_from_time_window(tw, feature_names=['feature1', 'feature2'], global_cluster_centers={'1': (1,1), '2': (6.5,5)})
# entropy: https://planetcalc.com/2476/
# distance: https://www.calculatorsoup.com/calculators/geometry-plane/distance-two-points.php
self.assert_layer(l, [3/5, 2/5], 0.97095059, [2/3, sqrt(.5)])
#region setup methods
def _get_timewindow_single_cluster_1d_same_feature(self) -> TimeWindow:
'''Returns a TimeWindow with time=KW1 and three nodes in cluster 1, all feature values = 1.'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
tw.add_node_to_cluster("1", {"feature":1})
return tw
def _get_timewindow_single_cluster_2d_same_feature(self) -> TimeWindow:
'''Returns a TimeWindow with time=KW1 and three nodes in cluster 1, all feature1 & feature2 values = 1.'''
tw = TimeWindow("KW1", "uc", "uct", "ln")
tw.add_node_to_cluster("1", {"feature1":1, "feature2":1})
tw.add_node_to_cluster("1", {"feature1":1, "feature2":1})
tw.add_node_to_cluster("1", {"feature1":1, "feature2":1})
return tw
#endregion setup methods
def assert_layer(self, actual_layer: Layer, relative_sizes: List[float], entropy: float, center_dist: List[float]):
self.assertEqual(len(actual_layer.relative_cluster_sizes), len(relative_sizes))
for i in range(len(relative_sizes)):
self.assertAlmostEqual(relative_sizes[i], actual_layer.relative_cluster_sizes[i])
self.assertAlmostEqual(entropy, actual_layer.entropy)
self.assertEqual(len(actual_layer.distances_from_global_centers), len(center_dist))
for i in range(len(center_dist)):
self.assertAlmostEqual(center_dist[i], actual_layer.distances_from_global_centers[i])
if __name__ == '__main__':
unittest.main()
LAYER_NAME = 'CallTypeLayer'
import sys
if len(sys.argv) > 1:
LAYER_NAME = sys.argv[1]
print(f"Working on {LAYER_NAME}")
##########
import json
from entities import Cluster
import collections
import numpy as np
from typing import Iterable
def get_evolution_label(old_size: int, new_size: int) -> int:
'''Returns the evolution label as int by mapping 0..4 to {continuing, shrinking, growing, dissolving, forming}.'''
if old_size == new_size:
return 0 # continuing
if old_size == 0 and new_size != 0:
return 4 # forming
if old_size != 0 and new_size == 0:
return 3 # dissolving
if old_size > new_size:
return 1 # shrinking
if old_size < new_size:
return 2 # growing
def get_cyclic_time_feature(time: int, max_time_value: int = 52) -> (float, float):
return (np.sin(2*np.pi*time/max_time_value),
np.cos(2*np.pi*time/max_time_value))
def create_metrics_training_data(N: int = 3, layer_name: str = 'CallTypeLayer') -> Iterable:
"""
A single metrics training data point should look like this:
(cluster_size, cluster_std_dev, cluster_scarcity, cluster_import1, cluster_import2, time_info) ^ N, evolution_label
time_info ... the time as 2d cyclic feature, i.e. time_info := (time_f1, time_f2)
The first tuple represents metrics from the cluster in t_i-(N-1).
The Nth tuple represents metrics from the cluster in t_i.
The label is one of {continuing, shrinking, growing, dissolving, forming} \ {splitting, merging} and identifies the change for t_i+1.
:param N: number of cluster metric tuples
"""
path_in = f"input/metrics/{layer_name}.json"
with open(path_in, 'r') as file:
data = [Cluster.create_from_dict(cl_d) for cl_d in json.loads(file.read())]
data.sort(key=lambda cl: (cl.cluster_id, cl.time_window_id))
# manually prepare deque with N metric_tuples + evolution label
tuples = []
prev_cluster_id = -1
for i, cur_cluster in enumerate(data[:-1]):
if cur_cluster.cluster_id != data[i+1].cluster_id:
# next cluster slice in list will be another cluster id -> restart deque and skip adding the current (last) cluster slice
tuples = []
continue
cur_metrics = (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2, get_cyclic_time_feature(cur_cluster.get_time_info()))
# deque function: adding N+1st element will remove oldest one
if len(tuples) == N:
tuples.pop(0)
tuples.append(cur_metrics)
label = get_evolution_label(cur_cluster.size, data[i+1].size)
if len(tuples) == N:
yield list(tuples) + [label]
###########
def flatten_metrics_datapoint(datapoint: list) -> ('X', 'Y'):
'''
Flattens a single metrics data point in the form:
[(cluster_size, cluster_variance, cluster_density, cluster_import1, cluster_import2, (time_f1, time_f2))^N, evolution_label]
to:
(X: np.array, evolution_label)
'''
flat_list = []
for entry in datapoint[:-1]: # for all x
flat_list.extend(entry[:-1]) # add all number features except the time tuple
flat_list.extend(entry[-1]) # add time tuple
# flat_list.append(datapoint[-1]) # add y
return np.asarray(flat_list), datapoint[-1]
##########
def convert_metrics_data_for_training(data: Iterable) -> ('nparray with Xs', 'nparray with Ys'):
'''Flattens and splits metrics data to match ML conventions.'''
X = []
Y = []
for element in data:
x, y = flatten_metrics_datapoint(element)
X.append(x)
Y.append(y)
return (np.asarray(X), np.asarray(Y))
##########
import numpy as np
import pandas as pd
import collections
import statistics as stat
def balance_dataset(X: np.array, Y: np.array, imbalance_threshold=.3) -> ('X: np.array', 'Y: np.array'):
'''Balances an unbalanced dataset by ignoring elements from the majority label, so that majority-label data size = median of other cluster sizes.'''
y = Y.tolist()
counter = collections.Counter(y)
print(f"Label Occurrences: Total = {counter}")
# find key with max values
max_key = max(counter, key=lambda k: counter[k])
max_val = counter[max_key]
unbalanced_labels = all([v < max_val * (1-imbalance_threshold) for k, v in counter.items() if k != max_key])
if unbalanced_labels: # if all other labels are >=30% less frequent than max_key
median_rest = int(stat.median([v for k, v in counter.items() if k != max_key]))
print(f"Labels are unbalanced, keeping {median_rest} for label {max_key}")
# merge X and Y
data = np.append(X, Y.reshape(Y.shape[0], 1), 1)
df = pd.DataFrame(data, columns=['_']*21+['label'])
# take only median_rest for the max_key label
max_labeled_data = df.loc[df['label'] == max_key].sample(n=median_rest)
other_labeled_data = df.loc[df['label'] != max_key]
balanced_data = pd.concat([max_labeled_data, other_labeled_data])
balanced_data = balanced_data.sample(frac=1) # shuffle
X = balanced_data.loc[:, balanced_data.columns != 'label'].to_numpy()
Y = balanced_data.loc[:, balanced_data.columns == 'label'].to_numpy()
Y = Y.reshape(Y.shape[0],).astype(int)
return X, Y
def get_training_data(layer_name='CallTypeLayer', test_dataset_frac=.2) -> '(X_train, Y_train, X_test, Y_test)':
# load metrics data from disk
data: Iterable = create_metrics_training_data(layer_name=layer_name)
# convert to X and Y
X, Y = convert_metrics_data_for_training(data)
X, Y = balance_dataset(X, Y)
# split in training and test set
test_size = int(X.shape[0] * test_dataset_frac)
X_train = X[test_size:]
Y_train = Y[test_size:]
X_test = X[:test_size]
Y_test = Y[:test_size]
print(f"\nWorking with: {X_train.shape[0]} training points + {X_test.shape[0]} test points ({X_test.shape[0]/(X_train.shape[0]+X_test.shape[0])}).")
print(f"Label Occurrences: Total = {collections.Counter(Y_train.tolist() + Y_test.tolist())}, "\
f"Training = {collections.Counter(Y_train)}, Test = {collections.Counter(Y_test)}")
try:
print(f"Label Majority Class: Training = {stat.mode(Y_train)}, Test = {stat.mode(Y_test)}\n")
except stat.StatisticsError:
print(f"Label Majority Class: no unique mode; found 2 equally common values")
return X_train, Y_train, X_test, Y_test
X_train, Y_train, X_test, Y_test = get_training_data(LAYER_NAME)
###########
# train
from sklearn import svm
svc = svm.SVC(kernel='linear')
svc.fit(X_train, Y_train)
# verify
import sklearn
pred_Y = svc.predict(X_test)
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
# export
import pickle
import os
if not os.path.exists('output'):
os.makedirs('output')
with open(f'output/{LAYER_NAME}.model', 'wb') as file:
b = pickle.dump(svc, file)
#! /bin/bash
source venv/bin/activate
for layer in CallTypeLayer DayTypeLayer EndLocationLayer OriginCallLayer OriginStandLayer StartLocationLayer TaxiIdLayer
do
python3 train.py $layer
done
\ No newline at end of file
MAX_DEPTHS: int = [5, 10, 15]
LAYER_NAME: str = 'CallTypeLayer'
REFERENCE_LAYER_NAME: str = 'DayTypeLayer'
import sys
if len(sys.argv) > 1:
LAYER_NAME = sys.argv[1]
REFERENCE_LAYER_NAME = sys.argv[2]
print(f"Working with params:")
from icecream import ic
ic(LAYER_NAME)
ic(REFERENCE_LAYER_NAME)
#######################
# TODO remove dup code
import numpy as np
def get_evolution_label(old_size: int, new_size: int) -> int:
'''Returns the evolution label as int by mapping 0..4 to {continuing, shrinking, growing, dissolving, forming}.'''
if old_size == new_size:
return 0 # continuing
if old_size == 0 and new_size != 0:
return 4 # forming
if old_size != 0 and new_size == 0:
return 3 # dissolving
if old_size > new_size:
return 1 # shrinking
if old_size < new_size:
return 2 # growing
def get_cyclic_time_feature(time: int, max_time_value: int = 52) -> (float, float):
return (np.sin(2*np.pi*time/max_time_value),
np.cos(2*np.pi*time/max_time_value))
def get_cyclic_time_feature_from_time_window(time: str) -> (float, float):
return get_cyclic_time_feature(int(time.replace('(', '').replace(')', '').split(',')[1]))
#########
from typing import Iterable, List, Dict, Any
import json
from entities import Layer, Cluster
def create_metrics_training_data(N: int = 2, layer_name: str = 'CallTypeLayer', reference_layer: str = 'CallTypeLayer') -> Iterable:
"""
Loads the metrics training data for an individual layer from disk.
A single metrics training data point should look like this:
[((relative_cluster_size) ^ M, entropy, (distance_from_global_center) ^ M, (time1, time2)) ^ N, cluster_number, evolution_label]
The first tuple represents metrics from the reference layer in t_i-(N-1).
The Nth tuple represents metrics from the reference layer in t_i.
The reference_layer has M clusters in total, this might differ from the number of clusters in layer_name.
The cluster number identifies the cluster for which the evolution_label holds.
The label is one of {continuing, shrinking, growing, dissolving, forming} \ {splitting, merging} and identifies the change for a cluster in the layer layer_name for t_i.
# TODO what exactly should the classifier predict?
# all cluster changes, this would mean that cluster information has to be provided
# TODO N is not implemented and fixed to 2
"""
with open(f'input/metrics/{layer_name}.json') as file:
cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]
cluster_ids = {c.cluster_id for c in cluster_metrics}
cluster_metrics: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}
with open(f'input/layer_metrics/{reference_layer}.json') as file:
layer_metrics: List[Layer] = [Layer.create_from_dict(e) for e in json.loads(file.read())]
layer_metrics: Dict[Any, Layer] = {l.time_window_id: l for l in layer_metrics}
# load the time keys chronologically
ordered_time_keys = list(layer_metrics.keys())
ordered_time_keys.sort(key=lambda x: [int(v) for v in x.replace('(', '').replace(')', '').split(',')])
# go through all time windows once...
prev_time_key = ordered_time_keys[0]
for current_time_key in ordered_time_keys[1:]:
# ...and load the current and previous layer metrics in the reference_layer
current_layer_metric = layer_metrics[current_time_key]
prev_layer_metric = layer_metrics[prev_time_key]
current_layer_metric_tuple = (current_layer_metric.relative_cluster_sizes, current_layer_metric.entropy, current_layer_metric.distances_from_global_centers, get_cyclic_time_feature_from_time_window(current_layer_metric.time_window_id))
prev_layer_metric_tuple = (prev_layer_metric.relative_cluster_sizes, prev_layer_metric.entropy, prev_layer_metric.distances_from_global_centers, get_cyclic_time_feature_from_time_window(prev_layer_metric.time_window_id))
# ...then load the current and previous cluster metrics for all clusters in the layer_name
for cluster_id in cluster_ids:
current_cluster_metric = cluster_metrics[(current_time_key, cluster_id)]
prev_cluster_metric = cluster_metrics[(prev_time_key, cluster_id)]
evolution_label = get_evolution_label(prev_cluster_metric.size, current_cluster_metric.size)
# yield each combination of reference layer metrics to clusters
yield [prev_layer_metric_tuple, current_layer_metric_tuple, int(cluster_id), evolution_label]
prev_time_key = current_time_key
########
def flatten_metrics_datapoint(datapoint: list) -> ('X', 'Y'):
'''
Flattens a single layer metrics data point in the form:
[((relative_cluster_size) ^ M, entropy, (distance_from_global_center) ^ M, (time1, time2)) ^ N, cluster_number, evolution_label]
to:
(X: np.array, evolution_label)
'''
flat_list = []
for layer_metric_tuple in datapoint[:-2]:
flat_list.extend(layer_metric_tuple[0]) # sizes
flat_list.append(layer_metric_tuple[1]) # entropy
flat_list.extend(layer_metric_tuple[2]) # distances
flat_list.extend(layer_metric_tuple[3]) # time1/2
flat_list.append(datapoint[-2]) # cluster num
return np.asarray(flat_list), datapoint[-1]
#########
# TODO remove dup code
def convert_metrics_data_for_training(data: Iterable) -> ('nparray with Xs', 'nparray with Ys'):
'''Flattens and splits metrics data to match ML conventions.'''
X = []
Y = []
for element in data:
x, y = flatten_metrics_datapoint(element)
X.append(x)
Y.append(y)
return (np.asarray(X), np.asarray(Y))
###########
# TODO remove dup code
import numpy as np
import pandas as pd
import collections
import statistics as stat
def balance_dataset(X: np.array, Y: np.array, imbalance_threshold=.3) -> ('X: np.array', 'Y: np.array'):
'''Balances an unbalanced dataset by ignoring elements from the majority label, so that majority-label data size = median of other cluster sizes.'''
y = Y.tolist()
counter = collections.Counter(y)
print(f"Label Occurrences: Total = {counter}")
# find key with max values
max_key = max(counter, key=lambda k: counter[k])
max_val = counter[max_key]
unbalanced_labels = all([v < max_val * (1-imbalance_threshold) for k, v in counter.items() if k != max_key])
if unbalanced_labels: # if all other labels are >=30% less frequent than max_key
median_rest = int(stat.median([v for k, v in counter.items() if k != max_key]))
print(f"Labels are unbalanced, keeping {median_rest} for label {max_key}")
# merge X and Y
data = np.append(X, Y.reshape(Y.shape[0], 1), 1)
df = pd.DataFrame(data, columns=['_']*X.shape[1]+['label'])
# take only median_rest for the max_key label
max_labeled_data = df.loc[df['label'] == max_key].sample(n=median_rest)
other_labeled_data = df.loc[df['label'] != max_key]
balanced_data = pd.concat([max_labeled_data, other_labeled_data])
balanced_data = balanced_data.sample(frac=1) # shuffle
X = balanced_data.loc[:, balanced_data.columns != 'label'].to_numpy()
Y = balanced_data.loc[:, balanced_data.columns == 'label'].to_numpy()
Y = Y.reshape(Y.shape[0],).astype(int)
return X, Y
def get_training_data(layer_name='CallTypeLayer', reference_layer_name='CallTypeLayer', test_dataset_frac=.2) -> '(X_train, Y_train, X_test, Y_test)':
# load metrics data from disk
data: Iterable = create_metrics_training_data(layer_name=layer_name, reference_layer=reference_layer_name)
# convert to X and Y
X, Y = convert_metrics_data_for_training(data)
X, Y = balance_dataset(X, Y)
# split in training and test set
test_size = int(X.shape[0] * test_dataset_frac)
X_train = X[test_size:]
Y_train = Y[test_size:]
X_test = X[:test_size]
Y_test = Y[:test_size]
print(f"\nWorking with: {X_train.shape[0]} training points + {X_test.shape[0]} test points ({X_test.shape[0]/(X_train.shape[0]+X_test.shape[0])}).")
print(f"Label Occurrences: Total = {collections.Counter(Y_train.tolist() + Y_test.tolist())}, "\
f"Training = {collections.Counter(Y_train)}, Test = {collections.Counter(Y_test)}")
try:
print(f"Label Majority Class: Training = {stat.mode(Y_train)}, Test = {stat.mode(Y_test)}\n")
except stat.StatisticsError:
print(f"Label Majority Class: no unique mode; found 2 equally common values")
return X_train, Y_train, X_test, Y_test
#########
X_train, Y_train, X_test, Y_test = get_training_data(layer_name=LAYER_NAME, reference_layer_name=REFERENCE_LAYER_NAME)
#########
for depth in MAX_DEPTHS:
from sklearn.ensemble import RandomForestClassifier
classifier = RandomForestClassifier(max_depth=depth)
classifier.fit(X_train, Y_train)
# export
import pickle
with open(f'output/layer_metrics/{depth}/{LAYER_NAME}_{REFERENCE_LAYER_NAME}.model', 'wb') as file:
b = pickle.dump(classifier, file)
# verify
import sklearn
pred_Y = classifier.predict(X_test)
print(f"### Layer={LAYER_NAME}, Ref Layer={REFERENCE_LAYER_NAME}, Depth={depth} ###")
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
###########
import sklearn
import statistics as stat
import random
def show_majority_class_prediction():
print("### Majority Class Prediction: ###")
majority_class = stat.mode(Y_train)
try:
print(f"Training majority class = {stat.mode(Y_train)}, Test majority class = {stat.mode(Y_test)}")
except stat.StatisticsError:
print(f"Label Majority Class: no unique mode; found 2 equally common values")
pred_Y = len(Y_test) * [majority_class]
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
def show_random_prediction():
print("### Random Class Prediction: ###")
classes = list(set(Y_train))
print(f"Classes: {classes}")
pred_Y = random.choices(classes, k=len(Y_test))
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
show_majority_class_prediction()
show_random_prediction()
\ No newline at end of file
#! /bin/bash
source venv/bin/activate
# create result folders
mkdir output/layer_metrics/5
mkdir output/layer_metrics/10
mkdir output/layer_metrics/15
# train
python3 train_layer.py CallTypeLayer DayTypeLayer
python3 train_layer.py OriginCallLayer CallTypeLayer
python3 train_layer.py OriginStandLayer CallTypeLayer
python3 train_layer.py TaxiIdLayer OriginCallLayer
python3 train_layer.py StartLocationLayer OriginCallLayer
python3 train_layer.py EndLocationLayer OriginCallLayer
python3 train_layer.py TaxiIdLayer OriginStandLayer
python3 train_layer.py StartLocationLayer OriginStandLayer
python3 train_layer.py EndLocationLayer OriginStandLayer
from icecream import ic
#######################
# TODO remove dup code
import numpy as np
def get_evolution_label(old_size: int, new_size: int) -> int:
'''Returns the evolution label as int by mapping 0..4 to {continuing, shrinking, growing, dissolving, forming}.'''
if old_size == new_size:
return 0 # continuing
if old_size == 0 and new_size != 0:
return 4 # forming
if old_size != 0 and new_size == 0:
return 3 # dissolving
if old_size > new_size:
return 1 # shrinking
if old_size < new_size:
return 2 # growing
def get_cyclic_time_feature(time: int, max_time_value: int = 52) -> (float, float):
return (np.sin(2*np.pi*time/max_time_value),
np.cos(2*np.pi*time/max_time_value))
def get_cyclic_time_feature_from_time_window(time: str) -> (float, float):
return get_cyclic_time_feature(int(time.replace('(', '').replace(')', '').split(',')[1]))
#########
from typing import Iterable, List, Dict, Any
import json
from entities import Layer, Cluster
def create_metrics_training_data(N: int = 2, layer_name: str = 'CallTypeLayer', reference_layer: str = 'CallTypeLayer') -> Iterable:
"""
Loads the metrics training data for an individual layer from disk.
A single metrics training data point should look like this:
[((relative_cluster_size) ^ M, entropy, (distance_from_global_center) ^ M, (time1, time2)) ^ N, cluster_number, evolution_label]
The first tuple represents metrics from the reference layer in t_i-(N-1).
The Nth tuple represents metrics from the reference layer in t_i.
The reference_layer has M clusters in total, this might differ from the number of clusters in layer_name.
The cluster number identifies the cluster for which the evolution_label holds.
The label is one of {continuing, shrinking, growing, dissolving, forming} \ {splitting, merging} and identifies the change for a cluster in the layer layer_name for t_i.
# TODO what exactly should the classifier predict?
# all cluster changes, this would mean that cluster information has to be provided
# TODO N is not implemented and fixed to 2
"""
with open(f'input/metrics/{layer_name}.json') as file:
cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]
cluster_ids = {c.cluster_id for c in cluster_metrics}
cluster_metrics: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}
with open(f'input/layer_metrics/{reference_layer}.json') as file:
layer_metrics: List[Layer] = [Layer.create_from_dict(e) for e in json.loads(file.read())]
layer_metrics: Dict[Any, Layer] = {l.time_window_id: l for l in layer_metrics}
# load the time keys chronologically
ordered_time_keys = list(layer_metrics.keys())
ordered_time_keys.sort(key=lambda x: [int(v) for v in x.replace('(', '').replace(')', '').split(',')])
# go through all time windows once...
prev_time_key = ordered_time_keys[0]
for current_time_key in ordered_time_keys[1:]:
# ...and load the current and previous layer metrics in the reference_layer
current_layer_metric = layer_metrics[current_time_key]
prev_layer_metric = layer_metrics[prev_time_key]
current_layer_metric_tuple = (current_layer_metric.relative_cluster_sizes, current_layer_metric.entropy, current_layer_metric.distances_from_global_centers, get_cyclic_time_feature_from_time_window(current_layer_metric.time_window_id))
prev_layer_metric_tuple = (prev_layer_metric.relative_cluster_sizes, prev_layer_metric.entropy, prev_layer_metric.distances_from_global_centers, get_cyclic_time_feature_from_time_window(prev_layer_metric.time_window_id))
# ...then load the current and previous cluster metrics for all clusters in the layer_name
for cluster_id in cluster_ids:
current_cluster_metric = cluster_metrics[(current_time_key, cluster_id)]
prev_cluster_metric = cluster_metrics[(prev_time_key, cluster_id)]
evolution_label = get_evolution_label(prev_cluster_metric.size, current_cluster_metric.size)
# yield each combination of reference layer metrics to clusters
yield [prev_layer_metric_tuple, current_layer_metric_tuple, int(cluster_id), evolution_label]
prev_time_key = current_time_key
########
def flatten_metrics_datapoint(datapoint: list) -> ('X', 'Y'):
'''
Flattens a single layer metrics data point in the form:
[((relative_cluster_size) ^ M, entropy, (distance_from_global_center) ^ M, (time1, time2)) ^ N, cluster_number, evolution_label]
to:
(X: np.array, evolution_label)
'''
flat_list = []
for layer_metric_tuple in datapoint[:-2]:
flat_list.extend(layer_metric_tuple[0]) # sizes
flat_list.append(layer_metric_tuple[1]) # entropy
flat_list.extend(layer_metric_tuple[2]) # distances
flat_list.extend(layer_metric_tuple[3]) # time1/2
flat_list.append(datapoint[-2]) # cluster num
return np.asarray(flat_list), datapoint[-1]
#########
# TODO remove dup code
def convert_metrics_data_for_training(data: Iterable) -> ('nparray with Xs', 'nparray with Ys'):
'''Flattens and splits metrics data to match ML conventions.'''
X = []
Y = []
for element in data:
x, y = flatten_metrics_datapoint(element)
X.append(x)
Y.append(y)
return (np.asarray(X), np.asarray(Y))
###########
# TODO remove dup code
import numpy as np
import pandas as pd
import collections
import statistics as stat
def balance_dataset(X: np.array, Y: np.array, imbalance_threshold=.3) -> ('X: np.array', 'Y: np.array'):
'''Balances an unbalanced dataset by ignoring elements from the majority label, so that majority-label data size = median of other cluster sizes.'''
y = Y.tolist()
counter = collections.Counter(y)
print(f"Label Occurrences: Total = {counter}")
# find key with max values
max_key = max(counter, key=lambda k: counter[k])
max_val = counter[max_key]
unbalanced_labels = all([v < max_val * (1-imbalance_threshold) for k, v in counter.items() if k != max_key])
if unbalanced_labels: # if all other labels are >=30% less frequent than max_key
median_rest = int(stat.median([v for k, v in counter.items() if k != max_key]))
print(f"Labels are unbalanced, keeping {median_rest} for label {max_key}")
# merge X and Y
data = np.append(X, Y.reshape(Y.shape[0], 1), 1)
df = pd.DataFrame(data, columns=['_']*X.shape[1]+['label'])
# take only median_rest for the max_key label
max_labeled_data = df.loc[df['label'] == max_key].sample(n=median_rest)
other_labeled_data = df.loc[df['label'] != max_key]
balanced_data = pd.concat([max_labeled_data, other_labeled_data])
balanced_data = balanced_data.sample(frac=1) # shuffle
X = balanced_data.loc[:, balanced_data.columns != 'label'].to_numpy()
Y = balanced_data.loc[:, balanced_data.columns == 'label'].to_numpy()
Y = Y.reshape(Y.shape[0],).astype(int)
return X, Y
def get_training_data(layer_name='CallTypeLayer', reference_layer_name='CallTypeLayer', test_dataset_frac=.2) -> '(X_train, Y_train, X_test, Y_test)':
# load metrics data from disk
data: Iterable = create_metrics_training_data(layer_name=layer_name, reference_layer=reference_layer_name)
# convert to X and Y
X, Y = convert_metrics_data_for_training(data)
X, Y = balance_dataset(X, Y)
# split in training and test set
test_size = int(X.shape[0] * test_dataset_frac)
X_train = X[test_size:]
Y_train = Y[test_size:]
X_test = X[:test_size]
Y_test = Y[:test_size]
print(f"\nWorking with: {X_train.shape[0]} training points + {X_test.shape[0]} test points ({X_test.shape[0]/(X_train.shape[0]+X_test.shape[0])}).")
print(f"Label Occurrences: Total = {collections.Counter(Y_train.tolist() + Y_test.tolist())}, "\
f"Training = {collections.Counter(Y_train)}, Test = {collections.Counter(Y_test)}")
try:
print(f"Label Majority Class: Training = {stat.mode(Y_train)}, Test = {stat.mode(Y_test)}\n")
except stat.StatisticsError:
print(f"Label Majority Class: no unique mode; found 2 equally common values")
return X_train, Y_train, X_test, Y_test
#########
# import for verification
LAYER_NAME = 'StartLocationLayer'
REF_LAYER_NAME = 'OriginCallLayer'
X_train, Y_train, X_test, Y_test = get_training_data(layer_name=LAYER_NAME, reference_layer_name=REF_LAYER_NAME)
ic("loaded")
for depth in [5,10,15]:
import pickle
with open(f'output/layer_metrics/{depth}/{LAYER_NAME}_{REF_LAYER_NAME}.model', 'rb') as file:
classifier = pickle.load(file)
# verify
import sklearn
pred_Y = classifier.predict(X_test)
print(f"### Layer={LAYER_NAME}, Ref Layer={REF_LAYER_NAME}, Depth={depth} ###")
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
###########
import sklearn
import statistics as stat
import random
def show_majority_class_prediction():
print("### Majority Class Prediction: ###")
majority_class = stat.mode(Y_train)
try:
print(f"Training majority class = {stat.mode(Y_train)}, Test majority class = {stat.mode(Y_test)}")
except stat.StatisticsError:
print(f"Label Majority Class: no unique mode; found 2 equally common values")
pred_Y = len(Y_test) * [majority_class]
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
def show_random_prediction():
print("### Random Class Prediction: ###")
classes = list(set(Y_train))
print(f"Classes: {classes}")
pred_Y = random.choices(classes, k=len(Y_test))
print(sklearn.metrics.classification_report(y_true=Y_test, y_pred=pred_Y))
show_majority_class_prediction()
show_random_prediction()
\ No newline at end of file
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
'200':
description: "Successful echo of request data"
\ No newline at end of file
...@@ -11,20 +11,9 @@ produces: ...@@ -11,20 +11,9 @@ produces:
basePath: "/api" basePath: "/api"
# Import security definitions from global security definition
securityDefinitions:
$ref: '../security/security.yml#securityDefinitions'
paths: paths:
/debug: $ref: 'routes.yml#paths'
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
200:
description: "Successful echo of request data"
swagger: "2.0"
info:
title: Proactive Community Detection microservice
description: This is the documentation for the proactive community detection microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
# Import security definitions from global security definition
securityDefinitions:
$ref: '../../../../modules/security/security_local.yml#securityDefinitions'
paths:
$ref: 'routes.yml#paths'
# add modules folder to interpreter path
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
### init logging ###
import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
#############################
import connexion import connexion
from security import swagger_util
from pathlib import Path
import env_info
from flask import request
from flask import redirect
from flask_cors import CORS
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml') CORS(app.app)
@app.route('/', methods=['GET']) @app.route('/', methods=['GET'])
def api_root(): def api_root():
return 'Endpoint of proactive-community-detection-microservice!' return redirect('/api/ui')
if not env_info.is_running_locally():
swagger_path = "configs/swagger.yml"
# SSL configuration
certificate_path = env_info.get_resources_path()
context = (os.path.normpath(f'{certificate_path}/articonf1.crt'), os.path.normpath(f'{certificate_path}/articonf1.key')) # certificate and key files
else:
print("Running locally...")
swagger_path = "configs/swagger_local.yml"
context = None
app.add_api(swagger_util.get_bundled_specs(Path(swagger_path)),
resolver = connexion.RestyResolver("cms_rest_api"))
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True) app.run(host='0.0.0.0', port=5000, ssl_context=context)
from security.token_manager import TokenManager
import network_constants
from db.entities.layer import Layer
from db.repository import Repository
from typing import List, Dict
import requests
import json
def _fetch_use_cases() -> List[str]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"}
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch use-cases from business-logic microservice, statuscode: {response.status_code}!")
data = json.loads(response.text)
return [row["name"] for row in data]
def _fetch_tables(use_case: str) -> List[str]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"}
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch tables for {use_case} from business-logic microservice, statuscode: {response.status_code}!")
data = json.loads(response.text)
return [row["name"] for row in data]
def _fetch_layers(use_case: str, table: str) -> List[Layer]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables/{table}/layers'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"}
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch layers for {use_case}//{table} from business-logic microservice, statuscode: {response.status_code}!")
data = json.loads(response.text)
return [Layer.from_business_logic_dict(row) for row in data]
def _fetch_nodes(use_case: str, table: str, layer_name: str) -> List[Dict]:
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.SEMANTIC_LINKING_HOSTNAME}:{network_constants.SEMANTIC_LINKING_REST_PORT}/api/use-cases/{use_case}/tables/{table}/layers/{layer_name}/nodes'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"}
)
if response.status_code != 200:
raise ConnectionError(f"Could not fetch nodes for {use_case}//{table}//{layer_name} from semantic-linking microservice, statuscode: {response.status_code}!")
return response.json()
def fetch_nodes_from_semantic_linking(selected_use_cases: List[str] = None, selected_use_case_tables: List[str] = None):
'''Empties the db and inserts layers and nodes from BusinessLogic and SemanticLinking'''
repository = Repository()
# please dont delete all layers/ nodes anymore @10.11.2020
# repository.delete_all_layers()
# repository.delete_all_nodes()
use_cases = _fetch_use_cases()
for use_case in use_cases:
if selected_use_cases is not None and use_case not in selected_use_cases:
continue
print(f"Fetching for use-case {use_case}")
tables = _fetch_tables(use_case)
for table in tables:
if selected_use_case_tables is not None and table not in selected_use_case_tables:
continue
layers = _fetch_layers(use_case, table)
for layer in layers:
try:
print(f"Fetching nodes for layer {use_case}//{table}//{layer.layer_name}.")
# check if layer already exists in DB, add it if not
reference_layer = repository.get_layer_by_name(use_case, table, layer.layer_name)
if reference_layer == None:
repository.add_layer(layer)
else:
raise Exception(f"Layer should be unique, but was not: {reference_layer}")
nodes = _fetch_nodes(use_case, table, layer.layer_name)
for node in nodes:
node['use_case_table'] = node['table']
del node['table']
for node in nodes:
repository.add_layer_node(node)
except ConnectionError as e:
print(str(e))
continue
\ No newline at end of file
attrs==21.2.0
certifi==2021.5.30
chardet==4.0.0
charset-normalizer==2.0.3
click==7.1.2
clickclick==20.10.2
colorama==0.4.4
connexion==2.9.0
Flask==1.1.4
Flask-Cors==3.0.10
idna==3.2
importlib-metadata==4.6.1
inflection==0.5.1
isodate==0.6.0
itsdangerous==1.1.0
Jinja2==2.11.3
jsonschema==3.2.0
MarkupSafe==2.0.1
openapi-schema-validator==0.1.5
openapi-spec-validator==0.3.1
prance==0.21.2
pyrsistent==0.18.0
PyYAML==5.4.1
requests==2.26.0
semver==2.13.0
six==1.16.0
swagger-ui-bundle==0.0.8
typing-extensions==3.10.0.0
urllib3==1.26.6
Werkzeug==1.0.1
zipp==3.5.0
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment