Commit 495390ac authored by Alexander Lercher's avatar Alexander Lercher

Cross context prediction

parent a878064e
...@@ -131,4 +131,7 @@ class Repository(MongoRepositoryBase): ...@@ -131,4 +131,7 @@ class Repository(MongoRepositoryBase):
def get_prediction_results(self, use_case: str) -> List[PredictionResult]: def get_prediction_results(self, use_case: str) -> List[PredictionResult]:
entries = super().get_entries(self._prediction_result_collection, selection={'use_case': use_case}, projection={'_id': 0}) entries = super().get_entries(self._prediction_result_collection, selection={'use_case': use_case}, projection={'_id': 0})
return [PredictionResult.create_from_dict(e) for e in entries] return [PredictionResult.create_from_dict(e) for e in entries]
def delete_all_prediction_results(self):
super().drop_collection(self._prediction_result_collection)
#endregion #endregion
...@@ -33,8 +33,7 @@ class Cluster: ...@@ -33,8 +33,7 @@ class Cluster:
def get_time_info(self) -> int: def get_time_info(self) -> int:
'''Returns the week of the time tuple str, eg. 25 for "(2014, 25)".''' '''Returns the week of the time tuple str, eg. 25 for "(2014, 25)".'''
str_tuple = self.time_window_id return eval(self.time_window_id)[1]
return int(str_tuple.split(',')[1].strip()[:-1])
def __repr__(self): def __repr__(self):
return str(self.__dict__) return str(self.__dict__)
......
...@@ -53,6 +53,10 @@ class Layer: ...@@ -53,6 +53,10 @@ class Layer:
self.distances_from_global_centers = self.get_distances_from_global_center(active_clusters) self.distances_from_global_centers = self.get_distances_from_global_center(active_clusters)
self.cluster_center_distance_agg_metrics = self.get_center_distance_min_max_avg_sum(active_clusters) self.cluster_center_distance_agg_metrics = self.get_center_distance_min_max_avg_sum(active_clusters)
def get_time_info(self) -> int:
'''Returns the week of the time tuple str, eg. 25 for "(2014, 25)".'''
return eval(self.time_window_id)[1]
def get_size_min_max_avg_sum(self, clusters: List[InternalCluster]) -> dict: def get_size_min_max_avg_sum(self, clusters: List[InternalCluster]) -> dict:
'''Returns min, max, avg, and sum of the cluster's absolute sizes.''' '''Returns min, max, avg, and sum of the cluster's absolute sizes.'''
if len(clusters) == 0: if len(clusters) == 0:
......
from processing.data_prep.metrics_base import calculate_center, get_cyclic_time_feature, get_evolution_label, convert_metrics_data_to_dataframe from processing.data_prep.metrics_base import calculate_center, get_cyclic_time_feature, get_evolution_label, convert_metrics_data_to_dataframe, get_cluster_metrics
from pathlib import Path from pathlib import Path
############################# #############################
...@@ -86,7 +86,7 @@ def create_metrics_training_data(use_case: str, layer_name: str, N: int = 3) -> ...@@ -86,7 +86,7 @@ def create_metrics_training_data(use_case: str, layer_name: str, N: int = 3) ->
tuples = [] tuples = []
continue continue
cur_metrics = (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2, cur_cluster.range_, cur_cluster.global_center_distance, get_cyclic_time_feature(cur_cluster.get_time_info())) cur_metrics = get_cluster_metrics(cur_cluster)
# deque function: adding N+1st element will remove oldest one # deque function: adding N+1st element will remove oldest one
if len(tuples) == N: if len(tuples) == N:
......
from processing.data_prep.metrics_base import calculate_center, get_cyclic_time_feature, get_evolution_label,convert_metrics_data_to_dataframe from processing.data_prep.metrics_base import calculate_center, get_cyclic_time_feature, get_evolution_label,convert_metrics_data_to_dataframe, get_layer_metrics
from pathlib import Path from pathlib import Path
################# #################
...@@ -59,21 +59,10 @@ def get_columns(N) -> List[str]: ...@@ -59,21 +59,10 @@ def get_columns(N) -> List[str]:
cols = cols * N cols = cols * N
return cols + ['cluster_id'] + ['evolution_label'] return cols + ['cluster_id'] + ['evolution_label']
###################### ######################
def get_cyclic_time_feature_from_time_window(time: str) -> Tuple[float, float]:
return get_cyclic_time_feature(int(time.replace('(', '').replace(')', '').split(',')[1]))
#######################
from typing import Iterable, List, Dict, Any from typing import Iterable, List, Dict, Any
import json import json
from entities import Layer, Cluster from entities import Layer, Cluster
def get_layer_metrics(layer: Layer) -> Iterable:
res = [layer.n_nodes, layer.n_clusters, layer.entropy]
res += [layer.cluster_size_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res += [layer.cluster_relative_size_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res += [layer.cluster_center_distance_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res.append(get_cyclic_time_feature_from_time_window(layer.time_window_id))
return res
def create_layer_metrics_training_data(use_case: str, layer_name: str, reference_layer: str, N: int = 2) -> Iterable: def create_layer_metrics_training_data(use_case: str, layer_name: str, reference_layer: str, N: int = 2) -> Iterable:
""" """
Loads the metrics training data for an individual layer from disk. Loads the metrics training data for an individual layer from disk.
......
...@@ -45,4 +45,22 @@ def convert_metrics_data_to_dataframe(data: Iterable, columns: list, flattening_ ...@@ -45,4 +45,22 @@ def convert_metrics_data_to_dataframe(data: Iterable, columns: list, flattening_
training_data.append(xy) training_data.append(xy)
return pd.DataFrame(data=training_data, columns=columns) return pd.DataFrame(data=training_data, columns=columns)
\ No newline at end of file
####################
from entities import Cluster, Layer
from typing import Dict, Tuple
def get_cluster_metrics(cur_cluster: Cluster) -> Tuple:
return (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2,
cur_cluster.range_, cur_cluster.global_center_distance, get_cyclic_time_feature(cur_cluster.get_time_info()))
####################
def get_layer_metrics(layer: Layer) -> Iterable:
res = [layer.n_nodes, layer.n_clusters, layer.entropy]
res += [layer.cluster_size_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res += [layer.cluster_relative_size_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res += [layer.cluster_center_distance_agg_metrics[k] for k in ['min', 'max', 'avg', 'sum']]
res.append(get_cyclic_time_feature(layer.get_time_info()))
return res
###################
\ No newline at end of file
def increase_time_window(time_window_id: str) -> str:
tuple_ = eval(time_window_id)
if tuple_[1] == 52:
# 1st week next year
return (tuple_[0]+1 , 1)
else:
# next week
return str((tuple_[0], tuple_[1]+1))
######################
from typing import Tuple
import pickle
def load_ml_models(use_case, method, layer_name, reference_layer_name=None) -> Tuple['scaler', 'clf']:
path_ = f'data/{use_case}/ml_output/{method}/{layer_name}'
if method == 'single_context':
with open(f'{path_}.model', 'rb') as file:
svc = pickle.load(file)
with open(f'{path_}_scaler.model', 'rb') as file:
scaler = pickle.load(file)
elif method == 'cross_context':
with open(f'{path_}_{reference_layer_name}.model', 'rb') as file:
svc = pickle.load(file)
with open(f'{path_}_{reference_layer_name}_scaler.model', 'rb') as file:
scaler = pickle.load(file)
else:
raise NotImplementedError('Prediction method is not implemented')
return scaler, svc
\ No newline at end of file
from processing.data_prep.metrics_base import get_cyclic_time_feature, get_layer_metrics
from processing.ml.predict_base import increase_time_window, load_ml_models
method = 'cross_context'
N = 2 # Currently N is fixed to 2
####################
import pandas as pd
from pandas import DataFrame
#####################
import json
from entities import Layer, Cluster
import collections
import numpy as np
from typing import Iterable, Tuple, List, Dict, Any
####################
import pickle
#####################
import numpy as np
def flatten_layer_metrics_datapoint(datapoint: list) -> np.array:
'''
Flattens a single layer metrics data point in the form:
[(n_nodes, n_clusters, entropy,
(relative_cluster_size)^M, (distance_from_global_centers)^M,
(time1, time2))^N,
cluster_number]
to:
(X)
'''
flat_list = []
for layer_metric_tuple in datapoint[:-1]: # for all x
flat_list.extend(layer_metric_tuple[0:-1]) # everything before time
flat_list.extend(layer_metric_tuple[-1]) # time1/2
flat_list.append(datapoint[-1]) # cluster num
return np.asarray(flat_list)
#########################
from db.repository import Repository
from db.dao import PredictionResult
repo = Repository()
def run_prediction(use_case: str):
for layerpair in repo.get_layer_pairs(use_case):
layer_name = layerpair.layer
reference_layer_name = layerpair.reference_layer
print(f"Predicting {method} for {use_case}//{layer_name} based on {reference_layer_name}")
##########################
with open(f'data/{use_case}/cluster_metrics/{layer_name}.json') as file:
cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]
cluster_ids = {c.cluster_id for c in cluster_metrics}
cluster_metrics: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}
with open(f'data/{use_case}/layer_metrics/{reference_layer_name}.json') as file:
layer_metrics: List[Layer] = [Layer.create_from_dict(e) for e in json.loads(file.read())]
layer_metrics: Dict[Any, Layer] = {l.time_window_id: l for l in layer_metrics}
######################
# load the time keys chronologically
ordered_time_keys = list(layer_metrics.keys())
ordered_time_keys.sort(key=lambda x: eval(x))
######################
ordered_time_keys = ordered_time_keys[-N:]
#################
prediction_metrics_raw = []
current_layer_metric = layer_metrics[ordered_time_keys[1]]
prev_layer_metric = layer_metrics[ordered_time_keys[0]]
current_layer_metric_tuple = get_layer_metrics(current_layer_metric)
prev_layer_metric_tuple = get_layer_metrics(prev_layer_metric)
for cluster_id in cluster_ids:
# yield each combination of reference layer metrics to clusters
prediction_metrics_raw.append([prev_layer_metric_tuple, current_layer_metric_tuple, int(cluster_id)])
#######################
scaler, svc = load_ml_models(use_case, method, layer_name, reference_layer_name)
################
prediction_cluster_ids = []
prediction_time_window = increase_time_window(ordered_time_keys[1])
prediction_metrics = []
for pred in prediction_metrics_raw:
cluster_id = pred[-1]
prediction_cluster_ids.append(cluster_id)
flat_ = flatten_layer_metrics_datapoint(pred)
prediction_metrics.append(flat_)
prediction_results = svc.predict(scaler.transform(np.array(prediction_metrics)))
print(np.unique(prediction_results, return_counts=True))
for i in range(len(prediction_cluster_ids)):
res = PredictionResult(use_case, use_case, method, layer_name, reference_layer_name, prediction_cluster_ids[i], prediction_time_window, prediction_results[i])
repo.add_prediction_result(res)
from processing.data_prep.metrics_base import get_cyclic_time_feature from processing.data_prep.metrics_base import get_cyclic_time_feature, get_cluster_metrics
from processing.ml.predict_base import increase_time_window, load_ml_models
N = 3 # Currently N is fixed to 3 N = 3 # Currently N is fixed to 3
method = 'single_context' method = 'single_context'
...@@ -11,18 +12,11 @@ import json ...@@ -11,18 +12,11 @@ import json
from entities import Cluster from entities import Cluster
import collections import collections
import numpy as np import numpy as np
from typing import Iterable, Tuple from typing import Iterable, Tuple, Dict, List
###################### ######################
from typing import Dict
from typing import Tuple
def get_metrics(cur_cluster: Cluster) -> Tuple:
return (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2,
cur_cluster.range_, cur_cluster.global_center_distance, get_cyclic_time_feature(cur_cluster.get_time_info()))
####################
import pickle import pickle
##################### #####################
def flatten_metrics_datapoint(datapoint: list) -> Tuple['X', np.array]: def flatten_metrics_datapoint(datapoint: list) -> np.array:
''' '''
Flattens a single metrics data point in the form: Flattens a single metrics data point in the form:
[(cluster_size, cluster_variance, cluster_density, cluster_import1, cluster_import2, cluster_range, cluster_center, (time_f1, time_f2))^N] [(cluster_size, cluster_variance, cluster_density, cluster_import1, cluster_import2, cluster_range, cluster_center, (time_f1, time_f2))^N]
...@@ -35,16 +29,6 @@ def flatten_metrics_datapoint(datapoint: list) -> Tuple['X', np.array]: ...@@ -35,16 +29,6 @@ def flatten_metrics_datapoint(datapoint: list) -> Tuple['X', np.array]:
flat_list.extend(entry[-1]) # add time tuple flat_list.extend(entry[-1]) # add time tuple
return np.asarray(flat_list) return np.asarray(flat_list)
######################
def increase_time_window(time_window_id: str):
tuple_ = eval(time_window_id)
if tuple_[1] == 52:
# 1st week next year
return (tuple_[0]+1 , 1)
else:
# next week
return str((tuple_[0], tuple_[1]+1))
######################### #########################
from db.repository import Repository from db.repository import Repository
from db.dao import PredictionResult from db.dao import PredictionResult
...@@ -72,12 +56,8 @@ def run_prediction(use_case: str): ...@@ -72,12 +56,8 @@ def run_prediction(use_case: str):
cluster_map[id_] = [] cluster_map[id_] = []
cluster_map[id_].append(cluster) cluster_map[id_].append(cluster)
#################### ####################
with open(f'data/{use_case}/ml_output/{method}/{layer_name}.model', 'rb') as file: scaler, svc = load_ml_models(use_case, method, layer_name)
svc = pickle.load(file)
####################
with open(f'data/{use_case}/ml_output/{method}/{layer_name}_scaler.model', 'rb') as file:
scaler = pickle.load(file)
##################### #####################
# store id, future time window, and flattened metrics to combine the latter during prediction # store id, future time window, and flattened metrics to combine the latter during prediction
prediction_cluster_ids = [] prediction_cluster_ids = []
...@@ -85,7 +65,7 @@ def run_prediction(use_case: str): ...@@ -85,7 +65,7 @@ def run_prediction(use_case: str):
prediction_metrics = [] prediction_metrics = []
for cluster_id, time_windows in cluster_map.items(): for cluster_id, time_windows in cluster_map.items():
v = [get_metrics(c) for c in time_windows[-N:]] # metrics for last N time windows v = [get_cluster_metrics(c) for c in time_windows[-N:]] # metrics for last N time windows
v_flattened = flatten_metrics_datapoint(v) v_flattened = flatten_metrics_datapoint(v)
prediction_cluster_ids.append(cluster_id) prediction_cluster_ids.append(cluster_id)
......
...@@ -8,10 +8,10 @@ approach = 'cross_context' ...@@ -8,10 +8,10 @@ approach = 'cross_context'
import pickle import pickle
from pathlib import Path from pathlib import Path
def export_model(model, use_case, layer_name, reference_layer_name): def export_model(model, use_case, layer_name, reference_layer_name, scaler=False):
fpath = f'data/{use_case}/ml_output/{approach}' fpath = f'data/{use_case}/ml_output/{approach}'
Path(fpath).mkdir(parents=True, exist_ok=True) Path(fpath).mkdir(parents=True, exist_ok=True)
with open(f'{fpath}/{layer_name}_{reference_layer_name}.model', 'wb') as f: with open(f'{fpath}/{layer_name}_{reference_layer_name}{"_scaler" if scaler else ""}.model', 'wb') as f:
pickle.dump(model, f) pickle.dump(model, f)
################### ###################
from sklearn.ensemble import RandomForestClassifier from sklearn.ensemble import RandomForestClassifier
...@@ -46,11 +46,13 @@ def run_training(use_case): ...@@ -46,11 +46,13 @@ def run_training(use_case):
from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import StandardScaler
scaler = StandardScaler() scaler = StandardScaler()
train_X = scaler.fit_transform(training)[:,:-1] # all except y train_X = scaler.fit_transform(training[training.columns[:-1]]) # all except y
train_Y = training[training.columns[-1]] train_Y = training[training.columns[-1]]
test_X = scaler.transform(testing)[:,:-1] # all except y test_X = scaler.transform(testing[testing.columns[:-1]]) # all except y
test_Y = testing[testing.columns[-1]] test_Y = testing[testing.columns[-1]]
export_model(scaler, use_case, layer_name, reference_layer_name, scaler=True)
######################## ########################
from processing import DataSampler from processing import DataSampler
......
...@@ -5,12 +5,18 @@ if os.path.exists(modules_path): ...@@ -5,12 +5,18 @@ if os.path.exists(modules_path):
sys.path.insert(1, modules_path) sys.path.insert(1, modules_path)
from db.repository import Repository
from processing.ml.predict_single_context import run_prediction as run_single_prediction from processing.ml.predict_single_context import run_prediction as run_single_prediction
# from processing.ml.predict_cross_context import run_prediction as run_cross_prediction from processing.ml.predict_cross_context import run_prediction as run_cross_prediction
if __name__ == '__main__': if __name__ == '__main__':
'''Executes the predictions.''' '''Executes the predictions.'''
use_case='community-prediction-youtube-n' use_case='community-prediction-youtube-n'
repo = Repository()
repo.delete_all_prediction_results()
run_single_prediction(use_case) run_single_prediction(use_case)
# run_cross_prediction(use_case) run_cross_prediction(use_case)
\ No newline at end of file \ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment