Commit 6e26b92b authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'bugfix/raw-use-case-data-upload' into 'develop'

Direct upload of smart-energy and bank-app data to semantic-linking.
Use-Case(-Table) selection for node fetching and clustering scripts.
parents 3afa128e e2deb911
...@@ -34,7 +34,7 @@ class Repository(MongoRepositoryBase): ...@@ -34,7 +34,7 @@ class Repository(MongoRepositoryBase):
def get_layers(self) -> List[Layer]: def get_layers(self) -> List[Layer]:
'''Retrieves all layers from the db, independent of use-case.''' '''Retrieves all layers from the db, independent of use-case.'''
entries = super().get_entries(self._layer_collection) entries = super().get_entries(self._layer_collection, projection={'_id': 0})
return [Layer(e) for e in entries] return [Layer(e) for e in entries]
def get_layers_for_use_case(self, use_case: str) -> Layer: def get_layers_for_use_case(self, use_case: str) -> Layer:
...@@ -81,6 +81,9 @@ class Repository(MongoRepositoryBase): ...@@ -81,6 +81,9 @@ class Repository(MongoRepositoryBase):
#endregion #endregion
#region Clusters #region Clusters
def add_cluster(self, cluster: Cluster):
super().insert_entry(self._clusters_collection, cluster.to_serializable_dict(for_db=True))
def add_clusters(self, clusters: List[Cluster]): def add_clusters(self, clusters: List[Cluster]):
cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters] cluster_dicts = [c.to_serializable_dict(for_db=True) for c in clusters]
super().insert_many(self._clusters_collection, cluster_dicts) super().insert_many(self._clusters_collection, cluster_dicts)
......
...@@ -86,24 +86,31 @@ def _fetch_nodes(use_case: str, table: str, layer_name: str) -> List[Dict]: ...@@ -86,24 +86,31 @@ def _fetch_nodes(use_case: str, table: str, layer_name: str) -> List[Dict]:
return response.json() return response.json()
def fetch_nodes_from_semantic_linking(): def fetch_nodes_from_semantic_linking(selected_use_cases: List[str] = None, selected_use_case_tables: List[str] = None):
'''Empties the db and inserts layers and nodes from BusinessLogic and SemanticLinking''' '''Empties the db and inserts layers and nodes from BusinessLogic and SemanticLinking'''
repository = Repository() repository = Repository()
repository.delete_all_layers() # please dont delete all layers/ nodes anymore @10.11.2020
repository.delete_all_nodes() # repository.delete_all_layers()
# repository.delete_all_nodes()
use_cases = _fetch_use_cases() use_cases = _fetch_use_cases()
for use_case in use_cases: for use_case in use_cases:
if selected_use_cases is not None and use_case not in selected_use_cases:
continue
print(f"Fetching for use-case {use_case}") print(f"Fetching for use-case {use_case}")
tables = _fetch_tables(use_case) tables = _fetch_tables(use_case)
for table in tables: for table in tables:
if selected_use_case_tables is not None and table not in selected_use_case_tables:
continue
layers = _fetch_layers(use_case, table) layers = _fetch_layers(use_case, table)
for layer in layers: for layer in layers:
try: try:
print(f"Fetching nodes for layer {use_case}//{table}//{layer.layer_name}.") print(f"Fetching nodes for layer {use_case}//{table}//{layer.layer_name}.")
# check if layer already exists in DB, add it if not # check if layer already exists in DB, add it if not
reference_layer = repository.get_layer_by_name(use_case, table, layer.layer_name) reference_layer = repository.get_layer_by_name(use_case, table, layer.layer_name)
if reference_layer == None: if reference_layer == None:
......
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import json
from db.entities import Cluster
from typing import List
from db.repository import Repository
LAYER_FILES = ['User_Demand_Layer.json']
repo = Repository()
def get_clusters(layer_file) -> List[Cluster]:
with open(layer_file, 'r') as file:
clusters = json.loads(file.read())
return [Cluster(cluster_dict=cluster, from_db=False) for cluster in clusters]
def store_generic_clusters(clusters: List[Cluster], layer):
try:
with open(f'{layer}.json', 'w') as file:
cluster_dicts = [c.to_serializable_dict(for_db=False) for c in clusters]
file.write(json.dumps(cluster_dicts))
except:
print(f"failed writing {layer}")
try:
for cluster in clusters:
repo.add_cluster(cluster)
except:
print(f"failed uploading {layer}")
for layer in LAYER_FILES:
clusts: List[Cluster] = get_clusters(layer)
store_generic_clusters(clusts, layer)
\ No newline at end of file
...@@ -14,20 +14,25 @@ from processing.clustering import Clusterer, ClusterResult ...@@ -14,20 +14,25 @@ from processing.clustering import Clusterer, ClusterResult
repo = Repository() repo = Repository()
def run_generic_clustering(): def run_generic_clustering(selected_use_cases: List[str] = None, selected_use_case_tables: List[str] = None, selected_layer_names: List[str] = None):
'''Runs the clustering for all layers found in the repository.''' '''Runs the clustering for all layers found in the repository.'''
all_layers:List[Layer] = repo.get_layers() all_layers:List[Layer] = repo.get_layers()
layers = [l for l in all_layers
if (selected_use_cases is None or l.use_case in selected_use_cases)
and (selected_use_case_tables is None or l.use_case_table in selected_use_case_tables)
and (selected_layer_names is None or l.layer_name in selected_layer_names)
]
for layer in all_layers: for layer in layers:
print(f"Clustering {layer.use_case}//{layer.use_case_table}//{layer.layer_name}.") print(f"Clustering {layer.use_case}//{layer.use_case_table}//{layer.layer_name}.")
if layer.properties is None or len(layer.properties) == 0: if layer.properties is None or len(layer.properties) == 0:
print("skipping") print("skipping, no properties to cluster")
continue continue
try: try:
clusters = run_clustering_for_layer(layer) clusters = run_clustering_for_layer(layer)
store_generic_clusters(clusters) store_generic_clusters(clusters, layer.layer_name)
except Exception as e: except Exception as e:
print(str(e)) print(str(e))
...@@ -46,10 +51,22 @@ def run_clustering_for_layer(layer: Layer) -> List[Cluster]: ...@@ -46,10 +51,22 @@ def run_clustering_for_layer(layer: Layer) -> List[Cluster]:
for key, cluster_result in res.items()] for key, cluster_result in res.items()]
def store_generic_clusters(clusters: List[Cluster]): def store_generic_clusters(clusters: List[Cluster], layer):
repo.add_clusters(clusters) try:
with open(f'{layer}.json', 'w') as file:
cluster_dicts = [c.to_serializable_dict(for_db=False) for c in clusters]
file.write(json.dumps(cluster_dicts))
except:
print(f"Error while writing json for {layer}")
try:
for cluster in clusters:
repo.add_cluster(cluster)
except:
print(f"Error while storing cluster in db for {layer}")
if __name__ == "__main__": if __name__ == "__main__":
repo.delete_all_clusters() # please dont delete all clusters anymore @10.11.2020
run_generic_clustering() # repo.delete_all_clusters()
run_generic_clustering(selected_use_cases=[], selected_use_case_tables=[], selected_layer_names=[])
...@@ -11,4 +11,4 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -11,4 +11,4 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import processing.fetching.fetching as f import processing.fetching.fetching as f
if __name__ == "__main__": if __name__ == "__main__":
f.fetch_nodes_from_semantic_linking() f.fetch_nodes_from_semantic_linking(selected_use_cases=[], selected_use_case_tables=[])
\ No newline at end of file \ No newline at end of file
...@@ -21,8 +21,6 @@ class Repository(MongoRepositoryBase): ...@@ -21,8 +21,6 @@ class Repository(MongoRepositoryBase):
'semanticLinkingDb') 'semanticLinkingDb')
self._layer_collection = 'layers' self._layer_collection = 'layers'
self._layer_nodes_collection = 'layer_nodes' self._layer_nodes_collection = 'layer_nodes'
self._clusters_collection = 'clusters'
self._time_slice_collection = 'time_slices'
# region Layers # region Layers
......
This source diff could not be displayed because it is too large. You can view the blob instead.
import csv
import hashlib
import sys
import os
modules_paths = ['.', '../../../modules/']
for modules_path in modules_paths:
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from messaging.MessageHandler import MessageHandler
# file to read the data from
CSV_FILE = r'dummy_upload/bank_app/bank_data.csv'
handler = MessageHandler()
def upload_transaction(transaction):
uid = hashlib.sha256(f"{transaction['Transaction_ID']}{transaction['Timestamp']}".encode("utf-8")).hexdigest()
transaction['UniqueID'] = uid
transaction['ApplicationType'] = 'bank-app'
transaction['docType'] = 'bank-app'
t = {
'use_case': transaction['ApplicationType'],
'table': transaction['docType'],
'id': uid,
'properties': transaction,
}
handler.handle_new_trace(t)
type_mapping = { 'House Rent': 1, 'Payback Loan': 2, 'Initial Credit': 3, 'Emergency Help': 4, 'Friendly Help': 5 }
if __name__ == '__main__':
with open(CSV_FILE, 'r') as file:
reader = csv.reader(file)
titles = next(reader)
for row in reader:
transaction = {}
for idx in range(len(row)):
transaction[titles[idx]] = row[idx]
transaction['Transaction_Type'] = type_mapping[transaction['Transaction_Type']]
upload_transaction(transaction)
This source diff could not be displayed because it is too large. You can view the blob instead.
import csv
import hashlib
import sys
import os
modules_paths = ['.', '../../../modules/']
for modules_path in modules_paths:
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from messaging.MessageHandler import MessageHandler
# file to read the data from
CSV_FILE = r'Energy_Dataset.csv'
handler = MessageHandler()
def upload_transaction(transaction):
'''{"type": "new-trace",
"content": {"use_case": "smart-energy", "table": "smart-energy", "id": "dd2c5146c919b046d77a32a5cf553d5133163562f7b7e1298c878c575d516025",
"properties": {"Customer": "297", "Energy_Consumption_kWh": "0.177", "Heating_Consumption_kWh": "0.0", "Latitude": "-33.362679", "Longitude": "151.447302", "Postcode": "2261", "Price_AUD_MWh": "58.05", "Solar_Production_kWh": "0.0", "Timestamp": "2013-06-30 00:00:00", "Total_Demand_MWh": "8154.14", "UniqueID": "dd2c5146c919b046d77a32a5cf553d5133163562f7b7e1298c878c575d516025"}
}}'''
uid = hashlib.sha256(f"{transaction['Customer']}{transaction['Timestamp']}".encode("utf-8")).hexdigest()
transaction['UniqueID'] = uid
t = {
'use_case': transaction['ApplicationType'],
'table': transaction['docType'],
'id': uid,
'properties': transaction,
}
handler.handle_new_trace(t)
if __name__ == '__main__':
with open(CSV_FILE, 'r') as file:
reader = csv.reader(file)
titles = next(reader)
old_c = None
for row in reader:
transaction = {}
transaction['ApplicationType'] = 'smart-energy'
transaction['docType'] = 'smart-energy-paper'
for idx in range(len(row)):
transaction[titles[idx]] = row[idx]
# also include the user demand, as Total_Demand_WMh is not per user
energy_cons = float(transaction['Energy_Consumption_kWh']) if transaction['Energy_Consumption_kWh'] is not None and transaction['Energy_Consumption_kWh'] != "" else 0
heating_cons = float(transaction['Heating_Consumption_kWh']) if transaction['Heating_Consumption_kWh'] is not None and transaction['Heating_Consumption_kWh'] != "" else 0
transaction['User_Demand_kWh'] = heating_cons + energy_cons
if transaction['Customer'] != old_c:
old_c = transaction['Customer']
print(f"uploading for {old_c}")
upload_transaction(transaction)
...@@ -63,7 +63,7 @@ class MessageHandler: ...@@ -63,7 +63,7 @@ class MessageHandler:
return layers return layers
def handle_new_trace(self, content: Dict): def handle_new_trace(self, content: Dict):
if "use_case" not in content.keys() or "id" not in content.keys() or "properties" not in content.keys() or "table" not in content.keys(): if "use_case" not in content or "id" not in content or "properties" not in content or "table" not in content:
LOGGER.error(f"Missing fields in trace, required fields: (use_case, id, properties, table), given fields: ({content.keys()})") LOGGER.error(f"Missing fields in trace, required fields: (use_case, id, properties, table), given fields: ({content.keys()})")
return return
......
...@@ -20,7 +20,8 @@ def add_table(use_case: str, table_name: str): ...@@ -20,7 +20,8 @@ def add_table(use_case: str, table_name: str):
"Price_AUD/MWh", "Price_AUD/MWh",
"Total_Demand_MWh", "Total_Demand_MWh",
"Latitude", "Latitude",
"Longitude" "Longitude",
"User_Demand_kWh",
] ]
columns = { c.replace("/", "_") : c for c in columns } columns = { c.replace("/", "_") : c for c in columns }
...@@ -119,7 +120,7 @@ def add_layers(use_case:str, table_name: str): ...@@ -119,7 +120,7 @@ def add_layers(use_case:str, table_name: str):
},{ },{
"use_case": use_case, "use_case": use_case,
"table": table_name, "table": table_name,
"name": "Position_Layer", "name": "Location_Layer",
"properties": [ "properties": [
"UniqueID", "UniqueID",
"Customer", "Customer",
...@@ -133,6 +134,21 @@ def add_layers(use_case:str, table_name: str): ...@@ -133,6 +134,21 @@ def add_layers(use_case:str, table_name: str):
"Longitude", "Longitude",
] ]
}, },
{
"use_case": use_case,
"table": table_name,
"name": "User_Demand_Layer",
"properties": [
"UniqueID",
"Customer",
"Postcode",
"Timestamp",
"User_Demand_kWh"
],
"cluster_properties": [
"User_Demand_kWh",
]
},
] ]
jwt = TokenManager.getInstance().getToken() jwt = TokenManager.getInstance().getToken()
...@@ -150,10 +166,8 @@ def add_layers(use_case:str, table_name: str): ...@@ -150,10 +166,8 @@ def add_layers(use_case:str, table_name: str):
print(url+": "+str(response.status_code)) print(url+": "+str(response.status_code))
def main(use_case: str): def main(use_case: str = "smart-energy", table_name: str = "smart-energy"):
print("SMART-ENERGY") print("SMART-ENERGY")
table_name = "smart-energy"
add_table(use_case, table_name) add_table(use_case, table_name)
add_layers(use_case, table_name) add_layers(use_case, table_name)
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment