Commit 3f61d953 authored by Manuel's avatar Manuel

Merge branch 'feature/enums' into develop

parents 391ba2ad 87ec9330
...@@ -54,15 +54,21 @@ class MessageHandler: ...@@ -54,15 +54,21 @@ class MessageHandler:
layers = [Layer.from_business_logic_dict(row) for row in json.loads(response.text)] layers = [Layer.from_business_logic_dict(row) for row in json.loads(response.text)]
LOGGER.info(f"Received {len(layers)} layers")
# update local DB, insert each layer that does not already exists # update local DB, insert each layer that does not already exists
for layer in layers: for layer in layers:
print(f"Add layer to DB: {layer.to_serializable_dict(for_db=True)}") print(f"Add layer to DB: {layer.to_serializable_dict(for_db=True)}")
self._repository.delete_layer(layer) self._repository.delete_layer(layer)
self._repository.add_layer(layer) self._repository.add_layer(layer)
if len(layers)==0:
LOGGER.error(f"no schema information found")
return layers return layers
def handle_new_trace(self, content: Dict): def handle_new_trace(self, content: Dict):
LOGGER.info("new trace!")
if "use_case" not in content or "id" not in content or "properties" not in content or "table" not in content: if "use_case" not in content or "id" not in content or "properties" not in content or "table" not in content:
LOGGER.error(f"Missing fields in trace, required fields: (use_case, id, properties, table), given fields: ({content.keys()})") LOGGER.error(f"Missing fields in trace, required fields: (use_case, id, properties, table), given fields: ({content.keys()})")
return return
...@@ -82,6 +88,7 @@ class MessageHandler: ...@@ -82,6 +88,7 @@ class MessageHandler:
LOGGER.warning(f"No layers available for '{use_case}'.'{table}', ignoring trace.") LOGGER.warning(f"No layers available for '{use_case}'.'{table}', ignoring trace.")
return return
LOGGER.info(f"{len(layers)} layers available")
nodes = [] nodes = []
for layer in layers: for layer in layers:
...@@ -97,7 +104,12 @@ class MessageHandler: ...@@ -97,7 +104,12 @@ class MessageHandler:
nodes.append(node) nodes.append(node)
if len(nodes) > 0: if len(nodes) > 0:
LOGGER.info(f"{len(layers)} layers available")
self._repository.add_layer_nodes(nodes) self._repository.add_layer_nodes(nodes)
else:
LOGGER.error(f"did NOT add nodes...")
LOGGER.info("done")
def handle_new_traces_available(self): def handle_new_traces_available(self):
# get all traces and call the Processor # get all traces and call the Processor
......
...@@ -29,7 +29,7 @@ def add_use_case(use_case: str): ...@@ -29,7 +29,7 @@ def add_use_case(use_case: str):
print(url+": "+str(response.content)) print(url+": "+str(response.content))
if __name__ == "__main__": if __name__ == "__main__":
use_case = "crowd-journalism" use_case = "crowd-journalism-enum"
# disable ssl warnings :) # disable ssl warnings :)
requests.packages.urllib3.disable_warnings() requests.packages.urllib3.disable_warnings()
......
...@@ -18,6 +18,7 @@ def add_table(use_case: str, table_name: str): ...@@ -18,6 +18,7 @@ def add_table(use_case: str, table_name: str):
columns = { c : c for c in columns } columns = { c : c for c in columns }
columns["UniqueID"] = "userid+videoid" columns["UniqueID"] = "userid+videoid"
columns["objecttype"] = "enum(objecttype)"
table = { table = {
"name": table_name, "name": table_name,
......
...@@ -17,7 +17,8 @@ def add_table(use_case: str, table_name: str): ...@@ -17,7 +17,8 @@ def add_table(use_case: str, table_name: str):
columns = { c : c for c in columns } columns = { c : c for c in columns }
columns["UniqueID"] = "eventid" columns["UniqueID"] = "eventid"
columns["firstTag"] = "tags[0]" columns["firstTag"] = "enum(tags[0])"
columns["objecttype"] = "enum(objecttype)"
table = { table = {
"name": table_name, "name": table_name,
......
...@@ -17,6 +17,10 @@ def add_table(use_case: str, table_name: str): ...@@ -17,6 +17,10 @@ def add_table(use_case: str, table_name: str):
columns = { c : c for c in columns } columns = { c : c for c in columns }
columns["UniqueID"] = "userid+videoid+ownerid" columns["UniqueID"] = "userid+videoid+ownerid"
columns["objecttype"] = "enum(objecttype)"
columns["userid"] = "enum(userid)"
columns["ownerid"] = "enum(ownerid)"
table = { table = {
"name": table_name, "name": table_name,
......
...@@ -5,13 +5,9 @@ def add_table(use_case: str, table_name: str): ...@@ -5,13 +5,9 @@ def add_table(use_case: str, table_name: str):
replace all "/"'s in the internal representation with a "_" replace all "/"'s in the internal representation with a "_"
''' '''
columns = [ columns = {}
# "docType", columns["tag"] = "enum(tag)"
"objecttype", columns["objecttype"] = "enum(objecttype)"
"tag"
]
columns = { c : c for c in columns }
columns["UniqueID"] = "objecttype+tag" columns["UniqueID"] = "objecttype+tag"
table = { table = {
......
...@@ -33,7 +33,10 @@ def add_table(use_case: str, table_name: str): ...@@ -33,7 +33,10 @@ def add_table(use_case: str, table_name: str):
columns["UniqueID"] = "videoid" columns["UniqueID"] = "videoid"
columns["encodedAudio"] = "codec//audio" columns["encodedAudio"] = "codec//audio"
columns["encodedVideo"] = "codec//video" columns["encodedVideo"] = "codec//video"
columns["firstTag"] = "tags[0]"
columns["objecttype"] = "enum(objecttype)"
columns["duration"] = "enum(duration)"
columns["firstTag"] = "enum(tags[0])"
table = { table = {
"name": table_name, "name": table_name,
......
{
"ApplicationType": "debug",
"docType": "pizza",
"id": 1,
"dough": {
"type": "wheat",
"spinach": false
},
"toppings": [
{
"name": "cheese",
"price": 0.99
}
],
"name": "Margherita"
}
\ No newline at end of file
...@@ -64,6 +64,7 @@ def main(use_case: str): ...@@ -64,6 +64,7 @@ def main(use_case: str):
{ {
"UniqueID": "id", "UniqueID": "id",
"name": "name", "name": "name",
"nameIndex": "enum(name)",
"doughType": "dough//type", "doughType": "dough//type",
"hasSpinach": "dough//spinach", "hasSpinach": "dough//spinach",
"firstTopping": "toppings[0]//name", "firstTopping": "toppings[0]//name",
...@@ -81,10 +82,11 @@ def main(use_case: str): ...@@ -81,10 +82,11 @@ def main(use_case: str):
"name": "Price_Layer", "name": "Price_Layer",
"properties": [ "properties": [
"UniqueID", "UniqueID",
"firstToppingPrice" "firstToppingPrice",
"nameIndex"
], ],
"cluster_properties": [ "cluster_properties": [
"firstToppingPrice" "firstToppingPrice",
] ]
}, },
{ {
......
import sys
import os
import json
from pathlib import Path
from typing import Dict, Any
import requests
requests.packages.urllib3.disable_warnings()
modules_paths = ['.', '../../../modules/']
for modules_path in modules_paths:
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import network_constants as nc
from security.token_manager import TokenManager
use_case_from = "crowd-journalism"
use_case_to = "crowd-journalism-enum"
# 1. get all tables for the use-case
from db.entities.table import Table
jwt = TokenManager.getInstance().getToken()
url = f"https://articonf1.itec.aau.at:30420/api/use-cases/{use_case_from}/tables"
response = requests.get(
url,
verify=False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt}"},
)
tables = json.loads(response.content)
tables = [Table.from_serializable_dict(row) for row in tables]
print(f"|tables|={len(tables)}")
#2. fetch layers for tables
from db.entities.layer_adapter import LayerAdapter
url = f"https://articonf1.itec.aau.at:30420/api/use-cases/{use_case_from}/layers"
response = requests.get(
url,
verify=False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt}"},
)
layers = json.loads(response.content)
layers = [LayerAdapter.from_serializable_dict(row) for row in layers]
print(f"|layers|={len(layers)}")
# 3. fetch transactions for use-case
url = f"https://articonf1.itec.aau.at:30001/api/use_cases/{use_case_from}/transactions"
response = requests.get(
url,
verify=False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt}"},
)
transactions = json.loads(response.content)
print(f"|transactions|={len(transactions)}")
ignored_fields = ["ApplicationType","docType","UniqueID"]
for table in tables:
for transaction in transactions:
if transaction["table"] != table.name:
continue
data = {
"ApplicationType": use_case_to,
"docType": table.name,
}
if table.name == "video":
data["codec"] = {
"audio": transaction["properties"]["encodedAudio"],
"video": transaction["properties"]["encodedVideo"]
}
data["tags"] = [transaction["properties"]["firstTag"]]
if table.name == "event":
data["tags"] = [transaction["properties"]["firstTag"]]
for key,value in transaction["properties"].items():
if key not in ignored_fields:
data[key] = value
# post the new trace
url = f"https://articonf1.itec.aau.at:30401/api/trace"
response = requests.post(
url,
verify=False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt}"},
json = data
)
print(url+": "+str(response.status_code))
...@@ -515,6 +515,109 @@ paths: ...@@ -515,6 +515,109 @@ paths:
description: "Field in request is missing" description: "Field in request is missing"
'403': '403':
description: "Confirmation required" description: "Confirmation required"
#####
# END LAYERS
#####
#####
# ENUMS
#####
/enums:
get:
security:
- JwtRegular: []
operationId: "routes.enum.all"
tags:
- "Enums"
summary: "Retrieve all Enums from the DB"
description: "Retrieve all Enums from the DB"
responses:
'200':
description: "Successful Request"
/use-cases/{use_case}/enums:
get:
security:
- JwtRegular: []
operationId: "routes.enum.get_all_for_use_case"
tags:
- "Enums"
summary: "Retrieve one Enum from the DB"
description: "Retrieve one Enum from the DB"
parameters:
- name: "use_case"
in: "path"
description: "Name of the Use-Case the Enum belongs to"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
'404':
description: "Enum does not exist"
/use-cases/{use_case}/tables/{table}/enum/{name}:
put:
security:
- JwtRegular: []
operationId: "routes.enum.put_new"
tags:
- "Enums"
summary: "Updates an existing Enum with a new value."
description: "Updates an existing Enum with a new value."
parameters:
- name: "use_case"
in: "path"
description: "Name of the Use-Case the Enum belongs to"
required: true
type: "string"
- name: "table"
in: "path"
description: "Name of the Table the Enum belongs to"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the Enum"
required: true
type: "string"
- name: value
in: query
description: Value of the Enum.
required: true
type: string
responses:
'200':
description: "Successful Request"
'404':
description: "Enum does not exist"
get:
security:
- JwtRegular: []
operationId: "routes.enum.one"
tags:
- "Enums"
summary: "Retrieve one Enum from the DB"
description: "Retrieve one Enum from the DB"
parameters:
- name: "use_case"
in: "path"
description: "Name of the Use-Case the Enum belongs to"
required: true
type: "string"
- name: "table"
in: "path"
description: "Name of the Table the Enum belongs to"
required: true
type: "string"
- name: "name"
in: "path"
description: "Name of the Enum"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
'404':
description: "Enum does not exist"
definitions: definitions:
LayerMapping: LayerMapping:
......
from typing import Dict
class Enum:
def __init__(self, use_case: str, table: str, name: str, value: str, index: int):
self.use_case = use_case
self.table = table
self.name = name
self.value = value
self.index = index
def to_serializable_dict(self) -> Dict:
return {
"use_case": self.use_case,
"table": self.table,
"name": self.name,
"value": self.value,
"index": self.index,
}
@staticmethod
def from_serializable_dict(enum_dict: Dict):
'''
creates a layer object from a dictionary. has to have the following keys:
- name
- properties
- cluster_properties
- use_case
'''
return Enum(
enum_dict["use_case"],
enum_dict["table"],
enum_dict["name"],
enum_dict["value"],
enum_dict["index"],
)
# global imports (dont't worry, red is normal)
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
from db.entities.enum import Enum
from typing import List
class EnumRepository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
def __init__(self):
super().__init__(netconst.BUSINESS_LOGIC_DB_HOSTNAME,
netconst.BUSINESS_LOGIC_DB_PORT,
'business-logic-db')
self._enum_collection = 'enums'
def all(self) -> List[Enum]:
result = super().get_entries(
self._enum_collection, projection={'_id': False})
return [Enum.from_serializable_dict(row) for row in list(result)]
def all_for_use_case(self, use_case: str) -> List[Enum]:
result = super().get_entries(self._enum_collection, projection={
'_id': False}, selection={"use_case": use_case})
return [Enum.from_serializable_dict(row) for row in list(result)]
def all_for_use_case_and_table(self, use_case: str, table: str) -> List[Enum]:
result = super().get_entries(self._enum_collection, projection={
'_id': False}, selection={"use_case": use_case, "table": table})
return [Enum.from_serializable_dict(row) for row in list(result)]
def all_for_use_case_and_table_and_name(self, name: str, use_case: str, table: str) -> Enum:
result = list(super().get_entries(self._enum_collection, selection={
"name": name, "use_case": use_case, "table": table}))
return [Enum.from_serializable_dict(row) for row in result]
def delete_all(self):
collection = self._database[self._enum_collection]
collection.delete_many({})
def add(self, enum: Enum):
super().insert_entry(self._enum_collection, enum.to_serializable_dict())
def update_use_case(self, enum: Enum, use_case: str):
collection = self._database[self._enum_collection]
collection.update_one({"name": enum.name, "use_case": use_case, "table": enum.table}, {
"$set": enum.to_serializable_dict()})
def update(self, enum: Enum):
collection = self._database[self._enum_collection]
collection.update_one({"name": enum.name, "use_case": enum.use_case, "table": enum.table}, {
"$set": enum.to_serializable_dict()})
def delete(self, enum: Enum):
collection = self._database[self._enum_collection]
collection.delete_many(
{"name": enum.name, "use_case": enum.use_case, "table": enum.table})
...@@ -13,6 +13,7 @@ import connexion ...@@ -13,6 +13,7 @@ import connexion
from security import swagger_util from security import swagger_util
from env_info import is_running_locally, get_resources_path from env_info import is_running_locally, get_resources_path
from flask import request from flask import request
from flask_cors import CORS
from flask import redirect from flask import redirect
from flask_cors import CORS from flask_cors import CORS
......
#global imports
from db.entities.enum import Enum
from db.entities.layer_adapter import LayerAdapter
from db.repository import Repository
from db.table_repository import TableRepository
from db.use_case_repository import UseCaseRepository
from db.enum_repository import EnumRepository
import json
from flask import Response, request
table_repository = TableRepository()
use_case_repository = UseCaseRepository()
enum_repository = EnumRepository()
def all():
return [enum.to_serializable_dict() for enum in enum_repository.all()]
def get_all_for_use_case(use_case: str):
'''
get all enums assigned to the given use_case
'''
use_case_repository.put(use_case)
return [enum.to_serializable_dict() for enum in enum_repository.all_for_use_case(use_case)]
def one(use_case: str, table: str, name: str):
'''
fetch a single enum from the DB
@params:
use_case - Required : String-identifier for the Use-Case the Enum belongs to
table - Required : unique identifier of the Table the Enum belongs to
name - Required : unique identifier for the Enum
'''
enums = enum_repository.all_for_use_case_and_table_and_name(name, use_case, table)
return Response(status=200, response=json.dumps([enum.to_serializable_dict() for enum in enums]))
def put_new(use_case: str, table: str, name: str, value: str):
'''
Put a new Enum to the DB. If the value for this enum already exists, nothing happens.
@params:
use_case - Required : String-identifier for the Use-Case the Enum belongs to
table - Required : unique identifier of the Table the Enum belongs to
name - Required : unique identifier for the Enum
'''
existing_enums = enum_repository.all_for_use_case_and_table_and_name(name, use_case, table)
enum_target = None
found = False
for enum in existing_enums:
if enum.value == value:
enum_target = enum
break
if enum_target == None:
enum_target = Enum(use_case, table, name, value, len(existing_enums))
enum_repository.add(enum_target)
return Response(status=200, response=json.dumps(enum_target.to_serializable_dict()))
\ No newline at end of file
from typing import Dict
from security.token_manager import TokenManager from security.token_manager import TokenManager
import network_constants import network_constants
from database.entities.transaction import Transaction from database.entities.transaction import Transaction
...@@ -10,11 +11,10 @@ import logging ...@@ -10,11 +11,10 @@ import logging
import requests import requests
requests.packages.urllib3.disable_warnings() requests.packages.urllib3.disable_warnings()
from typing import Dict
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageHandler: class MessageHandler:
MSG_NOT_JSON = "Message is not in JSON format and is ignored" MSG_NOT_JSON = "Message is not in JSON format and is ignored"
MSG_NO_TYPE = "Message has no type field and is ignored" MSG_NO_TYPE = "Message has no type field and is ignored"
...@@ -25,14 +25,13 @@ class MessageHandler: ...@@ -25,14 +25,13 @@ class MessageHandler:
_message_sender = None _message_sender = None
_rest_fetcher = None _rest_fetcher = None
def __init__(self, mongo_repo, message_sender, rest_fetcher:RestFetcher): def __init__(self, mongo_repo, message_sender, rest_fetcher: RestFetcher):
self._mongo_repo = mongo_repo self._mongo_repo = mongo_repo
self._message_sender = message_sender self._message_sender = message_sender
self._message_sender.create_message_destination('datahub', 'direct') self._message_sender.create_message_destination('datahub', 'direct')
self._rest_fetcher = rest_fetcher self._rest_fetcher = rest_fetcher
def handle_generic(self, body): def handle_generic(self, body):
result = None result = None
message = None message = None
...@@ -49,7 +48,7 @@ class MessageHandler: ...@@ -49,7 +48,7 @@ class MessageHandler:
LOGGER.warning(result) LOGGER.warning(result)
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
return result return result
if message['type'] == 'blockchain-transaction': if message['type'] == 'blockchain-transaction':
self.handle_blockchain_transaction(message['content']) self.handle_blockchain_transaction(message['content'])
result = self.MSG_TRACE_PROCESSED result = self.MSG_TRACE_PROCESSED
...@@ -57,41 +56,11 @@ class MessageHandler: ...@@ -57,41 +56,11 @@ class MessageHandler:
result = self.MSG_NOT_PROCESSED result = self.MSG_NOT_PROCESSED
LOGGER.warning(result) LOGGER.warning(result)
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
#LOGGER.info(result) #too much spam # LOGGER.info(result) #too much spam
return result return result
def _resolve_path(self, data: Dict, path:str) -> Dict:
'''
resolves a path without concatenation in a json dictionary
@params
data - Required: Dictionary that is the decoded json string
path - Required: path of multiple keys seperated by "//" and list indizes "[5]"
'''
path_pieces = path.split("//")
value = data
# resolve all pieces of the path in order def _flatten_transaction(self, transaction: Dict, mappings: Dict, use_case: str, table: str):
for i in range(0,len(path_pieces)):
piece = path_pieces[i]
# is the current path piece in the form attribute[index]?
if piece[-1] == "]":
start = piece.index("[")
# stem ... attribute name
# index ... list index
stem = piece[:start]
index = int(piece[start+1:-1])
value = value[stem][index]
else:
value = value[piece]
return value
def _flatten_transaction(self, transaction: Dict, mappings: Dict):
''' '''
Resolves all paths in [mappings] with the data from [transaction] and creates Resolves all paths in [mappings] with the data from [transaction] and creates
a non-nested flattened element. a non-nested flattened element.
...@@ -101,34 +70,83 @@ class MessageHandler: ...@@ -101,34 +70,83 @@ class MessageHandler:
mappings - Required: contains string->path mappings, describing how the flattened object is built mappings - Required: contains string->path mappings, describing how the flattened object is built
''' '''
# print("TRANSACTION: "+str(transaction)) print("yey new trace ^.^")
# print("MAPPINGS: "+str(mappings))
flattened = {} flattened = {}
# iterate over schema mappings and resolve paths # iterate over schema mappings and resolve paths
for mapping in mappings.keys(): for mapping_name in mappings.keys():
full_path = mappings[mapping] full_path = mappings[mapping_name]
concat_paths = full_path.split("+") concat_paths = full_path.split("+")
values = [] values = []
# resolve individual path parts
for path in concat_paths: for path in concat_paths:
values.append( resolved_value = self._resolve_path(mapping_name, transaction, path)
self._resolve_path(transaction, path)
) if path.startswith("enum("):
resolved_value = self._resolve_enum(use_case, table, mapping_name, resolved_value)
values.append(resolved_value)
# rejoin individual parts
if len(values) > 1: if len(values) > 1:
final_value = "".join([str(value) for value in values]) final_value = "".join([str(value) for value in values])
else: else:
final_value = values[0] final_value = values[0]
flattened[mapping] = final_value flattened[mapping_name] = final_value
flattened["UniqueID"] = hashlib.sha256(str(flattened["UniqueID"]).encode("utf-8")).hexdigest() flattened["UniqueID"] = hashlib.sha256(
str(flattened["UniqueID"]).encode("utf-8")).hexdigest()
return flattened return flattened
def _resolve_enum(self, use_case: str, table: str, enum_name: str, enum_value: str):
return RestFetcher().fetch_enum_value(use_case, table, enum_name, enum_value)
def _resolve_path(self, attribute_name: str, data: Dict, path: str) -> Dict:
'''
resolves a path WITHOUT concatenation in a json dictionary. ignores enum
calculation and returns the value to be enumified instead.
@params
data - Required: Dictionary that is the decoded json string
path - Required: path of multiple keys seperated by "//" and list indizes "[5]"
'''
# cutoff of enum if present
if path.startswith("enum("):
path = path[5:-1]
path_pieces = path.split("//")
value = data
# resolve all pieces of the path in order
for i in range(0, len(path_pieces)):
piece = path_pieces[i]
value = self._resolve_path_part(value, piece)
return value
def _resolve_path_part(self, data: Dict, path_part: str):
value = None
# is the current path piece in the form attribute[index]?
if path_part[-1] == "]":
start = path_part.index("[")
# stem ... attribute name
# index ... list index
stem = path_part[:start]
index = int(path_part[start+1:-1])
value = data[stem][index]
else:
value = data[path_part]
return value
def handle_blockchain_transaction(self, transaction_message: Dict): def handle_blockchain_transaction(self, transaction_message: Dict):
''' '''
Proccesses a blockchain transaction. The schema gets fetched from the business-logic microservice and a flattened Proccesses a blockchain transaction. The schema gets fetched from the business-logic microservice and a flattened
...@@ -140,14 +158,16 @@ class MessageHandler: ...@@ -140,14 +158,16 @@ class MessageHandler:
# check if there is a use-case in the message # check if there is a use-case in the message
if "ApplicationType" not in transaction_message: if "ApplicationType" not in transaction_message:
LOGGER.error("Transaction has no ApplicationType, storing it under use-case 'unknown'.") LOGGER.error(
"Transaction has no ApplicationType, storing it under use-case 'unknown'.")
transaction_message["ApplicationType"] = "unknown" transaction_message["ApplicationType"] = "unknown"
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
# check if there is a table in the message # check if there is a table in the message
if "docType" not in transaction_message: if "docType" not in transaction_message:
LOGGER.error("Transaction has no docType, storing it under table 'unknown'.") LOGGER.error(
"Transaction has no docType, storing it under table 'unknown'.")
transaction_message["docType"] = "unknown" transaction_message["docType"] = "unknown"
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
...@@ -156,9 +176,11 @@ class MessageHandler: ...@@ -156,9 +176,11 @@ class MessageHandler:
transaction_table = transaction_message["docType"] transaction_table = transaction_message["docType"]
try: try:
tables = self._rest_fetcher.fetch_schema_information(transaction_use_case) tables = self._rest_fetcher.fetch_schema_information(
transaction_use_case)
except ValueError as e: except ValueError as e:
LOGGER.error(f"{e}\nCould not fetch schema, storing it as a failed transaction..") LOGGER.error(
f"{e}\nCould not fetch schema, storing it as a failed transaction..")
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
...@@ -168,30 +190,40 @@ class MessageHandler: ...@@ -168,30 +190,40 @@ class MessageHandler:
if table["name"] == transaction_table: if table["name"] == transaction_table:
target_table = table target_table = table
break break
# abort if table does not exist. # abort if table does not exist.
if target_table == None: if target_table == None:
LOGGER.error(f"There is no table '{transaction_table}', storing it as a failed transaction.") LOGGER.error(
f"There is no table '{transaction_table}', storing it as a failed transaction.")
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
mappings = table["mappings"] mappings = table["mappings"]
try: try:
flattened = self._flatten_transaction(transaction_message, mappings) flattened = self._flatten_transaction(
transaction_message,
mappings,
transaction_use_case,
transaction_table
)
except KeyError as e: except KeyError as e:
LOGGER.error(f"Failed while flattening with KeyError: {str(e)}, storing it as a failed transaction.") LOGGER.error(
f"Failed while flattening with KeyError: {str(e)}, storing it as a failed transaction.")
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
transaction = Transaction(transaction_use_case, target_table["name"], flattened) transaction = Transaction(
transaction_use_case, target_table["name"], flattened)
#check for duplicates # check for duplicates
try: try:
references = self._mongo_repo.get_transaction_with_id(transaction.id(),transaction_use_case,transaction_table) references = self._mongo_repo.get_transaction_with_id(
transaction.id(), transaction_use_case, transaction_table)
if references != None: if references != None:
LOGGER.info("Found duplicate, storing it as a duplicated transaction.") LOGGER.info(
"Found duplicate, storing it as a duplicated transaction.")
self._mongo_repo.add_duplicated_transaction(transaction) self._mongo_repo.add_duplicated_transaction(transaction)
return return
except ValueError as e: except ValueError as e:
LOGGER.error(f"{e}, could not insert duplicated node.") LOGGER.error(f"{e}, could not insert duplicated node.")
return return
...@@ -200,16 +232,16 @@ class MessageHandler: ...@@ -200,16 +232,16 @@ class MessageHandler:
self._mongo_repo.add_transaction(transaction) self._mongo_repo.add_transaction(transaction)
except KeyError as e: except KeyError as e:
LOGGER.error(f"{e}, ignored {transaction_message}") LOGGER.error(f"{e}, ignored {transaction_message}")
# self._mongo_repo.add_failed_transaction(transaction_message) # self._mongo_repo.add_failed_transaction(transaction_message)
return return
msg = { msg = {
"type": "new-trace", "type": "new-trace",
"content": transaction.to_serializable_dict() "content": transaction.to_serializable_dict()
} }
msg_string = json.dumps(msg) msg_string = json.dumps(msg)
# inform semantic linking microservice # inform semantic linking microservice
self._message_sender.send_message('datahub', 'semantic-linking', msg_string) self._message_sender.send_message(
\ No newline at end of file 'datahub', 'semantic-linking', msg_string)
...@@ -2,23 +2,48 @@ from security.token_manager import TokenManager ...@@ -2,23 +2,48 @@ from security.token_manager import TokenManager
import network_constants import network_constants
from typing import List from typing import List
import json, requests import json
import requests
class RestFetcher: class RestFetcher:
def fetch_enum_value(self, use_case: str, table: str, enum_name: str, enum_value: str) -> int:
jwt_token = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables/{table}/enum/{enum_name}?value={enum_value}'
print(f"calling {url}")
# query tables for use-case
url = url
response = requests.put(
url,
verify=False,
proxies={"http": None, "https": None},
headers={"Authorization": f"Bearer {jwt_token}"},
)
if response.status_code != 200:
raise ValueError(
f"Error while retrieving Enum information.\tStatus-code:{response.status_code}")
enum = json.loads(response.text)
return enum["index"]
def fetch_schema_information(self, use_case: str) -> List: def fetch_schema_information(self, use_case: str) -> List:
jwt_token = TokenManager.getInstance().getToken() jwt_token = TokenManager.getInstance().getToken()
# query tables for use-case # query tables for use-case
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables' url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables'
response = requests.get( response = requests.get(
url, url,
verify = False, verify=False,
proxies = { "http":None, "https":None }, proxies={"http": None, "https": None},
headers = { "Authorization": f"Bearer {jwt_token}"} headers={"Authorization": f"Bearer {jwt_token}"}
) )
if response.status_code != 200: if response.status_code != 200:
raise ValueError(f"Error while retrieving schema information.\tStatus-code:{response.status_code}") raise ValueError(
f"Error while retrieving schema information.\tStatus-code:{response.status_code}")
tables = json.loads(response.text) tables = json.loads(response.text)
return tables return tables
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment