Commit 07074316 authored by Alexander Lercher's avatar Alexander Lercher

Rest Gateway: removed message list and changed messaging

Fixed trace forwarding to Trace Retrieval; disabled message receiver
parent 1ae81b41
...@@ -15,7 +15,7 @@ basePath: "/api" ...@@ -15,7 +15,7 @@ basePath: "/api"
paths: paths:
/debug: /debug:
post: post:
operationId: "debug.echo" operationId: "rest.debug.echo"
tags: tags:
- "Echo" - "Echo"
summary: "Echo function for debugging purposes" summary: "Echo function for debugging purposes"
...@@ -30,17 +30,6 @@ paths: ...@@ -30,17 +30,6 @@ paths:
200: 200:
description: "Successful echo of request data" description: "Successful echo of request data"
/messages:
get:
operationId: "debug.get_messages"
tags:
- "Received Messages"
summary: "List of all received messages"
description: "Lists all messages received from Rabbit MQ."
responses:
200:
description: "Message list"
/trace: /trace:
post: post:
operationId: "rest.blockchain_trace.receive" operationId: "rest.blockchain_trace.receive"
......
...@@ -8,28 +8,33 @@ if os.path.exists(modules_path): ...@@ -8,28 +8,33 @@ if os.path.exists(modules_path):
# init logging to file # init logging to file
import logging import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s') LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig( logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
filename='error.log',
level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
#################################
import connexion import connexion
from multiprocessing import Process
from messaging.MessageReceiver import MessageReceiver from messaging.MessageReceiver import MessageReceiver
from messaging.MessageSender import MessageSender from messaging.MessageSender import MessageSender
from MessageList import MessageList from MessageList import MessageList
messages = MessageList.getInstance() message_rec: MessageReceiver = None
# init message handler # init message handler
def message_received_callback(channel, method, properties, body): def message_received_callback(channel, method, properties, body):
messages.appendMessage(body)
print(f"### Received: {body}") print(f"### Received: {body}")
# TODO
def pika_error_callback(error): def pika_error_callback(error):
# TODO gracefully handle error LOGGER.warning(f"RabbitMQ stopped with error: {error}")
print("Rabbit MQ error!") # restart receiver
os._exit(1) global message_rec
message_rec.stop()
init_message_receiver()
def init_message_receiver():
global message_rec
message_rec = MessageReceiver(exchange_name='rest-gateway', exchange_type='direct', queue_name='rest-gateway', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
...@@ -41,7 +46,4 @@ def api_root(): ...@@ -41,7 +46,4 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
message_rec = MessageReceiver(exchange_name='rest-gateway', exchange_type='direct', queue_name='rest-gateway', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
\ No newline at end of file
from flask import request, Response from flask import request, Response
from messaging.MessageSender import MessageSender from messaging.MessageSender import MessageSender
import json import json
# import main
message_sender = MessageSender() message_sender = MessageSender()
message_sender.connect() message_sender.connect()
...@@ -9,8 +8,9 @@ message_sender.create_exchange('inhub', 'direct') ...@@ -9,8 +8,9 @@ message_sender.create_exchange('inhub', 'direct')
def receive(): def receive():
body = request.json body = request.json
if isBlockchainTraceValid(body): if isBlockchainTraceValid(body):
message = {'type': 'blockchain-transaction', 'content': json.dumps(body)} message = {'type': 'blockchain-transaction', 'content': body}
message_sender.send('inhub', json.dumps(message), 'trace-retrieval') message_sender.send('inhub', json.dumps(message), 'trace-retrieval')
return Response(status=201) return Response(status=201)
...@@ -29,5 +29,3 @@ def isBlockchainTraceValid(trace) -> bool: ...@@ -29,5 +29,3 @@ def isBlockchainTraceValid(trace) -> bool:
and 'ResourceMd5' in trace \ and 'ResourceMd5' in trace \
and 'ResourceState' in trace \ and 'ResourceState' in trace \
and 'Metadata' in trace and 'Metadata' in trace
...@@ -17,15 +17,17 @@ class MessageHandler: ...@@ -17,15 +17,17 @@ class MessageHandler:
try: try:
message = json.loads(body) message = json.loads(body)
except ValueError: except ValueError:
LOGGER.warning(f"Message is not in JSON format and is ignored") LOGGER.warning("Message is not in JSON format and is ignored")
return return
if not 'type' in message: if not 'type' in message:
LOGGER.warning(f"Message has no type field and is ignored") LOGGER.warning("Message has no type field and is ignored")
return return
if message['type'] == 'blockchain-transaction': if message['type'] == 'blockchain-transaction':
self.handle_blockchain_transaction(message['content']) self.handle_blockchain_transaction(message['content'])
else:
LOGGER.info("Message Type could not be processed")
def handle_blockchain_transaction(self, transaction): def handle_blockchain_transaction(self, transaction):
self._mongo_repo.insert_trace(transaction) self._mongo_repo.insert_trace(transaction)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment