Commit d94b70d7 authored by Alexander Lercher's avatar Alexander Lercher

Single context prediction

parent c1dc19d0
......@@ -14,4 +14,58 @@ paths:
type: object
responses:
'200':
description: "Successful echo of request data"
\ No newline at end of file
description: "Successful echo of request data"
/use-cases/{use_case}/tables/{table}/layers/{layer_name}/predictions:
get:
operationId: "routes.predictions.get"
security:
- JwtRegular: []
tags:
- "Predictions"
summary: "Get predictions"
parameters:
- name: "use_case"
in: "path"
description: "Name of the use-case"
required: true
type: "string"
- name: "table"
in: "path"
description: "Name of the table"
required: true
type: "string"
- name: "layer_name"
in: "path"
description: "Name of the layer"
required: true
type: "string"
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/Prediction"
'404':
description: "Predictions not found"
definitions:
Prediction:
type: object
properties:
use_case:
type: string
table:
type: string
method:
type: string
layer:
type: string
reference_layer:
type: string
cluster_label:
type: string
time_window:
type: string
prediction:
type: integer
......@@ -2,3 +2,4 @@ from db.dao.cluster import Cluster as ClusterDao
from db.dao.layer import Layer as LayerDao
from db.dao.timeslice import TimeSlice as TimeSliceDao
from db.dao.layer_pair import LayerPair as LayerPairDao
from db.dao.prediction_result import PredictionResult
from typing import List, Dict
class PredictionResult:
def __init__(self, use_case: str, table: str, method: str,
layer: str, reference_layer: str, cluster_id: str,
time_window: str, prediction: int):
self.use_case = use_case
self.table = table
self.method = method
self.layer = layer
self.reference_layer = reference_layer
self.cluster_id = cluster_id
self.time_window = time_window
self.prediction = prediction
@staticmethod
def create_from_dict(dict_) -> 'PredictionResult':
obj = PredictionResult(None, None, None, None, None, None, None, None)
obj.__dict__.update(dict_)
return obj
......@@ -23,6 +23,7 @@ class Repository(MongoRepositoryBase):
self._layer_pair_collection = 'layer_pairs'
self._clusters_collection = 'clusters'
self._time_slice_collection = 'time_slices'
self._prediction_result_collection = 'prediction_results'
def DROP(self, confirm:bool=False):
......@@ -120,6 +121,14 @@ class Repository(MongoRepositoryBase):
def get_layer_pairs(self, use_case: str) -> List[LayerPairDao]:
entries = super().get_entries(self._layer_pair_collection, selection={'use_case': use_case})
return [LayerPairDao.create_from_dict(e) for e in entries]
#endregion
#region PredictionResult
def add_prediction_result(self, prediction_result: PredictionResult):
super().insert_entry(self._prediction_result_collection, prediction_result.__dict__)
def get_prediction_results(self, use_case: str) -> List[PredictionResult]:
entries = super().get_entries(self._prediction_result_collection, selection={'use_case': use_case}, projection={'_id': 0})
return [PredictionResult.create_from_dict(e) for e in entries]
#endregion
from processing.ClusterMetricsCalculator import ClusterMetricsCalculator, ClusterMetricsCalculator1D, ClusterMetricsCalculator2D, ClusterMetricsCalculatorFactory
from processing.DataSampler import DataSampler
from processing.fetching import fetching
\ No newline at end of file
from processing.data_prep.metrics_base import get_cyclic_time_feature
N = 3 # Currently N is fixed to 3
method = 'single_context'
####################
import pandas as pd
from pandas import DataFrame
#####################
import json
from entities import Cluster
import collections
import numpy as np
from typing import Iterable, Tuple
######################
from typing import Dict
from typing import Tuple
def get_metrics(cur_cluster: Cluster) -> Tuple:
return (cur_cluster.size, cur_cluster.std_dev, cur_cluster.scarcity, cur_cluster.importance1, cur_cluster.importance2, cur_cluster.range_, cur_cluster.global_center_distance, get_cyclic_time_feature(cur_cluster.get_time_info()))
####################
import pickle
#####################
def flatten_metrics_datapoint(datapoint: list) -> Tuple['X', np.array]:
'''
Flattens a single metrics data point in the form:
[(cluster_size, cluster_variance, cluster_density, cluster_import1, cluster_import2, cluster_range, cluster_center, (time_f1, time_f2))^N]
to:
(X)
'''
flat_list = []
for entry in datapoint: # for all x
flat_list.extend(entry[:-1]) # add all number features except the time tuple
flat_list.extend(entry[-1]) # add time tuple
return np.asarray(flat_list)
######################
def increase_time_window(time_window_id: str):
tuple_ = eval(time_window_id)
if tuple_[1] == 52:
# 1st week next year
return (tuple_[0]+1 , 1)
else:
# next week
return str((tuple_[0], tuple_[1]+1))
#########################
from db.repository import Repository
from db.dao import PredictionResult
repo = Repository()
def run_prediction(use_case: str):
for layer in repo.get_layers_for_use_case(use_case):
layer_name = layer.layer_name
################
df: DataFrame = pd.read_csv(f'data/{use_case}/ml_input/single_context/{layer_name}.csv', index_col=0)
#################
path_in = f"data/{use_case}/cluster_metrics/{layer_name}.json"
with open(path_in, 'r') as file:
data = [Cluster.create_from_dict(cl_d) for cl_d in json.loads(file.read())]
data.sort(key=lambda cl: (eval(cl.cluster_id), eval(cl.time_window_id)))
#####################
cluster_map: Dict['cluster_id', 'time_windows'] = {}
for cluster in data:
id_ = cluster.cluster_id
if id_ not in cluster_map:
cluster_map[id_] = []
cluster_map[id_].append(cluster)
####################
with open(f'data/{use_case}/ml_output/{method}/{layer_name}.model', 'rb') as file:
svc = pickle.load(file)
#####################
for cluster_id, time_windows in cluster_map.items():
v = [get_metrics(c) for c in time_windows[-N:]] # metrics for last N time windows
v_flattened = flatten_metrics_datapoint(v)
v_flattened = v_flattened.reshape(1, v_flattened.shape[0]) # reshape for ML with only 1 pred value
res = PredictionResult(use_case, use_case, method, layer_name, None, cluster_id, increase_time_window(time_windows[-1].time_window_id), svc.predict(v_flattened)[0])
repo.add_prediction_result(res)
#####################
from flask import request, Response
from db.repository import Repository
from db.dao import PredictionResult
repo = Repository()
def get(use_case, table, layer_name):
res = repo.get_prediction_results(use_case)
if res is None or len(res) == 0:
return Response(status=404)
else:
return [c.__dict__ for c in res]
......@@ -7,4 +7,6 @@ if os.path.exists(modules_path):
from processing.data_prep.main import run
run(use_case='community-prediction-youtube-n')
\ No newline at end of file
if __name__ == '__main__':
'''Creates data/raw files'''
run(use_case='community-prediction-youtube-n')
\ No newline at end of file
......@@ -68,6 +68,8 @@ def upload_layerpair(layerpair:LayerPairDao):
if __name__ == '__main__':
'''Uploads the cross-context dependencies for all use-cases.'''
assert False, 'replace with true to upload now'
for lp in get_youtube_dependencies():
......
......@@ -11,4 +11,6 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
from processing.fetching import fetching
if __name__ == "__main__":
'''Fetches all required data from business-logic and role-stage-discovery.'''
fetching.fetch(selected_use_cases=['community-prediction-youtube-n'], selected_use_case_tables=None)
\ No newline at end of file
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from processing.ml.predict_single_context import run_prediction as run_single_prediction
# from processing.ml.predict_cross_context import run_prediction as run_cross_prediction
if __name__ == '__main__':
'''Executes the predictions.'''
use_case='community-prediction-youtube-n'
run_single_prediction(use_case)
# run_cross_prediction(use_case)
\ No newline at end of file
......@@ -5,10 +5,11 @@ if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from processing.ml.train_single_context import run_single_training
from processing.ml.train_cross_context import run_cross_training
from processing.ml.train_single_context import run_training as run_single_training
from processing.ml.train_cross_context import run_training as run_cross_training
if __name__ == '__main__':
'''Executes the training.'''
use_case='community-prediction-youtube-n'
run_single_training(use_case)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment