Commit 678477f0 authored by Alexander Lercher's avatar Alexander Lercher

Preparing ml input for cross-context

parent 3edcd509
...@@ -117,8 +117,8 @@ class Repository(MongoRepositoryBase): ...@@ -117,8 +117,8 @@ class Repository(MongoRepositoryBase):
def add_layer_pair(self, layer_pair: LayerPairDao): def add_layer_pair(self, layer_pair: LayerPairDao):
super().insert_entry(self._layer_pair_collection, layer_pair.__dict__) super().insert_entry(self._layer_pair_collection, layer_pair.__dict__)
def get_layer_pairs(self, use_case: str, use_case_table: str) -> List[LayerPairDao]: def get_layer_pairs(self, use_case: str) -> List[LayerPairDao]:
entries = super().get_entries(self._layer_pair_collection) entries = super().get_entries(self._layer_pair_collection, selection={'use_case': use_case})
return [LayerPairDao.create_from_dict(e) for e in entries] return [LayerPairDao.create_from_dict(e) for e in entries]
#endregion #endregion
......
...@@ -7,7 +7,7 @@ import json ...@@ -7,7 +7,7 @@ import json
import os import os
from entities import TimeWindow, Cluster from entities import TimeWindow, Cluster
def store_metrics_for_clusters(use_case:str, layer_name: str, feature_names: List[str]): def store_metrics_for_clusters(use_case: str, layer_name: str, feature_names: List[str]):
''' '''
:param layer_name: Name of the layer for which multiple time windows exist :param layer_name: Name of the layer for which multiple time windows exist
:param feature_names: Features of the layer :param feature_names: Features of the layer
...@@ -119,7 +119,7 @@ import collections ...@@ -119,7 +119,7 @@ import collections
import statistics as stat import statistics as stat
def balance_dataset(df: DataFrame) -> DataFrame: def balance_dataset(df: DataFrame) -> DataFrame:
# nothing happening here, balance only on real training not during prep # nothing happening here, balance only on real training, not during prep
return df return df
def store_training_data(use_case: str, layer_name: str): def store_training_data(use_case: str, layer_name: str):
...@@ -139,48 +139,25 @@ def store_training_data(use_case: str, layer_name: str): ...@@ -139,48 +139,25 @@ def store_training_data(use_case: str, layer_name: str):
df.to_csv(f'data/{use_case}/ml_input/single_context/{layer_name}.csv') df.to_csv(f'data/{use_case}/ml_input/single_context/{layer_name}.csv')
####################### #######################
from db.repository import Repository from db.repository import Repository
from db.dao import LayerDao from db.dao import LayerDao
repo = Repository()
from pathlib import Path from pathlib import Path
def store_clusters_as_files(use_case): repo = Repository()
path_ = f'data/{use_case}/raw/clusters/'
Path(path_).mkdir(parents=True, exist_ok=True)
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
clusters = repo.get_clusters_for_layer(use_case, l.use_case_table, l.layer_name)
with open(os.path.join(path_, f'{l.layer_name}.json'), 'w') as file_:
file_.write(json.dumps([c.to_serializable_dict() for c in clusters]))
def store_time_slices_as_files(use_case):
path_ = f'data/{use_case}/raw/timeslices/'
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
Path(os.path.join(path_, l.layer_name)).mkdir(parents=True, exist_ok=True)
time_slices = repo.get_time_slices_for_layer(use_case, l.use_case_table, l.layer_name)
for ts in time_slices:
with open(os.path.join(path_, l.layer_name, f'{ts.time}.json'), 'w') as file_:
file_.write(json.dumps(ts.to_serializable_dict()))
def start(use_case=None): def run(use_case=None):
'''
Requires raw jsons for clusters and time slices.
Working directory: data/
'''
if use_case is not None: if use_case is not None:
use_cases = [use_case] use_cases = [use_case]
else: else:
use_cases = repo.get_use_cases() use_cases = repo.get_use_cases()
for use_case in use_cases: for use_case in use_cases:
print(f"Executing for use case {use_case}") print(f"Executing cluster metrics calc for use case {use_case}")
# store_clusters_as_files(use_case)
store_time_slices_as_files(use_case)
layers = [[l.layer_name, l.properties] for l in repo.get_layers_for_use_case(use_case)] layers = [[l.layer_name, l.properties] for l in repo.get_layers_for_use_case(use_case)]
...@@ -189,6 +166,6 @@ def start(use_case=None): ...@@ -189,6 +166,6 @@ def start(use_case=None):
store_metrics_for_clusters(use_case, layer[0], layer[1]) store_metrics_for_clusters(use_case, layer[0], layer[1])
################### ###################
for name, _ in layers: for name, _ in layers:
print(f"Creating training data for {name}") print(f"Storing training data for {name}")
store_training_data(use_case, layer_name=name) store_training_data(use_case, layer_name=name)
\ No newline at end of file
from processing.data_prep.metrics_base import calculate_center, get_cyclic_time_feature, get_evolution_label,convert_metrics_data_to_dataframe
from pathlib import Path
#################
from typing import List, Tuple
import statistics as stat
import json
import os
from entities import TimeWindow, Layer
from processing import ClusterMetricsCalculatorFactory
def store_metrics_for_layers(use_case: str, layer_name: str, feature_names: List[str]):
print(f"Working on {layer_name} layer metrics")
# load global cluster centers
path_in = f'data/{use_case}/raw/clusters/{layer_name}.json'
with open(path_in, 'r') as file:
clusters = json.loads(file.read())
cluster_centers: Dict[str, Tuple[float]] = {
str(cluster['cluster_label']): calculate_center(cluster, feature_names)
for cluster in clusters
if cluster['label'] != 'noise'
}
# load time windows
all_layers: List[Layer] = []
path_in = f'data/{use_case}/raw/timeslices/{layer_name}'
for root, _, files in os.walk(path_in):
for f in files:
with open(os.path.join(root, f), 'r') as file:
json_time_slice = json.loads(file.read())
time_window = TimeWindow.create_from_serializable_dict(json_time_slice)
layer = Layer.create_from_time_window(time_window, feature_names, cluster_centers)
all_layers.append(layer)
# store the layer metrics
Path(f'data/{use_case}/layer_metrics/').mkdir(parents=True, exist_ok=True)
path_out = f'data/{use_case}/layer_metrics/{layer_name}.json'
with open(path_out, 'w') as file:
file.write(json.dumps([l.__dict__ for l in all_layers]))
#########################
from typing import List
def get_columns(N) -> List[str]:
'''Returns columns for the data depending on sizes of N (number time windows) independent of M (number of clusters in L_R).'''
cols = ['n_nodes', 'n_clusters', 'entropy']
for v in ['sizes', 'relative_sizes', 'center_dist']:
cols += [f'{v}_min', f'{v}_max', f'{v}_avg', f'{v}_sum']
# cols.extend(['relative_cluster_sizes']*M)
# cols.extend(['cluster_centers']*M)
# cols.extend(['distance_from_global_centers']*M)
cols.extend(['time_f1', 'time_f2'])
cols = cols * N
return cols + ['cluster_id'] + ['evolution_label']
######################
def get_cyclic_time_feature_from_time_window(time: str) -> Tuple[float, float]:
return get_cyclic_time_feature(int(time.replace('(', '').replace(')', '').split(',')[1]))
#######################
from typing import Iterable, List, Dict, Any
import json
from entities import Layer, Cluster
def get_layer_metrics(layer: Layer) -> Iterable:
res = [layer.n_nodes, layer.n_clusters, layer.entropy]
res += [layer.cluster_size_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res += [layer.cluster_relative_size_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res += [layer.cluster_center_distance_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res.append(get_cyclic_time_feature_from_time_window(layer.time_window_id))
return res
def create_layer_metrics_training_data(use_case: str, layer_name: str, reference_layer: str, N: int = 2) -> Iterable:
"""
Loads the metrics training data for an individual layer from disk.
A single metrics training data point should look like this:
[(n_nodes, n_clusters, entropy,
(relative_cluster_size)^M, (cluster_centers)^M, (distance_from_global_centers)^M,
(time1, time2))^N,
cluster_number, evolution_label]
The first tuple represents metrics from the reference layer in t_i-(N-1).
The Nth tuple represents metrics from the reference layer in t_i.
The reference_layer has M clusters in total, this might differ from the number of clusters in layer_name.
The cluster number identifies the cluster for which the evolution_label holds.
The label is one of {continuing, shrinking, growing, dissolving, forming} \ {splitting, merging} and identifies the change for a cluster in the layer layer_name for t_i.
"""
if N != 2:
raise NotImplementedError("N is not implemented and fixed to 2!")
with open(f'data/{use_case}/cluster_metrics/{layer_name}.json') as file:
cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]
cluster_ids = {c.cluster_id for c in cluster_metrics}
cluster_metrics: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}
with open(f'data/{use_case}/layer_metrics/{reference_layer}.json') as file:
layer_metrics: List[Layer] = [Layer.create_from_dict(e) for e in json.loads(file.read())]
layer_metrics: Dict[Any, Layer] = {l.time_window_id: l for l in layer_metrics}
# load the time keys chronologically
ordered_time_keys = list(layer_metrics.keys())
ordered_time_keys.sort(key=lambda x: [int(v) for v in x.replace('(', '').replace(')', '').split(',')])
# go through all time windows once...
prev_time_key = ordered_time_keys[0]
for current_time_key in ordered_time_keys[1:]:
# ...and load the current and previous layer metrics in the reference_layer
current_layer_metric = layer_metrics[current_time_key]
prev_layer_metric = layer_metrics[prev_time_key]
current_layer_metric_tuple = get_layer_metrics(current_layer_metric)
prev_layer_metric_tuple = get_layer_metrics(prev_layer_metric)
# ...then load the current and previous cluster metrics for all clusters in the layer_name
for cluster_id in cluster_ids:
current_cluster_metric = cluster_metrics[(current_time_key, cluster_id)]
prev_cluster_metric = cluster_metrics[(prev_time_key, cluster_id)]
evolution_label = get_evolution_label(prev_cluster_metric.size, current_cluster_metric.size)
# yield each combination of reference layer metrics to clusters
yield [prev_layer_metric_tuple, current_layer_metric_tuple, int(cluster_id), evolution_label]
prev_time_key = current_time_key
#####################
import numpy as np
def flatten_layer_metrics_datapoint(datapoint: list) -> Tuple['X', np.array]:
'''
Flattens a single layer metrics data point in the form:
[(n_nodes, n_clusters, entropy,
(relative_cluster_size)^M, (distance_from_global_centers)^M,
(time1, time2))^N,
cluster_number, evolution_label]
to:
(X, y: np.array)
'''
flat_list = []
for layer_metric_tuple in datapoint[:-2]: # for all x
flat_list.extend(layer_metric_tuple[0:-1]) # everything before time
flat_list.extend(layer_metric_tuple[-1]) # time1/2
flat_list.append(datapoint[-2]) # cluster num
flat_list.append(datapoint[-1]) # y
return np.asarray(flat_list)
#########################
import numpy as np
import pandas as pd
from pandas import DataFrame
import collections
import statistics as stat
def balance_dataset(df: DataFrame) -> DataFrame:
# nothing happening here, balance only on real training, not during prep
return df
def store_training_data(use_case: str, layer_name: str, reference_layer_name: str):
# load metrics data from disk
data: Iterable = create_layer_metrics_training_data(use_case=use_case, layer_name=layer_name, reference_layer=reference_layer_name)
# convert to X and Y
df = convert_metrics_data_to_dataframe(data, columns=get_columns(N=2), flattening_method=flatten_layer_metrics_datapoint)
# balance df
df = balance_dataset(df)
# shuffle
df = df.sample(frac=1).reset_index(drop=True)
Path(f'data/{use_case}/ml_input/cross_context/').mkdir(parents=True, exist_ok=True)
df.to_csv(f'data/{use_case}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv')
#########################
from db.repository import Repository
from db.dao import LayerDao
from pathlib import Path
repo = Repository()
def run(use_case=None):
'''
Requires raw jsons and cluster metrics.
Working directory: data/
'''
if use_case is not None:
use_cases = [use_case]
else:
use_cases = repo.get_use_cases()
for use_case in use_cases:
print(f"Executing layer metrics calc for use case {use_case}")
layers = [[l.layer_name, l.properties] for l in repo.get_layers_for_use_case(use_case)]
layer_pairs = repo.get_layer_pairs(use_case)
################
for layer in layers:
try:
store_metrics_for_layers(use_case, layer[0], layer[1])
except FileNotFoundError:
pass
###############
for ld in layer_pairs:
print(f"Storing training data for {ld.layer} with L_R={ld.reference_layer}")
store_training_data(use_case, layer_name=ld.layer, reference_layer_name=ld.reference_layer)
\ No newline at end of file
from processing.data_prep.cluster_metrics_calc import run as crun
from processing.data_prep.layer_metrics_calc import run as lrun
from pathlib import Path
import json
import os
from db.repository import Repository
repo = Repository()
def store_clusters_as_files(use_case):
path_ = f'data/{use_case}/raw/clusters/'
Path(path_).mkdir(parents=True, exist_ok=True)
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
clusters = repo.get_clusters_for_layer(use_case, l.use_case_table, l.layer_name)
with open(os.path.join(path_, f'{l.layer_name}.json'), 'w') as file_:
file_.write(json.dumps([c.to_serializable_dict() for c in clusters]))
def store_time_slices_as_files(use_case):
path_ = f'data/{use_case}/raw/timeslices/'
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
Path(os.path.join(path_, l.layer_name)).mkdir(parents=True, exist_ok=True)
time_slices = repo.get_time_slices_for_layer(use_case, l.use_case_table, l.layer_name)
for ts in time_slices:
with open(os.path.join(path_, l.layer_name, f'{ts.time}.json'), 'w') as file_:
file_.write(json.dumps(ts.to_serializable_dict()))
def run(use_case=None):
'''Prepares training data for single and cross-context using the file system (data/)'''
if use_case is not None:
use_cases = [use_case]
else:
use_cases = repo.get_use_cases()
for use_case in use_cases:
store_clusters_as_files(use_case)
store_time_slices_as_files(use_case)
crun(use_case)
lrun(use_case)
'''
These functions are utilized for both single and cross-context data preparation, i.e.
- cluster_metrics_main.py
- layer_metrics_main.py
'''
############################# #############################
from typing import Tuple from typing import Tuple
from processing import ClusterMetricsCalculatorFactory from processing import ClusterMetricsCalculatorFactory
......
...@@ -5,6 +5,6 @@ if os.path.exists(modules_path): ...@@ -5,6 +5,6 @@ if os.path.exists(modules_path):
sys.path.insert(1, modules_path) sys.path.insert(1, modules_path)
from processing.data_prep.cluster_metrics_main import start from processing.data_prep.main import run
start(use_case='community-prediction-youtube-n') run(use_case='community-prediction-youtube-n')
\ No newline at end of file \ No newline at end of file
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from security.token_manager import TokenManager
import network_constants
from db.repository import Repository
from db.dao import LayerPairDao
from typing import List, Dict
import requests
import json
def get_youtube_dependencies() -> List[LayerPairDao]:
uc = 'community-prediction-youtube-n'
layer_dependencies = [
('CategoryLayer', 'CountryLayer'),
('ViewsLayer', 'CountryLayer'),
('ViewsLayer', 'CategoryLayer'),
('LikesLayer', 'ViewsLayer'),
('DislikesLayer', 'ViewsLayer'),
('CommentCountLayer', 'ViewsLayer'),
('TrendDelayLayer', 'ViewsLayer'),
]
return [LayerPairDao(uc, uc, ld[0], ld[1]) for ld in layer_dependencies]
def get_taxi_dependencies() -> List[LayerPairDao]:
uc = 'community-prediction-taxi'
layer_dependencies = [
('CallTypeLayer', 'DayTypeLayer'),
('OriginCallLayer', 'CallTypeLayer'),
('OriginStandLayer', 'CallTypeLayer'),
('TaxiIdLayer', 'OriginStandLayer'),
('StartLocationLayer', 'OriginStandLayer'),
('EndLocationLayer', 'OriginStandLayer'),
('StartLocationLayer', 'DayTypeLayer'),
('EndLocationLayer', 'DayTypeLayer'),
]
return [LayerPairDao(uc, uc, ld[0], ld[1]) for ld in layer_dependencies]
def upload_layerpair(layerpair:LayerPairDao):
jwt = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{layerpair.use_case}/tables/{layerpair.table}/layer-pairs'
response = requests.post(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = {"Authorization": f"Bearer {jwt}"},
json = layerpair.__dict__
)
if response.status_code != 200:
raise ConnectionError(f"Could not upload layer pair, statuscode: {response.status_code}!")
if __name__ == '__main__':
assert False, 'replace with true to upload now'
for lp in get_youtube_dependencies():
upload_layerpair(lp)
for lp in get_taxi_dependencies():
upload_layerpair(lp)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment