Commit 31f6aadc authored by Alexander Lercher's avatar Alexander Lercher

Changes for prediction pipeline

parent f2b72d8f
...@@ -30,7 +30,7 @@ class Repository(MongoRepositoryBase): ...@@ -30,7 +30,7 @@ class Repository(MongoRepositoryBase):
assert confirm, 'WONT DELETE WHOLE DB WITHOUT CONFIRMATION' assert confirm, 'WONT DELETE WHOLE DB WITHOUT CONFIRMATION'
for collection_ in [self._use_case_collection, self._layer_collection, self._layer_pair_collection, for collection_ in [self._use_case_collection, self._layer_collection, self._layer_pair_collection,
self._clusters_collection, self._time_slice_collection]: self._clusters_collection, self._time_slice_collection, self._prediction_result_collection]:
super().drop_collection(collection_) super().drop_collection(collection_)
......
...@@ -13,6 +13,7 @@ from db.repository import Repository ...@@ -13,6 +13,7 @@ from db.repository import Repository
if __name__ == "__main__": if __name__ == "__main__":
'''Fetches all required data from business-logic and role-stage-discovery.''' '''Fetches all required data from business-logic and role-stage-discovery.'''
Repository().DROP(confirm=True) # Repository().DROP(confirm=True)
use_cases = ['vialog-enum', 'car-sharing-official', 'smart-energy', 'crowd-journalism-enum']+['community-prediction-youtube-n', 'community-prediction-taxi'] use_cases = ['smart-energy']
# ['vialog-enum', 'car-sharing-official', 'smart-energy', 'crowd-journalism-enum']+['community-prediction-youtube-n', 'community-prediction-taxi']
fetching.fetch(selected_use_cases=use_cases, selected_use_case_tables=None) fetching.fetch(selected_use_cases=use_cases, selected_use_case_tables=None)
\ No newline at end of file
...@@ -46,7 +46,7 @@ def _run_cleanup(use_cases: List[str] = None): ...@@ -46,7 +46,7 @@ def _run_cleanup(use_cases: List[str] = None):
if __name__ == '__main__': if __name__ == '__main__':
use_cases = ['vialog-enum', 'car-sharing-official', 'smart-energy', 'crowd-journalism-enum'] use_cases = ['smart-energy'] #['vialog-enum', 'car-sharing-official', 'smart-energy', 'crowd-journalism-enum']
# use_cases = ['community-prediction-youtube-n', 'community-prediction-taxi'] # use_cases = ['community-prediction-youtube-n', 'community-prediction-taxi']
_run_data_preparation(use_cases) _run_data_preparation(use_cases)
......
...@@ -69,4 +69,4 @@ def store_generic_clusters(clusters: List[Cluster], layer): ...@@ -69,4 +69,4 @@ def store_generic_clusters(clusters: List[Cluster], layer):
if __name__ == "__main__": if __name__ == "__main__":
# please dont delete all clusters anymore @10.11.2020 # please dont delete all clusters anymore @10.11.2020
# repo.delete_all_clusters() # repo.delete_all_clusters()
run_generic_clustering(selected_use_cases=[], selected_use_case_tables=[], selected_layer_names=[]) run_generic_clustering(selected_use_cases=['smart-energy'], selected_use_case_tables=None, selected_layer_names=None)
...@@ -11,4 +11,4 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning) ...@@ -11,4 +11,4 @@ urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
import processing.fetching.fetching as f import processing.fetching.fetching as f
if __name__ == "__main__": if __name__ == "__main__":
f.fetch_nodes_from_semantic_linking(selected_use_cases=[], selected_use_case_tables=[]) f.fetch_nodes_from_semantic_linking(selected_use_cases=['smart-energy'], selected_use_case_tables=None)
\ No newline at end of file \ No newline at end of file
...@@ -16,12 +16,14 @@ TimeSliceKey = Tuple[int, int] ...@@ -16,12 +16,14 @@ TimeSliceKey = Tuple[int, int]
TIME_PROPERTY_NAMES = [ TIME_PROPERTY_NAMES = [
# vialog-enum # vialog-enum
'created', 'created',
'moderationDate',
'timestamp',
# car-sharing-official # car-sharing-official
'available', 'available',
'date', 'date',
'startDate', 'startDate',
'moment', 'moment',
# smart-energy # smart-energy / bank-app
'Timestamp', 'Timestamp',
# crowd-journalism-enum # crowd-journalism-enum
'creationTimestamp', 'creationTimestamp',
...@@ -156,6 +158,6 @@ def run_time_slicing(selected_use_cases: List[str] = None, selected_use_case_tab ...@@ -156,6 +158,6 @@ def run_time_slicing(selected_use_cases: List[str] = None, selected_use_case_tab
if __name__ == "__main__": if __name__ == "__main__":
use_case = 'community-prediction-youtube-n' use_case = 'smart-energy'
repo.delete_time_slices(use_case) repo.delete_time_slices(use_case)
run_time_slicing(selected_use_cases=[use_case]) run_time_slicing(selected_use_cases=[use_case])
\ No newline at end of file
...@@ -9,10 +9,12 @@ for modules_path in modules_paths: ...@@ -9,10 +9,12 @@ for modules_path in modules_paths:
sys.path.insert(1, modules_path) sys.path.insert(1, modules_path)
from messaging.MessageHandler import MessageHandler from messaging.MessageHandler import MessageHandler
from db.repository import Repository
# file to read the data from # file to read the data from
CSV_FILE = r'dummy_upload/bank_app/bank_data.csv' CSV_FILE = r'dummy_upload/bank_app/bank_data.csv'
handler = MessageHandler() repo = Repository()
handler = MessageHandler(repo)
def upload_transaction(transaction): def upload_transaction(transaction):
...@@ -31,7 +33,7 @@ def upload_transaction(transaction): ...@@ -31,7 +33,7 @@ def upload_transaction(transaction):
handler.handle_new_trace(t) handler.handle_new_trace(t)
type_mapping = { 'House Rent': 1, 'Payback Loan': 2, 'Initial Credit': 3, 'Emergency Help': 4, 'Friendly Help': 5 } type_mapping = { 'House Rent': 0, 'Payback Loan': 1, 'Initial Credit': 2, 'Emergency Help': 3, 'Friendly Help': 4 }
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -30,9 +30,9 @@ def upload_transaction(transaction): ...@@ -30,9 +30,9 @@ def upload_transaction(transaction):
'id': uid, 'id': uid,
'properties': transaction, 'properties': transaction,
} }
# handler.handle_new_trace(t) handler.handle_new_trace(t)
processed_transactions.append(t) # processed_transactions.append(t)
def store_transactions_for_mirsat(): def store_transactions_for_mirsat():
...@@ -67,7 +67,7 @@ if __name__ == '__main__': ...@@ -67,7 +67,7 @@ if __name__ == '__main__':
for row in reader: for row in reader:
transaction = {} transaction = {}
transaction['ApplicationType'] = 'smart-energy' transaction['ApplicationType'] = 'smart-energy'
transaction['docType'] = 'smart-energy-paper' transaction['docType'] = 'smart-energy'
for idx in range(len(row)): for idx in range(len(row)):
transaction[titles[idx]] = row[idx] transaction[titles[idx]] = row[idx]
...@@ -82,4 +82,4 @@ if __name__ == '__main__': ...@@ -82,4 +82,4 @@ if __name__ == '__main__':
upload_transaction(transaction) upload_transaction(transaction)
store_transactions_for_mirsat() # store_transactions_for_mirsat()
\ No newline at end of file \ No newline at end of file
...@@ -68,7 +68,7 @@ class MessageHandler: ...@@ -68,7 +68,7 @@ class MessageHandler:
return layers return layers
def handle_new_trace(self, content: Dict): def handle_new_trace(self, content: Dict):
LOGGER.info("new trace!") # LOGGER.info(f"Received message: {str(content)}")
if "use_case" not in content or "id" not in content or "properties" not in content or "table" not in content: if "use_case" not in content or "id" not in content or "properties" not in content or "table" not in content:
LOGGER.error(f"Missing fields in trace, required fields: (use_case, id, properties, table), given fields: ({content.keys()})") LOGGER.error(f"Missing fields in trace, required fields: (use_case, id, properties, table), given fields: ({content.keys()})")
return return
...@@ -88,7 +88,6 @@ class MessageHandler: ...@@ -88,7 +88,6 @@ class MessageHandler:
LOGGER.warning(f"No layers available for '{use_case}'.'{table}', ignoring trace.") LOGGER.warning(f"No layers available for '{use_case}'.'{table}', ignoring trace.")
return return
LOGGER.info(f"{len(layers)} layers available")
nodes = [] nodes = []
for layer in layers: for layer in layers:
...@@ -104,12 +103,10 @@ class MessageHandler: ...@@ -104,12 +103,10 @@ class MessageHandler:
nodes.append(node) nodes.append(node)
if len(nodes) > 0: if len(nodes) > 0:
LOGGER.info(f"{len(layers)} layers available")
self._repository.add_layer_nodes(nodes) self._repository.add_layer_nodes(nodes)
else: else:
LOGGER.error(f"did NOT add nodes...") LOGGER.error(f"did NOT add nodes...")
LOGGER.info("done")
def handle_new_traces_available(self): def handle_new_traces_available(self):
# get all traces and call the Processor # get all traces and call the Processor
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment