Commit b734c8c5 authored by Manuel's avatar Manuel

traceRetrieval: treating failed transaction

when a transaction failes it gets stored in its own collection
updated requirements.txt
parent 27db2926
...@@ -23,6 +23,21 @@ paths: ...@@ -23,6 +23,21 @@ paths:
responses: responses:
'200': '200':
description: "Successful Request" description: "Successful Request"
/transactions-failed/use_case/{use_case}:
get:
operationId: "routes.transactions.all_failed_for_use_case"
tags:
- "Transactions"
summary: "Retrieves all Transactions in the given Use-Case"
description: "Retrieves all Transactions in the given Use-Case"
parameters:
- in: path
name: "use_case"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
/transactions/use_case/{use_case}: /transactions/use_case/{use_case}:
get: get:
operationId: "routes.transactions.all_for_use_case" operationId: "routes.transactions.all_for_use_case"
......
swagger: "2.0"
info:
title: Trace Retrieval microservice
description: This is the documentation for the trace retrieval microservice.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
# Paths supported by the server application
paths:
/transactions:
delete:
operationId: "routes.transactions.delete_all_transactions"
tags:
- "Transactions"
summary: "Delete all Transactions in the DB"
description: "Delete all Transactions in the DB"
responses:
'200':
description: "Successful Request"
/transactions-failed/use_case/{use_case}:
delete:
operationId: "routes.transactions.delete_all_failed_for_use_case"
tags:
- "Transactions"
summary: "Deletes all failed Transactions in the given Use-Case"
description: "Deletes all failed Transactions in the given Use-Case"
parameters:
- in: path
name: "use_case"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
get:
operationId: "routes.transactions.all_failed_for_use_case"
tags:
- "Transactions"
summary: "Retrieves all failed Transactions in the given Use-Case"
description: "Retrieves all failed Transactions in the given Use-Case"
parameters:
- in: path
name: "use_case"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
/transactions/use_case/{use_case}:
get:
operationId: "routes.transactions.all_for_use_case"
tags:
- "Transactions"
summary: "Retrieves all Transactions in the given Use-Case"
description: "Retrieves all Transactions in the given Use-Case"
parameters:
- in: path
name: "use_case"
required: true
type: "string"
responses:
'200':
description: "Successful Request"
/debug:
post:
operationId: "routes.debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
'200':
description: "Successful echo of request data"
/trace:
post:
operationId: "routes.blockchain_trace.post"
tags:
- "Blockchain Trace"
summary: "Add a new blockchain trace to SMART"
description: "Receives a new blockchain trace to store in SMART."
parameters:
- in: body
name: "BlockchainTrace"
description: "The trace to be added"
required: true
schema:
$ref: "#/definitions/BlockchainTrace"
responses:
'201':
description: "Successful operation"
'400':
description: "Invalid input"
get:
operationId: "routes.blockchain_trace.get"
tags:
- "Blockchain Trace"
summary: "Get blockchain traces"
description: "Returns all blockchain traces in the database"
parameters: []
responses:
'200':
description: "Successful operation"
schema:
$ref: "#/definitions/BlockchainTrace"
definitions:
BlockchainTrace:
type: "object"
properties:
TransactionId:
type: string
format: uuid
Timestamp:
type: "string"
format: "date-time"
ApplicationType:
type: "string"
TransactionFrom:
type: "string"
format: "uuid"
TransactionTo:
type: "string"
format: "uuid"
TransferredAsset:
type: "string"
ResourceIds:
type: "string"
ResourceMd5:
type: "string"
ResourceState:
type: "string"
Metadata:
type: "string"
\ No newline at end of file
...@@ -5,6 +5,7 @@ from database.entities.transaction import Transaction ...@@ -5,6 +5,7 @@ from database.entities.transaction import Transaction
import pymongo import pymongo
import json import json
import time
from typing import List, Dict from typing import List, Dict
class Repository(MongoRepositoryBase): class Repository(MongoRepositoryBase):
...@@ -16,6 +17,7 @@ class Repository(MongoRepositoryBase): ...@@ -16,6 +17,7 @@ class Repository(MongoRepositoryBase):
'rest-gateway-db') 'rest-gateway-db')
self._transaction_collection = 'transactions' self._transaction_collection = 'transactions'
self._failed_transaction_collection = 'transactions_failed'
def delete_all_transactions(self): def delete_all_transactions(self):
collection = self._database[self._transaction_collection] collection = self._database[self._transaction_collection]
...@@ -36,4 +38,21 @@ class Repository(MongoRepositoryBase): ...@@ -36,4 +38,21 @@ class Repository(MongoRepositoryBase):
if len(result) == 1: if len(result) == 1:
return Transaction.from_serializable_dict(result[0]) return Transaction.from_serializable_dict(result[0])
return None return None
\ No newline at end of file
def add_failed_transaction(self, transaction: Dict):
transaction["timestamp"] = time.time()
super().insert_entry(self._failed_transaction_collection, transaction)
def all_failed_transactions_for_use_case(self, use_case: str) -> List[Dict]:
result = super().get_entries(
self._failed_transaction_collection,
projection={'_id': False},
selection={"ApplicationType": use_case}
).sort("timestamp", 1)
return list(result)
def delete_all_failed_transactions(self, use_case:str):
collection = self._database[self._failed_transaction_collection]
collection.delete_many({"ApplicationType": use_case})
\ No newline at end of file
# add modules folder to interpreter path # add modules folder to interpreter path
import sys import sys
import os import os
import prance
from pathlib import Path
modules_path = '../../../modules/' modules_path = '../../../modules/'
if os.path.exists(modules_path): if os.path.exists(modules_path):
sys.path.insert(1, modules_path) sys.path.insert(1, modules_path)
...@@ -13,6 +16,8 @@ LOGGER = logging.getLogger(__name__) ...@@ -13,6 +16,8 @@ LOGGER = logging.getLogger(__name__)
############################# #############################
import connexion import connexion
from security import swagger_util
from env_info import is_running_locally
from database.MongoRepository import MongoRepository from database.MongoRepository import MongoRepository
from messaging.MessageHandler import MessageHandler from messaging.MessageHandler import MessageHandler
...@@ -25,15 +30,31 @@ def message_received_callback(channel, method, properties, body): ...@@ -25,15 +30,31 @@ def message_received_callback(channel, method, properties, body):
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
@app.route('/', methods=['GET']) @app.route('/', methods=['GET'])
def api_root(): def api_root():
return 'Endpoint of trace-retrieval-microservice!' return 'Endpoint of trace-retrieval-microservice!'
# SSL configuration
try:
certificate_path = os.environ['ARTICONF_CERTIFICATE_PATH']
except KeyError:
certificate_path = '/srv/articonf/'
context = (os.path.normpath(f'{certificate_path}/articonf1.crt'), os.path.normpath(f'{certificate_path}/articonf1.key')) # certificate and key files
if is_running_locally():
# Local Mode
print("Running with local settings...")
app.add_api(swagger_util.get_bundled_specs(Path("configs/swagger_local.yml")),
resolver = connexion.RestyResolver("cms_rest_api"))
context = 'adhoc'
else:
app.add_api(swagger_util.get_bundled_specs(Path("configs/swagger.yml")),
resolver = connexion.RestyResolver("cms_rest_api"))
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
message_manager = ReconnectingMessageManager.getInstance() message_manager = ReconnectingMessageManager.getInstance()
message_manager.start_consuming('inhub', 'direct', 'trace-retrieval', True, message_received_callback) message_manager.start_consuming('inhub', 'direct', 'trace-retrieval', True, message_received_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False, ssl_context=context) # disable reloader so only subscribed once to rabbitmq
...@@ -83,30 +83,12 @@ class MessageHandler: ...@@ -83,30 +83,12 @@ class MessageHandler:
value = value[piece] value = value[piece]
return value return value
def handle_blockchain_transaction(self, transaction): def _flatten_transaction(self, transaction: Dict, mappings: Dict):
jwt_token = TokenManager.getInstance().getToken() '''
takes the (possibly nested) dictionary and resolves the given paths
transaction_data = json.loads(transaction) returns a map which is no longer nested with the keys of the
mappings parameter
use_case = transaction_data["content"]["ApplicationType"] '''
# query schema information
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/schema'
print(f"CALLING: {url}")
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt_token}"}
)
print(f"RESPONSE: {response.text}")
data = json.loads(response.text)
mappings = data["mappings"]
flattened = {} flattened = {}
# iterate over schema mappings and resolve paths # iterate over schema mappings and resolve paths
...@@ -118,7 +100,7 @@ class MessageHandler: ...@@ -118,7 +100,7 @@ class MessageHandler:
for path in concat_paths: for path in concat_paths:
values.append( values.append(
self._resolve_path(transaction_data["content"], path) self._resolve_path(transaction, path)
) )
if len(values) > 1: if len(values) > 1:
...@@ -129,9 +111,44 @@ class MessageHandler: ...@@ -129,9 +111,44 @@ class MessageHandler:
flattened[mapping] = final_value flattened[mapping] = final_value
flattened["UniqueID"] = hashlib.sha256(flattened["UniqueID"].encode("utf-8")).hexdigest() flattened["UniqueID"] = hashlib.sha256(flattened["UniqueID"].encode("utf-8")).hexdigest()
return flattened
def handle_blockchain_transaction(self, transaction: Dict):
jwt_token = TokenManager.getInstance().getToken()
transaction_data = transaction
use_case = transaction_data["ApplicationType"]
# query schema information
url = f'https://{network_constants.BUSINESS_LOGIC_HOSTNAME}:{network_constants.BUSINESS_LOGIC_REST_PORT}/api/use-cases/{use_case}/schema'
print(f"CALLING: {url}")
response = requests.get(
url,
verify = False,
proxies = { "http":None, "https":None },
headers = { "Authorization": f"Bearer {jwt_token}"}
)
print(f"RESPONSE: {response.text}")
if response.status_code != 200:
MessageHandler._repository.add_failed_transaction(transaction)
return
data = json.loads(response.text)
mappings = data["mappings"]
flattened = self._flatten_transaction(transaction_data, mappings)
transaction = Transaction(use_case, flattened) transaction = Transaction(use_case, flattened)
MessageHandler._repository.add_transaction(transaction) MessageHandler._repository.add_transaction(transaction)
msg = json.dumps(transaction.to_serializable_dict())
print(msg)
# inform semantic linking microservice # inform semantic linking microservice
self._message_sender.send_message('datahub', 'semantic-linking', json.dumps(transaction.to_serializable_dict())) # self._message_sender.send_message('datahub', 'semantic-linking', msg)
\ No newline at end of file \ No newline at end of file
flask astroid==2.4.2
connexion[swagger-ui] attrs==20.1.0
pika certifi==2020.6.20
pymongo cffi==1.14.2
deprecated chardet==3.0.4
click==7.1.2
clickclick==1.2.2
colorama==0.4.3
connexion==2.7.0
cryptography==3.1
Deprecated==1.2.10
Flask==1.1.2
idna==2.10
importlib-metadata==1.7.0
inflection==0.5.1
isort==5.4.2
itsdangerous==1.1.0
Jinja2==2.11.2
jsonschema==3.2.0
lazy-object-proxy==1.4.3
MarkupSafe==1.1.1
mccabe==0.6.1
openapi-spec-validator==0.2.9
pika==1.1.0
prance==0.19.0
pycparser==2.20
pylint==2.6.0
pymongo==3.11.0
pyrsistent==0.16.0
PyYAML==5.3.1
requests==2.24.0
semver==2.10.2
six==1.15.0
swagger-ui-bundle==0.0.8
toml==0.10.1
typed-ast==1.4.1
urllib3==1.25.10
Werkzeug==1.0.1
wrapt==1.12.1
zipp==3.1.0
...@@ -13,4 +13,11 @@ def all_for_use_case(use_case: str): ...@@ -13,4 +13,11 @@ def all_for_use_case(use_case: str):
def delete_all_transactions(): def delete_all_transactions():
_repository.delete_all_transactions() _repository.delete_all_transactions()
return Response(status=200)
def all_failed_for_use_case(use_case: str):
return _repository.all_failed_transactions_for_use_case(use_case)
def delete_all_failed_for_use_case(use_case: str):
_repository.delete_all_failed_transactions(use_case)
return Response(status=200) return Response(status=200)
\ No newline at end of file
...@@ -76,7 +76,7 @@ class Test_MessageHandler(unittest.TestCase): ...@@ -76,7 +76,7 @@ class Test_MessageHandler(unittest.TestCase):
print("STARTING THE TEST...") print("STARTING THE TEST...")
msg = self._get_pizza_message() msg = self._get_pizza_message()
_ = self.handler.handle_blockchain_transaction(msg) _ = self.handler.handle_blockchain_transaction(json.loads(msg)["content"])
if __name__ == '__main__': if __name__ == '__main__':
unittest.main() unittest.main()
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment