Commit e7c1391d authored by Alexander Lercher's avatar Alexander Lercher

Robuster MessageManager

parent 3125ed93
import os import os
import sys
kube_command = 'apply'
if len(sys.argv) > 1:
kube_command = sys.argv[1]
paths = [] paths = []
for p, _, f in os.walk('./'): for p, _, f in os.walk('./'):
...@@ -7,4 +12,4 @@ for p, _, f in os.walk('./'): ...@@ -7,4 +12,4 @@ for p, _, f in os.walk('./'):
paths.append(os.path.join(p, '')) paths.append(os.path.join(p, ''))
for path in paths: for path in paths:
os.system(f"kubectl apply -f {path}") os.system(f"kubectl {kube_command} -f {path}")
\ No newline at end of file
...@@ -6,7 +6,7 @@ import logging ...@@ -6,7 +6,7 @@ import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageManager: class MessageManager:
'''The MessageManager is used for sending and receiving messages''' '''This Class is used for sending and receiving messages with RabbitMQ'''
_rabbit_mq_ip = None _rabbit_mq_ip = None
_rabbit_mq_port = None _rabbit_mq_port = None
...@@ -21,7 +21,7 @@ class MessageManager: ...@@ -21,7 +21,7 @@ class MessageManager:
self._rabbit_mq_port = rabbit_mq_port self._rabbit_mq_port = rabbit_mq_port
def connect(self, error_callback=None): def connect(self, error_callback=None):
'''Creates a connection with two channels to RabbitMQ''' '''Creates a connection with two channels'''
self._error_callback = error_callback self._error_callback = error_callback
self._connection = self._connect_async() self._connection = self._connect_async()
...@@ -91,10 +91,6 @@ class MessageManager: ...@@ -91,10 +91,6 @@ class MessageManager:
def send_message(self, exchange_name, routing_key, message): def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange''' '''Sends a message to the exchange'''
if self._send_channel == None:
LOGGER.error("Tried to send before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established")
self._send_channel.basic_publish(exchange_name, routing_key, message) self._send_channel.basic_publish(exchange_name, routing_key, message)
def disconnect(self): def disconnect(self):
......
...@@ -6,12 +6,13 @@ import logging ...@@ -6,12 +6,13 @@ import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class ReconnectingMessageManager: class ReconnectingMessageManager:
'''The ReconnectingMessageManager Singleton handles connection errors by itself''' '''This Class can be used to access RabbitMQ. It handles connection errors by trying to reconnect every second'''
_instance = None _instance = None
_message_manager: MessageManager = None _message_manager: MessageManager = None
_consuming = False _consuming = False
_consumption_parameters = None _consumption_parameters = None
_sending_parameters = None
@staticmethod @staticmethod
def getInstance() -> ReconnectingMessageManager: def getInstance() -> ReconnectingMessageManager:
...@@ -21,6 +22,7 @@ class ReconnectingMessageManager: ...@@ -21,6 +22,7 @@ class ReconnectingMessageManager:
return ReconnectingMessageManager._instance return ReconnectingMessageManager._instance
def __init__(self): def __init__(self):
'''Do not use the constructor as it is a Singleton!'''
if ReconnectingMessageManager._instance != None: if ReconnectingMessageManager._instance != None:
raise Exception("This class is a singleton!") raise Exception("This class is a singleton!")
...@@ -36,34 +38,58 @@ class ReconnectingMessageManager: ...@@ -36,34 +38,58 @@ class ReconnectingMessageManager:
# restart receiver # restart receiver
self._message_manager.disconnect() self._message_manager.disconnect()
self._init_message_manager() self._init_message_manager()
if self._consuming: if self._consuming:
self._restart_consuming() self._restart_consuming()
if self._sending_parameters != None:
self._reinit_sending()
def _restart_consuming(self): def _restart_consuming(self):
self.start_consuming(self._consumption_parameters['exchange_name'], if self._consumption_parameters != None:
self.start_consuming(self._consumption_parameters['exchange_name'],
self._consumption_parameters['exchange_type'], self._consumption_parameters['exchange_type'],
self._consumption_parameters['queue_name'], self._consumption_parameters['queue_name'],
self._consumption_parameters['auto_ack'], self._consumption_parameters['auto_ack'],
self._consumption_parameters['message_received_callback']) self._consumption_parameters['message_received_callback'])
def _reinit_sending(self):
if self._sending_parameters != None:
self.create_message_destination(self._sending_parameters['exchange_name'],
self._sending_parameters['exchange_type'])
def start_consuming(self, exchange_name, exchange_type, queue_name, auto_ack, message_received_callback): def start_consuming(self, exchange_name, exchange_type, queue_name, auto_ack, message_received_callback):
'''Creates exchange and queue and starts to listen to new messages''' '''Creates exchange and queue and starts to listen to new messages'''
self._consumption_parameters = {'exchange_name': exchange_name, 'exchange_type': exchange_type, self._consumption_parameters = {'exchange_name': exchange_name,
'queue_name': queue_name, 'auto_ack': auto_ack, 'exchange_type': exchange_type,
'queue_name': queue_name,
'auto_ack': auto_ack,
'message_received_callback': message_received_callback} 'message_received_callback': message_received_callback}
self._consuming = True self._consuming = True
self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name, self._exchange_created_callback) try:
self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name, self._exchange_created_callback)
except:
LOGGER.error("Error while creating exchange and queue")
def _exchange_created_callback(self): def _exchange_created_callback(self):
LOGGER.info("Exchange and queue set up")
self._message_manager.start_consuming(self._consumption_parameters['queue_name'], self._message_manager.start_consuming(self._consumption_parameters['queue_name'],
self._consumption_parameters['auto_ack'], self._consumption_parameters['auto_ack'],
self._consumption_parameters['message_received_callback']) self._consumption_parameters['message_received_callback'])
def create_message_destination(self, exchange_name, exchange_type): def create_message_destination(self, exchange_name, exchange_type):
'''Creates the exchange''' '''Creates the exchange for sending messages'''
self._message_manager.create_exchange(exchange_name, exchange_type) self._sending_parameters = {'exchange_name': exchange_name,
'exchange_type': exchange_type}
try:
self._message_manager.create_exchange(exchange_name, exchange_type)
except:
LOGGER.error("Error while creating exchange")
def send_message(self, exchange_name, routing_key, message): def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange''' '''Sends a message to the exchange'''
self._message_manager.send_message(exchange_name, routing_key, message) try:
self._message_manager.send_message(exchange_name, routing_key, message)
except:
LOGGER.error("Error while sending message")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment