Commit 4cda6e98 authored by Luca Braun's avatar Luca Braun

implemented publisher side message confirmation

parent bd115b86
import pika
from pika import spec
from threading import Thread
import network_constants as netconst
......@@ -47,10 +48,18 @@ class MessageManager:
if self._error_callback != None:
self._error_callback("Connection closed")
def _message_confirmed_callback(self, frame):
if isinstance(frame.method, spec.Basic.Ack):
LOGGER.debug("message acknowledged")
return
else:
LOGGER.warning("Message was rejected by broker!")
def _channel_created_callback(self, channel: pika.channel.Channel):
# Assign both channels
if self._send_channel == None:
self._send_channel = channel
self._send_channel.confirm_delivery(ack_nack_callback=self._message_confirmed_callback)
else:
self._receive_channel = channel
LOGGER.info("RabbitMQ connection established")
......@@ -91,8 +100,11 @@ class MessageManager:
def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange'''
self._send_channel.basic_publish(exchange_name, routing_key, message)
try:
self._send_channel.basic_publish(exchange_name, routing_key, message, mandatory=True)
except pika.exceptions.UnroutableError:
raise pika.exceptions.UnroutableError
def disconnect(self):
'''Stops listening for messages and closes the connection'''
try:
......@@ -100,4 +112,4 @@ class MessageManager:
self._connection.close()
LOGGER.info("RabbitMQ connection closed")
except pika.exceptions.ConnectionWrongStateError:
LOGGER.warning("RabbitMQ connection already closed")
LOGGER.warning("RabbitMQ connection already closed")
\ No newline at end of file
......@@ -3,6 +3,8 @@ import time
from messaging.MessageManager import MessageManager
import logging
import json
import pika
LOGGER = logging.getLogger(__name__)
class ReconnectingMessageManager:
......@@ -91,5 +93,13 @@ class ReconnectingMessageManager:
'''Sends a message to the exchange'''
try:
self._message_manager.send_message(exchange_name, routing_key, message)
except pika.exceptions.UnroutableError:
message = json.loads(message)
if message["retries"] < 10:
message["retries"] += 1
message = json.dumps(message)
self.send_message(exchange_name, routing_key, message)
else:
LOGGER.error("Message went through too many send retries and was not delivered")
except:
LOGGER.error("Error while sending message")
......@@ -9,7 +9,7 @@ def receive():
body = request.json
if isBlockchainTraceValid(body):
message = {'type': 'blockchain-transaction', 'content': body}
message = {'type': 'blockchain-transaction', 'retries': 0, 'content': body}
message_sender.send_message('inhub', 'trace-retrieval', json.dumps(message))
return Response(status=201)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment