Commit 7b6acafd authored by Alexander Lercher's avatar Alexander Lercher

MessageManager initialization is async

parent 939c61bc
...@@ -55,11 +55,12 @@ class MessageManager: ...@@ -55,11 +55,12 @@ class MessageManager:
self._receive_channel = channel self._receive_channel = channel
LOGGER.info("RabbitMQ connection established") LOGGER.info("RabbitMQ connection established")
def create_exchange_with_queue(self, exchange_name, exchange_type, queue_name): def create_exchange_with_queue(self, exchange_name, exchange_type, queue_name, callback):
'''Creates exchange and queue and binds them''' '''Creates exchange and queue and binds them'''
self._prepare_receive_parameters = {'exchange_name': exchange_name, self._prepare_receive_parameters = {'exchange_name': exchange_name,
'exchange_type': exchange_type, 'exchange_type': exchange_type,
'queue_name': queue_name} 'queue_name': queue_name,
'callback': callback}
self._receive_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, callback=self._exchange_created_callback) self._receive_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, callback=self._exchange_created_callback)
...@@ -74,8 +75,11 @@ class MessageManager: ...@@ -74,8 +75,11 @@ class MessageManager:
exchange_name = self._prepare_receive_parameters['exchange_name'] exchange_name = self._prepare_receive_parameters['exchange_name']
# Bind queue to exchange # Bind queue to exchange
self._receive_channel.queue_bind(exchange=exchange_name, queue=queue_name) self._receive_channel.queue_bind(exchange=exchange_name, queue=queue_name, callback=self._queue_bound_callback)
LOGGER.info(f"RabbitMQ connection to exchange '{exchange_name}' established")
def _queue_bound_callback(self, _):
if self._prepare_receive_parameters['callback'] != None:
self._prepare_receive_parameters['callback']()
def start_consuming(self, queue_name, auto_ack, message_received_callback): def start_consuming(self, queue_name, auto_ack, message_received_callback):
'''Starts listening for messages''' '''Starts listening for messages'''
......
...@@ -53,8 +53,12 @@ class ReconnectingMessageManager: ...@@ -53,8 +53,12 @@ class ReconnectingMessageManager:
'message_received_callback': message_received_callback} 'message_received_callback': message_received_callback}
self._consuming = True self._consuming = True
self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name) self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name, self._exchange_created_callback)
self._message_manager.start_consuming(queue_name, auto_ack, message_received_callback)
def _exchange_created_callback(self):
self._message_manager.start_consuming(self._consumption_parameters['queue_name'],
self._consumption_parameters['auto_ack'],
self._consumption_parameters['message_received_callback'])
def create_message_destination(self, exchange_name, exchange_type): def create_message_destination(self, exchange_name, exchange_type):
'''Creates the exchange''' '''Creates the exchange'''
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment