Commit 9c22b103 authored by Alexander Lercher's avatar Alexander Lercher

Created async RabbitMQ message handler

parent 28354233
import connexion import connexion
from messages.MessageHandler import MessageHandler
# init message handler
def message_received_callback(channel, method, properties, body):
print(f"### NICE: {body}")
channel.basic_ack(delivery_tag=method.delivery_tag)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
...@@ -10,4 +16,7 @@ def api_root(): ...@@ -10,4 +16,7 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True) message_handler = MessageHandler()
message_handler.start(message_received_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
import pika
import functools
from threading import Thread
import logging
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) '
'-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
class MessageHandler:
_exchange_name = 'beacon'
_rabbit_mq_ip = '143.205.173.36'
_rabbit_mq_port = 30302
def __init__(self, exchange_name=None, rabbit_mq_ip=None, rabbit_mq_port=None):
if exchange_name != None:
self._exchange_name = exchange_name
if rabbit_mq_ip != None:
self._rabbit_mq_ip = rabbit_mq_ip
if rabbit_mq_port != None:
self._rabbit_mq_port = rabbit_mq_port
def _pika_connect(self):
connection = pika.SelectConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port),
on_open_callback=self._connection_opened_callback, on_open_error_callback=self._connection_opened_error)
return connection
def _connection_opened_callback(self, connection: pika.SelectConnection):
# Create channel
connection.channel(on_open_callback=self._channel_created_callback)
def _connection_opened_error(self, connection, err):
print('## Error: ' + str(err))
def _channel_created_callback(self, channel: pika.channel.Channel):
# Create exchange
cb = functools.partial(self._exchange_created_callback, userdata=channel)
channel.exchange_declare(exchange=self._exchange_name, exchange_type='fanout', callback=cb)
def _exchange_created_callback(self, answer_message, userdata):
# Create queue
channel = userdata
cb = functools.partial(self._queue_created_callback, userdata=channel)
channel.queue_declare(queue='', exclusive=True, callback=cb)
def _queue_created_callback(self, answer_message: pika.frame.Method, userdata):
queue_name = answer_message.method.queue
channel = userdata
# Bind queue to exchange
channel.queue_bind(exchange=self._exchange_name, queue=queue_name, routing_key='')
LOGGER.info(f"Rabbit MQ connection to exchange '{self._exchange_name}' established")
# Consume messages
channel.basic_consume(queue=queue_name, auto_ack=False, on_message_callback=_message_received_callback)
def start(self, message_received_callback):
'''Starts the message handling'''
global _message_received_callback
_message_received_callback = message_received_callback
global _connection
_connection = self._pika_connect()
Thread(target=_connection.ioloop.start).start()
def stop(self):
_connection.ioloop.stop()
_connection.close()
LOGGER.info(f"Rabbit MQ connection closed")
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment