Commit fde49aef authored by Alexander Lercher's avatar Alexander Lercher

Trace Retrieval: restarting message_receiver; cleanup

parent eaa08e45
import pymongo
MONGO_DB_HOST = '143.205.173.36'
MONGO_DB_PORT = '30003'
class MongoRepository:
# TODO extract to docker env var
_username = 'root'
_password = 'root'
_collection : pymongo.collection.Collection = None
def __init__(self, username=_username, password=_password):
myclient = pymongo.MongoClient(f"mongodb://{username}:{password}@{MONGO_DB_HOST}:{MONGO_DB_PORT}/")
database = myclient['traceRetrievalDB'] # trace retrieval
self._collection = database['traces']
def insert_trace(self, content: dict):
self._collection.insert_one(content)
def get_traces(self, selection: dict = {}):
return self._collection.find(selection)
traces = MongoRepository().get_traces()
for t in traces:
print(str(t))
\ No newline at end of file
...@@ -15,7 +15,7 @@ basePath: "/api" ...@@ -15,7 +15,7 @@ basePath: "/api"
paths: paths:
/debug: /debug:
post: post:
operationId: "debug.echo" operationId: "rest.debug.echo"
tags: tags:
- "Echo" - "Echo"
summary: "Echo function for debugging purposes" summary: "Echo function for debugging purposes"
...@@ -32,7 +32,7 @@ paths: ...@@ -32,7 +32,7 @@ paths:
/trace: /trace:
post: post:
operationId: "blockchain_trace.receive" operationId: "rest.blockchain_trace.receive"
tags: tags:
- "Blockchain Trace" - "Blockchain Trace"
summary: "Add a new blockchain trace to SMART" summary: "Add a new blockchain trace to SMART"
......
import pymongo import pymongo
MONGO_DB_HOST = 'trace-retrieval-db' MONGO_DB_HOST = 'trace-retrieval-db'
# MONGO_DB_HOST = '143.205.173.36'
MONGO_DB_PORT = '27017' MONGO_DB_PORT = '27017'
# MONGO_DB_PORT = '30003'
class MongoRepository: class MongoRepository:
# TODO extract to docker env var # TODO extract to docker env var
......
...@@ -8,24 +8,31 @@ if os.path.exists(modules_path): ...@@ -8,24 +8,31 @@ if os.path.exists(modules_path):
# init logging to file # init logging to file
import logging import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s') LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(filename='error.log', level=logging.WARNING, format=LOG_FORMAT) logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
#############################
import connexion import connexion
from messaging.MessageHandler import MessageHandler from messaging.MessageHandler import MessageHandler
from messaging.MessageReceiver import MessageReceiver from messaging.MessageReceiver import MessageReceiver
from messaging.MessageSender import MessageSender
RABBIT_MQ_DNS_NAME = 'rabbit-mq' message_rec: MessageReceiver = None
RABBIT_MQ_PORT = '5672'
message_sender = None
# init message handler # init message handler
message_handler = MessageHandler() message_handler = MessageHandler()
def message_received_callback(channel, method, properties, body): def message_received_callback(channel, method, properties, body):
message_handler.handle_message(body) LOGGER.info(f"Received new message: {body}")
message_sender.send('rest-gateway', str(body) + ' (sent from trace-retrieval-microservice)', 'rest-gateway') message_handler.handle_generic(body)
def pika_error_callback(error):
LOGGER.warning(f"RabbitMQ stopped with error: {error}")
# restart receiver
message_rec.stop()
init_message_receiver()
def init_message_receiver():
message_rec = MessageReceiver(exchange_name='inhub', exchange_type='direct', queue_name='trace-retrieval', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
...@@ -37,11 +44,6 @@ def api_root(): ...@@ -37,11 +44,6 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
message_rec = MessageReceiver(rabbit_mq_ip=RABBIT_MQ_DNS_NAME, rabbit_mq_port=RABBIT_MQ_PORT, exchange_name='inhub', exchange_type='direct', queue_name='trace-retrieval', auto_ack=True) init_message_receiver()
message_rec.start(message_received_callback)
message_sender = MessageSender(rabbit_mq_ip=RABBIT_MQ_DNS_NAME, rabbit_mq_port=RABBIT_MQ_PORT)
message_sender.connect()
message_sender.create_exchange('rest-gateway', 'direct')
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) # disable reloader so only subscribed once to rabbitmq
...@@ -10,11 +10,11 @@ class MessageHandler: ...@@ -10,11 +10,11 @@ class MessageHandler:
def __init__(self): def __init__(self):
self._mongo_repo = MongoRepository() self._mongo_repo = MongoRepository()
def handle_message(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
message = json.loads(body) message = json.loads(body)
if not 'type' in message: if not 'type' in message:
LOGGER.warning(f"Message has no type field") LOGGER.warning(f"Message has no type field -> ignored")
return return
if message['type'] == 'blockchain-transaction': if message['type'] == 'blockchain-transaction':
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment