Commit 71e8e785 authored by Alexander Lercher's avatar Alexander Lercher

Trace Retrieval / Rest Gateway: message are sent async

Fixed workaround from commits d6ec9dd5 and ee789e29.
Messages are now sent asynchronously and connection is reused with multiple channels
parent bf2c831c
import pika
from threading import Thread
import network_constants as netconst
import logging
LOGGER = logging.getLogger(__name__)
class MessageManager:
'''The MessageManager is used for sending and receiving messages'''
_rabbit_mq_ip = None
_rabbit_mq_port = None
_prepare_receive_parameters = None
_connection: pika.SelectConnection = None
_send_channel: pika.channel.Channel = None
_receive_channel: pika.channel.Channel = None
def __init__(self, rabbit_mq_ip=netconst.RABBIT_MQ_HOSTNAME, rabbit_mq_port=netconst.RABBIT_MQ_PORT):
self._rabbit_mq_ip = rabbit_mq_ip
self._rabbit_mq_port = rabbit_mq_port
def connect(self, error_callback=None):
'''Creates a connection with two channels to RabbitMQ'''
self._error_callback = error_callback
self._connection = self._connect_async()
Thread(target=self._connection.ioloop.start).start()
def _connect_async(self) -> pika.SelectConnection:
connection = pika.SelectConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port),
on_open_callback=self._connection_opened_callback, on_open_error_callback=self._connection_opened_error_callback, on_close_callback=self._connection_closed_callback)
return connection
def _connection_opened_callback(self, connection: pika.SelectConnection):
# Create channels
connection.channel(on_open_callback=self._channel_created_callback)
connection.channel(on_open_callback=self._channel_created_callback)
def _connection_opened_error_callback(self, connection, error):
LOGGER.error(f"RabbitMQ connection could not be established: {str(error)}")
if self._error_callback != None:
self._error_callback("Connection could not be established")
def _connection_closed_callback(self, connection, error):
LOGGER.warning(f"RabbitMQ connection closed: {str(error)}")
if self._error_callback != None:
self._error_callback("Connection closed")
def _channel_created_callback(self, channel: pika.channel.Channel):
# Assign both channels
if self._send_channel == None:
self._send_channel = channel
else:
self._receive_channel = channel
LOGGER.info("RabbitMQ connection established")
def create_exchange_with_queue(self, exchange_name, exchange_type, queue_name):
'''Creates exchange and queue and binds them'''
self._prepare_receive_parameters = {'exchange_name': exchange_name,
'exchange_type': exchange_type,
'queue_name': queue_name}
self._receive_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, callback=self._exchange_created_callback)
def _exchange_created_callback(self, answer_message):
# Create queue
queue_name = self._prepare_receive_parameters['queue_name']
exclusive_access = (queue_name == '')
self._receive_channel.queue_declare(queue=queue_name, exclusive=exclusive_access, callback=self._queue_created_callback, auto_delete=exclusive_access)
def _queue_created_callback(self, answer_message: pika.frame.Method):
queue_name = answer_message.method.queue
exchange_name = self._prepare_receive_parameters['exchange_name']
# Bind queue to exchange
self._receive_channel.queue_bind(exchange=exchange_name, queue=queue_name)
LOGGER.info(f"RabbitMQ connection to exchange '{exchange_name}' established")
def start_consuming(self, queue_name, auto_ack, message_received_callback):
'''Starts listening for messages'''
self._receive_channel.basic_consume(queue=queue_name, auto_ack=auto_ack, on_message_callback=message_received_callback)
def create_exchange(self, exchange_name, exchange_type):
'''Creates the exchange'''
self._send_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange'''
if self._send_channel == None:
LOGGER.error("Tried to send before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established")
self._send_channel.basic_publish(exchange_name, routing_key, message)
def disconnect(self):
'''Stops listening for messages and closes the connection'''
try:
self._connection.ioloop.stop()
self._connection.close()
LOGGER.info("RabbitMQ connection closed")
except pika.exceptions.ConnectionWrongStateError:
LOGGER.warning("RabbitMQ connection already closed")
import pika
import functools
from threading import Thread
from deprecated import deprecated
import network_constants as netconst
import logging
LOGGER = logging.getLogger(__name__)
@deprecated(reason='Can only receive messages with full connection. Use ReconnectingMessageManager for connection reuse instead.')
class MessageReceiver:
_exchange_name = None
......
import pika
import functools
from threading import Thread
from deprecated import deprecated
import network_constants as netconst
import logging
LOGGER = logging.getLogger(__name__)
@deprecated(reason='BlockingConnection has connection timeout. Use ReconnectingMessageManager instead.')
class MessageSender:
_rabbit_mq_ip = None
......
from __future__ import annotations
import time
from messaging.MessageManager import MessageManager
import logging
LOGGER = logging.getLogger(__name__)
class ReconnectingMessageManager:
'''The ReconnectingMessageManager Singleton handles connection errors by itself'''
__instance = None
_message_manager: MessageManager = None
_consuming = False
_consumption_parameters = None
@staticmethod
def getInstance() -> ReconnectingMessageManager:
''' Static access method. '''
if ReconnectingMessageManager.__instance == None:
ReconnectingMessageManager.__instance = ReconnectingMessageManager()
return ReconnectingMessageManager.__instance
def __init__(self):
if ReconnectingMessageManager.__instance != None:
raise Exception("This class is a singleton!")
ReconnectingMessageManager.__instance = self
self._init_message_manager()
def _init_message_manager(self):
self._message_manager = MessageManager()
self._message_manager.connect(self._pika_error_callback)
time.sleep(1)
def _pika_error_callback(self, error):
# restart receiver
self._message_manager.disconnect()
self._init_message_manager()
if self._consuming:
self._restart_consuming()
def _restart_consuming(self):
self.start_consuming(self._consumption_parameters['exchange_name'],
self._consumption_parameters['exchange_type'],
self._consumption_parameters['queue_name'],
self._consumption_parameters['auto_ack'],
self._consumption_parameters['message_received_callback'])
def start_consuming(self, exchange_name, exchange_type, queue_name, auto_ack, message_received_callback):
'''Creates exchange and queue and starts to listen to new messages'''
self._consumption_parameters = {'exchange_name': exchange_name, 'exchange_type': exchange_type,
'queue_name': queue_name, 'auto_ack': auto_ack,
'message_received_callback': message_received_callback}
self._consuming = True
self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name)
self._message_manager.start_consuming(queue_name, auto_ack, message_received_callback)
def create_message_destination(self, exchange_name, exchange_type):
'''Creates the exchange'''
self._message_manager.create_exchange(exchange_name, exchange_type)
def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange'''
self._message_manager.send_message(exchange_name, routing_key, message)
......@@ -8,6 +8,7 @@ RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
RUN pip install pika
RUN pip install deprecated
EXPOSE 5000
......
......@@ -13,28 +13,6 @@ LOGGER = logging.getLogger(__name__)
#################################
import connexion
from messaging.MessageReceiver import MessageReceiver
from messaging.MessageSender import MessageSender
from MessageList import MessageList
message_rec: MessageReceiver = None
# init message handler
def message_received_callback(channel, method, properties, body):
print(f"### Received: {body}")
# TODO
def pika_error_callback(error):
LOGGER.warning(f"RabbitMQ stopped with error: {error}")
# restart receiver
global message_rec
message_rec.stop()
init_message_receiver()
def init_message_receiver():
global message_rec
message_rec = MessageReceiver(exchange_name='rest-gateway', exchange_type='direct', queue_name='rest-gateway', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
# load swagger config
app = connexion.App(__name__, specification_dir='configs/')
......
from flask import request, Response
from messaging.MessageSender import MessageSender
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
import json
message_sender = ReconnectingMessageManager.getInstance()
message_sender.create_message_destination('inhub', 'direct')
def receive():
body = request.json
if isBlockchainTraceValid(body):
# TODO remove workaround
message_sender = MessageSender()
message_sender.connect()
message_sender.create_exchange('inhub', 'direct')
message = {'type': 'blockchain-transaction', 'content': body}
message_sender.send('inhub', json.dumps(message), 'trace-retrieval')
message_sender.disconnect()
message_sender.send_message('inhub', 'trace-retrieval', json.dumps(message))
return Response(status=201)
return Response(status=400)
......
......@@ -9,6 +9,7 @@ RUN pip install flask
RUN pip install connexion[swagger-ui]
RUN pip install pika
RUN pip install pymongo
RUN pip install deprecated
EXPOSE 5000
......
......@@ -14,27 +14,13 @@ LOGGER = logging.getLogger(__name__)
#############################
import connexion
from messaging.MessageHandler import MessageHandler
from messaging.MessageReceiver import MessageReceiver
message_rec: MessageReceiver = None
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
# init message handler
message_handler = MessageHandler()
def message_received_callback(channel, method, properties, body):
message_handler.handle_generic(body)
def pika_error_callback(error):
LOGGER.warning(f"RabbitMQ stopped with error: {error}")
# restart receiver
global message_rec
message_rec.stop()
init_message_receiver()
def init_message_receiver():
global message_rec
message_rec = MessageReceiver(exchange_name='inhub', exchange_type='direct', queue_name='trace-retrieval', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
# load swagger config
app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
......@@ -45,6 +31,7 @@ def api_root():
# start app
if __name__ == '__main__':
init_message_receiver()
message_manager = ReconnectingMessageManager.getInstance()
message_manager.start_consuming('inhub', 'direct', 'trace-retrieval', True, message_received_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
from db.MongoRepository import MongoRepository
from messaging.MessageSender import MessageSender
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
import json
import logging
......@@ -11,12 +11,11 @@ class MessageHandler:
def __init__(self):
self._mongo_repo = MongoRepository()
# self._init_message_sender()
self._init_message_sender()
def _init_message_sender(self):
self._message_sender = MessageSender()
self._message_sender.connect()
self._message_sender.create_exchange('datahub', 'direct')
self._message_sender = ReconnectingMessageManager.getInstance()
self._message_sender.create_message_destination('datahub', 'direct')
def handle_generic(self, body):
LOGGER.info(f"Received message: {body}")
......@@ -40,13 +39,6 @@ class MessageHandler:
def handle_blockchain_transaction(self, transaction):
self._mongo_repo.insert_trace(transaction)
# TODO remove workaround
self._message_sender = MessageSender()
self._message_sender.connect()
self._message_sender.create_exchange('datahub', 'direct')
# inform semantic linking microservice
msg = {'type': 'new-traces-available'}
self._message_sender.send('datahub', json.dumps(msg), 'semantic-linking')
self._message_sender.disconnect()
self._message_sender.send_message('datahub', 'semantic-linking', json.dumps(msg))
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment