Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
S
SMART
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
3
Issues
3
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UNI-KLU
SMART
Commits
e4fb7c90
Commit
e4fb7c90
authored
Mar 24, 2021
by
Bogdan
Browse files
Options
Browse Files
Download
Email Patches
Plain Diff
Implemented use_case/table params for message handler
parent
c6d69c93
Changes
3
Show whitespace changes
Inline
Side-by-side
Showing
3 changed files
with
37 additions
and
35 deletions
+37
-35
repository.py
...n/trace-retrieval-microservice/app/database/repository.py
+6
-9
MessageHandler.py
...ce-retrieval-microservice/app/messaging/MessageHandler.py
+26
-21
test_MessageHandler.py
...e-retrieval-microservice/app/tests/test_MessageHandler.py
+5
-5
No files found.
src/transaction-hub-in/trace-retrieval-microservice/app/database/repository.py
View file @
e4fb7c90
...
...
@@ -31,7 +31,9 @@ class Repository(MongoRepositoryBase):
@throws
KeyError - Duplicate transaction ID
'''
reference
=
self
.
get_transaction_with_id
(
transaction
.
id
())
use_case
=
transaction
.
use_case
table
=
transaction
.
table
reference
=
self
.
get_transaction_with_id
(
transaction
.
id
(),
use_case
,
table
)
if
reference
==
None
:
super
()
.
insert_entry
(
self
.
_transaction_collection
,
transaction
.
to_serializable_dict
())
else
:
...
...
@@ -41,16 +43,11 @@ class Repository(MongoRepositoryBase):
result
=
super
()
.
get_entries
(
self
.
_transaction_collection
,
projection
=
{
'_id'
:
False
},
selection
=
{
"use_case"
:
use_case
})
return
[
Transaction
.
from_serializable_dict
(
row
)
for
row
in
list
(
result
)]
def
get_transaction_with_id
(
self
,
unique_id
:
str
)
->
Transaction
:
result
=
list
(
super
()
.
get_entries
(
self
.
_transaction_collection
,
projection
=
{
'_id'
:
False
},
selection
=
{
"UniqueID"
:
unique_id
}))
return
None
def
get_transaction_with_id
(
self
,
unique_id
:
str
,
use_case
:
str
,
table
:
str
)
->
Transaction
:
result
=
list
(
super
()
.
get_entries
(
self
.
_transaction_collection
,
projection
=
{
'_id'
:
False
},
selection
=
{
"id"
:
unique_id
,
"use_case"
:
use_case
,
"table"
:
table
}))
if
len
(
result
)
>=
1
:
returnList
=
[]
for
item
in
result
:
returnList
.
append
(
item
)
return
Transaction
.
from_serializable_dict
(
returnList
)
return
Transaction
.
from_serializable_dict
(
result
[
0
])
return
None
...
...
src/transaction-hub-in/trace-retrieval-microservice/app/messaging/MessageHandler.py
View file @
e4fb7c90
...
...
@@ -8,9 +8,11 @@ import json
import
hashlib
import
logging
import
requests
requests
.
packages
.
urllib3
.
disable_warnings
()
from
typing
import
Dict
LOGGER
=
logging
.
getLogger
(
__name__
)
class
MessageHandler
:
...
...
@@ -30,7 +32,7 @@ class MessageHandler:
self
.
_rest_fetcher
=
rest_fetcher
def
handle_generic
(
self
,
body
):
LOGGER
.
info
(
f
"Received message: {body}"
)
result
=
None
message
=
None
...
...
@@ -39,11 +41,13 @@ class MessageHandler:
except
(
ValueError
,
TypeError
):
result
=
self
.
MSG_NOT_JSON
LOGGER
.
warning
(
result
)
LOGGER
.
info
(
f
"Received message: {body}"
)
return
result
if
not
'type'
in
message
:
result
=
self
.
MSG_NO_TYPE
LOGGER
.
warning
(
result
)
LOGGER
.
info
(
f
"Received message: {body}"
)
return
result
if
message
[
'type'
]
==
'blockchain-transaction'
:
...
...
@@ -51,8 +55,11 @@ class MessageHandler:
result
=
self
.
MSG_TRACE_PROCESSED
else
:
result
=
self
.
MSG_NOT_PROCESSED
LOGGER
.
warning
(
result
)
LOGGER
.
info
(
f
"Received message: {body}"
)
LOGGER
.
info
(
result
)
#LOGGER.info(result) #too much spam
return
result
def
_resolve_path
(
self
,
data
:
Dict
,
path
:
str
)
->
Dict
:
...
...
@@ -132,39 +139,39 @@ class MessageHandler:
'''
# check if there is a use-case in the message
if
"ApplicationType"
not
in
transaction_message
.
keys
()
:
if
"ApplicationType"
not
in
transaction_message
:
LOGGER
.
error
(
"Transaction has no ApplicationType, storing it under use-case 'unknown'."
)
transaction_message
[
"ApplicationType"
]
=
"unknown"
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
# check if there is a
doctyp
e in the message
if
"docType"
not
in
transaction_message
.
keys
()
:
LOGGER
.
error
(
"Transaction has no docType, storing it under
docTyp
e 'unknown'."
)
# check if there is a
tabl
e in the message
if
"docType"
not
in
transaction_message
:
LOGGER
.
error
(
"Transaction has no docType, storing it under
tabl
e 'unknown'."
)
transaction_message
[
"docType"
]
=
"unknown"
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
use_case
=
transaction_message
[
"ApplicationType"
]
docTyp
e
=
transaction_message
[
"docType"
]
transaction_
use_case
=
transaction_message
[
"ApplicationType"
]
transaction_tabl
e
=
transaction_message
[
"docType"
]
try
:
tables
=
self
.
_rest_fetcher
.
fetch_schema_information
(
use_case
)
tables
=
self
.
_rest_fetcher
.
fetch_schema_information
(
transaction_
use_case
)
except
ValueError
as
e
:
LOGGER
.
error
(
f
"{e}
\n
Storing it as a failed transaction
."
)
LOGGER
.
error
(
f
"{e}
\n
Could not fetch schema, storing it as a failed transaction.
."
)
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
target_table
=
None
# find correct table
for
table
in
tables
:
if
table
[
"name"
]
==
docTyp
e
:
if
table
[
"name"
]
==
transaction_tabl
e
:
target_table
=
table
break
# abort if table does not exist.
if
target_table
==
None
:
LOGGER
.
error
(
f
"There is no table '{
docTyp
e}', storing it as a failed transaction."
)
LOGGER
.
error
(
f
"There is no table '{
transaction_tabl
e}', storing it as a failed transaction."
)
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
...
...
@@ -172,19 +179,17 @@ class MessageHandler:
try
:
flattened
=
self
.
_flatten_transaction
(
transaction_message
,
mappings
)
except
KeyError
as
e
:
LOGGER
.
error
(
f
"Failed while flattening with KeyError: {str(e)}"
)
LOGGER
.
error
(
f
"Failed while flattening with KeyError: {str(e)}
, storing it as a failed transaction.
"
)
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
transaction
=
Transaction
(
use_case
,
target_table
[
"name"
],
flattened
)
transaction
=
Transaction
(
transaction_
use_case
,
target_table
[
"name"
],
flattened
)
#check for duplicates
try
:
references
=
self
.
_mongo_repo
.
get_transaction_with_id
(
transaction
.
id
())
references
=
self
.
_mongo_repo
.
get_transaction_with_id
(
transaction
.
id
()
,
transaction_use_case
,
transaction_table
)
if
references
!=
None
:
for
item
in
references
:
if
(
item
.
table
==
transaction
.
table
)
and
(
item
.
use_case
==
transaction
.
use_case
):
LOGGER
.
error
(
"Found duplicate"
)
LOGGER
.
info
(
"Found duplicate, storing it as a duplicated transaction."
)
self
.
_mongo_repo
.
add_duplicated_transaction
(
transaction
)
return
except
ValueError
as
e
:
...
...
@@ -201,7 +206,7 @@ class MessageHandler:
msg
=
{
"type"
:
"new-trace"
,
"content"
:
transaction
.
to_serializable_dict
()
,
"content"
:
transaction
.
to_serializable_dict
()
}
msg_string
=
json
.
dumps
(
msg
)
...
...
src/transaction-hub-in/trace-retrieval-microservice/app/tests/test_MessageHandler.py
View file @
e4fb7c90
...
...
@@ -18,14 +18,14 @@ class DummyMongoRepo:
def
add_transaction
(
self
,
transaction
):
self
.
added_transactions
.
append
(
transaction
)
def
get_transaction_with_id
(
self
,
unique_id
:
str
):
def
get_transaction_with_id
(
self
,
unique_id
:
str
,
use_case
,
table
):
result
=
[]
for
trans
in
self
.
added_transactions
:
transID
=
trans
.
id
()
if
transID
==
unique_id
:
if
transID
==
unique_id
and
trans
.
use_case
==
use_case
and
trans
.
table
==
table
:
result
.
append
(
trans
)
if
len
(
result
)
>
0
:
return
result
return
result
[
0
]
return
None
...
...
@@ -198,8 +198,8 @@ class Test_MessageHandler(unittest.TestCase):
self
.
handler
.
handle_blockchain_transaction
(
msg2
[
'content'
])
self
.
assertEqual
(
len
(
self
.
repo
.
added_transactions
),
2
)
def
test_handleBlockchainTransaction_multipleTransactions_3AddedUnique2Duplicate
(
self
):
print
(
"Entered Test: 3Unique 2Dupli"
)
def
test_handleBlockchainTransaction_multipleTransactions
3SameIdDiffUseCaseTable
_3AddedUnique2Duplicate
(
self
):
#
print("Entered Test: 3Unique 2Dupli")
msg
=
self
.
_get_valid_message
()
msg2
=
self
.
_get_valid_message2
()
msg3
=
self
.
_get_valid_message3
()
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment