Commit 09289708 authored by Alexander Lercher's avatar Alexander Lercher

Trace Retrieval: message handling while running flask

Working message sending and receiving while running the flask service
parent 9c22b103
**/__pycache__ **/__pycache__
*.log
\ No newline at end of file
import pika import pika
EXCHANGE_NAME = 'beacon' EXCHANGE_NAME = 'rest-gateway'
EXCHANGE_TYPE = 'direct'
connection = pika.BlockingConnection(pika.ConnectionParameters('143.205.173.36', 30302, heartbeat=600, blocked_connection_timeout=300)) connection = pika.BlockingConnection(pika.ConnectionParameters('143.205.173.36', 30302, heartbeat=600, blocked_connection_timeout=300))
channel = connection.channel() channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type='fanout') channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type=EXCHANGE_TYPE)
result = channel.queue_declare(queue='', exclusive=True) result = channel.queue_declare(queue='rest-gateway', exclusive=True, auto_delete=True)
queue_name = result.method.queue queue_name = result.method.queue
channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name, routing_key='') channel.queue_bind(exchange=EXCHANGE_NAME, queue=queue_name, routing_key='rest-gateway')
def callback(ch, method, properties, body): def callback(ch, method, properties, body):
print(f"### Received: {body}") print(f"### Received: {body}")
......
import pika import pika
import functools import functools
from threading import Thread from threading import Thread
import logging
LOG_FORMAT = ('%(levelname) -10s %(asctime)s %(name) -30s %(funcName) ' import logging
'-35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageHandler: class MessageReceiver:
_exchange_name = 'beacon'
_rabbit_mq_ip = '143.205.173.36' _exchange_name = None
_rabbit_mq_port = 30302 _exchange_type = None
_rabbit_mq_ip = None
_rabbit_mq_port = None
_auto_ack = None
_queue_name = None
def __init__(self, exchange_name=None, rabbit_mq_ip=None, rabbit_mq_port=None): _connection: pika.SelectConnection = None
if exchange_name != None: _message_received_callback = None
def __init__(self, exchange_name='beacon', exchange_type='fanout', rabbit_mq_ip='143.205.173.36', rabbit_mq_port=30302, queue_name='', auto_ack=True):
self._exchange_name = exchange_name self._exchange_name = exchange_name
if rabbit_mq_ip != None: self._exchange_type = exchange_type
self._rabbit_mq_ip = rabbit_mq_ip self._rabbit_mq_ip = rabbit_mq_ip
if rabbit_mq_port != None:
self._rabbit_mq_port = rabbit_mq_port self._rabbit_mq_port = rabbit_mq_port
self._auto_ack = auto_ack
self._queue_name = queue_name
#region Connection establishment
def _pika_connect(self): def _connect(self) -> pika.SelectConnection:
connection = pika.SelectConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port), connection = pika.SelectConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port),
on_open_callback=self._connection_opened_callback, on_open_error_callback=self._connection_opened_error) on_open_callback=self._connection_opened_callback, on_open_error_callback=self._connection_opened_error)
return connection return connection
...@@ -31,40 +37,41 @@ class MessageHandler: ...@@ -31,40 +37,41 @@ class MessageHandler:
connection.channel(on_open_callback=self._channel_created_callback) connection.channel(on_open_callback=self._channel_created_callback)
def _connection_opened_error(self, connection, err): def _connection_opened_error(self, connection, err):
print('## Error: ' + str(err)) LOGGER.error(f"RabbitMQ connection could not be established: {str(err)}")
def _channel_created_callback(self, channel: pika.channel.Channel): def _channel_created_callback(self, channel: pika.channel.Channel):
# Create exchange # Create exchange
cb = functools.partial(self._exchange_created_callback, userdata=channel) cb = functools.partial(self._exchange_created_callback, userdata=channel)
channel.exchange_declare(exchange=self._exchange_name, exchange_type='fanout', callback=cb) channel.exchange_declare(exchange=self._exchange_name, exchange_type=self._exchange_type, callback=cb)
def _exchange_created_callback(self, answer_message, userdata): def _exchange_created_callback(self, answer_message, userdata):
# Create queue # Create queue
channel = userdata channel = userdata
cb = functools.partial(self._queue_created_callback, userdata=channel) cb = functools.partial(self._queue_created_callback, userdata=channel)
channel.queue_declare(queue='', exclusive=True, callback=cb) channel.queue_declare(queue=self._queue_name, exclusive=False, callback=cb)
def _queue_created_callback(self, answer_message: pika.frame.Method, userdata): def _queue_created_callback(self, answer_message: pika.frame.Method, userdata):
queue_name = answer_message.method.queue queue_name = answer_message.method.queue
channel = userdata channel = userdata
# Bind queue to exchange # Bind queue to exchange
channel.queue_bind(exchange=self._exchange_name, queue=queue_name, routing_key='') channel.queue_bind(exchange=self._exchange_name, queue=queue_name)
LOGGER.info(f"Rabbit MQ connection to exchange '{self._exchange_name}' established") LOGGER.info(f"RabbitMQ connection to exchange '{self._exchange_name}' established")
# Consume messages # Consume messages
channel.basic_consume(queue=queue_name, auto_ack=False, on_message_callback=_message_received_callback) channel.basic_consume(queue=queue_name, auto_ack=self._auto_ack, on_message_callback=self._message_received_callback)
#endregion Connection establishment
def start(self, message_received_callback): def start(self, message_received_callback):
'''Starts the message handling''' '''Connects to RabbitMQ and starts listening for messages'''
global _message_received_callback self._message_received_callback = message_received_callback
_message_received_callback = message_received_callback
global _connection self._connection = self._connect()
_connection = self._pika_connect() Thread(target=self._connection.ioloop.start).start()
Thread(target=_connection.ioloop.start).start()
def stop(self): def stop(self):
_connection.ioloop.stop() '''Stops listening for messages and closes the connection'''
_connection.close() self._connection.ioloop.stop()
LOGGER.info(f"Rabbit MQ connection closed") self._connection.close()
\ No newline at end of file LOGGER.info(f"RabbitMQ connection closed")
\ No newline at end of file
import pika
import functools
from threading import Thread
import logging
LOGGER = logging.getLogger(__name__)
class MessageSender:
_rabbit_mq_ip = None
_rabbit_mq_port = None
_connection: pika.BlockingConnection = None
_channel: pika.channel.Channel = None
def __init__(self, rabbit_mq_ip='143.205.173.36', rabbit_mq_port=30302):
self._rabbit_mq_ip = rabbit_mq_ip
self._rabbit_mq_port = rabbit_mq_port
def connect(self):
'''Connects to RabbitMQ'''
self._connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port))
self._channel = self._connection.channel()
LOGGER.info(f"RabbitMQ connection established")
def create_exchange(self, exchange_name, exchange_type):
'''Creates the exchange if not existent'''
self._channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
def send(self, exchange_name, message, routing_key=''):
'''Sends a message to the exchange'''
if self._channel == None:
LOGGER.error("Tried to send before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established, call MessageSender.connect() first")
self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message)
def disconnect(self):
'''Disconnects from RabbitMQ'''
if self._connection == None:
LOGGER.error("Tried to close connection before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established")
self._connection.close()
LOGGER.info(f"RabbitMQ connection closed")
\ No newline at end of file
import sys
sys.path.insert(1, '../../../modules/')
import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(filename='trace-retrieval.log', level=logging.WARNING, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
import connexion import connexion
from messages.MessageHandler import MessageHandler from messaging.MessageReceiver import MessageReceiver
from messaging.MessageSender import MessageSender
message_sender = None
# init message handler # init message handler
def message_received_callback(channel, method, properties, body): def message_received_callback(channel, method, properties, body):
print(f"### NICE: {body}") print(f"### Received: {body}")
channel.basic_ack(delivery_tag=method.delivery_tag) # channel.basic_ack(delivery_tag=method.delivery_tag)
message_sender.send('rest-gateway', str(body) + ' (sent from trace-retrieval-microservice)', 'rest-gateway')
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
...@@ -16,7 +28,11 @@ def api_root(): ...@@ -16,7 +28,11 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
message_handler = MessageHandler() message_rec = MessageReceiver(exchange_name='inhub', exchange_type='direct', queue_name='trace-retrieval', auto_ack=True)
message_handler.start(message_received_callback) message_rec.start(message_received_callback)
message_sender = MessageSender()
message_sender.connect()
message_sender.create_exchange('rest-gateway', 'direct')
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment