Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
S
SMART
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
3
Issues
3
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UNI-KLU
SMART
Commits
dcb2b221
Commit
dcb2b221
authored
Aug 02, 2021
by
Alexander Lercher
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Added support for multiple use-case tables
parent
ab96c298
Changes
13
Hide whitespace changes
Inline
Side-by-side
Showing
13 changed files
with
109 additions
and
112 deletions
+109
-112
repository.py
...ive-community-detection-microservice/app/db/repository.py
+5
-3
cluster_metrics_calc.py
...oservice/app/processing/data_prep/cluster_metrics_calc.py
+16
-16
layer_metrics_calc.py
...croservice/app/processing/data_prep/layer_metrics_calc.py
+15
-15
main.py
...y-detection-microservice/app/processing/data_prep/main.py
+9
-13
predict_base.py
...-detection-microservice/app/processing/ml/predict_base.py
+2
-2
predict_cross_context.py
...n-microservice/app/processing/ml/predict_cross_context.py
+6
-5
predict_single_context.py
...-microservice/app/processing/ml/predict_single_context.py
+5
-4
train_cross_context.py
...ion-microservice/app/processing/ml/train_cross_context.py
+8
-8
train_single_context.py
...on-microservice/app/processing/ml/train_single_context.py
+6
-6
run_dataprep.py
...tive-community-detection-microservice/app/run_dataprep.py
+0
-12
run_node_fetching.py
...community-detection-microservice/app/run_node_fetching.py
+4
-2
run_prediction.py
...ve-community-detection-microservice/app/run_prediction.py
+0
-22
run_training.py
...tive-community-detection-microservice/app/run_training.py
+33
-4
No files found.
src/data-hub/proactive-community-detection-microservice/app/db/repository.py
View file @
dcb2b221
...
...
@@ -34,14 +34,13 @@ class Repository(MongoRepositoryBase):
super
()
.
drop_collection
(
collection_
)
#region
LayerPair
#region
Use Case
def
add_use_case
(
self
,
use_case
:
str
):
super
()
.
insert_entry
(
self
.
_use_case_collection
,
{
'name'
:
use_case
})
def
get_use_cases
(
self
)
->
List
[
str
]:
entries
=
super
()
.
get_entries
(
self
.
_use_case_collection
)
return
[
e
[
'name'
]
for
e
in
entries
]
#endregion
#region Layers
...
...
@@ -53,7 +52,7 @@ class Repository(MongoRepositoryBase):
entries
=
super
()
.
get_entries
(
self
.
_layer_collection
,
projection
=
{
'_id'
:
0
})
return
[
LayerDao
(
e
)
for
e
in
entries
]
def
get_layers_for_use_case
(
self
,
use_case
:
str
)
->
L
ayerDao
:
def
get_layers_for_use_case
(
self
,
use_case
:
str
)
->
L
ist
[
LayerDao
]
:
entries
=
super
()
.
get_entries
(
self
.
_layer_collection
,
selection
=
{
'use_case'
:
use_case
})
return
[
LayerDao
(
e
)
for
e
in
entries
]
...
...
@@ -138,4 +137,7 @@ class Repository(MongoRepositoryBase):
def
delete_all_prediction_results
(
self
):
super
()
.
drop_collection
(
self
.
_prediction_result_collection
)
def
delete_prediction_results
(
self
,
use_case
:
str
):
super
()
.
delete_many
(
self
.
_prediction_result_collection
,
selection
=
{
'use_case'
:
use_case
})
#endregion
src/data-hub/proactive-community-detection-microservice/app/processing/data_prep/cluster_metrics_calc.py
View file @
dcb2b221
...
...
@@ -7,7 +7,7 @@ import json
import
os
from
entities
import
TimeWindow
,
Cluster
def
store_metrics_for_clusters
(
use_case
:
str
,
layer_name
:
str
,
feature_names
:
List
[
str
]):
def
store_metrics_for_clusters
(
use_case
:
str
,
table
:
str
,
layer_name
:
str
,
feature_names
:
List
[
str
]):
'''
:param layer_name: Name of the layer for which multiple time windows exist
:param feature_names: Features of the layer
...
...
@@ -15,7 +15,7 @@ def store_metrics_for_clusters(use_case: str, layer_name: str, feature_names: Li
print
(
f
"Working on {layer_name} cluster metrics"
)
# load global cluster centers
path_in
=
f
'data/{use_case}/raw/clusters/{layer_name}.json'
path_in
=
f
'data/{use_case}/
{table}/
raw/clusters/{layer_name}.json'
with
open
(
path_in
,
'r'
)
as
file
:
clusters
=
json
.
loads
(
file
.
read
())
cluster_centers
:
Dict
[
str
,
Tuple
[
float
]]
=
{
...
...
@@ -24,9 +24,9 @@ def store_metrics_for_clusters(use_case: str, layer_name: str, feature_names: Li
if
cluster
[
'label'
]
!=
'noise'
}
path_in
=
f
'data/{use_case}/raw/timeslices/{layer_name}'
Path
(
f
'data/{use_case}/cluster_metrics/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
path_out
=
f
'data/{use_case}/cluster_metrics/{layer_name}.json'
path_in
=
f
'data/{use_case}/
{table}/
raw/timeslices/{layer_name}'
Path
(
f
'data/{use_case}/
{table}/
cluster_metrics/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
path_out
=
f
'data/{use_case}/
{table}/
cluster_metrics/{layer_name}.json'
complete_clusters
:
List
[
Cluster
]
=
[]
...
...
@@ -54,7 +54,7 @@ import collections
import
numpy
as
np
from
typing
import
Iterable
,
Tuple
def
create_metrics_training_data
(
use_case
:
str
,
layer_name
:
str
,
N
:
int
=
3
)
->
Iterable
[
list
]:
def
create_metrics_training_data
(
use_case
:
str
,
table
:
str
,
layer_name
:
str
,
N
:
int
=
3
)
->
Iterable
[
list
]:
"""
Loads the metrics training data for an individual layer from disk.
A single metrics training data point should look like this:
...
...
@@ -70,7 +70,7 @@ def create_metrics_training_data(use_case: str, layer_name: str, N: int = 3) ->
:param layer_name: the name of the layer metrics json file
"""
path_in
=
f
"data/{use_case}/cluster_metrics/{layer_name}.json"
path_in
=
f
"data/{use_case}/
{table}/
cluster_metrics/{layer_name}.json"
with
open
(
path_in
,
'r'
)
as
file
:
data
=
[
Cluster
.
create_from_dict
(
cl_d
)
for
cl_d
in
json
.
loads
(
file
.
read
())]
...
...
@@ -122,9 +122,9 @@ def balance_dataset(df: DataFrame) -> DataFrame:
# nothing happening here, balance only on real training, not during prep
return
df
def
store_training_data
(
use_case
:
str
,
layer_name
:
str
):
def
store_training_data
(
use_case
:
str
,
table
:
str
,
layer_name
:
str
):
# load metrics data from disk
data
:
Iterable
=
create_metrics_training_data
(
use_case
=
use_case
,
layer_name
=
layer_name
)
data
:
Iterable
=
create_metrics_training_data
(
use_case
=
use_case
,
table
=
table
,
layer_name
=
layer_name
)
# flatten and convert to df
df
=
convert_metrics_data_to_dataframe
(
data
,
columns
=
COLUMNS
,
flattening_method
=
flatten_metrics_datapoint
)
...
...
@@ -135,8 +135,8 @@ def store_training_data(use_case: str, layer_name: str):
# shuffle
df
=
df
.
sample
(
frac
=
1
)
.
reset_index
(
drop
=
True
)
Path
(
f
'data/{use_case}/ml_input/single_context/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
df
.
to_csv
(
f
'data/{use_case}/ml_input/single_context/{layer_name}.csv'
)
Path
(
f
'data/{use_case}/
{table}/
ml_input/single_context/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
df
.
to_csv
(
f
'data/{use_case}/
{table}/
ml_input/single_context/{layer_name}.csv'
)
#######################
...
...
@@ -159,13 +159,13 @@ def run(use_case=None):
for
use_case
in
use_cases
:
print
(
f
"Executing cluster metrics calc for use case {use_case}"
)
layers
=
[[
l
.
layer_name
,
l
.
properties
]
for
l
in
repo
.
get_layers_for_use_case
(
use_case
)]
layers
=
repo
.
get_layers_for_use_case
(
use_case
)
##################
for
layer
in
layers
:
store_metrics_for_clusters
(
use_case
,
layer
[
0
],
layer
[
1
]
)
store_metrics_for_clusters
(
layer
.
use_case
,
layer
.
use_case_table
,
layer
.
layer_name
,
layer
.
properties
)
###################
for
name
,
_
in
layers
:
print
(
f
"Storing training data for {name}"
)
store_training_data
(
use_case
,
layer_name
=
name
)
for
layer
in
layers
:
print
(
f
"Storing training data for {
layer.layer_
name}"
)
store_training_data
(
layer
.
use_case
,
layer
.
use_case_table
,
layer
.
layer_
name
)
\ No newline at end of file
src/data-hub/proactive-community-detection-microservice/app/processing/data_prep/layer_metrics_calc.py
View file @
dcb2b221
...
...
@@ -9,11 +9,11 @@ import os
from
entities
import
TimeWindow
,
Layer
from
processing
import
ClusterMetricsCalculatorFactory
def
store_metrics_for_layers
(
use_case
:
str
,
layer_name
:
str
,
feature_names
:
List
[
str
]):
def
store_metrics_for_layers
(
use_case
:
str
,
table
:
str
,
layer_name
:
str
,
feature_names
:
List
[
str
]):
print
(
f
"Working on {layer_name} layer metrics"
)
# load global cluster centers
path_in
=
f
'data/{use_case}/raw/clusters/{layer_name}.json'
path_in
=
f
'data/{use_case}/
{table}/
raw/clusters/{layer_name}.json'
with
open
(
path_in
,
'r'
)
as
file
:
clusters
=
json
.
loads
(
file
.
read
())
cluster_centers
:
Dict
[
str
,
Tuple
[
float
]]
=
{
...
...
@@ -24,7 +24,7 @@ def store_metrics_for_layers(use_case: str, layer_name: str, feature_names: List
# load time windows
all_layers
:
List
[
Layer
]
=
[]
path_in
=
f
'data/{use_case}/raw/timeslices/{layer_name}'
path_in
=
f
'data/{use_case}/
{table}/
raw/timeslices/{layer_name}'
for
root
,
_
,
files
in
os
.
walk
(
path_in
):
for
f
in
files
:
with
open
(
os
.
path
.
join
(
root
,
f
),
'r'
)
as
file
:
...
...
@@ -35,8 +35,8 @@ def store_metrics_for_layers(use_case: str, layer_name: str, feature_names: List
all_layers
.
append
(
layer
)
# store the layer metrics
Path
(
f
'data/{use_case}/layer_metrics/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
path_out
=
f
'data/{use_case}/layer_metrics/{layer_name}.json'
Path
(
f
'data/{use_case}/
{table}/
layer_metrics/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
path_out
=
f
'data/{use_case}/
{table}/
layer_metrics/{layer_name}.json'
with
open
(
path_out
,
'w'
)
as
file
:
file
.
write
(
json
.
dumps
([
l
.
__dict__
for
l
in
all_layers
]))
#########################
...
...
@@ -63,7 +63,7 @@ from typing import Iterable, List, Dict, Any
import
json
from
entities
import
Layer
,
Cluster
def
create_layer_metrics_training_data
(
use_case
:
str
,
layer_name
:
str
,
reference_layer
:
str
,
N
:
int
=
2
)
->
Iterable
:
def
create_layer_metrics_training_data
(
use_case
:
str
,
table
:
str
,
layer_name
:
str
,
reference_layer
:
str
,
N
:
int
=
2
)
->
Iterable
:
"""
Loads the metrics training data for an individual layer from disk.
...
...
@@ -83,12 +83,12 @@ def create_layer_metrics_training_data(use_case: str, layer_name: str, reference
if
N
!=
2
:
raise
NotImplementedError
(
"N is not implemented and fixed to 2!"
)
with
open
(
f
'data/{use_case}/cluster_metrics/{layer_name}.json'
)
as
file
:
with
open
(
f
'data/{use_case}/
{table}/
cluster_metrics/{layer_name}.json'
)
as
file
:
cluster_metrics
:
List
[
Cluster
]
=
[
Cluster
.
create_from_dict
(
e
)
for
e
in
json
.
loads
(
file
.
read
())]
cluster_ids
=
{
c
.
cluster_id
for
c
in
cluster_metrics
}
cluster_metrics
:
Dict
[
Any
,
Cluster
]
=
{(
c
.
time_window_id
,
c
.
cluster_id
):
c
for
c
in
cluster_metrics
}
with
open
(
f
'data/{use_case}/layer_metrics/{reference_layer}.json'
)
as
file
:
with
open
(
f
'data/{use_case}/
{table}/
layer_metrics/{reference_layer}.json'
)
as
file
:
layer_metrics
:
List
[
Layer
]
=
[
Layer
.
create_from_dict
(
e
)
for
e
in
json
.
loads
(
file
.
read
())]
layer_metrics
:
Dict
[
Any
,
Layer
]
=
{
l
.
time_window_id
:
l
for
l
in
layer_metrics
}
...
...
@@ -150,9 +150,9 @@ def balance_dataset(df: DataFrame) -> DataFrame:
# nothing happening here, balance only on real training, not during prep
return
df
def
store_training_data
(
use_case
:
str
,
layer_name
:
str
,
reference_layer_name
:
str
):
def
store_training_data
(
use_case
:
str
,
table
:
str
,
layer_name
:
str
,
reference_layer_name
:
str
):
# load metrics data from disk
data
:
Iterable
=
create_layer_metrics_training_data
(
use_case
=
use_case
,
layer_name
=
layer_name
,
reference_layer
=
reference_layer_name
)
data
:
Iterable
=
create_layer_metrics_training_data
(
use_case
=
use_case
,
table
=
table
,
layer_name
=
layer_name
,
reference_layer
=
reference_layer_name
)
# convert to X and Y
df
=
convert_metrics_data_to_dataframe
(
data
,
columns
=
get_columns
(
N
=
2
),
flattening_method
=
flatten_layer_metrics_datapoint
)
...
...
@@ -163,8 +163,8 @@ def store_training_data(use_case: str, layer_name: str, reference_layer_name: st
# shuffle
df
=
df
.
sample
(
frac
=
1
)
.
reset_index
(
drop
=
True
)
Path
(
f
'data/{use_case}/ml_input/cross_context/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
df
.
to_csv
(
f
'data/{use_case}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv'
)
Path
(
f
'data/{use_case}/
{table}/
ml_input/cross_context/'
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
df
.
to_csv
(
f
'data/{use_case}/
{table}/
ml_input/cross_context/{layer_name}_{reference_layer_name}.csv'
)
#########################
...
...
@@ -187,16 +187,16 @@ def run(use_case=None):
for
use_case
in
use_cases
:
print
(
f
"Executing layer metrics calc for use case {use_case}"
)
layers
=
[[
l
.
layer_name
,
l
.
properties
]
for
l
in
repo
.
get_layers_for_use_case
(
use_case
)]
layers
=
repo
.
get_layers_for_use_case
(
use_case
)
layer_pairs
=
repo
.
get_layer_pairs
(
use_case
)
################
for
layer
in
layers
:
try
:
store_metrics_for_layers
(
use_case
,
layer
[
0
],
layer
[
1
]
)
store_metrics_for_layers
(
layer
.
use_case
,
layer
.
use_case_table
,
layer
.
layer_name
,
layer
.
properties
)
except
FileNotFoundError
:
pass
###############
for
ld
in
layer_pairs
:
print
(
f
"Storing training data for {ld.layer} with L_R={ld.reference_layer}"
)
store_training_data
(
use_case
,
layer_name
=
ld
.
layer
,
reference_layer_name
=
ld
.
reference_layer
)
\ No newline at end of file
store_training_data
(
ld
.
use_case
,
table
=
ld
.
table
,
layer_name
=
ld
.
layer
,
reference_layer_name
=
ld
.
reference_layer
)
\ No newline at end of file
src/data-hub/proactive-community-detection-microservice/app/processing/data_prep/main.py
View file @
dcb2b221
...
...
@@ -4,6 +4,7 @@ from processing.data_prep.layer_metrics_calc import run as lrun
from
pathlib
import
Path
import
json
import
os
from
typing
import
List
from
db.repository
import
Repository
...
...
@@ -12,11 +13,11 @@ repo = Repository()
def
store_clusters_as_files
(
use_case
):
path_
=
f
'data/{use_case}/raw/clusters/'
Path
(
path_
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
layers
=
repo
.
get_layers_for_use_case
(
use_case
)
for
l
in
layers
:
path_
=
f
'data/{l.use_case}/{l.use_case_table}/raw/clusters/'
Path
(
path_
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
clusters
=
repo
.
get_clusters_for_layer
(
use_case
,
l
.
use_case_table
,
l
.
layer_name
)
with
open
(
os
.
path
.
join
(
path_
,
f
'{l.layer_name}.json'
),
'w'
)
as
file_
:
...
...
@@ -24,30 +25,25 @@ def store_clusters_as_files(use_case):
def
store_time_slices_as_files
(
use_case
):
path_
=
f
'data/{use_case}/raw/timeslices/'
layers
=
repo
.
get_layers_for_use_case
(
use_case
)
for
l
in
layers
:
Path
(
os
.
path
.
join
(
path_
,
l
.
layer_name
))
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
path_
=
f
'data/{l.use_case}/{l.use_case_table}/raw/timeslices/{l.layer_name}/'
Path
(
path_
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
time_slices
=
repo
.
get_time_slices_for_layer
(
use_case
,
l
.
use_case_table
,
l
.
layer_name
)
for
ts
in
time_slices
:
with
open
(
os
.
path
.
join
(
path_
,
l
.
layer_name
,
f
'{ts.time}.json'
),
'w'
)
as
file_
:
with
open
(
os
.
path
.
join
(
path_
,
f
'{ts.time}.json'
),
'w'
)
as
file_
:
file_
.
write
(
json
.
dumps
(
ts
.
to_serializable_dict
()))
def
run
(
use_case
=
None
):
def
run
(
use_case
s
:
List
[
str
]
=
None
):
'''Prepares training data for single and cross-context using the file system (data/)'''
if
use_case
is
not
None
:
use_cases
=
[
use_case
]
else
:
if
use_cases
is
None
:
use_cases
=
repo
.
get_use_cases
()
for
use_case
in
use_cases
:
store_clusters_as_files
(
use_case
)
store_time_slices_as_files
(
use_case
)
crun
(
use_case
)
lrun
(
use_case
)
src/data-hub/proactive-community-detection-microservice/app/processing/ml/predict_base.py
View file @
dcb2b221
...
...
@@ -11,8 +11,8 @@ def increase_time_window(time_window_id: str) -> str:
from
typing
import
Tuple
import
pickle
def
load_ml_models
(
use_case
,
method
,
layer_name
,
reference_layer_name
=
None
)
->
Tuple
[
'scaler'
,
'clf'
]:
path_
=
f
'data/{use_case}/ml_output/{method}/{layer_name}'
def
load_ml_models
(
use_case
,
table
,
method
,
layer_name
,
reference_layer_name
=
None
)
->
Tuple
[
'scaler'
,
'clf'
]:
path_
=
f
'data/{use_case}/
{table}/
ml_output/{method}/{layer_name}'
if
method
==
'single_context'
:
with
open
(
f
'{path_}.model'
,
'rb'
)
as
file
:
...
...
src/data-hub/proactive-community-detection-microservice/app/processing/ml/predict_cross_context.py
View file @
dcb2b221
...
...
@@ -44,18 +44,19 @@ repo = Repository()
def
run_prediction
(
use_case
:
str
):
for
layerpair
in
repo
.
get_layer_pairs
(
use_case
):
table
=
layerpair
.
table
layer_name
=
layerpair
.
layer
reference_layer_name
=
layerpair
.
reference_layer
print
(
f
"Predicting {method} for {use_case}//{layer_name} based on {reference_layer_name}"
)
print
(
f
"Predicting {method} for {use_case}//{
table}//{
layer_name} based on {reference_layer_name}"
)
##########################
with
open
(
f
'data/{use_case}/cluster_metrics/{layer_name}.json'
)
as
file
:
with
open
(
f
'data/{use_case}/
{table}/
cluster_metrics/{layer_name}.json'
)
as
file
:
cluster_metrics
:
List
[
Cluster
]
=
[
Cluster
.
create_from_dict
(
e
)
for
e
in
json
.
loads
(
file
.
read
())]
cluster_ids
=
{
c
.
cluster_id
for
c
in
cluster_metrics
}
cluster_metrics
:
Dict
[
Any
,
Cluster
]
=
{(
c
.
time_window_id
,
c
.
cluster_id
):
c
for
c
in
cluster_metrics
}
with
open
(
f
'data/{use_case}/layer_metrics/{reference_layer_name}.json'
)
as
file
:
with
open
(
f
'data/{use_case}/
{table}/
layer_metrics/{reference_layer_name}.json'
)
as
file
:
layer_metrics
:
List
[
Layer
]
=
[
Layer
.
create_from_dict
(
e
)
for
e
in
json
.
loads
(
file
.
read
())]
layer_metrics
:
Dict
[
Any
,
Layer
]
=
{
l
.
time_window_id
:
l
for
l
in
layer_metrics
}
######################
...
...
@@ -77,7 +78,7 @@ def run_prediction(use_case: str):
# yield each combination of reference layer metrics to clusters
prediction_metrics_raw
.
append
([
prev_layer_metric_tuple
,
current_layer_metric_tuple
,
int
(
cluster_id
)])
#######################
scaler
,
svc
=
load_ml_models
(
use_case
,
method
,
layer_name
,
reference_layer_name
)
scaler
,
svc
=
load_ml_models
(
use_case
,
table
,
method
,
layer_name
,
reference_layer_name
)
################
prediction_cluster_ids
=
[]
prediction_time_window
=
increase_time_window
(
ordered_time_keys
[
1
])
...
...
@@ -95,5 +96,5 @@ def run_prediction(use_case: str):
prediction_results
=
np
.
rint
(
prediction_results
)
# round to full numbers
for
i
in
range
(
len
(
prediction_cluster_ids
)):
res
=
PredictionResult
(
use_case
,
use_cas
e
,
method
,
layer_name
,
reference_layer_name
,
prediction_cluster_ids
[
i
],
prediction_time_window
,
prediction_results
[
i
])
res
=
PredictionResult
(
use_case
,
tabl
e
,
method
,
layer_name
,
reference_layer_name
,
prediction_cluster_ids
[
i
],
prediction_time_window
,
prediction_results
[
i
])
repo
.
add_prediction_result
(
res
)
src/data-hub/proactive-community-detection-microservice/app/processing/ml/predict_single_context.py
View file @
dcb2b221
...
...
@@ -37,11 +37,12 @@ repo = Repository()
def
run_prediction
(
use_case
:
str
):
for
layer
in
repo
.
get_layers_for_use_case
(
use_case
):
table
=
layer
.
use_case_table
layer_name
=
layer
.
layer_name
print
(
f
"Predicting {method} for {use_case}//{layer_name}"
)
print
(
f
"Predicting {method} for {use_case}//{
table}//{
layer_name}"
)
#################
path_in
=
f
"data/{use_case}/cluster_metrics/{layer_name}.json"
path_in
=
f
"data/{use_case}/
{table}/
cluster_metrics/{layer_name}.json"
with
open
(
path_in
,
'r'
)
as
file
:
data
=
[
Cluster
.
create_from_dict
(
cl_d
)
for
cl_d
in
json
.
loads
(
file
.
read
())]
...
...
@@ -57,7 +58,7 @@ def run_prediction(use_case: str):
cluster_map
[
id_
]
.
append
(
cluster
)
####################
scaler
,
svc
=
load_ml_models
(
use_case
,
method
,
layer_name
)
scaler
,
svc
=
load_ml_models
(
use_case
,
table
,
method
,
layer_name
)
#####################
# store id, future time window, and flattened metrics to combine the latter during prediction
prediction_cluster_ids
=
[]
...
...
@@ -78,5 +79,5 @@ def run_prediction(use_case: str):
prediction_results
=
np
.
rint
(
prediction_results
)
# round to full numbers
for
i
in
range
(
len
(
prediction_cluster_ids
)):
res
=
PredictionResult
(
use_case
,
use_cas
e
,
method
,
layer_name
,
None
,
prediction_cluster_ids
[
i
],
prediction_time_windows
[
i
],
prediction_results
[
i
])
res
=
PredictionResult
(
use_case
,
tabl
e
,
method
,
layer_name
,
None
,
prediction_cluster_ids
[
i
],
prediction_time_windows
[
i
],
prediction_results
[
i
])
repo
.
add_prediction_result
(
res
)
src/data-hub/proactive-community-detection-microservice/app/processing/ml/train_cross_context.py
View file @
dcb2b221
...
...
@@ -9,8 +9,8 @@ max_sampling_size = 20000
import
pickle
from
pathlib
import
Path
def
export_model
(
model
,
use_case
,
layer_name
,
reference_layer_name
,
scaler
=
False
):
fpath
=
f
'data/{use_case}/ml_output/{approach}'
def
export_model
(
model
,
use_case
,
table
,
layer_name
,
reference_layer_name
,
scaler
=
False
):
fpath
=
f
'data/{use_case}/
{table}/
ml_output/{approach}'
Path
(
fpath
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
with
open
(
f
'{fpath}/{layer_name}_{reference_layer_name}{"_scaler" if scaler else ""}.model'
,
'wb'
)
as
f
:
pickle
.
dump
(
model
,
f
)
...
...
@@ -39,11 +39,11 @@ repo = Repository()
def
run_training
(
use_case
):
for
layerpair
in
repo
.
get_layer_pairs
(
use_case
):
table
=
layerpair
.
table
layer_name
=
layerpair
.
layer
reference_layer_name
=
layerpair
.
reference_layer
df
:
DataFrame
=
pd
.
read_csv
(
f
'data/{use_case}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv'
,
index_col
=
0
)
df
:
DataFrame
=
pd
.
read_csv
(
f
'data/{use_case}/{table}/ml_input/cross_context/{layer_name}_{reference_layer_name}.csv'
,
index_col
=
0
)
#######################
training
,
testing
=
split_data
(
df
,
shuffle
=
False
)
#####################
...
...
@@ -58,7 +58,7 @@ def run_training(use_case):
test_X
=
scaler
.
transform
(
testing
[
testing
.
columns
[:
-
1
]])
# all except y
test_Y
=
testing
[
testing
.
columns
[
-
1
]]
export_model
(
scaler
,
use_case
,
layer_name
,
reference_layer_name
,
scaler
=
True
)
export_model
(
scaler
,
use_case
,
table
,
layer_name
,
reference_layer_name
,
scaler
=
True
)
########################
# RF is a lot better than SVM, but I did not tune hyperparameters for regression
rfc
=
RandomForestRegressor
(
n_estimators
=
n_estimators
,
criterion
=
criterion
,
max_depth
=
max_depth
,
...
...
@@ -67,8 +67,8 @@ def run_training(use_case):
# rfc = LinearSVR(loss=loss, C=c, dual=dual, tol=tol)
rfc
.
fit
(
train_X
,
train_Y
)
####################
print_regression_report
(
rfc
,
test_X
,
test_Y
,
f
"{layer_name} based on {reference_layer_name}"
)
export_model
(
rfc
,
use_case
,
layer_name
,
reference_layer_name
)
####################
export_model
(
rfc
,
use_case
,
table
,
layer_name
,
reference_layer_name
)
\ No newline at end of file
src/data-hub/proactive-community-detection-microservice/app/processing/ml/train_single_context.py
View file @
dcb2b221
...
...
@@ -9,8 +9,8 @@ max_sampling_size = 20000
import
pickle
from
pathlib
import
Path
def
export_model
(
model
,
use_case
,
layer_name
,
scaler
=
False
):
fpath
=
f
'data/{use_case}/ml_output/{approach}'
def
export_model
(
model
,
use_case
,
table
,
layer_name
,
scaler
=
False
):
fpath
=
f
'data/{use_case}/
{table}/
ml_output/{approach}'
Path
(
fpath
)
.
mkdir
(
parents
=
True
,
exist_ok
=
True
)
with
open
(
f
'{fpath}/{layer_name}{"_scaler" if scaler else ""}.model'
,
'wb'
)
as
f
:
pickle
.
dump
(
model
,
f
)
...
...
@@ -39,10 +39,10 @@ repo = Repository()
def
run_training
(
use_case
):
for
layer
in
repo
.
get_layers_for_use_case
(
use_case
):
table
=
layer
.
use_case_table
layer_name
=
layer
.
layer_name
df
:
DataFrame
=
pd
.
read_csv
(
f
'data/{use_case}/ml_input/single_context/{layer_name}.csv'
,
index_col
=
0
)
df
:
DataFrame
=
pd
.
read_csv
(
f
'data/{use_case}/{table}/ml_input/single_context/{layer_name}.csv'
,
index_col
=
0
)
#######################
training
,
testing
=
split_data
(
df
,
shuffle
=
False
)
#####################
...
...
@@ -57,7 +57,7 @@ def run_training(use_case):
test_X
=
scaler
.
transform
(
testing
[
testing
.
columns
[:
-
1
]])
# all except y
test_Y
=
testing
[
testing
.
columns
[
-
1
]]
export_model
(
scaler
,
use_case
,
layer_name
,
scaler
=
True
)
export_model
(
scaler
,
use_case
,
table
,
layer_name
,
scaler
=
True
)
########################
# RF is 10-20% better compared to SVM, but I did not tune hyperparameters for regression
rfc
=
RandomForestRegressor
(
n_estimators
=
n_estimators
,
criterion
=
criterion
,
max_depth
=
max_depth
,
...
...
@@ -69,5 +69,5 @@ def run_training(use_case):
####################
print_regression_report
(
rfc
,
test_X
,
test_Y
,
layer_name
)
####################
export_model
(
rfc
,
use_case
,
layer_name
)
export_model
(
rfc
,
use_case
,
table
,
layer_name
)
\ No newline at end of file
src/data-hub/proactive-community-detection-microservice/app/run_dataprep.py
deleted
100644 → 0
View file @
ab96c298
import
sys
import
os
modules_path
=
'../../../modules/'
if
os
.
path
.
exists
(
modules_path
):
sys
.
path
.
insert
(
1
,
modules_path
)
from
processing.data_prep.main
import
run
if
__name__
==
'__main__'
:
'''Creates data/raw files'''
run
(
use_case
=
'community-prediction-youtube-n'
)
\ No newline at end of file
src/data-hub/proactive-community-detection-microservice/app/run_node_fetching.py
View file @
dcb2b221
...
...
@@ -9,8 +9,10 @@ import urllib3
urllib3
.
disable_warnings
(
urllib3
.
exceptions
.
InsecureRequestWarning
)
from
processing.fetching
import
fetching
from
db.repository
import
Repository
if
__name__
==
"__main__"
:
'''Fetches all required data from business-logic and role-stage-discovery.'''
fetching
.
fetch
(
selected_use_cases
=
[
'community-prediction-youtube-n'
],
selected_use_case_tables
=
None
)
\ No newline at end of file
Repository
()
.
DROP
(
confirm
=
True
)
use_cases
=
[
'vialog-enum'
,
'car-sharing-official'
,
'smart-energy'
,
'crowd-journalism-enum'
]
+
[
'community-prediction-youtube-n'
,
'community-prediction-taxi'
]
fetching
.
fetch
(
selected_use_cases
=
use_cases
,
selected_use_case_tables
=
None
)
\ No newline at end of file
src/data-hub/proactive-community-detection-microservice/app/run_prediction.py
deleted
100644 → 0
View file @
ab96c298
import
sys
import
os
modules_path
=
'../../../modules/'
if
os
.
path
.
exists
(
modules_path
):
sys
.
path
.
insert
(
1
,
modules_path
)
from
db.repository
import
Repository
from
processing.ml.predict_single_context
import
run_prediction
as
run_single_prediction
from
processing.ml.predict_cross_context
import
run_prediction
as
run_cross_prediction
if
__name__
==
'__main__'
:
'''Executes the predictions.'''
use_case
=
'community-prediction-youtube-n'
repo
=
Repository
()
repo
.
delete_all_prediction_results
()
run_single_prediction
(
use_case
)
run_cross_prediction
(
use_case
)
\ No newline at end of file
src/data-hub/proactive-community-detection-microservice/app/run_training.py
View file @
dcb2b221
...
...
@@ -4,13 +4,42 @@ modules_path = '../../../modules/'
if
os
.
path
.
exists
(
modules_path
):
sys
.
path
.
insert
(
1
,
modules_path
)
from
typing
import
List
from
db.repository
import
Repository
repo
=
Repository
()
from
processing.data_prep.main
import
run
as
run_data_prep
def
_run_data_preparation
(
use_cases
:
List
[
str
]
=
None
):
'''Creates data/raw, data/cluster_metrics, data/layer_metrics, and data/ml_input files.'''
run_data_prep
(
use_cases
)
from
processing.ml.train_single_context
import
run_training
as
run_single_training
from
processing.ml.train_cross_context
import
run_training
as
run_cross_training
def
_run_training
(
use_cases
:
List
[
str
]
=
None
):
'''Executes the training and creates data/ml_output files.'''
for
use_case
in
use_cases
:
run_single_training
(
use_case
)
run_cross_training
(
use_case
)
from
processing.ml.predict_single_context
import
run_prediction
as
run_single_prediction
from
processing.ml.predict_cross_context
import
run_prediction
as
run_cross_prediction
def
_run_prediction
(
use_cases
:
List
[
str
]
=
None
):
'''Executes the predictions and stores them in the DB.'''
for
use_case
in
use_cases
:
repo
.
delete_prediction_results
(
use_case
)
run_single_prediction
(
use_case
)
run_cross_prediction
(
use_case
)
if
__name__
==
'__main__'
:
'''Executes the training.'''
use_cases
=
[
'vialog-enum'
,
'car-sharing-official'
,
'smart-energy'
,
'crowd-journalism-enum'
]
use_cases
=
[
'community-prediction-youtube-n'
,
'community-prediction-taxi'
]
use_case
=
'community-prediction-youtube-n'
run_single_training
(
use_case
)
run_cross_training
(
use_case
)
\ No newline at end of file
_run_data_preparation
(
use_cases
)
_run_training
(
use_cases
)
_run_prediction
(
use_cases
)
# TODO file cleanup
\ No newline at end of file
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment