Commit 3edcd509 authored by Alexander Lercher's avatar Alexander Lercher

Preparing ml input for single-context

parent e0a467cf
...@@ -18,11 +18,30 @@ class Repository(MongoRepositoryBase): ...@@ -18,11 +18,30 @@ class Repository(MongoRepositoryBase):
netconst.PROACTIVE_COMMUNITY_DETECTION_DB_PORT, netconst.PROACTIVE_COMMUNITY_DETECTION_DB_PORT,
'proactiveCommunityDb') 'proactiveCommunityDb')
self._use_case_collection = 'use_cases'
self._layer_collection = 'layers' self._layer_collection = 'layers'
self._layer_pair_collection = 'layer_pairs' self._layer_pair_collection = 'layer_pairs'
self._clusters_collection = 'clusters' self._clusters_collection = 'clusters'
self._time_slice_collection = 'time_slices' self._time_slice_collection = 'time_slices'
def DROP(self, confirm:bool=False):
assert confirm, 'WONT DELETE WHOLE DB WITHOUT CONFIRMATION'
for collection_ in [self._use_case_collection, self._layer_collection, self._layer_pair_collection,
self._clusters_collection, self._time_slice_collection]:
super().drop_collection(collection_)
#region LayerPair
def add_use_case(self, use_case: str):
super().insert_entry(self._use_case_collection, {'name':use_case})
def get_use_cases(self) -> List[str]:
entries = super().get_entries(self._use_case_collection)
return [e['name'] for e in entries]
#endregion
#region Layers #region Layers
def add_layer(self, layer: LayerDao): def add_layer(self, layer: LayerDao):
...@@ -84,7 +103,7 @@ class Repository(MongoRepositoryBase): ...@@ -84,7 +103,7 @@ class Repository(MongoRepositoryBase):
entries = super().get_entries(self._time_slice_collection) entries = super().get_entries(self._time_slice_collection)
return [TimeSliceDao(None, None, time_slice_dict=e, from_db=True) for e in entries] return [TimeSliceDao(None, None, time_slice_dict=e, from_db=True) for e in entries]
def get_time_slices_by_name(self, use_case: str, use_case_table: str, layer_name: str) -> List[TimeSliceDao]: def get_time_slices_for_layer(self, use_case: str, use_case_table: str, layer_name: str) -> List[TimeSliceDao]:
'''Returns all time slices with the given layer_name.''' '''Returns all time slices with the given layer_name.'''
entries = super().get_entries(self._time_slice_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name}) entries = super().get_entries(self._time_slice_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
return [TimeSliceDao(time_slice_dict=e, from_db=True) for e in entries] return [TimeSliceDao(time_slice_dict=e, from_db=True) for e in entries]
......
from processing.data_prep.metrics_base import calculate_center, get_cyclic_time_feature, get_evolution_label, convert_metrics_data_to_dataframe
from pathlib import Path
#############################
from typing import List, Dict
import json
import os
from entities import TimeWindow, Cluster
def store_metrics_for_clusters(use_case:str, layer_name: str, feature_names: List[str]):
'''
:param layer_name: Name of the layer for which multiple time windows exist
:param feature_names: Features of the layer
'''
print(f"Working on {layer_name} cluster metrics")
# load global cluster centers
path_in = f'data/{use_case}/raw/clusters/{layer_name}.json'
with open(path_in, 'r') as file:
clusters = json.loads(file.read())
cluster_centers: Dict[str, Tuple[float]] = {
str(cluster['cluster_label']): calculate_center(cluster, feature_names)
for cluster in clusters
if cluster['label'] != 'noise'
}
path_in = f'data/{use_case}/raw/timeslices/{layer_name}'
Path(f'data/{use_case}/cluster_metrics/').mkdir(parents=True, exist_ok=True)
path_out = f'data/{use_case}/cluster_metrics/{layer_name}.json'
complete_clusters: List[Cluster] = []
for root, _, files in os.walk(path_in):
for f in files:
with open(os.path.join(root, f), 'r') as file:
# for each time window json
json_slice = json.loads(file.read())
time_window = TimeWindow.create_from_serializable_dict(json_slice)
# create all clusters + metrics for one time window
clusters = Cluster.create_multiple_from_time_window(time_window, feature_names, cluster_centers)
complete_clusters.extend(clusters)
# store the cluster metrics
with open(path_out, 'w') as file:
file.write(json.dumps([cl.__dict__ for cl in complete_clusters]))
######################
COLUMNS = ['cluster_size', 'cluster_variance', 'cluster_density', 'cluster_import1', 'cluster_import2',
'cluster_area', 'cluster_center_distance', 'time_f1', 'time_f2']*3 + ['evolution_label']
######################
import json
from entities import Cluster
import collections
import numpy as np
from typing import Iterable, Tuple
def create_metrics_training_data(use_case: str, layer_name: str, N: int = 3) -> Iterable[list]:
"""
Loads the metrics training data for an individual layer from disk.
A single metrics training data point should look like this:
(cluster_size, cluster_std_dev, cluster_scarcity, cluster_import1, cluster_import2, cluster_range, cluster_center_x, cluster_center_y, time_info) ^ N, evolution_label
time_info ... the time as 2d cyclic feature, i.e. time_info := (time_f1, time_f2)
The first tuple represents metrics from the cluster in t_i-(N-1).
The Nth tuple represents metrics from the cluster in t_i.
The label is one of {continuing, shrinking, growing, dissolving, forming} \ {splitting, merging} and identifies the change for t_i+1.
:param N: number of cluster metric tuples
:param layer_name: the name of the layer metrics json file
"""
path_in = f"data/{use_case}/cluster_metrics/{layer_name}.json"
with open(path_in, 'r') as file:
data = [Cluster.create_from_dict(cl_d) for cl_d in json.loads(file.read())]
data.sort(key=lambda cl: (cl.cluster_id, cl.time_window_id))
# manually prepare deque with N metric_tuples + evolution label
tuples = []
for i, cur_cluster in enumerate(data[:-1]):
if cur_cluster.cluster_id != data[i+1].cluster_id:
# next cluster slice in list will be another cluster id -> restart deque and skip adding the current (last) cluster slice
tuples = []
continue
cur_metrics = (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2, cur_cluster.range_, cur_cluster.global_center_distance, get_cyclic_time_feature(cur_cluster.get_time_info()))
# deque function: adding N+1st element will remove oldest one
if len(tuples) == N:
tuples.pop(0)
tuples.append(cur_metrics)
if len(tuples) == N:
label = get_evolution_label(cur_cluster.size, data[i+1].size)
yield list(tuples) + [label]
############################
def flatten_metrics_datapoint(datapoint: list) -> Tuple['X', np.array]:
'''
Flattens a single metrics data point in the form:
[(cluster_size, cluster_variance, cluster_density, cluster_import1, cluster_import2, cluster_range, cluster_center, (time_f1, time_f2))^N, evolution_label]
to:
(X, y: np.array)
'''
flat_list = []
for entry in datapoint[:-1]: # for all x
flat_list.extend(entry[:-1]) # add all number features except the time tuple
flat_list.extend(entry[-1]) # add time tuple
flat_list.append(datapoint[-1]) # y
return np.asarray(flat_list)
################################
import numpy as np
import pandas as pd
from pandas import DataFrame
import collections
import statistics as stat
def balance_dataset(df: DataFrame) -> DataFrame:
# nothing happening here, balance only on real training not during prep
return df
def store_training_data(use_case: str, layer_name: str):
# load metrics data from disk
data: Iterable = create_metrics_training_data(use_case=use_case, layer_name=layer_name)
# flatten and convert to df
df = convert_metrics_data_to_dataframe(data, columns=COLUMNS, flattening_method=flatten_metrics_datapoint)
# balance df
df = balance_dataset(df)
# shuffle
df = df.sample(frac=1).reset_index(drop=True)
Path(f'data/{use_case}/ml_input/single_context/').mkdir(parents=True, exist_ok=True)
df.to_csv(f'data/{use_case}/ml_input/single_context/{layer_name}.csv')
#######################
from db.repository import Repository
from db.dao import LayerDao
repo = Repository()
from pathlib import Path
def store_clusters_as_files(use_case):
path_ = f'data/{use_case}/raw/clusters/'
Path(path_).mkdir(parents=True, exist_ok=True)
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
clusters = repo.get_clusters_for_layer(use_case, l.use_case_table, l.layer_name)
with open(os.path.join(path_, f'{l.layer_name}.json'), 'w') as file_:
file_.write(json.dumps([c.to_serializable_dict() for c in clusters]))
def store_time_slices_as_files(use_case):
path_ = f'data/{use_case}/raw/timeslices/'
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
Path(os.path.join(path_, l.layer_name)).mkdir(parents=True, exist_ok=True)
time_slices = repo.get_time_slices_for_layer(use_case, l.use_case_table, l.layer_name)
for ts in time_slices:
with open(os.path.join(path_, l.layer_name, f'{ts.time}.json'), 'w') as file_:
file_.write(json.dumps(ts.to_serializable_dict()))
def start(use_case=None):
if use_case is not None:
use_cases = [use_case]
else:
use_cases = repo.get_use_cases()
for use_case in use_cases:
print(f"Executing for use case {use_case}")
# store_clusters_as_files(use_case)
store_time_slices_as_files(use_case)
layers = [[l.layer_name, l.properties] for l in repo.get_layers_for_use_case(use_case)]
##################
for layer in layers:
store_metrics_for_clusters(use_case, layer[0], layer[1])
###################
for name, _ in layers:
print(f"Creating training data for {name}")
store_training_data(use_case, layer_name=name)
\ No newline at end of file
#############################
from typing import Tuple
from processing import ClusterMetricsCalculatorFactory
def calculate_center(cluster: dict, features: list) -> Tuple[float]:
calc = ClusterMetricsCalculatorFactory.create_metrics_calculator(cluster['nodes'], features, 1, 1)
return calc.get_center()
#####################
import numpy as np
def get_cyclic_time_feature(time: int, max_time_value: int = 52) -> Tuple[float, float]:
return (np.sin(2*np.pi*time/max_time_value),
np.cos(2*np.pi*time/max_time_value))
####################
def get_evolution_label(old_size: int, new_size: int) -> int:
'''Returns the evolution label as int by mapping 0..4 to {continuing, shrinking, growing, dissolving, forming}.'''
if old_size == 0 and new_size == 0:
return -1 # STILL EMPTY
if old_size == new_size:
return 0 # continuing
if old_size == 0 and new_size > 0:
return 4 # forming
if old_size > 0 and new_size == 0:
return 3 # dissolving
if old_size > new_size:
return 1 # shrinking
if old_size < new_size:
return 2 # growing
#########################
from typing import Iterable
import pandas as pd
def convert_metrics_data_to_dataframe(data: Iterable, columns: list, flattening_method: 'callable') -> pd.DataFrame:
'''Flattens and splits metrics data to match ML conventions.'''
training_data = []
for element in data:
xy: 'np.array' = flattening_method(element)
training_data.append(xy)
return pd.DataFrame(data=training_data, columns=columns)
\ No newline at end of file
...@@ -117,16 +117,15 @@ def _fetch_layerpairs(use_case: str, table: str) -> List[Dict]: ...@@ -117,16 +117,15 @@ def _fetch_layerpairs(use_case: str, table: str) -> List[Dict]:
def fetch(selected_use_cases: List[str] = None, selected_use_case_tables: List[str] = None): def fetch(selected_use_cases: List[str] = None, selected_use_case_tables: List[str] = None):
'''Empties the db and inserts layers and nodes from BusinessLogic and SemanticLinking''' '''Fetches all the required data.'''
repo = Repository() repo = Repository()
# please dont delete all layers/ nodes anymore @10.11.2020
# repository.delete_all_layers()
# repository.delete_all_nodes()
for use_case in _fetch_use_cases(): for use_case in _fetch_use_cases():
if selected_use_cases is not None and use_case not in selected_use_cases: if selected_use_cases is not None and use_case not in selected_use_cases:
continue continue
repo.add_use_case(use_case)
for table in _fetch_tables(use_case): for table in _fetch_tables(use_case):
if selected_use_case_tables is not None and table not in selected_use_case_tables: if selected_use_case_tables is not None and table not in selected_use_case_tables:
continue continue
...@@ -148,7 +147,7 @@ def fetch(selected_use_cases: List[str] = None, selected_use_case_tables: List[s ...@@ -148,7 +147,7 @@ def fetch(selected_use_cases: List[str] = None, selected_use_case_tables: List[s
if db_layer == None: if db_layer == None:
repo.add_layer(layer) repo.add_layer(layer)
else: else:
print(f"Layer already exists, skipping cluster and timeslice fetching: {db_layer}") print(f"Layer already exists, skipping cluster and timeslice fetching: {db_layer.layer_name}")
continue continue
try: try:
......
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from processing.data_prep.cluster_metrics_main import start
start(use_case='community-prediction-youtube-n')
\ No newline at end of file
...@@ -129,4 +129,4 @@ def run_time_slicing(selected_use_cases: List[str] = None, selected_use_case_tab ...@@ -129,4 +129,4 @@ def run_time_slicing(selected_use_cases: List[str] = None, selected_use_case_tab
if __name__ == "__main__": if __name__ == "__main__":
# repo.remove_all_time_slices() # repo.remove_all_time_slices()
run_time_slicing(selected_use_cases=['community-prediction-taxi']) run_time_slicing(selected_use_cases=['community-prediction-youtube-n'])
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment