Commit c8376756 authored by Alexander Lercher's avatar Alexander Lercher

Semantic Linking: receives messages and loads traces from trace retrieval

parent 0652b32f
...@@ -14,7 +14,7 @@ basePath: "/api" ...@@ -14,7 +14,7 @@ basePath: "/api"
paths: paths:
/debug: /debug:
post: post:
operationId: "debug.echo" operationId: "rest.debug.echo"
tags: tags:
- "Echo" - "Echo"
summary: "Echo function for debugging purposes" summary: "Echo function for debugging purposes"
......
import logging
LOGGER = logging.getLogger(__name__)
class Processor:
def __init__(self):
pass
def process(self, traces: list):
LOGGER.info("called processing")
\ No newline at end of file
# add modules folder to interpreter path
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
# init logging to file
import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
logging.basicConfig(level=logging.INFO, format=LOG_FORMAT)
LOGGER = logging.getLogger(__name__)
#################################
import connexion import connexion
from messaging.MessageReceiver import MessageReceiver
from messaging.MessageHandler import MessageHandler
# init message handler
message_rec: MessageReceiver = None
message_handler = MessageHandler()
def message_received_callback(channel, method, properties, body):
message_handler.handle_generic(body)
def pika_error_callback(error):
LOGGER.warning(f"RabbitMQ stopped with error: {error}")
# restart receiver
global message_rec
message_rec.stop()
init_message_receiver()
def init_message_receiver():
global message_rec
message_rec = MessageReceiver(exchange_name='datahub', exchange_type='direct', queue_name='semantic-linking', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
...@@ -10,4 +45,5 @@ def api_root(): ...@@ -10,4 +45,5 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=True) init_message_receiver()
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)
import json
import requests
import network_constants as netconst
from intelligence_zahra.Processor import Processor
import logging
LOGGER = logging.getLogger(__name__)
class MessageHandler:
_processor: Processor = None
def __init__(self):
self._processor = Processor()
def handle_generic(self, body):
LOGGER.info(f"Received message: {body}")
message = None
try:
message = json.loads(body)
except ValueError:
LOGGER.warning("Message is not in JSON format and is ignored")
return
if not 'type' in message:
LOGGER.warning("Message has no type field and is ignored")
return
if message['type'] == 'new-traces-available':
self.handle_new_traces_available()
else:
LOGGER.info("Message Type could not be processed")
def handle_new_traces_available(self):
# get traces
url = f'http://{netconst.TRACE_RETRIEVAL_HOSTNAME}:{netconst.TRACE_RETRIEVAL_REST_PORT}/api/trace'
response = requests.get(url)
if response.status_code == 200:
traces = response.json()
self._processor.process(traces)
else:
LOGGER.error(f"Could not retrieve JSON from {url} with GET request")
...@@ -2,9 +2,12 @@ ...@@ -2,9 +2,12 @@
RABBIT_MQ_HOSTNAME = 'rabbit-mq' RABBIT_MQ_HOSTNAME = 'rabbit-mq'
RABBIT_MQ_PORT = 5672 RABBIT_MQ_PORT = 5672
MONGO_DB_HOST = 'trace-retrieval-db' MONGO_DB_HOSTNAME = 'trace-retrieval-db'
MONGO_DB_PORT = 27017 MONGO_DB_PORT = 27017
TRACE_RETRIEVAL_HOSTNAME = 'trace-retrieval'
TRACE_RETRIEVAL_REST_PORT = 80
### outside k8s ### outside k8s
# HOST_IP = '143.205.173.36' # HOST_IP = '143.205.173.36'
...@@ -12,4 +15,7 @@ MONGO_DB_PORT = 27017 ...@@ -12,4 +15,7 @@ MONGO_DB_PORT = 27017
# RABBIT_MQ_PORT = 30302 # RABBIT_MQ_PORT = 30302
# MONGO_DB_HOST = HOST_IP # MONGO_DB_HOST = HOST_IP
# MONGO_DB_PORT = 30003 # MONGO_DB_PORT = 30003
\ No newline at end of file
# TRACE_RETRIEVAL_HOSTNAME = HOST_IP
# TRACE_RETRIEVAL_REST_PORT = 30001
\ No newline at end of file
...@@ -10,7 +10,7 @@ class MongoRepository: ...@@ -10,7 +10,7 @@ class MongoRepository:
_mongo_client: pymongo.MongoClient = None _mongo_client: pymongo.MongoClient = None
def __init__(self, username=_username, password=_password): def __init__(self, username=_username, password=_password):
self._mongo_client = pymongo.MongoClient(f"mongodb://{username}:{password}@{netconst.MONGO_DB_HOST}:{netconst.MONGO_DB_PORT}/") self._mongo_client = pymongo.MongoClient(f"mongodb://{username}:{password}@{netconst.MONGO_DB_HOSTNAME}:{netconst.MONGO_DB_PORT}/")
database = self._mongo_client['traceRetrievalDB'] database = self._mongo_client['traceRetrievalDB']
self._collection = database['traces'] self._collection = database['traces']
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment