Commit 0f7bfca8 authored by Manuel Herold's avatar Manuel Herold

[traceRetrieval] implemented enum parsing

parent b38aa85f
{
"ApplicationType": "debug",
"docType": "pizza",
"id": 1,
"dough": {
"type": "wheat",
"spinach": false
},
"toppings": [
{
"name": "cheese",
"price": 0.99
}
],
"name": "Margherita"
}
\ No newline at end of file
......@@ -64,6 +64,7 @@ def main(use_case: str):
{
"UniqueID": "id",
"name": "name",
"nameIndex": "enum(name)",
"doughType": "dough//type",
"hasSpinach": "dough//spinach",
"firstTopping": "toppings[0]//name",
......@@ -81,10 +82,11 @@ def main(use_case: str):
"name": "Price_Layer",
"properties": [
"UniqueID",
"firstToppingPrice"
"firstToppingPrice",
"nameIndex"
],
"cluster_properties": [
"firstToppingPrice"
"firstToppingPrice",
]
},
{
......
from typing import Dict
from security.token_manager import TokenManager
import network_constants
from database.entities.transaction import Transaction
......@@ -10,11 +11,10 @@ import logging
import requests
requests.packages.urllib3.disable_warnings()
from typing import Dict
LOGGER = logging.getLogger(__name__)
class MessageHandler:
MSG_NOT_JSON = "Message is not in JSON format and is ignored"
MSG_NO_TYPE = "Message has no type field and is ignored"
......@@ -25,14 +25,13 @@ class MessageHandler:
_message_sender = None
_rest_fetcher = None
def __init__(self, mongo_repo, message_sender, rest_fetcher:RestFetcher):
def __init__(self, mongo_repo, message_sender, rest_fetcher: RestFetcher):
self._mongo_repo = mongo_repo
self._message_sender = message_sender
self._message_sender.create_message_destination('datahub', 'direct')
self._rest_fetcher = rest_fetcher
def handle_generic(self, body):
result = None
message = None
......@@ -49,7 +48,7 @@ class MessageHandler:
LOGGER.warning(result)
LOGGER.info(f"Received message: {body}")
return result
if message['type'] == 'blockchain-transaction':
self.handle_blockchain_transaction(message['content'])
result = self.MSG_TRACE_PROCESSED
......@@ -57,41 +56,11 @@ class MessageHandler:
result = self.MSG_NOT_PROCESSED
LOGGER.warning(result)
LOGGER.info(f"Received message: {body}")
#LOGGER.info(result) #too much spam
# LOGGER.info(result) #too much spam
return result
def _resolve_path(self, data: Dict, path:str) -> Dict:
'''
resolves a path without concatenation in a json dictionary
@params
data - Required: Dictionary that is the decoded json string
path - Required: path of multiple keys seperated by "//" and list indizes "[5]"
'''
path_pieces = path.split("//")
value = data
# resolve all pieces of the path in order
for i in range(0,len(path_pieces)):
piece = path_pieces[i]
# is the current path piece in the form attribute[index]?
if piece[-1] == "]":
start = piece.index("[")
# stem ... attribute name
# index ... list index
stem = piece[:start]
index = int(piece[start+1:-1])
value = value[stem][index]
else:
value = value[piece]
return value
def _flatten_transaction(self, transaction: Dict, mappings: Dict):
def _flatten_transaction(self, transaction: Dict, mappings: Dict, use_case: str, table: str):
'''
Resolves all paths in [mappings] with the data from [transaction] and creates
a non-nested flattened element.
......@@ -101,34 +70,83 @@ class MessageHandler:
mappings - Required: contains string->path mappings, describing how the flattened object is built
'''
# print("TRANSACTION: "+str(transaction))
# print("MAPPINGS: "+str(mappings))
print("yey new trace ^.^")
flattened = {}
# iterate over schema mappings and resolve paths
for mapping in mappings.keys():
full_path = mappings[mapping]
for mapping_name in mappings.keys():
full_path = mappings[mapping_name]
concat_paths = full_path.split("+")
values = []
# resolve individual path parts
for path in concat_paths:
values.append(
self._resolve_path(transaction, path)
)
resolved_value = self._resolve_path(mapping_name, transaction, path)
if path.startswith("enum("):
resolved_value = self._resolve_enum(use_case, table, mapping_name, resolved_value)
values.append(resolved_value)
# rejoin individual parts
if len(values) > 1:
final_value = "".join([str(value) for value in values])
else:
final_value = values[0]
flattened[mapping] = final_value
flattened["UniqueID"] = hashlib.sha256(str(flattened["UniqueID"]).encode("utf-8")).hexdigest()
flattened[mapping_name] = final_value
flattened["UniqueID"] = hashlib.sha256(
str(flattened["UniqueID"]).encode("utf-8")).hexdigest()
return flattened
def _resolve_enum(self, use_case: str, table: str, enum_name: str, enum_value: str):
return RestFetcher().fetch_enum_value(use_case, table, enum_name, enum_value)
def _resolve_path(self, attribute_name: str, data: Dict, path: str) -> Dict:
'''
resolves a path WITHOUT concatenation in a json dictionary. ignores enum
calculation and returns the value to be enumified instead.
@params
data - Required: Dictionary that is the decoded json string
path - Required: path of multiple keys seperated by "//" and list indizes "[5]"
'''
# cutoff of enum if present
if path.startswith("enum("):
path = path[5:-1]
path_pieces = path.split("//")
value = data
# resolve all pieces of the path in order
for i in range(0, len(path_pieces)):
piece = path_pieces[i]
value = self._resolve_path_part(value, piece)
return value
def _resolve_path_part(self, data: Dict, path_part: str):
value = None
# is the current path piece in the form attribute[index]?
if path_part[-1] == "]":
start = path_part.index("[")
# stem ... attribute name
# index ... list index
stem = path_part[:start]
index = int(path_part[start+1:-1])
value = data[stem][index]
else:
value = data[path_part]
return value
def handle_blockchain_transaction(self, transaction_message: Dict):
'''
Proccesses a blockchain transaction. The schema gets fetched from the business-logic microservice and a flattened
......@@ -140,14 +158,16 @@ class MessageHandler:
# check if there is a use-case in the message
if "ApplicationType" not in transaction_message:
LOGGER.error("Transaction has no ApplicationType, storing it under use-case 'unknown'.")
LOGGER.error(
"Transaction has no ApplicationType, storing it under use-case 'unknown'.")
transaction_message["ApplicationType"] = "unknown"
self._mongo_repo.add_failed_transaction(transaction_message)
return
# check if there is a table in the message
if "docType" not in transaction_message:
LOGGER.error("Transaction has no docType, storing it under table 'unknown'.")
LOGGER.error(
"Transaction has no docType, storing it under table 'unknown'.")
transaction_message["docType"] = "unknown"
self._mongo_repo.add_failed_transaction(transaction_message)
return
......@@ -156,9 +176,11 @@ class MessageHandler:
transaction_table = transaction_message["docType"]
try:
tables = self._rest_fetcher.fetch_schema_information(transaction_use_case)
tables = self._rest_fetcher.fetch_schema_information(
transaction_use_case)
except ValueError as e:
LOGGER.error(f"{e}\nCould not fetch schema, storing it as a failed transaction..")
LOGGER.error(
f"{e}\nCould not fetch schema, storing it as a failed transaction..")
self._mongo_repo.add_failed_transaction(transaction_message)
return
......@@ -168,30 +190,40 @@ class MessageHandler:
if table["name"] == transaction_table:
target_table = table
break
# abort if table does not exist.
if target_table == None:
LOGGER.error(f"There is no table '{transaction_table}', storing it as a failed transaction.")
LOGGER.error(
f"There is no table '{transaction_table}', storing it as a failed transaction.")
self._mongo_repo.add_failed_transaction(transaction_message)
return
mappings = table["mappings"]
try:
flattened = self._flatten_transaction(transaction_message, mappings)
flattened = self._flatten_transaction(
transaction_message,
mappings,
transaction_use_case,
transaction_table
)
except KeyError as e:
LOGGER.error(f"Failed while flattening with KeyError: {str(e)}, storing it as a failed transaction.")
LOGGER.error(
f"Failed while flattening with KeyError: {str(e)}, storing it as a failed transaction.")
self._mongo_repo.add_failed_transaction(transaction_message)
return
transaction = Transaction(transaction_use_case, target_table["name"], flattened)
transaction = Transaction(
transaction_use_case, target_table["name"], flattened)
#check for duplicates
# check for duplicates
try:
references = self._mongo_repo.get_transaction_with_id(transaction.id(),transaction_use_case,transaction_table)
references = self._mongo_repo.get_transaction_with_id(
transaction.id(), transaction_use_case, transaction_table)
if references != None:
LOGGER.info("Found duplicate, storing it as a duplicated transaction.")
LOGGER.info(
"Found duplicate, storing it as a duplicated transaction.")
self._mongo_repo.add_duplicated_transaction(transaction)
return
return
except ValueError as e:
LOGGER.error(f"{e}, could not insert duplicated node.")
return
......@@ -200,16 +232,16 @@ class MessageHandler:
self._mongo_repo.add_transaction(transaction)
except KeyError as e:
LOGGER.error(f"{e}, ignored {transaction_message}")
# self._mongo_repo.add_failed_transaction(transaction_message)
# self._mongo_repo.add_failed_transaction(transaction_message)
return
msg = {
"type": "new-trace",
"content": transaction.to_serializable_dict()
}
msg_string = json.dumps(msg)
# inform semantic linking microservice
self._message_sender.send_message('datahub', 'semantic-linking', msg_string)
\ No newline at end of file
self._message_sender.send_message(
'datahub', 'semantic-linking', msg_string)
......@@ -2,23 +2,48 @@ from security.token_manager import TokenManager
import network_constants
from typing import List
import json, requests
import json
import requests
class RestFetcher:
def fetch_enum_value(self, use_case: str, table: str, enum_name: str, enum_value: str) -> int:
jwt_token = TokenManager.getInstance().getToken()
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables/{table}/enum/{enum_name}?value={enum_value}'
print(f"calling {url}")
# query tables for use-case
url = url
response = requests.put(
url,
verify=False,
proxies={"http": None, "https": None},
headers={"Authorization": f"Bearer {jwt_token}"},
)
if response.status_code != 200:
raise ValueError(
f"Error while retrieving Enum information.\tStatus-code:{response.status_code}")
enum = json.loads(response.text)
return enum["index"]
def fetch_schema_information(self, use_case: str) -> List:
jwt_token = TokenManager.getInstance().getToken()
# query tables for use-case
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/tables'
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt_token}"}
url,
verify=False,
proxies={"http": None, "https": None},
headers={"Authorization": f"Bearer {jwt_token}"}
)
if response.status_code != 200:
raise ValueError(f"Error while retrieving schema information.\tStatus-code:{response.status_code}")
raise ValueError(
f"Error while retrieving schema information.\tStatus-code:{response.status_code}")
tables = json.loads(response.text)
return tables
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment