Commit d6ec9dd5 authored by Alexander Lercher's avatar Alexander Lercher

Workaround for MessageSender Error

Pika BlockingConnection in MessageSender somehow doesn't send heartbeats to RabbitMQ correctly. Opening new connection for each message now
parent 4fe1c07d
...@@ -6,4 +6,4 @@ class Processor: ...@@ -6,4 +6,4 @@ class Processor:
pass pass
def process(self, traces: list): def process(self, traces: list):
LOGGER.info("called processing") LOGGER.info(f"called processing with: {str(traces)}")
\ No newline at end of file \ No newline at end of file
...@@ -20,7 +20,7 @@ class MessageSender: ...@@ -20,7 +20,7 @@ class MessageSender:
def connect(self): def connect(self):
'''Connects to RabbitMQ''' '''Connects to RabbitMQ'''
self._connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port)) self._connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port, heartbeat=60, blocked_connection_timeout=30))
self._channel = self._connection.channel() self._channel = self._connection.channel()
LOGGER.info(f"RabbitMQ connection established") LOGGER.info(f"RabbitMQ connection established")
...@@ -33,7 +33,8 @@ class MessageSender: ...@@ -33,7 +33,8 @@ class MessageSender:
if self._channel == None: if self._channel == None:
LOGGER.error("Tried to send before connection to RabbitMQ was established") LOGGER.error("Tried to send before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established, call MessageSender.connect() first") raise RuntimeError("Connection to RabbitMQ not established, call MessageSender.connect() first")
# TODO connection is closed after 60s
self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message) self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message)
def disconnect(self): def disconnect(self):
......
...@@ -11,12 +11,12 @@ class MessageHandler: ...@@ -11,12 +11,12 @@ class MessageHandler:
def __init__(self): def __init__(self):
self._mongo_repo = MongoRepository() self._mongo_repo = MongoRepository()
self._init_message_sender() # self._init_message_sender()
def _init_message_sender(self): def _init_message_sender(self):
self._message_sender = MessageSender() self._message_sender = MessageSender()
self._message_sender.connect() self._message_sender.connect()
self._message_sender.create_exchange('inhub', 'direct') self._message_sender.create_exchange('datahub', 'direct')
def handle_generic(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
...@@ -40,6 +40,13 @@ class MessageHandler: ...@@ -40,6 +40,13 @@ class MessageHandler:
def handle_blockchain_transaction(self, transaction): def handle_blockchain_transaction(self, transaction):
self._mongo_repo.insert_trace(transaction) self._mongo_repo.insert_trace(transaction)
# TODO remove workaround
self._message_sender = MessageSender()
self._message_sender.connect()
self._message_sender.create_exchange('datahub', 'direct')
# inform semantic linking microservice # inform semantic linking microservice
msg = {'type': 'new-traces-available'} msg = {'type': 'new-traces-available'}
self._message_sender.send('datahub', json.dumps(msg), 'semantic-linking') self._message_sender.send('datahub', json.dumps(msg), 'semantic-linking')
self._message_sender.disconnect()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment