Commit 4644e420 authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'bugfix/traceIdDeduplication' into 'develop'

Bugfix/trace id deduplication

See merge request !20
parents bd115b86 3ca272b5
...@@ -90,6 +90,7 @@ class MessageHandler: ...@@ -90,6 +90,7 @@ class MessageHandler:
for prop in layer.total_properties: for prop in layer.total_properties:
node[prop] = content["properties"][prop] node[prop] = content["properties"][prop]
node["layer_name"] = layer.layer_name node["layer_name"] = layer.layer_name
node["table"] = layer.table node["table"] = layer.table
node["use_case"] = layer.use_case node["use_case"] = layer.use_case
......
...@@ -11,6 +11,7 @@ from typing import List ...@@ -11,6 +11,7 @@ from typing import List
class DummyMongoRepo: class DummyMongoRepo:
'''Dummy class to be used for testing the MessageHandler''' '''Dummy class to be used for testing the MessageHandler'''
last_trace = None last_trace = None
layernodes = []
def insert_trace(self, trace): def insert_trace(self, trace):
self.last_trace = trace self.last_trace = trace
...@@ -34,7 +35,9 @@ class DummyMongoRepo: ...@@ -34,7 +35,9 @@ class DummyMongoRepo:
] ]
def add_layer_nodes(self, nodes: List): def add_layer_nodes(self, nodes: List):
pass self.layernodes.extend(nodes)
return
class Test_Pipeline(unittest.TestCase): class Test_Pipeline(unittest.TestCase):
handler = None handler = None
...@@ -68,6 +71,7 @@ class Test_Pipeline(unittest.TestCase): ...@@ -68,6 +71,7 @@ class Test_Pipeline(unittest.TestCase):
def testTraceProcessing(self): def testTraceProcessing(self):
msg = self._buildTraceMessage() msg = self._buildTraceMessage()
self.handler.handle_new_trace(msg["content"]) self.handler.handle_new_trace(msg["content"])
self.assertEqual(len(self.handler._repository.layernodes),1)
if __name__ == '__main__': if __name__ == '__main__':
......
...@@ -62,6 +62,39 @@ paths: ...@@ -62,6 +62,39 @@ paths:
responses: responses:
'200': '200':
description: "Successful Request" description: "Successful Request"
/use_cases/{use_case}/transactions-duplicated:
delete:
security:
- JwtRegular: []
operationId: "routes.transactions.delete_all_duplicated_for_use_case"
tags:
- "Transactions"
summary: "Deletes all duplicated Transactions in the given Use-Case"
description: "Deletes all duplicated Transactions in the given Use-Case"
parameters:
- in: path
name: "use_case"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
get:
security:
- JwtRegular: []
operationId: "routes.transactions.all_duplicated_for_use_case"
tags:
- "Transactions"
summary: "Retrieves all duplicated Transactions in the given Use-Case"
description: "Retrieves all duplicated Transactions in the given Use-Case"
parameters:
- in: path
name: "use_case"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
/debug: /debug:
post: post:
......
...@@ -18,6 +18,7 @@ class Repository(MongoRepositoryBase): ...@@ -18,6 +18,7 @@ class Repository(MongoRepositoryBase):
self._transaction_collection = 'transactions' self._transaction_collection = 'transactions'
self._failed_transaction_collection = 'transactions_failed' self._failed_transaction_collection = 'transactions_failed'
self._duplicated_transaction_collection = "transactions_duplicated"
def delete_all_transactions(self): def delete_all_transactions(self):
collection = self._database[self._transaction_collection] collection = self._database[self._transaction_collection]
...@@ -58,3 +59,15 @@ class Repository(MongoRepositoryBase): ...@@ -58,3 +59,15 @@ class Repository(MongoRepositoryBase):
def delete_all_failed_transactions(self, use_case:str): def delete_all_failed_transactions(self, use_case:str):
collection = self._database[self._failed_transaction_collection] collection = self._database[self._failed_transaction_collection]
collection.delete_many({"ApplicationType": use_case}) collection.delete_many({"ApplicationType": use_case})
def add_duplicated_transaction(self, transaction: Transaction):
#transaction["timestamp"] = time.time()
super().insert_entry(self._duplicated_transaction_collection, transaction.to_serializable_dict())
def all_duplicated_transactions_for_use_case(self, use_case: str) -> List[Dict]:
result = super().get_entries(self._duplicated_transaction_collection, projection={'_id': False}, selection={"use_case": use_case})
return [Transaction.from_serializable_dict(row) for row in list(result)]
def delete_all_duplicated_transactions(self, use_case:str):
collection = self._database[self._duplicated_transaction_collection]
collection.delete_many({"ApplicationType": use_case})
\ No newline at end of file
...@@ -174,6 +174,19 @@ class MessageHandler: ...@@ -174,6 +174,19 @@ class MessageHandler:
return return
transaction = Transaction(use_case, target_table["name"], flattened) transaction = Transaction(use_case, target_table["name"], flattened)
#check for duplicates
try:
reference = self._mongo_repo.get_transaction_with_id(transaction.id())
if reference != None:
if (reference[0].table == transaction.table) and (reference[0].use_case == transaction.use_case):
LOGGER.error("Found duplicate")
self._mongo_repo.add_duplicated_transaction(transaction)
return
except ValueError as e:
LOGGER.error(f"{e}, could not insert duplicated node.")
return
try: try:
self._mongo_repo.add_transaction(transaction) self._mongo_repo.add_transaction(transaction)
except ValueError as e: except ValueError as e:
......
...@@ -2,7 +2,10 @@ from messaging.rest_fetcher import RestFetcher ...@@ -2,7 +2,10 @@ from messaging.rest_fetcher import RestFetcher
class DummyRestFetcher(RestFetcher): class DummyRestFetcher(RestFetcher):
def fetch_schema_information(self, use_case: str): def fetch_schema_information(self, use_case: str):
return [ returnList = []
if use_case == "string":
returnList =[
{ {
"name": "string", "name": "string",
"use_case": "string", "use_case": "string",
...@@ -10,5 +13,33 @@ class DummyRestFetcher(RestFetcher): ...@@ -10,5 +13,33 @@ class DummyRestFetcher(RestFetcher):
"UniqueID": "ResourceIds", "UniqueID": "ResourceIds",
"RIds": "ResourceIds" "RIds": "ResourceIds"
} }
},
{
"name": "string2",
"use_case": "string",
"mappings": {
"UniqueID": "ResourceIds",
"RIds": "ResourceIds"
}
}
]
else:
returnList = [
{
"name": "string",
"use_case": "string2",
"mappings": {
"UniqueID": "ResourceIds",
"RIds": "ResourceIds"
}
},
{
"name": "string2",
"use_case": "string2",
"mappings": {
"UniqueID": "ResourceIds",
"RIds": "ResourceIds"
}
} }
] ]
return returnList
\ No newline at end of file
...@@ -21,3 +21,10 @@ def all_failed_for_use_case(use_case: str): ...@@ -21,3 +21,10 @@ def all_failed_for_use_case(use_case: str):
def delete_all_failed_for_use_case(use_case: str): def delete_all_failed_for_use_case(use_case: str):
_repository.delete_all_failed_transactions(use_case) _repository.delete_all_failed_transactions(use_case)
return Response(status=200) return Response(status=200)
def all_duplicated_for_use_case(use_case: str):
return _repository.all_duplicated_transactions_for_use_case(use_case)
def delete_all_duplicated_for_use_case(use_case: str):
_repository.delete_all_duplicated_transactions(use_case)
return Response(status=200)
\ No newline at end of file
...@@ -10,6 +10,7 @@ class DummyMongoRepo: ...@@ -10,6 +10,7 @@ class DummyMongoRepo:
def __init__(self): def __init__(self):
self.added_transactions = [] self.added_transactions = []
self.duplicated_transactions = []
def insert_trace(self, trace): def insert_trace(self, trace):
self.last_trace = trace self.last_trace = trace
...@@ -17,6 +18,22 @@ class DummyMongoRepo: ...@@ -17,6 +18,22 @@ class DummyMongoRepo:
def add_transaction(self, transaction): def add_transaction(self, transaction):
self.added_transactions.append(transaction) self.added_transactions.append(transaction)
def get_transaction_with_id(self, unique_id: str):
result = []
for trans in self.added_transactions:
transID = trans.id()
if transID == unique_id:
result.append(trans)
if len(result) > 0:
return result
return None
def add_duplicated_transaction(self, transaction):
self.duplicated_transactions.append(transaction)
from messaging.DummyMessageManager import DummyMessageManager as DummyMessageSender from messaging.DummyMessageManager import DummyMessageManager as DummyMessageSender
from messaging.dummy_rest_fetcher import DummyRestFetcher from messaging.dummy_rest_fetcher import DummyRestFetcher
...@@ -53,6 +70,48 @@ class Test_MessageHandler(unittest.TestCase): ...@@ -53,6 +70,48 @@ class Test_MessageHandler(unittest.TestCase):
} }
} }
return json.dumps(message_values) return json.dumps(message_values)
def _get_valid_message2(self) -> str:
message_values = \
{ 'type': 'blockchain-transaction',
'content':
{
"ApplicationType": "string2",
"docType": "string",
"Metadata": {},
"ResourceIds": "string",
"ResourceMd5": "string",
"ResourceState": "string",
"Timestamp": "2019-08-27T14:00:48.587Z",
"TransactionFrom": "string",
"TransactionFromLatLng": "string",
"TransactionId": "string",
"TransactionTo": "string",
"TransactionToLatLng": "string",
"TransferredAsset": "string"
}
}
return json.dumps(message_values)
def _get_valid_message3(self) -> str:
message_values = \
{ 'type': 'blockchain-transaction',
'content':
{
"ApplicationType": "string",
"docType": "string2",
"Metadata": {},
"ResourceIds": "string",
"ResourceMd5": "string",
"ResourceState": "string",
"Timestamp": "2019-08-27T14:00:48.587Z",
"TransactionFrom": "string",
"TransactionFromLatLng": "string",
"TransactionId": "string",
"TransactionTo": "string",
"TransactionToLatLng": "string",
"TransferredAsset": "string"
}
}
return json.dumps(message_values)
def test_handleGeneric_emptyMessage_NotJsonError(self): def test_handleGeneric_emptyMessage_NotJsonError(self):
res = self.handler.handle_generic('') res = self.handler.handle_generic('')
...@@ -111,5 +170,32 @@ class Test_MessageHandler(unittest.TestCase): ...@@ -111,5 +170,32 @@ class Test_MessageHandler(unittest.TestCase):
self.assertEqual('semantic-linking', self.msg_sender.last_message['key']) self.assertEqual('semantic-linking', self.msg_sender.last_message['key'])
self.assertEqual('new-trace', json.loads(self.msg_sender.last_message['msg'])["type"]) self.assertEqual('new-trace', json.loads(self.msg_sender.last_message['msg'])["type"])
def test_handleblockchain_duplicateTrace(self):
msg = self._get_valid_message()
msg2 = self._get_valid_message()
msg = eval(msg)
msg2 = eval(msg2)
self.handler.handle_blockchain_transaction(msg['content'])
self.handler.handle_blockchain_transaction(msg2['content'])
self.assertEqual(len(self.repo.added_transactions),len(self.repo.duplicated_transactions))
def test_handleblockchain_duplicateTraceDifferentTable(self):
msg = self._get_valid_message()
msg2 = self._get_valid_message2()
msg = eval(msg)
msg2 = eval(msg2)
self.handler.handle_blockchain_transaction(msg['content'])
self.handler.handle_blockchain_transaction(msg2['content'])
self.assertEqual(len(self.repo.added_transactions),2)
def test_handleblockchain_duplicateTraceDifferentUseCase(self):
msg = self._get_valid_message()
msg2 = self._get_valid_message3()
msg = eval(msg)
msg2 = eval(msg2)
self.handler.handle_blockchain_transaction(msg['content'])
self.handler.handle_blockchain_transaction(msg2['content'])
self.assertEqual(len(self.repo.added_transactions),2)
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment