Commit 643da815 authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'bugfix/duplicate-trace' into 'develop'

Bugfix/duplicate trace

See merge request !36
parents 5ed3c13e 6b8fd3ed
...@@ -31,7 +31,9 @@ class Repository(MongoRepositoryBase): ...@@ -31,7 +31,9 @@ class Repository(MongoRepositoryBase):
@throws @throws
KeyError - Duplicate transaction ID KeyError - Duplicate transaction ID
''' '''
reference = self.get_transaction_with_id(transaction.id()) use_case= transaction.use_case
table = transaction.table
reference = self.get_transaction_with_id(transaction.id(),use_case, table)
if reference == None: if reference == None:
super().insert_entry(self._transaction_collection, transaction.to_serializable_dict()) super().insert_entry(self._transaction_collection, transaction.to_serializable_dict())
else: else:
...@@ -41,11 +43,11 @@ class Repository(MongoRepositoryBase): ...@@ -41,11 +43,11 @@ class Repository(MongoRepositoryBase):
result = super().get_entries(self._transaction_collection, projection={'_id': False}, selection={"use_case": use_case}) result = super().get_entries(self._transaction_collection, projection={'_id': False}, selection={"use_case": use_case})
return [Transaction.from_serializable_dict(row) for row in list(result)] return [Transaction.from_serializable_dict(row) for row in list(result)]
def get_transaction_with_id(self, unique_id: str) -> Transaction: def get_transaction_with_id(self, unique_id: str, use_case:str, table:str ) -> Transaction:
result = list(super().get_entries(self._transaction_collection, projection={'_id': False}, selection={"UniqueID": unique_id})) result = list(super().get_entries(self._transaction_collection, projection={'_id': False}, selection={"id": unique_id,"use_case": use_case, "table":table}))
if len(result) >= 1: if len(result) >= 1:
return Transaction.from_serializable_dict(result) return Transaction.from_serializable_dict(result[0])
return None return None
......
...@@ -8,9 +8,11 @@ import json ...@@ -8,9 +8,11 @@ import json
import hashlib import hashlib
import logging import logging
import requests import requests
requests.packages.urllib3.disable_warnings()
from typing import Dict from typing import Dict
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageHandler: class MessageHandler:
...@@ -30,7 +32,7 @@ class MessageHandler: ...@@ -30,7 +32,7 @@ class MessageHandler:
self._rest_fetcher = rest_fetcher self._rest_fetcher = rest_fetcher
def handle_generic(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}")
result = None result = None
message = None message = None
...@@ -39,11 +41,13 @@ class MessageHandler: ...@@ -39,11 +41,13 @@ class MessageHandler:
except (ValueError, TypeError): except (ValueError, TypeError):
result = self.MSG_NOT_JSON result = self.MSG_NOT_JSON
LOGGER.warning(result) LOGGER.warning(result)
LOGGER.info(f"Received message: {body}")
return result return result
if not 'type' in message: if not 'type' in message:
result = self.MSG_NO_TYPE result = self.MSG_NO_TYPE
LOGGER.warning(result) LOGGER.warning(result)
LOGGER.info(f"Received message: {body}")
return result return result
if message['type'] == 'blockchain-transaction': if message['type'] == 'blockchain-transaction':
...@@ -51,8 +55,11 @@ class MessageHandler: ...@@ -51,8 +55,11 @@ class MessageHandler:
result = self.MSG_TRACE_PROCESSED result = self.MSG_TRACE_PROCESSED
else: else:
result = self.MSG_NOT_PROCESSED result = self.MSG_NOT_PROCESSED
LOGGER.warning(result)
LOGGER.info(f"Received message: {body}")
LOGGER.info(result) #LOGGER.info(result) #too much spam
return result return result
def _resolve_path(self, data: Dict, path:str) -> Dict: def _resolve_path(self, data: Dict, path:str) -> Dict:
...@@ -132,39 +139,39 @@ class MessageHandler: ...@@ -132,39 +139,39 @@ class MessageHandler:
''' '''
# check if there is a use-case in the message # check if there is a use-case in the message
if "ApplicationType" not in transaction_message.keys(): if "ApplicationType" not in transaction_message:
LOGGER.error("Transaction has no ApplicationType, storing it under use-case 'unknown'.") LOGGER.error("Transaction has no ApplicationType, storing it under use-case 'unknown'.")
transaction_message["ApplicationType"] = "unknown" transaction_message["ApplicationType"] = "unknown"
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
# check if there is a doctype in the message # check if there is a table in the message
if "docType" not in transaction_message.keys(): if "docType" not in transaction_message:
LOGGER.error("Transaction has no docType, storing it under docType 'unknown'.") LOGGER.error("Transaction has no docType, storing it under table 'unknown'.")
transaction_message["docType"] = "unknown" transaction_message["docType"] = "unknown"
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
use_case = transaction_message["ApplicationType"] transaction_use_case = transaction_message["ApplicationType"]
docType = transaction_message["docType"] transaction_table = transaction_message["docType"]
try: try:
tables = self._rest_fetcher.fetch_schema_information(use_case) tables = self._rest_fetcher.fetch_schema_information(transaction_use_case)
except ValueError as e: except ValueError as e:
LOGGER.error(f"{e}\nStoring it as a failed transaction.") LOGGER.error(f"{e}\nCould not fetch schema, storing it as a failed transaction..")
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
target_table = None target_table = None
# find correct table # find correct table
for table in tables: for table in tables:
if table["name"] == docType: if table["name"] == transaction_table:
target_table = table target_table = table
break break
# abort if table does not exist. # abort if table does not exist.
if target_table == None: if target_table == None:
LOGGER.error(f"There is no table '{docType}', storing it as a failed transaction.") LOGGER.error(f"There is no table '{transaction_table}', storing it as a failed transaction.")
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
...@@ -172,21 +179,19 @@ class MessageHandler: ...@@ -172,21 +179,19 @@ class MessageHandler:
try: try:
flattened = self._flatten_transaction(transaction_message, mappings) flattened = self._flatten_transaction(transaction_message, mappings)
except KeyError as e: except KeyError as e:
LOGGER.error(f"Failed while flattening with KeyError: {str(e)}") LOGGER.error(f"Failed while flattening with KeyError: {str(e)}, storing it as a failed transaction.")
self._mongo_repo.add_failed_transaction(transaction_message) self._mongo_repo.add_failed_transaction(transaction_message)
return return
transaction = Transaction(use_case, target_table["name"], flattened) transaction = Transaction(transaction_use_case, target_table["name"], flattened)
#check for duplicates #check for duplicates
try: try:
references = self._mongo_repo.get_transaction_with_id(transaction.id()) references = self._mongo_repo.get_transaction_with_id(transaction.id(),transaction_use_case,transaction_table)
if references != None: if references != None:
for item in references: LOGGER.info("Found duplicate, storing it as a duplicated transaction.")
if (item.table == transaction.table) and (item.use_case == transaction.use_case): self._mongo_repo.add_duplicated_transaction(transaction)
LOGGER.error("Found duplicate") return
self._mongo_repo.add_duplicated_transaction(transaction)
return
except ValueError as e: except ValueError as e:
LOGGER.error(f"{e}, could not insert duplicated node.") LOGGER.error(f"{e}, could not insert duplicated node.")
return return
...@@ -194,14 +199,14 @@ class MessageHandler: ...@@ -194,14 +199,14 @@ class MessageHandler:
try: try:
self._mongo_repo.add_transaction(transaction) self._mongo_repo.add_transaction(transaction)
except KeyError as e: except KeyError as e:
LOGGER.error(f"{e}") LOGGER.error(f"{e}, ignored {transaction_message}")
self._mongo_repo.add_failed_transaction(transaction_message) # self._mongo_repo.add_failed_transaction(transaction_message)
return return
msg = { msg = {
"type": "new-trace", "type": "new-trace",
"content": transaction.to_serializable_dict(), "content": transaction.to_serializable_dict()
} }
msg_string = json.dumps(msg) msg_string = json.dumps(msg)
......
...@@ -23,7 +23,8 @@ def delete_all_failed_for_use_case(use_case: str): ...@@ -23,7 +23,8 @@ def delete_all_failed_for_use_case(use_case: str):
return Response(status=200) return Response(status=200)
def all_duplicated_for_use_case(use_case: str): def all_duplicated_for_use_case(use_case: str):
return _repository.all_duplicated_transactions_for_use_case(use_case) transactions = _repository.all_duplicated_transactions_for_use_case(use_case)
return [t.to_serializable_dict() for t in transactions]
def delete_all_duplicated_for_use_case(use_case: str): def delete_all_duplicated_for_use_case(use_case: str):
_repository.delete_all_duplicated_transactions(use_case) _repository.delete_all_duplicated_transactions(use_case)
......
...@@ -18,14 +18,14 @@ class DummyMongoRepo: ...@@ -18,14 +18,14 @@ class DummyMongoRepo:
def add_transaction(self, transaction): def add_transaction(self, transaction):
self.added_transactions.append(transaction) self.added_transactions.append(transaction)
def get_transaction_with_id(self, unique_id: str): def get_transaction_with_id(self, unique_id: str, use_case,table):
result = [] result = []
for trans in self.added_transactions: for trans in self.added_transactions:
transID = trans.id() transID = trans.id()
if transID == unique_id: if transID == unique_id and trans.use_case == use_case and trans.table == table:
result.append(trans) result.append(trans)
if len(result) > 0: if len(result) > 0:
return result return result[0]
return None return None
...@@ -198,7 +198,8 @@ class Test_MessageHandler(unittest.TestCase): ...@@ -198,7 +198,8 @@ class Test_MessageHandler(unittest.TestCase):
self.handler.handle_blockchain_transaction(msg2['content']) self.handler.handle_blockchain_transaction(msg2['content'])
self.assertEqual(len(self.repo.added_transactions),2) self.assertEqual(len(self.repo.added_transactions),2)
def test_handleBlockchainTransaction_multipleTransactions_3AddedUnique2Duplicate(self): def test_handleBlockchainTransaction_multipleTransactions3SameIdDiffUseCaseTable_3AddedUnique2Duplicate(self):
#print("Entered Test: 3Unique 2Dupli")
msg = self._get_valid_message() msg = self._get_valid_message()
msg2 = self._get_valid_message2() msg2 = self._get_valid_message2()
msg3 = self._get_valid_message3() msg3 = self._get_valid_message3()
......
...@@ -57,7 +57,7 @@ if __name__ == '__main__': ...@@ -57,7 +57,7 @@ if __name__ == '__main__':
"ApplicationType": "debug", "ApplicationType": "debug",
"docType": "pizza", "docType": "pizza",
"id": 1, "id": 1,
"name": "MEXICAANA", "name": "Quatro Formagi",
"dough": { "dough": {
"type": "wheat", "type": "wheat",
"spinach": False "spinach": False
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment