Skip to content
Projects
Groups
Snippets
Help
Loading...
Help
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
S
SMART
Project
Project
Details
Activity
Releases
Cycle Analytics
Repository
Repository
Files
Commits
Branches
Tags
Contributors
Graph
Compare
Charts
Issues
3
Issues
3
List
Board
Labels
Milestones
Merge Requests
0
Merge Requests
0
CI / CD
CI / CD
Pipelines
Jobs
Schedules
Charts
Registry
Registry
Wiki
Wiki
Snippets
Snippets
Members
Members
Collapse sidebar
Close sidebar
Activity
Graph
Charts
Create a new issue
Jobs
Commits
Issue Boards
Open sidebar
UNI-KLU
SMART
Commits
c67bbdff
Commit
c67bbdff
authored
Apr 07, 2021
by
Lubber
Browse files
Options
Browse Files
Download
Plain Diff
Merge remote-tracking branch 'origin/develop' into develop
parents
ba7a3462
b50f87f5
Changes
7
Show whitespace changes
Inline
Side-by-side
Showing
7 changed files
with
191 additions
and
66 deletions
+191
-66
repository.py
...n/trace-retrieval-microservice/app/database/repository.py
+6
-4
MessageHandler.py
...ce-retrieval-microservice/app/messaging/MessageHandler.py
+28
-23
transactions.py
...n/trace-retrieval-microservice/app/routes/transactions.py
+2
-1
test_MessageHandler.py
...e-retrieval-microservice/app/tests/test_MessageHandler.py
+5
-4
check_mog.py
tools/check-use-case-data/check_mog.py
+116
-0
reupload_failed_mog.py
tools/check-use-case-data/reupload_failed_mog.py
+34
-0
upload_data.py
tools/reddit-upload/upload_data.py
+0
-34
No files found.
src/transaction-hub-in/trace-retrieval-microservice/app/database/repository.py
View file @
c67bbdff
...
...
@@ -31,7 +31,9 @@ class Repository(MongoRepositoryBase):
@throws
KeyError - Duplicate transaction ID
'''
reference
=
self
.
get_transaction_with_id
(
transaction
.
id
())
use_case
=
transaction
.
use_case
table
=
transaction
.
table
reference
=
self
.
get_transaction_with_id
(
transaction
.
id
(),
use_case
,
table
)
if
reference
==
None
:
super
()
.
insert_entry
(
self
.
_transaction_collection
,
transaction
.
to_serializable_dict
())
else
:
...
...
@@ -41,11 +43,11 @@ class Repository(MongoRepositoryBase):
result
=
super
()
.
get_entries
(
self
.
_transaction_collection
,
projection
=
{
'_id'
:
False
},
selection
=
{
"use_case"
:
use_case
})
return
[
Transaction
.
from_serializable_dict
(
row
)
for
row
in
list
(
result
)]
def
get_transaction_with_id
(
self
,
unique_id
:
str
)
->
Transaction
:
result
=
list
(
super
()
.
get_entries
(
self
.
_transaction_collection
,
projection
=
{
'_id'
:
False
},
selection
=
{
"
UniqueID"
:
unique_id
}))
def
get_transaction_with_id
(
self
,
unique_id
:
str
,
use_case
:
str
,
table
:
str
)
->
Transaction
:
result
=
list
(
super
()
.
get_entries
(
self
.
_transaction_collection
,
projection
=
{
'_id'
:
False
},
selection
=
{
"
id"
:
unique_id
,
"use_case"
:
use_case
,
"table"
:
table
}))
if
len
(
result
)
>=
1
:
return
Transaction
.
from_serializable_dict
(
result
)
return
Transaction
.
from_serializable_dict
(
result
[
0
]
)
return
None
...
...
src/transaction-hub-in/trace-retrieval-microservice/app/messaging/MessageHandler.py
View file @
c67bbdff
...
...
@@ -8,9 +8,11 @@ import json
import
hashlib
import
logging
import
requests
requests
.
packages
.
urllib3
.
disable_warnings
()
from
typing
import
Dict
LOGGER
=
logging
.
getLogger
(
__name__
)
class
MessageHandler
:
...
...
@@ -30,7 +32,7 @@ class MessageHandler:
self
.
_rest_fetcher
=
rest_fetcher
def
handle_generic
(
self
,
body
):
LOGGER
.
info
(
f
"Received message: {body}"
)
result
=
None
message
=
None
...
...
@@ -39,11 +41,13 @@ class MessageHandler:
except
(
ValueError
,
TypeError
):
result
=
self
.
MSG_NOT_JSON
LOGGER
.
warning
(
result
)
LOGGER
.
info
(
f
"Received message: {body}"
)
return
result
if
not
'type'
in
message
:
result
=
self
.
MSG_NO_TYPE
LOGGER
.
warning
(
result
)
LOGGER
.
info
(
f
"Received message: {body}"
)
return
result
if
message
[
'type'
]
==
'blockchain-transaction'
:
...
...
@@ -51,8 +55,11 @@ class MessageHandler:
result
=
self
.
MSG_TRACE_PROCESSED
else
:
result
=
self
.
MSG_NOT_PROCESSED
LOGGER
.
warning
(
result
)
LOGGER
.
info
(
f
"Received message: {body}"
)
LOGGER
.
info
(
result
)
#LOGGER.info(result) #too much spam
return
result
def
_resolve_path
(
self
,
data
:
Dict
,
path
:
str
)
->
Dict
:
...
...
@@ -132,39 +139,39 @@ class MessageHandler:
'''
# check if there is a use-case in the message
if
"ApplicationType"
not
in
transaction_message
.
keys
()
:
if
"ApplicationType"
not
in
transaction_message
:
LOGGER
.
error
(
"Transaction has no ApplicationType, storing it under use-case 'unknown'."
)
transaction_message
[
"ApplicationType"
]
=
"unknown"
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
# check if there is a
doctyp
e in the message
if
"docType"
not
in
transaction_message
.
keys
()
:
LOGGER
.
error
(
"Transaction has no docType, storing it under
docTyp
e 'unknown'."
)
# check if there is a
tabl
e in the message
if
"docType"
not
in
transaction_message
:
LOGGER
.
error
(
"Transaction has no docType, storing it under
tabl
e 'unknown'."
)
transaction_message
[
"docType"
]
=
"unknown"
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
use_case
=
transaction_message
[
"ApplicationType"
]
docTyp
e
=
transaction_message
[
"docType"
]
transaction_
use_case
=
transaction_message
[
"ApplicationType"
]
transaction_tabl
e
=
transaction_message
[
"docType"
]
try
:
tables
=
self
.
_rest_fetcher
.
fetch_schema_information
(
use_case
)
tables
=
self
.
_rest_fetcher
.
fetch_schema_information
(
transaction_
use_case
)
except
ValueError
as
e
:
LOGGER
.
error
(
f
"{e}
\n
Storing it as a failed transaction
."
)
LOGGER
.
error
(
f
"{e}
\n
Could not fetch schema, storing it as a failed transaction.
."
)
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
target_table
=
None
# find correct table
for
table
in
tables
:
if
table
[
"name"
]
==
docTyp
e
:
if
table
[
"name"
]
==
transaction_tabl
e
:
target_table
=
table
break
# abort if table does not exist.
if
target_table
==
None
:
LOGGER
.
error
(
f
"There is no table '{
docTyp
e}', storing it as a failed transaction."
)
LOGGER
.
error
(
f
"There is no table '{
transaction_tabl
e}', storing it as a failed transaction."
)
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
...
...
@@ -172,19 +179,17 @@ class MessageHandler:
try
:
flattened
=
self
.
_flatten_transaction
(
transaction_message
,
mappings
)
except
KeyError
as
e
:
LOGGER
.
error
(
f
"Failed while flattening with KeyError: {str(e)}"
)
LOGGER
.
error
(
f
"Failed while flattening with KeyError: {str(e)}
, storing it as a failed transaction.
"
)
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
return
transaction
=
Transaction
(
use_case
,
target_table
[
"name"
],
flattened
)
transaction
=
Transaction
(
transaction_
use_case
,
target_table
[
"name"
],
flattened
)
#check for duplicates
try
:
references
=
self
.
_mongo_repo
.
get_transaction_with_id
(
transaction
.
id
())
references
=
self
.
_mongo_repo
.
get_transaction_with_id
(
transaction
.
id
()
,
transaction_use_case
,
transaction_table
)
if
references
!=
None
:
for
item
in
references
:
if
(
item
.
table
==
transaction
.
table
)
and
(
item
.
use_case
==
transaction
.
use_case
):
LOGGER
.
error
(
"Found duplicate"
)
LOGGER
.
info
(
"Found duplicate, storing it as a duplicated transaction."
)
self
.
_mongo_repo
.
add_duplicated_transaction
(
transaction
)
return
except
ValueError
as
e
:
...
...
@@ -194,14 +199,14 @@ class MessageHandler:
try
:
self
.
_mongo_repo
.
add_transaction
(
transaction
)
except
KeyError
as
e
:
LOGGER
.
error
(
f
"{e}"
)
self
.
_mongo_repo
.
add_failed_transaction
(
transaction_message
)
LOGGER
.
error
(
f
"{e}
, ignored {transaction_message}
"
)
#
self._mongo_repo.add_failed_transaction(transaction_message)
return
msg
=
{
"type"
:
"new-trace"
,
"content"
:
transaction
.
to_serializable_dict
()
,
"content"
:
transaction
.
to_serializable_dict
()
}
msg_string
=
json
.
dumps
(
msg
)
...
...
src/transaction-hub-in/trace-retrieval-microservice/app/routes/transactions.py
View file @
c67bbdff
...
...
@@ -23,7 +23,8 @@ def delete_all_failed_for_use_case(use_case: str):
return
Response
(
status
=
200
)
def
all_duplicated_for_use_case
(
use_case
:
str
):
return
_repository
.
all_duplicated_transactions_for_use_case
(
use_case
)
transactions
=
_repository
.
all_duplicated_transactions_for_use_case
(
use_case
)
return
[
t
.
to_serializable_dict
()
for
t
in
transactions
]
def
delete_all_duplicated_for_use_case
(
use_case
:
str
):
_repository
.
delete_all_duplicated_transactions
(
use_case
)
...
...
src/transaction-hub-in/trace-retrieval-microservice/app/tests/test_MessageHandler.py
View file @
c67bbdff
...
...
@@ -18,14 +18,14 @@ class DummyMongoRepo:
def
add_transaction
(
self
,
transaction
):
self
.
added_transactions
.
append
(
transaction
)
def
get_transaction_with_id
(
self
,
unique_id
:
str
):
def
get_transaction_with_id
(
self
,
unique_id
:
str
,
use_case
,
table
):
result
=
[]
for
trans
in
self
.
added_transactions
:
transID
=
trans
.
id
()
if
transID
==
unique_id
:
if
transID
==
unique_id
and
trans
.
use_case
==
use_case
and
trans
.
table
==
table
:
result
.
append
(
trans
)
if
len
(
result
)
>
0
:
return
result
return
result
[
0
]
return
None
...
...
@@ -198,7 +198,8 @@ class Test_MessageHandler(unittest.TestCase):
self
.
handler
.
handle_blockchain_transaction
(
msg2
[
'content'
])
self
.
assertEqual
(
len
(
self
.
repo
.
added_transactions
),
2
)
def
test_handleBlockchainTransaction_multipleTransactions_3AddedUnique2Duplicate
(
self
):
def
test_handleBlockchainTransaction_multipleTransactions3SameIdDiffUseCaseTable_3AddedUnique2Duplicate
(
self
):
#print("Entered Test: 3Unique 2Dupli")
msg
=
self
.
_get_valid_message
()
msg2
=
self
.
_get_valid_message2
()
msg3
=
self
.
_get_valid_message3
()
...
...
tools/check-use-case-data/check_mog.py
0 → 100644
View file @
c67bbdff
import
requests
requests
.
packages
.
urllib3
.
disable_warnings
()
from
icecream
import
ic
def
httpget
(
url
):
token
=
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6InJlZ3VsYXJAaXRlYy5hYXUuYXQiLCJjcmVhdGVkX2F0IjoiMjAyMS0wMy0yNCAxMDoxMzo1MS4wMjkwNDkiLCJ2YWxpZF91bnRpbCI6IjIwMjEtMDMtMjUgMTA6MTM6NTEuMDI5MDQ5In0.V6kYV5Lmb_tUIsF-6AKNB8_lIifmJP_Dm8gHhGa5w_o'
res
=
requests
.
get
(
url
,
verify
=
False
,
headers
=
{
"Authorization"
:
f
"Bearer {token}"
})
return
res
# list tables
res
=
httpget
(
url
=
'https://articonf1.itec.aau.at:30420/api/use-cases/crowd-journalism/tables'
)
print
(
"Tables: "
,
[
entry
[
'name'
]
for
entry
in
res
.
json
()])
# count pushed data
def
count_data
(
json_res
,
table_identifier
=
'table'
):
tables
=
{}
for
entry
in
json_res
:
key
=
entry
[
table_identifier
]
if
key
not
in
tables
:
tables
[
key
]
=
0
tables
[
key
]
+=
1
ic
(
tables
)
res
=
httpget
(
url
=
'https://articonf1.itec.aau.at:30001/api/use_cases/crowd-journalism/transactions'
)
count_data
(
res
.
json
())
res_f
=
httpget
(
url
=
'https://articonf1.itec.aau.at:30001/api/use_cases/crowd-journalism/transactions-failed'
)
count_data
(
res_f
.
json
(),
'docType'
)
# failed tags: the "tag" is missing, but is called name
# failed purchases: duplicate keys generated from (userid, videoid, ownerid)
# failed classifications: impact is missing
# visualize content
import
matplotlib.pyplot
as
plt
def
visualize_video_coordinates
():
geolocations
=
[]
for
entry
in
res
.
json
():
if
entry
[
'table'
]
!=
'video'
:
continue
loc_
=
entry
[
'properties'
][
'geolocation'
]
.
split
(
','
)
if
loc_
[
0
]
==
'undefined'
or
loc_
[
1
]
==
'undefined'
:
continue
geolocations
.
append
(
loc_
)
plt
.
scatter
([
float
(
coor
[
0
])
for
coor
in
geolocations
],
[
float
(
coor
[
1
])
for
coor
in
geolocations
])
plt
.
axis
(
'off'
)
plt
.
show
()
# visualize_video_coordinates()
def
visualize_video_prices
():
price
=
[]
for
entry
in
res
.
json
():
if
entry
[
'table'
]
!=
'video'
:
continue
price
.
append
(
entry
[
'properties'
][
'price'
])
from
collections
import
Counter
print
(
Counter
(
price
))
plt
.
hist
(
price
,
bins
=
len
(
set
(
price
)))
plt
.
show
()
# visualize_video_prices()
def
visualize_content_ratings
():
impact
=
[]
informative
=
[]
trustiness
=
[]
for
entry
in
res
.
json
():
if
entry
[
'table'
]
!=
'classification'
:
continue
if
entry
[
'properties'
][
'impact'
]
is
not
None
:
impact
.
append
(
entry
[
'properties'
][
'impact'
])
if
entry
[
'properties'
][
'informative'
]
is
not
None
:
informative
.
append
(
entry
[
'properties'
][
'informative'
])
if
entry
[
'properties'
][
'trustiness'
]
is
not
None
:
trustiness
.
append
(
entry
[
'properties'
][
'trustiness'
])
from
collections
import
Counter
print
(
Counter
(
impact
))
print
(
Counter
(
informative
))
print
(
Counter
(
trustiness
))
fig
,
(
ax1
,
ax2
,
ax3
)
=
plt
.
subplots
(
3
)
ax1
.
hist
(
impact
,
bins
=
len
(
set
(
impact
)))
ax1
.
set_title
(
'impact'
)
ax2
.
hist
(
informative
,
bins
=
len
(
set
(
informative
)))
ax2
.
set_title
(
'informative'
)
ax3
.
hist
(
trustiness
,
bins
=
len
(
set
(
trustiness
)))
ax3
.
set_title
(
'trustiness'
)
plt
.
show
()
# visualize_content_ratings()
# counting duplicate entries for 'purchase'
working_purchase_ids
=
[(
entry
[
'properties'
][
'userid'
],
entry
[
'properties'
][
'videoid'
],
entry
[
'properties'
][
'ownerid'
])
for
entry
in
res
.
json
()
if
entry
[
'table'
]
==
'purchase'
]
failed_purchase_ids
=
[(
entry
[
'userid'
],
entry
[
'videoid'
],
entry
[
'ownerid'
])
for
entry
in
res_f
.
json
()
if
entry
[
'docType'
]
==
'purchase'
]
ic
(
len
(
working_purchase_ids
))
ic
(
len
(
failed_purchase_ids
))
cnt
=
0
for
failed_id
in
failed_purchase_ids
:
if
failed_id
in
working_purchase_ids
:
cnt
+=
1
ic
(
cnt
)
\ No newline at end of file
tools/check-use-case-data/reupload_failed_mog.py
0 → 100644
View file @
c67bbdff
import
requests
requests
.
packages
.
urllib3
.
disable_warnings
()
from
icecream
import
ic
token
=
'eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJ1c2VybmFtZSI6InJlZ3VsYXJAaXRlYy5hYXUuYXQiLCJjcmVhdGVkX2F0IjoiMjAyMS0wMy0yNCAxMDoxMzo1MS4wMjkwNDkiLCJ2YWxpZF91bnRpbCI6IjIwMjEtMDMtMjUgMTA6MTM6NTEuMDI5MDQ5In0.V6kYV5Lmb_tUIsF-6AKNB8_lIifmJP_Dm8gHhGa5w_o'
def
httpget
(
url
):
res
=
requests
.
get
(
url
,
verify
=
False
,
headers
=
{
"Authorization"
:
f
"Bearer {token}"
})
return
res
res_f
=
httpget
(
url
=
'https://articonf1.itec.aau.at:30001/api/use_cases/crowd-journalism/transactions-failed'
)
failed_purchases
=
[]
for
entry
in
res_f
.
json
():
if
entry
[
'docType'
]
==
'purchase'
:
failed_purchases
.
append
(
entry
)
print
(
len
(
failed_purchases
))
# upload again
def
httppost_gateway
(
content_
):
url
=
'https://articonf1.itec.aau.at:30401/api/trace'
res
=
requests
.
post
(
url
,
verify
=
False
,
headers
=
{
"Authorization"
:
f
"Bearer {token}"
},
json
=
content_
)
return
res
for
purchase
in
failed_purchases
:
res
=
httppost_gateway
(
purchase
)
print
(
res
)
\ No newline at end of file
tools/reddit-upload/upload_data.py
View file @
c67bbdff
...
...
@@ -50,40 +50,6 @@ if __name__ == '__main__':
for
key
,
value
in
obj_dict
.
items
():
transaction
[
key
]
=
value
#####################TEEEEEEEEEEST###########
transaction
=
{
#"type": "blockchain-transaction",
"ApplicationType"
:
"debug"
,
"docType"
:
"pizza"
,
"id"
:
1
,
"name"
:
"MEXICAANA"
,
"dough"
:
{
"type"
:
"wheat"
,
"spinach"
:
False
},
"toppings"
:
[
{
"name"
:
"Tomato Sauce"
,
"price"
:
1.00
},
{
"name"
:
"Cheese"
,
"price"
:
0.50
},
{
"name"
:
"Chilli Oil"
,
"price"
:
0.50
},
{
"name"
:
"Peppers"
,
"price"
:
1.50
}
]
}
###################FIN TEEEEST ###############
send_transaction_to_rest_gateway
(
transaction
)
summ
+=
1
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment