Commit d80bca29 authored by Alexander Lercher's avatar Alexander Lercher

Added rest-gateway; Error handling for rabbit-mq connection errors

parent cc35cc82
# Message Broker
The message broker enables loose coupling between the microservices by only sending asynchronous messages.
## Technologies
- Rabbit MQ (out-of-the-box Docker image)
- Docker
- Kubernetes
## Exchanges and Queues
The individual microservices create and subscribe to the following exchanges:
- inhub
Used to send messages to the services in the *SMART transaction hub (in)*.
- datahub
Used to send messages to the services in the *Evolutionary semantic contextual data hub*.
- outhub
Used to send messages to the services in the *SMART transaction hub (out)*.
- rest-gateway
Used to send messages to the *SMART RESTful API Gateway*.
All exchanges are of type direct with names of the services as routing keys. For instance, to send a message to the Trace Retrieval microservice you have to use the exchange "inhub" with the routing key "trace-retrieval".
\ No newline at end of file
...@@ -16,6 +16,7 @@ class MessageReceiver: ...@@ -16,6 +16,7 @@ class MessageReceiver:
_connection: pika.SelectConnection = None _connection: pika.SelectConnection = None
_message_received_callback = None _message_received_callback = None
_error_callback = None
def __init__(self, exchange_name='beacon', exchange_type='fanout', rabbit_mq_ip='143.205.173.36', rabbit_mq_port=30302, queue_name='', auto_ack=True): def __init__(self, exchange_name='beacon', exchange_type='fanout', rabbit_mq_ip='143.205.173.36', rabbit_mq_port=30302, queue_name='', auto_ack=True):
self._exchange_name = exchange_name self._exchange_name = exchange_name
...@@ -29,15 +30,21 @@ class MessageReceiver: ...@@ -29,15 +30,21 @@ class MessageReceiver:
def _connect(self) -> pika.SelectConnection: def _connect(self) -> pika.SelectConnection:
connection = pika.SelectConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port), connection = pika.SelectConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port),
on_open_callback=self._connection_opened_callback, on_open_error_callback=self._connection_opened_error) on_open_callback=self._connection_opened_callback, on_open_error_callback=self._connection_opened_error_callback, on_close_callback=self._connection_closed_callback)
return connection return connection
def _connection_opened_callback(self, connection: pika.SelectConnection): def _connection_opened_callback(self, connection: pika.SelectConnection):
# Create channel # Create channel
connection.channel(on_open_callback=self._channel_created_callback) connection.channel(on_open_callback=self._channel_created_callback)
def _connection_opened_error(self, connection, err): def _connection_opened_error_callback(self, connection, err):
LOGGER.error(f"RabbitMQ connection could not be established: {str(err)}") LOGGER.error(f"RabbitMQ connection could not be established: {str(err)}")
if self._error_callback != None:
self._error_callback("Connection could not be established")
def _connection_closed_callback(self, conn, error):
if self._error_callback != None:
self._error_callback("Connection closed")
def _channel_created_callback(self, channel: pika.channel.Channel): def _channel_created_callback(self, channel: pika.channel.Channel):
# Create exchange # Create exchange
...@@ -46,9 +53,10 @@ class MessageReceiver: ...@@ -46,9 +53,10 @@ class MessageReceiver:
def _exchange_created_callback(self, answer_message, userdata): def _exchange_created_callback(self, answer_message, userdata):
# Create queue # Create queue
channel = userdata channel: pika.channel.Channel = userdata
cb = functools.partial(self._queue_created_callback, userdata=channel) cb = functools.partial(self._queue_created_callback, userdata=channel)
channel.queue_declare(queue=self._queue_name, exclusive=False, callback=cb) exclusive_access = (self._queue_name == '')
channel.queue_declare(queue=self._queue_name, exclusive=exclusive_access, callback=cb, auto_delete=exclusive_access)
def _queue_created_callback(self, answer_message: pika.frame.Method, userdata): def _queue_created_callback(self, answer_message: pika.frame.Method, userdata):
queue_name = answer_message.method.queue queue_name = answer_message.method.queue
...@@ -63,9 +71,10 @@ class MessageReceiver: ...@@ -63,9 +71,10 @@ class MessageReceiver:
#endregion Connection establishment #endregion Connection establishment
def start(self, message_received_callback): def start(self, message_received_callback, error_callback=None):
'''Connects to RabbitMQ and starts listening for messages''' '''Connects to RabbitMQ and starts listening for messages'''
self._message_received_callback = message_received_callback self._message_received_callback = message_received_callback
self._error_callback = error_callback
self._connection = self._connect() self._connection = self._connect()
Thread(target=self._connection.ioloop.start).start() Thread(target=self._connection.ioloop.start).start()
......
FROM python:3
LABEL maintainer="Alexander Lercher"
ENV http_proxy http://proxy.uni-klu.ac.at:3128/
ENV https_proxy http://proxy.uni-klu.ac.at:3128/
RUN apt-get update
RUN pip install flask
RUN pip install connexion[swagger-ui]
RUN pip install pika
EXPOSE 5000
WORKDIR /app
COPY rest-gateway/app/ /app/
COPY modules/ /app/
RUN chmod a+x main.py
CMD ["python", "./main.py"]
\ No newline at end of file
# SMART RESTful API Gateway
The RESTful API Gateway is used as the central access point. It works as a abstraction layer to the individual microservices of SMART.
## Technologies
- Python 3.x
(Check Dockerfile for used Python modules)
- Docker
- Kubernetes
\ No newline at end of file
swagger: "2.0"
info:
title: RESTful API Gateway
description: This is the documentation for the RESTful API gateway.
version: "1.0.0"
consumes:
- "application/json"
produces:
- "application/json"
basePath: "/api"
# Paths supported by the server application
paths:
/debug:
post:
operationId: "debug.echo"
tags:
- "Echo"
summary: "Echo function for debugging purposes"
description: "Echoes the input back to the caller."
parameters:
- in: body
name: "Object"
required: true
schema:
type: object
responses:
200:
description: "Successful echo of request data"
\ No newline at end of file
from flask import request
def echo():
return request.json
\ No newline at end of file
# add modules folder to interpreter path
import sys
import os
modules_path = '../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
# init logging to file
import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(
filename='error.log',
level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
import connexion
from multiprocessing import Process
from messaging.MessageReceiver import MessageReceiver
from messaging.MessageSender import MessageSender
RABBIT_MQ_DNS_NAME = 'rabbit-mq'
RABBIT_MQ_PORT = '5672'
message_sender = None
# init message handler
def message_received_callback(channel, method, properties, body):
print(f"### Received: {body}")
def pika_error_callback(error):
print("Rabbit MQ error!")
global flask_server
flask_server.terminate()
sys.exit()
# load swagger config
app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml')
@app.route('/', methods=['GET'])
def api_root():
return 'Endpoint of SMART RESTful API Gateway!'
def run_flask():
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
# start app
if __name__ == '__main__':
message_rec = MessageReceiver(
#rabbit_mq_ip=RABBIT_MQ_DNS_NAME, rabbit_mq_port=RABBIT_MQ_PORT,
exchange_name='rest-gateway', exchange_type='direct', queue_name='rest-gateway', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
global flask_server
flask_server = Process(target=run_flask)
flask_server.start()
flask_server.join()
\ No newline at end of file
apiVersion: apps/v1
kind: Deployment
metadata:
name: rest-gateway
spec:
replicas: 2
selector:
matchLabels:
app: rest-gateway
template:
metadata:
labels:
app: rest-gateway
spec:
containers:
- name: rest-gateway
image: 172.16.1.20:5000/rest-gateway
ports:
- containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: rest-gateway
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: rest-gateway
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30401
protocol: TCP
\ No newline at end of file
...@@ -3,7 +3,6 @@ The trace retrieval microservice is used as an interface to the blockchain compo ...@@ -3,7 +3,6 @@ The trace retrieval microservice is used as an interface to the blockchain compo
## Technologies ## Technologies
- Python 3.x - Python 3.x
- Python module Flask (Check Dockerfile for used Python modules)
- Python module Connexion with Swagger
- Docker - Docker
- Kubernetes - Kubernetes
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment