Commit d7d2b38b authored by alelercher's avatar alelercher

Merge branch 'trace-input-handling' into 'master'

Trace input handling

Rest Gateway is used for HTTP POSTing new trace - sends message with trace to Trace Retrieval - TR saves trace in MongoDb and informs Semantic Linking - SL loads all traces via HTTP Get from TR and calls processing method.

See merge request !1
parents fde49aef 34ef4d75
...@@ -7,11 +7,13 @@ ENV https_proxy http://proxy.uni-klu.ac.at:3128/ ...@@ -7,11 +7,13 @@ ENV https_proxy http://proxy.uni-klu.ac.at:3128/
RUN apt-get update RUN apt-get update
RUN pip install flask RUN pip install flask
RUN pip install connexion[swagger-ui] RUN pip install connexion[swagger-ui]
RUN pip install pika
EXPOSE 5000 EXPOSE 5000
WORKDIR /app WORKDIR /app
COPY data-hub/semantic-linking-microservice/app/ /app/ COPY data-hub/semantic-linking-microservice/app/ /app/
COPY modules/ /app/
RUN chmod a+x main.py RUN chmod a+x main.py
CMD ["python", "./main.py"] CMD ["python", "./main.py"]
\ No newline at end of file
...@@ -14,7 +14,7 @@ basePath: "/api" ...@@ -14,7 +14,7 @@ basePath: "/api"
paths: paths:
/debug: /debug:
post: post:
operationId: "debug.echo" operationId: "rest.debug.echo"
tags: tags:
- "Echo" - "Echo"
summary: "Echo function for debugging purposes" summary: "Echo function for debugging purposes"
......
import logging
LOGGER = logging.getLogger(__name__)
class Processor:
def __init__(self):
pass
def process(self, traces: list):
LOGGER.info(f"called processing with: {str(traces)}")
\ No newline at end of file
# add modules folder to interpreter path
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
# init logging to file
import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
#################################
import connexion import connexion
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
from messaging.MessageHandler import MessageHandler
# init message handler
message_handler = MessageHandler()
def message_received_callback(channel, method, properties, body):
message_handler.handle_generic(body)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
...@@ -10,4 +31,7 @@ def api_root(): ...@@ -10,4 +31,7 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True) message_manager = ReconnectingMessageManager.getInstance()
message_manager.start_consuming('datahub', 'direct', 'semantic-linking', True, message_received_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)
import json
import requests
from threading import Thread
import network_constants as netconst
from intelligence_zahra.Processor import Processor
import logging
LOGGER = logging.getLogger(__name__)
class MessageHandler:
_processor: Processor = None
def __init__(self):
self._processor = Processor()
def handle_generic(self, body):
LOGGER.info(f"Received message: {body}")
message = None
try:
message = json.loads(body)
except ValueError:
LOGGER.warning("Message is not in JSON format and is ignored")
return
if not 'type' in message:
LOGGER.warning("Message has no type field and is ignored")
return
if message['type'] == 'new-traces-available':
self.handle_new_traces_available()
else:
LOGGER.info("Message Type could not be processed")
def handle_new_traces_available(self):
# get all traces and call the Processor
url = f'http://{netconst.TRACE_RETRIEVAL_HOSTNAME}:{netconst.TRACE_RETRIEVAL_REST_PORT}/api/trace'
# disable university http(s) proxy for name resolution and routing
session = requests.Session()
session.trust_env = False
response = session.get(url)
if response.status_code == 200:
traces = response.json()
Thread(target=self._processor.process(traces)).start()
else:
LOGGER.error(f"Could not retrieve JSON from {url} with GET request ({response.status_code})")
apiVersion: v1
kind: Service
metadata:
name: semantic-linking
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: semantic-linking
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30101
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: semantic-linking name: semantic-linking
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: semantic-linking app: semantic-linking
......
apiVersion: v1
kind: Service
metadata:
name: semantic-linking
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: semantic-linking
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30101
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: rabbit-mq
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: rabbit-mq
ports:
- name: management
port: 15672
targetPort: 15672
nodePort: 30301
protocol: TCP
- name: message-broker
port: 5672
targetPort: 5672
nodePort: 30302
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: rabbit-mq name: rabbit-mq
spec: spec:
replicas: 1 replicas: 1
selector: selector:
......
apiVersion: v1
kind: Service
metadata:
name: rabbit-mq
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: rabbit-mq
ports:
- name: management
port: 15672
targetPort: 15672
nodePort: 30301
protocol: TCP
- name: message-broker
port: 5672
targetPort: 5672
nodePort: 30302
protocol: TCP
\ No newline at end of file
import pika
from threading import Thread
import network_constants as netconst
import logging
LOGGER = logging.getLogger(__name__)
class MessageManager:
'''The MessageManager is used for sending and receiving messages'''
_rabbit_mq_ip = None
_rabbit_mq_port = None
_prepare_receive_parameters = None
_connection: pika.SelectConnection = None
_send_channel: pika.channel.Channel = None
_receive_channel: pika.channel.Channel = None
def __init__(self, rabbit_mq_ip=netconst.RABBIT_MQ_HOSTNAME, rabbit_mq_port=netconst.RABBIT_MQ_PORT):
self._rabbit_mq_ip = rabbit_mq_ip
self._rabbit_mq_port = rabbit_mq_port
def connect(self, error_callback=None):
'''Creates a connection with two channels to RabbitMQ'''
self._error_callback = error_callback
self._connection = self._connect_async()
Thread(target=self._connection.ioloop.start).start()
def _connect_async(self) -> pika.SelectConnection:
connection = pika.SelectConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port),
on_open_callback=self._connection_opened_callback, on_open_error_callback=self._connection_opened_error_callback, on_close_callback=self._connection_closed_callback)
return connection
def _connection_opened_callback(self, connection: pika.SelectConnection):
# Create channels
connection.channel(on_open_callback=self._channel_created_callback)
connection.channel(on_open_callback=self._channel_created_callback)
def _connection_opened_error_callback(self, connection, error):
LOGGER.error(f"RabbitMQ connection could not be established: {str(error)}")
if self._error_callback != None:
self._error_callback("Connection could not be established")
def _connection_closed_callback(self, connection, error):
LOGGER.warning(f"RabbitMQ connection closed: {str(error)}")
if self._error_callback != None:
self._error_callback("Connection closed")
def _channel_created_callback(self, channel: pika.channel.Channel):
# Assign both channels
if self._send_channel == None:
self._send_channel = channel
else:
self._receive_channel = channel
LOGGER.info("RabbitMQ connection established")
def create_exchange_with_queue(self, exchange_name, exchange_type, queue_name):
'''Creates exchange and queue and binds them'''
self._prepare_receive_parameters = {'exchange_name': exchange_name,
'exchange_type': exchange_type,
'queue_name': queue_name}
self._receive_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, callback=self._exchange_created_callback)
def _exchange_created_callback(self, answer_message):
# Create queue
queue_name = self._prepare_receive_parameters['queue_name']
exclusive_access = (queue_name == '')
self._receive_channel.queue_declare(queue=queue_name, exclusive=exclusive_access, callback=self._queue_created_callback, auto_delete=exclusive_access)
def _queue_created_callback(self, answer_message: pika.frame.Method):
queue_name = answer_message.method.queue
exchange_name = self._prepare_receive_parameters['exchange_name']
# Bind queue to exchange
self._receive_channel.queue_bind(exchange=exchange_name, queue=queue_name)
LOGGER.info(f"RabbitMQ connection to exchange '{exchange_name}' established")
def start_consuming(self, queue_name, auto_ack, message_received_callback):
'''Starts listening for messages'''
self._receive_channel.basic_consume(queue=queue_name, auto_ack=auto_ack, on_message_callback=message_received_callback)
def create_exchange(self, exchange_name, exchange_type):
'''Creates the exchange'''
self._send_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type)
def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange'''
if self._send_channel == None:
LOGGER.error("Tried to send before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established")
self._send_channel.basic_publish(exchange_name, routing_key, message)
def disconnect(self):
'''Stops listening for messages and closes the connection'''
try:
self._connection.ioloop.stop()
self._connection.close()
LOGGER.info("RabbitMQ connection closed")
except pika.exceptions.ConnectionWrongStateError:
LOGGER.warning("RabbitMQ connection already closed")
import pika import pika
import functools import functools
from threading import Thread from threading import Thread
from deprecated import deprecated
import network_constants as netconst
import logging import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@deprecated(reason='Can only receive messages with full connection. Use ReconnectingMessageManager for connection reuse instead.')
class MessageReceiver: class MessageReceiver:
_exchange_name = None _exchange_name = None
...@@ -18,7 +21,7 @@ class MessageReceiver: ...@@ -18,7 +21,7 @@ class MessageReceiver:
_message_received_callback = None _message_received_callback = None
_error_callback = None _error_callback = None
def __init__(self, exchange_name='beacon', exchange_type='fanout', rabbit_mq_ip='143.205.173.36', rabbit_mq_port=30302, queue_name='', auto_ack=True): def __init__(self, exchange_name='beacon', exchange_type='fanout', rabbit_mq_ip=netconst.RABBIT_MQ_HOSTNAME, rabbit_mq_port=netconst.RABBIT_MQ_PORT, queue_name='', auto_ack=True):
self._exchange_name = exchange_name self._exchange_name = exchange_name
self._exchange_type = exchange_type self._exchange_type = exchange_type
self._rabbit_mq_ip = rabbit_mq_ip self._rabbit_mq_ip = rabbit_mq_ip
...@@ -81,6 +84,9 @@ class MessageReceiver: ...@@ -81,6 +84,9 @@ class MessageReceiver:
def stop(self): def stop(self):
'''Stops listening for messages and closes the connection''' '''Stops listening for messages and closes the connection'''
try:
self._connection.ioloop.stop() self._connection.ioloop.stop()
self._connection.close() self._connection.close()
LOGGER.info(f"RabbitMQ connection closed") LOGGER.info("RabbitMQ connection closed")
\ No newline at end of file except pika.exceptions.ConnectionWrongStateError:
LOGGER.warning("RabbitMQ connection already closed")
import pika import pika
import functools import functools
from threading import Thread from threading import Thread
from deprecated import deprecated
import network_constants as netconst
import logging import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
@deprecated(reason='BlockingConnection has connection timeout. Use ReconnectingMessageManager instead.')
class MessageSender: class MessageSender:
_rabbit_mq_ip = None _rabbit_mq_ip = None
...@@ -13,13 +16,13 @@ class MessageSender: ...@@ -13,13 +16,13 @@ class MessageSender:
_connection: pika.BlockingConnection = None _connection: pika.BlockingConnection = None
_channel: pika.channel.Channel = None _channel: pika.channel.Channel = None
def __init__(self, rabbit_mq_ip='rabbit-mq', rabbit_mq_port=5672): def __init__(self, rabbit_mq_ip=netconst.RABBIT_MQ_HOSTNAME, rabbit_mq_port=netconst.RABBIT_MQ_PORT):
self._rabbit_mq_ip = rabbit_mq_ip self._rabbit_mq_ip = rabbit_mq_ip
self._rabbit_mq_port = rabbit_mq_port self._rabbit_mq_port = rabbit_mq_port
def connect(self): def connect(self):
'''Connects to RabbitMQ''' '''Connects to RabbitMQ'''
self._connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port)) self._connection = pika.BlockingConnection(parameters=pika.ConnectionParameters(self._rabbit_mq_ip, self._rabbit_mq_port, heartbeat=60, blocked_connection_timeout=30))
self._channel = self._connection.channel() self._channel = self._connection.channel()
LOGGER.info(f"RabbitMQ connection established") LOGGER.info(f"RabbitMQ connection established")
...@@ -33,6 +36,7 @@ class MessageSender: ...@@ -33,6 +36,7 @@ class MessageSender:
LOGGER.error("Tried to send before connection to RabbitMQ was established") LOGGER.error("Tried to send before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established, call MessageSender.connect() first") raise RuntimeError("Connection to RabbitMQ not established, call MessageSender.connect() first")
# TODO connection is closed after 60s
self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message) self._channel.basic_publish(exchange=exchange_name, routing_key=routing_key, body=message)
def disconnect(self): def disconnect(self):
......
from __future__ import annotations
import time
from messaging.MessageManager import MessageManager
import logging
LOGGER = logging.getLogger(__name__)
class ReconnectingMessageManager:
'''The ReconnectingMessageManager Singleton handles connection errors by itself'''
__instance = None
_message_manager: MessageManager = None
_consuming = False
_consumption_parameters = None
@staticmethod
def getInstance() -> ReconnectingMessageManager:
''' Static access method. '''
if ReconnectingMessageManager.__instance == None:
ReconnectingMessageManager.__instance = ReconnectingMessageManager()
return ReconnectingMessageManager.__instance
def __init__(self):
if ReconnectingMessageManager.__instance != None:
raise Exception("This class is a singleton!")
ReconnectingMessageManager.__instance = self
self._init_message_manager()
def _init_message_manager(self):
self._message_manager = MessageManager()
self._message_manager.connect(self._pika_error_callback)
time.sleep(1)
def _pika_error_callback(self, error):
# restart receiver
self._message_manager.disconnect()
self._init_message_manager()
if self._consuming:
self._restart_consuming()
def _restart_consuming(self):
self.start_consuming(self._consumption_parameters['exchange_name'],
self._consumption_parameters['exchange_type'],
self._consumption_parameters['queue_name'],
self._consumption_parameters['auto_ack'],
self._consumption_parameters['message_received_callback'])
def start_consuming(self, exchange_name, exchange_type, queue_name, auto_ack, message_received_callback):
'''Creates exchange and queue and starts to listen to new messages'''
self._consumption_parameters = {'exchange_name': exchange_name, 'exchange_type': exchange_type,
'queue_name': queue_name, 'auto_ack': auto_ack,
'message_received_callback': message_received_callback}
self._consuming = True
self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name)
self._message_manager.start_consuming(queue_name, auto_ack, message_received_callback)
def create_message_destination(self, exchange_name, exchange_type):
'''Creates the exchange'''
self._message_manager.create_exchange(exchange_name, exchange_type)
def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange'''
self._message_manager.send_message(exchange_name, routing_key, message)
### inside k8s
RABBIT_MQ_HOSTNAME = 'rabbit-mq'
RABBIT_MQ_PORT = 5672
MONGO_DB_HOSTNAME = 'trace-retrieval-db'
MONGO_DB_PORT = 27017
TRACE_RETRIEVAL_HOSTNAME = 'trace-retrieval'
TRACE_RETRIEVAL_REST_PORT = 80
### outside k8s
# HOST_IP = '143.205.173.36'
# RABBIT_MQ_HOSTNAME = HOST_IP
# RABBIT_MQ_PORT = 30302
# MONGO_DB_HOST = HOST_IP
# MONGO_DB_PORT = 30003
# TRACE_RETRIEVAL_HOSTNAME = HOST_IP
# TRACE_RETRIEVAL_REST_PORT = 30001
\ No newline at end of file
...@@ -8,6 +8,7 @@ RUN apt-get update ...@@ -8,6 +8,7 @@ RUN apt-get update
RUN pip install flask RUN pip install flask
RUN pip install connexion[swagger-ui] RUN pip install connexion[swagger-ui]
RUN pip install pika RUN pip install pika
RUN pip install deprecated
EXPOSE 5000 EXPOSE 5000
......
...@@ -15,7 +15,7 @@ basePath: "/api" ...@@ -15,7 +15,7 @@ basePath: "/api"
paths: paths:
/debug: /debug:
post: post:
operationId: "debug.echo" operationId: "rest.debug.echo"
tags: tags:
- "Echo" - "Echo"
summary: "Echo function for debugging purposes" summary: "Echo function for debugging purposes"
...@@ -30,17 +30,6 @@ paths: ...@@ -30,17 +30,6 @@ paths:
200: 200:
description: "Successful echo of request data" description: "Successful echo of request data"
/messages:
get:
operationId: "debug.get_messages"
tags:
- "Received Messages"
summary: "List of all received messages"
description: "Lists all messages received from Rabbit MQ."
responses:
200:
description: "Message list"
/trace: /trace:
post: post:
operationId: "rest.blockchain_trace.receive" operationId: "rest.blockchain_trace.receive"
......
...@@ -8,28 +8,11 @@ if os.path.exists(modules_path): ...@@ -8,28 +8,11 @@ if os.path.exists(modules_path):
# init logging to file # init logging to file
import logging import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s') LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig( logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
filename='error.log',
level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
#################################
import connexion import connexion
from multiprocessing import Process
from messaging.MessageReceiver import MessageReceiver
from messaging.MessageSender import MessageSender
from MessageList import MessageList
messages = MessageList.getInstance()
# init message handler
def message_received_callback(channel, method, properties, body):
messages.appendMessage(body)
print(f"### Received: {body}")
def pika_error_callback(error):
# TODO gracefully handle error
print("Rabbit MQ error!")
os._exit(1)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
...@@ -41,7 +24,4 @@ def api_root(): ...@@ -41,7 +24,4 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
message_rec = MessageReceiver(exchange_name='rest-gateway', exchange_type='direct', queue_name='rest-gateway', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
\ No newline at end of file
from flask import request, Response from flask import request, Response
from messaging.MessageSender import MessageSender from messaging.ReconnectingMessageManager import ReconnectingMessageManager
import json import json
# import main
message_sender = MessageSender() message_sender = ReconnectingMessageManager.getInstance()
message_sender.connect() message_sender.create_message_destination('inhub', 'direct')
message_sender.create_exchange('inhub', 'direct')
def receive(): def receive():
body = request.json body = request.json
if isBlockchainTraceValid(body): if isBlockchainTraceValid(body):
message = {'type': 'blockchain-transaction', 'content': json.dumps(body)} message = {'type': 'blockchain-transaction', 'content': body}
message_sender.send('inhub', json.dumps(message), 'trace-retrieval') message_sender.send_message('inhub', 'trace-retrieval', json.dumps(message))
return Response(status=201) return Response(status=201)
return Response(status=400) return Response(status=400)
...@@ -29,5 +28,3 @@ def isBlockchainTraceValid(trace) -> bool: ...@@ -29,5 +28,3 @@ def isBlockchainTraceValid(trace) -> bool:
and 'ResourceMd5' in trace \ and 'ResourceMd5' in trace \
and 'ResourceState' in trace \ and 'ResourceState' in trace \
and 'Metadata' in trace and 'Metadata' in trace
apiVersion: v1
kind: Service
metadata:
name: rest-gateway
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: rest-gateway
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30401
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: rest-gateway name: rest-gateway
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: rest-gateway app: rest-gateway
......
apiVersion: v1
kind: Service
metadata:
name: rest-gateway
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: rest-gateway
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30401
protocol: TCP
\ No newline at end of file
...@@ -20,6 +20,11 @@ class MongoRepository: ...@@ -20,6 +20,11 @@ class MongoRepository:
def get_traces(self, selection: dict = {}): def get_traces(self, selection: dict = {}):
return self._collection.find(selection) return self._collection.find(selection)
def drop_collection(self):
self._collection.drop()
traces = MongoRepository().get_traces() traces = MongoRepository().get_traces()
for t in traces: for t in traces:
print(str(t)) print(str(t))
# MongoRepository().drop_collection()
\ No newline at end of file
...@@ -9,6 +9,7 @@ RUN pip install flask ...@@ -9,6 +9,7 @@ RUN pip install flask
RUN pip install connexion[swagger-ui] RUN pip install connexion[swagger-ui]
RUN pip install pika RUN pip install pika
RUN pip install pymongo RUN pip install pymongo
RUN pip install deprecated
EXPOSE 5000 EXPOSE 5000
......
...@@ -32,7 +32,7 @@ paths: ...@@ -32,7 +32,7 @@ paths:
/trace: /trace:
post: post:
operationId: "rest.blockchain_trace.receive" operationId: "rest.blockchain_trace.post"
tags: tags:
- "Blockchain Trace" - "Blockchain Trace"
summary: "Add a new blockchain trace to SMART" summary: "Add a new blockchain trace to SMART"
...@@ -49,6 +49,18 @@ paths: ...@@ -49,6 +49,18 @@ paths:
description: "Successful operation" description: "Successful operation"
400: 400:
description: "Invalid input" description: "Invalid input"
get:
operationId: "rest.blockchain_trace.get"
tags:
- "Blockchain Trace"
summary: "Get blockchain traces"
description: "Returns all blockchain traces in the database"
parameters: []
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/BlockchainTrace"
definitions: definitions:
BlockchainTrace: BlockchainTrace:
......
import pymongo import pymongo
import network_constants as netconst
MONGO_DB_HOST = 'trace-retrieval-db'
MONGO_DB_PORT = '27017'
class MongoRepository: class MongoRepository:
# TODO extract to docker env var # TODO extract to docker env var
_username = 'root' _username = 'root'
_password = 'root' _password = 'root'
_collection : pymongo.collection.Collection = None
_collection: pymongo.collection.Collection = None
_mongo_client: pymongo.MongoClient = None
def __init__(self, username=_username, password=_password): def __init__(self, username=_username, password=_password):
myclient = pymongo.MongoClient(f"mongodb://{username}:{password}@{MONGO_DB_HOST}:{MONGO_DB_PORT}/") self._mongo_client = pymongo.MongoClient(f"mongodb://{username}:{password}@{netconst.MONGO_DB_HOSTNAME}:{netconst.MONGO_DB_PORT}/")
database = myclient['traceRetrievalDB'] # trace retrieval database = self._mongo_client['traceRetrievalDB']
self._collection = database['traces'] self._collection = database['traces']
def insert_trace(self, content: dict): def insert_trace(self, content: dict):
self._collection.insert_one(content) self._collection.insert_one(content)
def get_traces(self, selection: dict = {}): def get_traces(self, selection: dict = {}, projection: dict = {'_': 0}) -> pymongo.cursor.Cursor:
return self._collection.find(selection) return self._collection.find(selection, projection)
def close_connection(self):
self._mongo_client.close()
self._collection = None
self._mongo_client = None
...@@ -14,26 +14,13 @@ LOGGER = logging.getLogger(__name__) ...@@ -14,26 +14,13 @@ LOGGER = logging.getLogger(__name__)
############################# #############################
import connexion import connexion
from messaging.MessageHandler import MessageHandler from messaging.MessageHandler import MessageHandler
from messaging.MessageReceiver import MessageReceiver from messaging.ReconnectingMessageManager import ReconnectingMessageManager
message_rec: MessageReceiver = None
# init message handler # init message handler
message_handler = MessageHandler() message_handler = MessageHandler()
def message_received_callback(channel, method, properties, body): def message_received_callback(channel, method, properties, body):
LOGGER.info(f"Received new message: {body}")
message_handler.handle_generic(body) message_handler.handle_generic(body)
def pika_error_callback(error):
LOGGER.warning(f"RabbitMQ stopped with error: {error}")
# restart receiver
message_rec.stop()
init_message_receiver()
def init_message_receiver():
message_rec = MessageReceiver(exchange_name='inhub', exchange_type='direct', queue_name='trace-retrieval', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml') app.add_api('swagger.yml')
...@@ -44,6 +31,7 @@ def api_root(): ...@@ -44,6 +31,7 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
init_message_receiver() message_manager = ReconnectingMessageManager.getInstance()
message_manager.start_consuming('inhub', 'direct', 'trace-retrieval', True, message_received_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
from db.MongoRepository import MongoRepository from db.MongoRepository import MongoRepository
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
import json import json
import logging import logging
...@@ -6,19 +7,38 @@ LOGGER = logging.getLogger(__name__) ...@@ -6,19 +7,38 @@ LOGGER = logging.getLogger(__name__)
class MessageHandler: class MessageHandler:
_mongo_repo = None _mongo_repo = None
_message_sender = None
def __init__(self): def __init__(self):
self._mongo_repo = MongoRepository() self._mongo_repo = MongoRepository()
self._init_message_sender()
def _init_message_sender(self):
self._message_sender = ReconnectingMessageManager.getInstance()
self._message_sender.create_message_destination('datahub', 'direct')
def handle_generic(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
message = None
try:
message = json.loads(body) message = json.loads(body)
except ValueError:
LOGGER.warning("Message is not in JSON format and is ignored")
return
if not 'type' in message: if not 'type' in message:
LOGGER.warning(f"Message has no type field -> ignored") LOGGER.warning("Message has no type field and is ignored")
return return
if message['type'] == 'blockchain-transaction': if message['type'] == 'blockchain-transaction':
self.handle_blockchain_transaction(json.loads(message['content'])) self.handle_blockchain_transaction(message['content'])
else:
LOGGER.info("Message Type could not be processed")
def handle_blockchain_transaction(self, transaction): def handle_blockchain_transaction(self, transaction):
self._mongo_repo.insert_trace(transaction) self._mongo_repo.insert_trace(transaction)
# inform semantic linking microservice
msg = {'type': 'new-traces-available'}
self._message_sender.send_message('datahub', 'semantic-linking', json.dumps(msg))
\ No newline at end of file
from flask import request, Response from flask import request, Response
from db.MongoRepository import MongoRepository
def receive(): mongo_repo = MongoRepository()
def post():
return Response(status=501) return Response(status=501)
def get():
return list(mongo_repo.get_traces(projection={'_id': 0}))
\ No newline at end of file
...@@ -20,7 +20,7 @@ kind: Deployment ...@@ -20,7 +20,7 @@ kind: Deployment
metadata: metadata:
name: trace-retrieval name: trace-retrieval
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: trace-retrieval app: trace-retrieval
...@@ -34,6 +34,7 @@ spec: ...@@ -34,6 +34,7 @@ spec:
image: 172.16.1.20:5000/trace-retrieval-microservice image: 172.16.1.20:5000/trace-retrieval-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
imagePullPolicy: Always
--- ---
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment