Commit 13dd065e authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'feature/community-prediction' into 'develop'

Feature/community prediction

See merge request !48
parents 13427b9e 784399e0
......@@ -77,7 +77,7 @@ Contains the clustering results. Clustering is performed on all nodes inside one
```GET https://articonf1.itec.aau.at:30103/api/use-cases/{use_case}/tables/{table}/layers/{layer_name}/clusters``` returns the identified clusters.
```GET https://articonf1.itec.aau.at:30103/api/use-cases/{use_case}/tables/{table}/layers/{layer_name}/timeslices``` returns the identified clusters partitioned based on their nodes' timestamps.
```GET https://articonf1.itec.aau.at:30103/api/use-cases/{use_case}/tables/{table}/layers/{layer_name}/timeslices``` returns the identified clusters partitioned based on their nodes' [timestamps](schema_information.md#time-slices).
## RunId
When a similarity computation is executed, it has an associated RunId which is used to uniquely identify that execution.
......@@ -93,3 +93,14 @@ Returns the computed similarity. Two clusters belonging to the SAME layer will b
Intermediary data-structure used only by the function which computes the similarity. Clusters are connected only to other clusters belonging to a DIFFERENT layer.
```GET https://articonf1.itec.aau.at:30103/api/use_cases/{use_case}/tables/{table}/connectedClusters``` returns all connected clusters for the given use-case and table.
# Proactive Community Detection Microservice
https://articonf1.itec.aau.at:30105/api/ui/
This microservice contains predictions of the cluster sizes from the clusters in [role stage discovery microservice](https://articonf1.itec.aau.at:30103/api/ui/#!/Clusters/routes_clustersets_get_by_name) for the week following the latest data in SMART. The timestamps used for temporal devision are documented [here](schema_information.md#time-slices).
Example: Layer $L$ contains 3 clusters with sizes 3, 0, 7 in the most recent week $t$. SMART predicts the sizes in the following week $t+1$ as 5, 0, 6 based on each cluster's structural changes over the last $N=3$ weeks, i.e. $t,\ t-1,\ t-2$.
```GET https://articonf1.itec.aau.at:30105/api/use-cases/{use_case}/tables/{table}/layers/{layer_name}/predictions```
contains the size predictions for all clusters of a layer derived as described above.
\ No newline at end of file
......@@ -119,4 +119,41 @@ __Example:__ The node for the initial trace of the pizzashop and the layer ```na
"fullName": "name+description",
"firstTopping": "toppings[0]//name"
}
}
\ No newline at end of file
}
# Clusters
## Time Slices
Time slices are clusters split into time windows. Currently SMART creates one time slice per week for each cluster.
Currently, the timestamp information is _not_ integrated into the table schema mapping and _not_ converted into a UNIX timestamp during upload.
The following fields are considered timestamps during the partitioning:
```yaml
vialog-enum:
video: created
change: /
car-sharing-official:
car: /
hash: /
media: /
offer: available
publication: date
travel: startDate
travelCancelledBy: moment
travelFinishedBy: moment
travelStartedBy: moment
travelSuggestedEndPlaces: /
travelUsers: /
user: /
offerEndPlaces: /
smart-energy:
smart-energy: Timestamp
crowd-journalism-enum:
video: creationTimestamp
tag: /
classification: lastUpdate
event: /
purchase: timestamp
```
# contains raw data for machine learning
data/
# backup data for machine learning debugging
data_bak/
\ No newline at end of file
......@@ -34,14 +34,13 @@ class Repository(MongoRepositoryBase):
super().drop_collection(collection_)
#region LayerPair
#region Use Case
def add_use_case(self, use_case: str):
super().insert_entry(self._use_case_collection, {'name':use_case})
def get_use_cases(self) -> List[str]:
entries = super().get_entries(self._use_case_collection)
return [e['name'] for e in entries]
#endregion
#region Layers
......@@ -53,7 +52,7 @@ class Repository(MongoRepositoryBase):
entries = super().get_entries(self._layer_collection, projection={'_id': 0})
return [LayerDao(e) for e in entries]
def get_layers_for_use_case(self, use_case: str) -> LayerDao:
def get_layers_for_use_case(self, use_case: str) -> List[LayerDao]:
entries = super().get_entries(self._layer_collection, selection={'use_case': use_case})
return [LayerDao(e) for e in entries]
......@@ -132,6 +131,13 @@ class Repository(MongoRepositoryBase):
entries = super().get_entries(self._prediction_result_collection, selection={'use_case': use_case}, projection={'_id': 0})
return [PredictionResult.create_from_dict(e) for e in entries]
def get_prediction_results_for_layer(self, use_case: str, use_case_table: str, layer_name: str) -> List[PredictionResult]:
entries = super().get_entries(self._prediction_result_collection, selection={'use_case': use_case, 'table': use_case_table, 'layer': layer_name}, projection={'_id': 0})
return [PredictionResult.create_from_dict(e) for e in entries]
def delete_all_prediction_results(self):
super().drop_collection(self._prediction_result_collection)
def delete_prediction_results(self, use_case: str):
super().delete_many(self._prediction_result_collection, selection={'use_case': use_case})
#endregion
......@@ -7,7 +7,7 @@ import json
import os
from entities import TimeWindow, Cluster
def store_metrics_for_clusters(use_case: str, layer_name: str, feature_names: List[str]):
def store_metrics_for_clusters(use_case: str, table: str, layer_name: str, feature_names: List[str]):
'''
:param layer_name: Name of the layer for which multiple time windows exist
:param feature_names: Features of the layer
......@@ -15,7 +15,7 @@ def store_metrics_for_clusters(use_case: str, layer_name: str, feature_names: Li
print(f"Working on {layer_name} cluster metrics")
# load global cluster centers
path_in = f'data/{use_case}/raw/clusters/{layer_name}.json'
path_in = f'data/{use_case}/{table}/raw/clusters/{layer_name}.json'
with open(path_in, 'r') as file:
clusters = json.loads(file.read())
cluster_centers: Dict[str, Tuple[float]] = {
......@@ -24,9 +24,9 @@ def store_metrics_for_clusters(use_case: str, layer_name: str, feature_names: Li
if cluster['label'] != 'noise'
}
path_in = f'data/{use_case}/raw/timeslices/{layer_name}'
Path(f'data/{use_case}/cluster_metrics/').mkdir(parents=True, exist_ok=True)
path_out = f'data/{use_case}/cluster_metrics/{layer_name}.json'
path_in = f'data/{use_case}/{table}/raw/timeslices/{layer_name}'
Path(f'data/{use_case}/{table}/cluster_metrics/').mkdir(parents=True, exist_ok=True)
path_out = f'data/{use_case}/{table}/cluster_metrics/{layer_name}.json'
complete_clusters: List[Cluster] = []
......@@ -54,7 +54,7 @@ import collections
import numpy as np
from typing import Iterable, Tuple
def create_metrics_training_data(use_case: str, layer_name: str, N: int = 3) -> Iterable[list]:
def create_metrics_training_data(use_case: str, table: str, layer_name: str, N: int = 3) -> Iterable[list]:
"""
Loads the metrics training data for an individual layer from disk.
A single metrics training data point should look like this:
......@@ -70,7 +70,7 @@ def create_metrics_training_data(use_case: str, layer_name: str, N: int = 3) ->
:param layer_name: the name of the layer metrics json file
"""
path_in = f"data/{use_case}/cluster_metrics/{layer_name}.json"
path_in = f"data/{use_case}/{table}/cluster_metrics/{layer_name}.json"
with open(path_in, 'r') as file:
data = [Cluster.create_from_dict(cl_d) for cl_d in json.loads(file.read())]
......@@ -94,7 +94,7 @@ def create_metrics_training_data(use_case: str, layer_name: str, N: int = 3) ->
tuples.append(cur_metrics)
if len(tuples) == N:
label = get_evolution_label(cur_cluster.size, data[i+1].size)
label = data[i+1].size # get_evolution_label(cur_cluster.size, data[i+1].size)
yield list(tuples) + [label]
############################
def flatten_metrics_datapoint(datapoint: list) -> Tuple['X', np.array]:
......@@ -122,9 +122,9 @@ def balance_dataset(df: DataFrame) -> DataFrame:
# nothing happening here, balance only on real training, not during prep
return df
def store_training_data(use_case: str, layer_name: str):
def store_training_data(use_case: str, table: str, layer_name: str):
# load metrics data from disk
data: Iterable = create_metrics_training_data(use_case=use_case, layer_name=layer_name)
data: Iterable = create_metrics_training_data(use_case=use_case, table=table, layer_name=layer_name)
# flatten and convert to df
df = convert_metrics_data_to_dataframe(data, columns=COLUMNS, flattening_method=flatten_metrics_datapoint)
......@@ -135,8 +135,8 @@ def store_training_data(use_case: str, layer_name: str):
# shuffle
df = df.sample(frac=1).reset_index(drop=True)
Path(f'data/{use_case}/ml_input/single_context/').mkdir(parents=True, exist_ok=True)
df.to_csv(f'data/{use_case}/ml_input/single_context/{layer_name}.csv')
Path(f'data/{use_case}/{table}/ml_input/single_context/').mkdir(parents=True, exist_ok=True)
df.to_csv(f'data/{use_case}/{table}/ml_input/single_context/{layer_name}.csv')
#######################
......@@ -159,13 +159,13 @@ def run(use_case=None):
for use_case in use_cases:
print(f"Executing cluster metrics calc for use case {use_case}")
layers = [[l.layer_name, l.properties] for l in repo.get_layers_for_use_case(use_case)]
layers = repo.get_layers_for_use_case(use_case)
##################
for layer in layers:
store_metrics_for_clusters(use_case, layer[0], layer[1])
store_metrics_for_clusters(layer.use_case, layer.use_case_table, layer.layer_name, layer.properties)
###################
for name, _ in layers:
print(f"Storing training data for {name}")
store_training_data(use_case, layer_name=name)
for layer in layers:
print(f"Storing training data for {layer.layer_name}")
store_training_data(layer.use_case, layer.use_case_table, layer.layer_name)
\ No newline at end of file
......@@ -9,11 +9,11 @@ import os
from entities import TimeWindow, Layer
from processing import ClusterMetricsCalculatorFactory
def store_metrics_for_layers(use_case: str, layer_name: str, feature_names: List[str]):
def store_metrics_for_layers(use_case: str, table: str, layer_name: str, feature_names: List[str]):
print(f"Working on {layer_name} layer metrics")
# load global cluster centers
path_in = f'data/{use_case}/raw/clusters/{layer_name}.json'
path_in = f'data/{use_case}/{table}/raw/clusters/{layer_name}.json'
with open(path_in, 'r') as file:
clusters = json.loads(file.read())
cluster_centers: Dict[str, Tuple[float]] = {
......@@ -24,7 +24,7 @@ def store_metrics_for_layers(use_case: str, layer_name: str, feature_names: List
# load time windows
all_layers: List[Layer] = []
path_in = f'data/{use_case}/raw/timeslices/{layer_name}'
path_in = f'data/{use_case}/{table}/raw/timeslices/{layer_name}'
for root, _, files in os.walk(path_in):
for f in files:
with open(os.path.join(root, f), 'r') as file:
......@@ -35,8 +35,8 @@ def store_metrics_for_layers(use_case: str, layer_name: str, feature_names: List
all_layers.append(layer)
# store the layer metrics
Path(f'data/{use_case}/layer_metrics/').mkdir(parents=True, exist_ok=True)
path_out = f'data/{use_case}/layer_metrics/{layer_name}.json'
Path(f'data/{use_case}/{table}/layer_metrics/').mkdir(parents=True, exist_ok=True)
path_out = f'data/{use_case}/{table}/layer_metrics/{layer_name}.json'
with open(path_out, 'w') as file:
file.write(json.dumps([l.__dict__ for l in all_layers]))
#########################
......@@ -63,7 +63,7 @@ from typing import Iterable, List, Dict, Any
import json
from entities import Layer, Cluster
def create_layer_metrics_training_data(use_case: str, layer_name: str, reference_layer: str, N: int = 2) -> Iterable:
def create_layer_metrics_training_data(use_case: str, table: str, layer_name: str, reference_layer: str, N: int = 2) -> Iterable:
"""
Loads the metrics training data for an individual layer from disk.
......@@ -83,12 +83,12 @@ def create_layer_metrics_training_data(use_case: str, layer_name: str, reference
if N != 2:
raise NotImplementedError("N is not implemented and fixed to 2!")
with open(f'data/{use_case}/cluster_metrics/{layer_name}.json') as file:
with open(f'data/{use_case}/{table}/cluster_metrics/{layer_name}.json') as file:
cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]
cluster_ids = {c.cluster_id for c in cluster_metrics}
cluster_metrics: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}
with open(f'data/{use_case}/layer_metrics/{reference_layer}.json') as file:
with open(f'data/{use_case}/{table}/layer_metrics/{reference_layer}.json') as file:
layer_metrics: List[Layer] = [Layer.create_from_dict(e) for e in json.loads(file.read())]
layer_metrics: Dict[Any, Layer] = {l.time_window_id: l for l in layer_metrics}
......@@ -98,7 +98,7 @@ def create_layer_metrics_training_data(use_case: str, layer_name: str, reference
# go through all time windows once...
prev_time_key = ordered_time_keys[0]
for current_time_key in ordered_time_keys[1:]:
for idx, current_time_key in enumerate(ordered_time_keys[1:-1]):
# ...and load the current and previous layer metrics in the reference_layer
current_layer_metric = layer_metrics[current_time_key]
prev_layer_metric = layer_metrics[prev_time_key]
......@@ -110,7 +110,8 @@ def create_layer_metrics_training_data(use_case: str, layer_name: str, reference
for cluster_id in cluster_ids:
current_cluster_metric = cluster_metrics[(current_time_key, cluster_id)]
prev_cluster_metric = cluster_metrics[(prev_time_key, cluster_id)]
evolution_label = get_evolution_label(prev_cluster_metric.size, current_cluster_metric.size)
next_time_key = ordered_time_keys[idx+2]
evolution_label = cluster_metrics[(next_time_key, cluster_id)].size # get_evolution_label(prev_cluster_metric.size, current_cluster_metric.size)
# yield each combination of reference layer metrics to clusters
yield [prev_layer_metric_tuple, current_layer_metric_tuple, int(cluster_id), evolution_label]
......@@ -149,9 +150,9 @@ def balance_dataset(df: DataFrame) -> DataFrame:
# nothing happening here, balance only on real training, not during prep
return df
def store_training_data(use_case: str, layer_name: str, reference_layer_name: str):
def store_training_data(use_case: str, table: str, layer_name: str, reference_layer_name: str):
# load metrics data from disk
data: Iterable = create_layer_metrics_training_data(use_case=use_case, layer_name=layer_name, reference_layer=reference_layer_name)
data: Iterable = create_layer_metrics_training_data(use_case=use_case, table=table, layer_name=layer_name, reference_layer=reference_layer_name)
# convert to X and Y
df = convert_metrics_data_to_dataframe(data, columns=get_columns(N=2), flattening_method=flatten_layer_metrics_datapoint)
......@@ -162,8 +163,8 @@ def store_training_data(use_case: str, layer_name: str, reference_layer_name: st
# shuffle
df = df.sample(frac=1).reset_index(drop=True)
Path(f'data/{use_case}/ml_input/cross_context/').mkdir(parents=True, exist_ok=True)
df.to_csv(f'data/{use_case}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv')
Path(f'data/{use_case}/{table}/ml_input/cross_context/').mkdir(parents=True, exist_ok=True)
df.to_csv(f'data/{use_case}/{table}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv')
#########################
......@@ -186,16 +187,16 @@ def run(use_case=None):
for use_case in use_cases:
print(f"Executing layer metrics calc for use case {use_case}")
layers = [[l.layer_name, l.properties] for l in repo.get_layers_for_use_case(use_case)]
layers = repo.get_layers_for_use_case(use_case)
layer_pairs = repo.get_layer_pairs(use_case)
################
for layer in layers:
try:
store_metrics_for_layers(use_case, layer[0], layer[1])
store_metrics_for_layers(layer.use_case, layer.use_case_table, layer.layer_name, layer.properties)
except FileNotFoundError:
pass
###############
for ld in layer_pairs:
print(f"Storing training data for {ld.layer} with L_R={ld.reference_layer}")
store_training_data(use_case, layer_name=ld.layer, reference_layer_name=ld.reference_layer)
\ No newline at end of file
store_training_data(ld.use_case, table=ld.table, layer_name=ld.layer, reference_layer_name=ld.reference_layer)
\ No newline at end of file
......@@ -4,6 +4,7 @@ from processing.data_prep.layer_metrics_calc import run as lrun
from pathlib import Path
import json
import os
from typing import List
from db.repository import Repository
......@@ -12,11 +13,11 @@ repo = Repository()
def store_clusters_as_files(use_case):
path_ = f'data/{use_case}/raw/clusters/'
Path(path_).mkdir(parents=True, exist_ok=True)
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
path_ = f'data/{l.use_case}/{l.use_case_table}/raw/clusters/'
Path(path_).mkdir(parents=True, exist_ok=True)
clusters = repo.get_clusters_for_layer(use_case, l.use_case_table, l.layer_name)
with open(os.path.join(path_, f'{l.layer_name}.json'), 'w') as file_:
......@@ -24,30 +25,25 @@ def store_clusters_as_files(use_case):
def store_time_slices_as_files(use_case):
path_ = f'data/{use_case}/raw/timeslices/'
layers = repo.get_layers_for_use_case(use_case)
for l in layers:
Path(os.path.join(path_, l.layer_name)).mkdir(parents=True, exist_ok=True)
path_ = f'data/{l.use_case}/{l.use_case_table}/raw/timeslices/{l.layer_name}/'
Path(path_).mkdir(parents=True, exist_ok=True)
time_slices = repo.get_time_slices_for_layer(use_case, l.use_case_table, l.layer_name)
for ts in time_slices:
with open(os.path.join(path_, l.layer_name, f'{ts.time}.json'), 'w') as file_:
with open(os.path.join(path_, f'{ts.time}.json'), 'w') as file_:
file_.write(json.dumps(ts.to_serializable_dict()))
def run(use_case=None):
def run(use_cases: List[str] = None):
'''Prepares training data for single and cross-context using the file system (data/)'''
if use_case is not None:
use_cases = [use_case]
else:
if use_cases is None:
use_cases = repo.get_use_cases()
for use_case in use_cases:
store_clusters_as_files(use_case)
store_time_slices_as_files(use_case)
crun(use_case)
lrun(use_case)
......@@ -11,8 +11,8 @@ def increase_time_window(time_window_id: str) -> str:
from typing import Tuple
import pickle
def load_ml_models(use_case, method, layer_name, reference_layer_name=None) -> Tuple['scaler', 'clf']:
path_ = f'data/{use_case}/ml_output/{method}/{layer_name}'
def load_ml_models(use_case, table, method, layer_name, reference_layer_name=None) -> Tuple['scaler', 'clf']:
path_ = f'data/{use_case}/{table}/ml_output/{method}/{layer_name}'
if method == 'single_context':
with open(f'{path_}.model', 'rb') as file:
......
......@@ -44,18 +44,19 @@ repo = Repository()
def run_prediction(use_case: str):
for layerpair in repo.get_layer_pairs(use_case):
table = layerpair.table
layer_name = layerpair.layer
reference_layer_name = layerpair.reference_layer
print(f"Predicting {method} for {use_case}//{layer_name} based on {reference_layer_name}")
print(f"Predicting {method} for {use_case}//{table}//{layer_name} based on {reference_layer_name}")
##########################
with open(f'data/{use_case}/cluster_metrics/{layer_name}.json') as file:
with open(f'data/{use_case}/{table}/cluster_metrics/{layer_name}.json') as file:
cluster_metrics: List[Cluster] = [Cluster.create_from_dict(e) for e in json.loads(file.read())]
cluster_ids = {c.cluster_id for c in cluster_metrics}
cluster_metrics: Dict[Any, Cluster] = {(c.time_window_id, c.cluster_id): c for c in cluster_metrics}
with open(f'data/{use_case}/layer_metrics/{reference_layer_name}.json') as file:
with open(f'data/{use_case}/{table}/layer_metrics/{reference_layer_name}.json') as file:
layer_metrics: List[Layer] = [Layer.create_from_dict(e) for e in json.loads(file.read())]
layer_metrics: Dict[Any, Layer] = {l.time_window_id: l for l in layer_metrics}
######################
......@@ -77,7 +78,7 @@ def run_prediction(use_case: str):
# yield each combination of reference layer metrics to clusters
prediction_metrics_raw.append([prev_layer_metric_tuple, current_layer_metric_tuple, int(cluster_id)])
#######################
scaler, svc = load_ml_models(use_case, method, layer_name, reference_layer_name)
scaler, svc = load_ml_models(use_case, table, method, layer_name, reference_layer_name)
################
prediction_cluster_ids = []
prediction_time_window = increase_time_window(ordered_time_keys[1])
......@@ -91,8 +92,9 @@ def run_prediction(use_case: str):
prediction_metrics.append(flat_)
prediction_results = svc.predict(scaler.transform(np.array(prediction_metrics)))
print(np.unique(prediction_results, return_counts=True))
# print(np.unique(prediction_results, return_counts=True))
prediction_results = np.rint(prediction_results) # round to full numbers
for i in range(len(prediction_cluster_ids)):
res = PredictionResult(use_case, use_case, method, layer_name, reference_layer_name, prediction_cluster_ids[i], prediction_time_window, prediction_results[i])
res = PredictionResult(use_case, table, method, layer_name, reference_layer_name, prediction_cluster_ids[i], prediction_time_window, prediction_results[i])
repo.add_prediction_result(res)
......@@ -37,14 +37,19 @@ repo = Repository()
def run_prediction(use_case: str):
for layer in repo.get_layers_for_use_case(use_case):
table = layer.use_case_table
layer_name = layer.layer_name
print(f"Predicting {method} for {use_case}//{layer_name}")
print(f"Predicting {method} for {use_case}//{table}//{layer_name}")
#################
path_in = f"data/{use_case}/cluster_metrics/{layer_name}.json"
path_in = f"data/{use_case}/{table}/cluster_metrics/{layer_name}.json"
with open(path_in, 'r') as file:
data = [Cluster.create_from_dict(cl_d) for cl_d in json.loads(file.read())]
if len(data) == 0:
print(f"No data for predicting {use_case}//{table}//{layer_name}.")
continue
data.sort(key=lambda cl: (eval(cl.cluster_id), eval(cl.time_window_id)))
#####################
cluster_map: Dict['cluster_id', 'time_windows'] = {}
......@@ -57,7 +62,7 @@ def run_prediction(use_case: str):
cluster_map[id_].append(cluster)
####################
scaler, svc = load_ml_models(use_case, method, layer_name)
scaler, svc = load_ml_models(use_case, table, method, layer_name)
#####################
# store id, future time window, and flattened metrics to combine the latter during prediction
prediction_cluster_ids = []
......@@ -74,8 +79,9 @@ def run_prediction(use_case: str):
# predict all at once for speedup
prediction_results = svc.predict(scaler.transform(np.array(prediction_metrics)))
print(np.unique(prediction_results, return_counts=True))
# print(np.unique(prediction_results, return_counts=True))
prediction_results = np.rint(prediction_results) # round to full numbers
for i in range(len(prediction_cluster_ids)):
res = PredictionResult(use_case, use_case, method, layer_name, None, prediction_cluster_ids[i], prediction_time_windows[i], prediction_results[i])
res = PredictionResult(use_case, table, method, layer_name, None, prediction_cluster_ids[i], prediction_time_windows[i], prediction_results[i])
repo.add_prediction_result(res)
......@@ -29,15 +29,29 @@ def remove_empty_community_class(df):
########################
import sklearn.metrics
def print_report(clfs: list, test_Xs: list, test_Y: 'y', titles: list):
def print_classification_report(clf, test_X, test_Y, title):
"""
Prints all reports.
:param clfs: list of classifiers to evaluate
:param test_Xs: list of test_X for the corresponding classifier at idx
:param test_Y: true classes
:param titles: list of titles for the classifiers at idx
Prints a classification report.
:param clf: classifier to evaluate
:param test_X: input features X
:param test_Y: true classes Y
:param title: title for the report
"""
for clf, test_X, title in zip(clfs, test_Xs, titles):
pred_Y = clf.predict(test_X)
print(f"### {title} ###\n", sklearn.metrics.classification_report(y_true=test_Y, y_pred=pred_Y))
pred_Y = clf.predict(test_X)
print(f"### {title} ###\n", sklearn.metrics.classification_report(y_true=test_Y, y_pred=pred_Y))
def print_regression_report(clf, test_X, test_Y, title):
"""
Prints a regression report.
:param clf: regressor to evaluate
:param test_X: input features X
:param test_Y: true prediction values
:param title: title for the report
"""
pred_Y = clf.predict(test_X)
pred_Y = np.rint(pred_Y) # round to full numbers
print(f"### {title} ###\nR2-score={sklearn.metrics.r2_score(y_true=test_Y, y_pred=pred_Y)}, " \
f"MSE={sklearn.metrics.mean_squared_error(y_true=test_Y, y_pred=pred_Y)}, " \
f"sanity={sklearn.metrics.mean_squared_error(y_true=test_Y, y_pred=[0]*len(pred_Y))}")
########################
\ No newline at end of file
import pandas as pd
from pandas import DataFrame
from processing.ml.train_base import split_data, remove_empty_community_class, print_report
from processing.ml.train_base import split_data, remove_empty_community_class, print_regression_report
approach = 'cross_context'
max_sampling_size = 20000
#######################
import pickle
from pathlib import Path
def export_model(model, use_case, layer_name, reference_layer_name, scaler=False):
fpath = f'data/{use_case}/ml_output/{approach}'
def export_model(model, use_case, table, layer_name, reference_layer_name, scaler=False):
fpath = f'data/{use_case}/{table}/ml_output/{approach}'
Path(fpath).mkdir(parents=True, exist_ok=True)
with open(f'{fpath}/{layer_name}_{reference_layer_name}{"_scaler" if scaler else ""}.model', 'wb') as f:
pickle.dump(model, f)
###################
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import RandomForestRegressor
n_estimators = 50
criterion = 'gini'
criterion = 'mse'
max_depth = None
min_samples_leaf = 2
min_impurity_decrease= 1E-5
bootstrap=True
####################
from sklearn.svm import LinearSVR
tol = 1E-4
c = 1
loss = 'squared_epsilon_insensitive'
dual = False
###############
......@@ -32,16 +39,19 @@ repo = Repository()
def run_training(use_case):
for layerpair in repo.get_layer_pairs(use_case):
table = layerpair.table
layer_name = layerpair.layer
reference_layer_name = layerpair.reference_layer
df: DataFrame = pd.read_csv(f'data/{use_case}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv', index_col=0)
df: DataFrame = pd.read_csv(f'data/{use_case}/{table}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv', index_col=0)
if df.empty:
print(f"No data for training {use_case}//{table}//{layer_name} on {reference_layer_name}.")
continue
#######################
training, testing = split_data(df, shuffle=False)
#####################
training = remove_empty_community_class(training)
testing = remove_empty_community_class(testing)
training.sample(frac=min(1, max_sampling_size/len(training))).reset_index(drop=True)
#####################
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
......@@ -52,22 +62,17 @@ def run_training(use_case):
test_X = scaler.transform(testing[testing.columns[:-1]]) # all except y
test_Y = testing[testing.columns[-1]]
export_model(scaler, use_case, layer_name, reference_layer_name, scaler=True)
export_model(scaler, use_case, table, layer_name, reference_layer_name, scaler=True)
########################
from processing import DataSampler
sampler = DataSampler()
try:
train_X, train_Y = sampler.sample_median_size(train_X, train_Y, max_size=100000)
except ValueError as e: # not enough points for oversampling
print(f"Could not sample training data, using original distribution: {e}")
####################
rfc = RandomForestClassifier(n_estimators=n_estimators, criterion=criterion, max_depth=max_depth,
# RF is a lot better than SVM, but I did not tune hyperparameters for regression
rfc = RandomForestRegressor(n_estimators=n_estimators, criterion=criterion, max_depth=max_depth,
min_samples_leaf=min_samples_leaf, min_impurity_decrease=min_impurity_decrease,
bootstrap=bootstrap)
# rfc = LinearSVR(loss=loss, C=c, dual=dual, tol=tol)
rfc.fit(train_X, train_Y)
print_report([rfc], [test_X], test_Y, ["X"])
export_model(rfc, use_case, layer_name, reference_layer_name)
####################
print_regression_report(rfc, test_X, test_Y, f"{layer_name} based on {reference_layer_name}")
####################
export_model(rfc, use_case, table, layer_name, reference_layer_name)
\ No newline at end of file
import pandas as pd
from pandas import DataFrame
from processing.ml.train_base import split_data, remove_empty_community_class, print_report
from processing.ml.train_base import split_data, remove_empty_community_class, print_regression_report
approach = 'single_context'
max_sampling_size = 20000
#######################
import pickle
from pathlib import Path
def export_model(model, use_case, layer_name, scaler=False):
fpath = f'data/{use_case}/ml_output/{approach}'
def export_model(model, use_case, table, layer_name, scaler=False):
fpath = f'data/{use_case}/{table}/ml_output/{approach}'
Path(fpath).mkdir(parents=True, exist_ok=True)
with open(f'{fpath}/{layer_name}{"_scaler" if scaler else ""}.model', 'wb') as f:
pickle.dump(model, f)
#####################
from sklearn.ensemble import RandomForestClassifier
from sklearn.ensemble import RandomForestRegressor
n_estimators = 100
criterion = 'gini'
criterion = 'mse'
max_depth = None
min_samples_leaf = 2
min_impurity_decrease = 1E-5
bootstrap=True
####################
from sklearn.svm import LinearSVR
tol = 1E-4
c = 1
loss = 'squared_epsilon_insensitive'
dual = False
###############
......@@ -32,15 +39,18 @@ repo = Repository()
def run_training(use_case):
for layer in repo.get_layers_for_use_case(use_case):
table = layer.use_case_table
layer_name = layer.layer_name
df: DataFrame = pd.read_csv(f'data/{use_case}/ml_input/single_context/{layer_name}.csv', index_col=0)
df: DataFrame = pd.read_csv(f'data/{use_case}/{table}/ml_input/single_context/{layer_name}.csv', index_col=0)
if df.empty:
print(f"No data for training {use_case}//{table}//{layer_name}.")
continue
#######################
training, testing = split_data(df, shuffle=False)
#####################
training = remove_empty_community_class(training)
testing = remove_empty_community_class(testing)
training.sample(frac=min(1, max_sampling_size/len(training))).reset_index(drop=True)
#####################
from sklearn.preprocessing import StandardScaler
scaler = StandardScaler()
......@@ -51,22 +61,17 @@ def run_training(use_case):
test_X = scaler.transform(testing[testing.columns[:-1]]) # all except y
test_Y = testing[testing.columns[-1]]
export_model(scaler, use_case, layer_name, scaler=True)
export_model(scaler, use_case, table, layer_name, scaler=True)
########################
from processing import DataSampler
sampler = DataSampler()
try:
train_X, train_Y = sampler.sample_median_size(train_X, train_Y, max_size=100000)
except ValueError as e: # not enough points for oversampling
print(f"Could not sample training data, using original distribution: {e}")
####################
rfc = RandomForestClassifier(n_estimators=n_estimators, criterion=criterion, max_depth=max_depth,
# RF is 10-20% better compared to SVM, but I did not tune hyperparameters for regression
rfc = RandomForestRegressor(n_estimators=n_estimators, criterion=criterion, max_depth=max_depth,
min_samples_leaf=min_samples_leaf, min_impurity_decrease=min_impurity_decrease,
bootstrap=bootstrap)
# rfc = LinearSVR(loss=loss, C=c, dual=dual, tol=tol)
rfc.fit(train_X, train_Y)
####################
print_report([rfc], [test_X], test_Y, ["X"])
print_regression_report(rfc, test_X, test_Y, layer_name)
####################
export_model(rfc, use_case, layer_name)
export_model(rfc, use_case, table, layer_name)
\ No newline at end of file
......@@ -5,7 +5,7 @@ from db.dao import PredictionResult
repo = Repository()
def get(use_case, table, layer_name):
res = repo.get_prediction_results(use_case)
res = repo.get_prediction_results_for_layer(use_case, table, layer_name)
if res is None or len(res) == 0:
return Response(status=404)
else:
......
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from processing.data_prep.main import run
if __name__ == '__main__':
'''Creates data/raw files'''
run(use_case='community-prediction-youtube-n')
\ No newline at end of file
......@@ -9,8 +9,10 @@ import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from processing.fetching import fetching
from db.repository import Repository
if __name__ == "__main__":
'''Fetches all required data from business-logic and role-stage-discovery.'''
fetching.fetch(selected_use_cases=['community-prediction-youtube-n'], selected_use_case_tables=None)
\ No newline at end of file
Repository().DROP(confirm=True)
use_cases = ['vialog-enum', 'car-sharing-official', 'smart-energy', 'crowd-journalism-enum']+['community-prediction-youtube-n', 'community-prediction-taxi']
fetching.fetch(selected_use_cases=use_cases, selected_use_case_tables=None)
\ No newline at end of file
......@@ -4,19 +4,52 @@ modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import shutil
from typing import List
from db.repository import Repository
repo = Repository()
from processing.data_prep.main import run as run_data_prep
def _run_data_preparation(use_cases: List[str] = None):
'''Creates data/raw, data/cluster_metrics, data/layer_metrics, and data/ml_input files.'''
run_data_prep(use_cases)
from processing.ml.train_single_context import run_training as run_single_training
from processing.ml.train_cross_context import run_training as run_cross_training
def _run_training(use_cases: List[str] = None):
'''Executes the training and creates data/ml_output files.'''
for use_case in use_cases:
run_single_training(use_case)
run_cross_training(use_case)
from processing.ml.predict_single_context import run_prediction as run_single_prediction
from processing.ml.predict_cross_context import run_prediction as run_cross_prediction
def _run_prediction(use_cases: List[str] = None):
'''Executes the predictions and stores them in the DB.'''
for use_case in use_cases:
repo.delete_prediction_results(use_case)
run_single_prediction(use_case)
# 20210803 dont execute cross-context for use-cases
# run_cross_prediction(use_case)
if __name__ == '__main__':
'''Executes the predictions.'''
def _run_cleanup(use_cases: List[str] = None):
'''Deletes all files in data/ for the use-cases'''
for use_case in use_cases:
path_ = f'data/{use_case}/'
if os.path.exists(path_):
shutil.rmtree(path_)
use_case='community-prediction-youtube-n'
repo = Repository()
repo.delete_all_prediction_results()
if __name__ == '__main__':
use_cases = ['vialog-enum', 'car-sharing-official', 'smart-energy', 'crowd-journalism-enum']
# use_cases = ['community-prediction-youtube-n', 'community-prediction-taxi']
run_single_prediction(use_case)
run_cross_prediction(use_case)
\ No newline at end of file
_run_data_preparation(use_cases)
_run_training(use_cases)
_run_prediction(use_cases)
_run_cleanup(use_cases)
\ No newline at end of file
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from processing.ml.train_single_context import run_training as run_single_training
from processing.ml.train_cross_context import run_training as run_cross_training
if __name__ == '__main__':
'''Executes the training.'''
use_case='community-prediction-youtube-n'
run_single_training(use_case)
run_cross_training(use_case)
\ No newline at end of file
......@@ -32,6 +32,15 @@ spec:
image: alexx882/proactive-community-detection-microservice
ports:
- containerPort: 5000
imagePullPolicy: Always
volumeMounts:
- mountPath: /srv/articonf
name: articonf
volumes:
- name: articonf
hostPath:
path: /srv/articonf
type: Directory
---
apiVersion: v1
kind: Service
......
......@@ -111,6 +111,9 @@ class Repository(MongoRepositoryBase):
entries = super().get_entries(self._time_slice_collection, selection={'use_case': use_case, 'use_case_table': use_case_table, 'layer_name': layer_name})
return [TimeSlice(time_slice_dict=e, from_db=True) for e in entries]
def delete_time_slices(self, use_case: str):
super().delete_many(self._time_slice_collection, selection={'use_case': use_case})
def remove_all_time_slices(self):
super().drop_collection(self._time_slice_collection)
......
......@@ -13,16 +13,61 @@ from typing import Tuple, Dict, Any, List
TimeSliceKey = Tuple[int, int]
# TODO extract information about time features (maybe from table mapping)
TIME_PROPERTY_NAMES = ['timestamp']
TIME_PROPERTY_NAMES = [
# vialog-enum
'created',
# car-sharing-official
'available',
'date',
'startDate',
'moment',
# smart-energy
'Timestamp',
# crowd-journalism-enum
'creationTimestamp',
'lastUpdate',
'timestamp',
# community-prediction-youtube-n
'timestamp',
# community-prediction-taxi
'timestamp',
]
repo = Repository()
def try_convert_string_to_time(timestamp: str) -> datetime:
'''
Tries to convert a timestamp string by applying a few common conversion methods,
e.g. unix timestamp, human readable, ISO 8601
'''
try:
return datetime.utcfromtimestamp(float(timestamp))
except ValueError:
pass
try:
return datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
except ValueError:
pass
try:
return datetime.strptime(timestamp, '%d-%m-%Y %H:%M:%S')
except ValueError:
pass
try:
return datetime.strptime(timestamp, "%Y-%m-%dT%H:%M:%SZ")
except ValueError:
pass
raise ValueError(f"No conversion available for time data: {timestamp}")
def convert_to_time_slice_key(timestamp: str) -> TimeSliceKey:
'''Returns the tuple (year, week_of_year) from a timestamp. This is used as the key for the slicing.'''
# time = datetime.utcfromtimestamp(float(timestamp[0:10]))
# time = datetime.strptime(timestamp, '%Y-%m-%d %H:%M:%S')
time = datetime.utcfromtimestamp(float(timestamp))
time = try_convert_string_to_time(timestamp)
# print(time)
(y, w, _) = time.isocalendar()
return (y, w)
......@@ -42,7 +87,7 @@ def split_clusterset_by_time(layer: Layer, clusters: List[Cluster]) -> Dict[Time
for cluster in clusters:
if cluster.cluster_label == -1:
print("Noise cluster was ignored.")
# print("Noise cluster was ignored.")
continue
for node in cluster.nodes:
......@@ -67,38 +112,21 @@ def get_layers() -> List[Layer]:
return repo.get_layers()
def get_clusters_for_layer(use_case, use_case_table, layer_name)-> List[Cluster]:
# return repo.get_clusters_for_layer(use_case, use_case_table, layer_name)
json_path = f'_predictions/clusters/{layer_name}.json'
if os.path.exists(json_path):
with open(json_path, 'r') as file:
return [Cluster(cluster_dict=e, from_db=False) for e in json.loads(file.read())]
return []
return repo.get_clusters_for_layer(use_case, use_case_table, layer_name)
def get_layer_nodes(use_case, use_case_table, layer_name)-> List[dict]:
# return repo.get_layer_nodes(use_case, use_case_table, layer_name)
return []
return repo.get_layer_nodes(use_case, use_case_table, layer_name)
def add_time_slice(timeslice):
try:
repo.add_time_slice(timeslice)
pass
except:
print(f"Error while storing time slice in db for {timeslice.layer_name}")
# try:
# json_path = f'_predictions/timeslices/{timeslice.layer_name}/{timeslice.time}.json'.replace(', ', '_').replace('(', '').replace(')', '')
# if not os.path.exists(os.path.dirname(json_path)):
# os.makedirs(os.path.dirname(json_path))
# with open(json_path, 'w') as file:
# file.write(json.dumps(timeslice.to_serializable_dict(for_db=False)))
# except Exception as e:
# print(f"Error while writing json for {timeslice.layer_name}: {e}")
def run_time_slicing(selected_use_cases: List[str] = None, selected_use_case_tables: List[str] = None, selected_layer_names: List[str] = None):
layers = get_layers()
for layer in layers:
layer_name = layer.layer_name
use_case = layer.use_case
......@@ -128,5 +156,6 @@ def run_time_slicing(selected_use_cases: List[str] = None, selected_use_case_tab
if __name__ == "__main__":
# repo.remove_all_time_slices()
run_time_slicing(selected_use_cases=['community-prediction-youtube-n'])
\ No newline at end of file
use_case = 'community-prediction-youtube-n'
repo.delete_time_slices(use_case)
run_time_slicing(selected_use_cases=[use_case])
\ No newline at end of file
......@@ -27,6 +27,10 @@ class MongoRepositoryBase:
collection = self._database[collection_name]
return collection.find(selection, projection)
def delete_many(self, collection_name, selection: dict = {'__confirm__': '__false__'}):
collection = self._database[collection_name]
collection.delete_many(selection)
def close_connection(self):
self._mongo_client.close()
self._mongo_client = None
......
......@@ -59,7 +59,7 @@ else:
if server:
PROACTIVE_COMMUNITY_DETECTION_HOSTNAME = 'proactive-community-detection'
PROACTIVE_COMMUNITY_DETECTION_REST_PORT = 80
PROACTIVE_COMMUNITY_DETECTION_DB_HOSTNAME = f'{ROLESTAGE_DISCOVERY_HOSTNAME}-db'
PROACTIVE_COMMUNITY_DETECTION_DB_HOSTNAME = f'{PROACTIVE_COMMUNITY_DETECTION_HOSTNAME}-db'
PROACTIVE_COMMUNITY_DETECTION_DB_PORT = 27017
else:
PROACTIVE_COMMUNITY_DETECTION_HOSTNAME = 'articonf1.itec.aau.at'
......
......@@ -20,13 +20,13 @@ class TokenManager:
def __init__(self):
self._token = None
def getToken(self) -> str:
def getToken(self, admin=False) -> str:
if self._token == None:
credentials_path = get_resources_path()
print("Looking for credentials at ... "+str(credentials_path))
with open(f'{credentials_path}/regular_user_credentials.json') as file_handler:
with open(f"{credentials_path}/{'admin' if admin else 'regular'}_user_credentials.json") as file_handler:
credentials = json.loads(file_handler.read())
url = f'https://{network_constants.REST_GATEWAY_HOSTNAME}:{network_constants.REST_GATEWAY_REST_PORT}/api/tokens'
......
......@@ -6,7 +6,7 @@ from typing import List
def postTableToSwagger(use_case:str, table:dict ):
jwt = TokenManager.getInstance().getToken()
jwt = TokenManager.getInstance().getToken(admin=True)
url = f"https://articonf1.itec.aau.at:30420/api/use-cases/{use_case}/tables"
......@@ -23,7 +23,7 @@ def postTableToSwagger(use_case:str, table:dict ):
def postLayersToSwagger(use_case:str, layers: List[dict]):
jwt = TokenManager.getInstance().getToken()
jwt = TokenManager.getInstance().getToken(admin=True)
for layer in layers:
url = f"https://articonf1.itec.aau.at:30420/api/layers"
......
import sys
import os
from pathlib import Path
from typing import Dict, Any
import requests
modules_paths = ['.', '../../../modules/']
for modules_path in modules_paths:
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from _add_use_case_scripts.vialog.tables import add_user, add_video, add_change
import network_constants as nc
from security.token_manager import TokenManager
def add_use_case(use_case: str):
#use_case = "vialog"
jwt = TokenManager.getInstance().getToken()
url = f"https://articonf1.itec.aau.at:30420/api/use-cases"
response = requests.post(
url,
verify=False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt}"},
json = {"name": use_case}
)
print(url+": "+str(response.content))
if __name__ == "__main__":
use_case = "vialog"
# disable ssl warnings :)
requests.packages.urllib3.disable_warnings()
add_use_case(use_case)
add_user.main(use_case)
add_video.main(use_case)
add_change.main(use_case)
\ No newline at end of file
......@@ -17,7 +17,7 @@ from security.token_manager import TokenManager
def add_use_case(use_case: str):
#use_case = "vialog"
jwt = TokenManager.getInstance().getToken()
jwt = TokenManager.getInstance().getToken(admin=True)
url = f"https://articonf1.itec.aau.at:30420/api/use-cases"
response = requests.post(
url,
......@@ -30,7 +30,7 @@ def add_use_case(use_case: str):
print(url+": "+str(response.content))
if __name__ == "__main__":
use_case = "vialog-new-enum"
use_case = "vialog-enum"
# disable ssl warnings :)
requests.packages.urllib3.disable_warnings()
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment