Commit fd3515f4 authored by Alexander Lercher's avatar Alexander Lercher

Trace Retrieval: More robust message handling

parent fde49aef
...@@ -7,11 +7,13 @@ class MongoRepository: ...@@ -7,11 +7,13 @@ class MongoRepository:
# TODO extract to docker env var # TODO extract to docker env var
_username = 'root' _username = 'root'
_password = 'root' _password = 'root'
_collection : pymongo.collection.Collection = None
_collection: pymongo.collection.Collection = None
_mongo_client: pymongo.MongoClient = None
def __init__(self, username=_username, password=_password): def __init__(self, username=_username, password=_password):
myclient = pymongo.MongoClient(f"mongodb://{username}:{password}@{MONGO_DB_HOST}:{MONGO_DB_PORT}/") self._mongo_client = pymongo.MongoClient(f"mongodb://{username}:{password}@{MONGO_DB_HOST}:{MONGO_DB_PORT}/")
database = myclient['traceRetrievalDB'] # trace retrieval database = self._mongo_client['traceRetrievalDB']
self._collection = database['traces'] self._collection = database['traces']
def insert_trace(self, content: dict): def insert_trace(self, content: dict):
...@@ -19,3 +21,8 @@ class MongoRepository: ...@@ -19,3 +21,8 @@ class MongoRepository:
def get_traces(self, selection: dict = {}): def get_traces(self, selection: dict = {}):
return self._collection.find(selection) return self._collection.find(selection)
def close_connection(self):
self._mongo_client.close()
self._collection = None
self._mongo_client = None
...@@ -21,16 +21,17 @@ message_rec: MessageReceiver = None ...@@ -21,16 +21,17 @@ message_rec: MessageReceiver = None
# init message handler # init message handler
message_handler = MessageHandler() message_handler = MessageHandler()
def message_received_callback(channel, method, properties, body): def message_received_callback(channel, method, properties, body):
LOGGER.info(f"Received new message: {body}")
message_handler.handle_generic(body) message_handler.handle_generic(body)
def pika_error_callback(error): def pika_error_callback(error):
LOGGER.warning(f"RabbitMQ stopped with error: {error}") LOGGER.warning(f"RabbitMQ stopped with error: {error}")
# restart receiver # restart receiver
global message_rec
message_rec.stop() message_rec.stop()
init_message_receiver() init_message_receiver()
def init_message_receiver(): def init_message_receiver():
global message_rec
message_rec = MessageReceiver(exchange_name='inhub', exchange_type='direct', queue_name='trace-retrieval', auto_ack=True) message_rec = MessageReceiver(exchange_name='inhub', exchange_type='direct', queue_name='trace-retrieval', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback) message_rec.start(message_received_callback, pika_error_callback)
......
...@@ -12,9 +12,16 @@ class MessageHandler: ...@@ -12,9 +12,16 @@ class MessageHandler:
def handle_generic(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
message = json.loads(body)
message = None
try:
message = json.loads(body)
except ValueError:
LOGGER.warning(f"Message is not in JSON format and is ignored")
return
if not 'type' in message: if not 'type' in message:
LOGGER.warning(f"Message has no type field -> ignored") LOGGER.warning(f"Message has no type field and is ignored")
return return
if message['type'] == 'blockchain-transaction': if message['type'] == 'blockchain-transaction':
......
...@@ -20,7 +20,7 @@ kind: Deployment ...@@ -20,7 +20,7 @@ kind: Deployment
metadata: metadata:
name: trace-retrieval name: trace-retrieval
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: trace-retrieval app: trace-retrieval
...@@ -34,6 +34,7 @@ spec: ...@@ -34,6 +34,7 @@ spec:
image: 172.16.1.20:5000/trace-retrieval-microservice image: 172.16.1.20:5000/trace-retrieval-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
imagePullPolicy: Always
--- ---
apiVersion: v1 apiVersion: v1
kind: Service kind: Service
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment