Commit 34ef4d75 authored by Alexander Lercher's avatar Alexander Lercher

Semantic Linking: traces are loaded via REST to Trace Retrieval

Fixed workaround from commit bf2c831c. 
Traces are loaded via REST Get instead of direct DB access.
parent 71e8e785
import pymongo
import network_constants as netconst
class MongoRepository:
# TODO extract to docker env var
_username = 'root'
_password = 'root'
_collection: pymongo.collection.Collection = None
_mongo_client: pymongo.MongoClient = None
def __init__(self, username=_username, password=_password):
self._mongo_client = pymongo.MongoClient(f"mongodb://{username}:{password}@{netconst.MONGO_DB_HOSTNAME}:{netconst.MONGO_DB_PORT}/")
database = self._mongo_client['traceRetrievalDB']
self._collection = database['traces']
def insert_trace(self, content: dict):
self._collection.insert_one(content)
def get_traces(self, selection: dict = {}, projection: dict = {'_': 0}) -> pymongo.cursor.Cursor:
return self._collection.find(selection, projection)
def close_connection(self):
self._mongo_client.close()
self._collection = None
self._mongo_client = None
...@@ -13,28 +13,14 @@ LOGGER = logging.getLogger(__name__) ...@@ -13,28 +13,14 @@ LOGGER = logging.getLogger(__name__)
################################# #################################
import connexion import connexion
from messaging.MessageReceiver import MessageReceiver from messaging.ReconnectingMessageManager import ReconnectingMessageManager
from messaging.MessageHandler import MessageHandler from messaging.MessageHandler import MessageHandler
# init message handler # init message handler
message_rec: MessageReceiver = None
message_handler = MessageHandler() message_handler = MessageHandler()
def message_received_callback(channel, method, properties, body): def message_received_callback(channel, method, properties, body):
message_handler.handle_generic(body) message_handler.handle_generic(body)
def pika_error_callback(error):
LOGGER.warning(f"RabbitMQ stopped with error: {error}")
# restart receiver
global message_rec
message_rec.stop()
init_message_receiver()
def init_message_receiver():
global message_rec
message_rec = MessageReceiver(exchange_name='datahub', exchange_type='direct', queue_name='semantic-linking', auto_ack=True)
message_rec.start(message_received_callback, pika_error_callback)
# load swagger config # load swagger config
app = connexion.App(__name__, specification_dir='configs/') app = connexion.App(__name__, specification_dir='configs/')
app.add_api('swagger.yml') app.add_api('swagger.yml')
...@@ -45,5 +31,7 @@ def api_root(): ...@@ -45,5 +31,7 @@ def api_root():
# start app # start app
if __name__ == '__main__': if __name__ == '__main__':
init_message_receiver() message_manager = ReconnectingMessageManager.getInstance()
message_manager.start_consuming('datahub', 'direct', 'semantic-linking', True, message_received_callback)
app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False) app.run(host='0.0.0.0', port=5000, debug=True, use_reloader=False)
import json import json
import requests import requests
from threading import Thread
import network_constants as netconst import network_constants as netconst
from db.MongoRepository import MongoRepository
from intelligence_zahra.Processor import Processor from intelligence_zahra.Processor import Processor
import logging import logging
...@@ -33,19 +33,16 @@ class MessageHandler: ...@@ -33,19 +33,16 @@ class MessageHandler:
LOGGER.info("Message Type could not be processed") LOGGER.info("Message Type could not be processed")
def handle_new_traces_available(self): def handle_new_traces_available(self):
# TODO remove workaround # get all traces and call the Processor
repo = MongoRepository()
traces = list(repo.get_traces(projection={'_id':0}))
self._processor.process(traces)
repo.close_connection()
return
# get traces
url = f'http://{netconst.TRACE_RETRIEVAL_HOSTNAME}:{netconst.TRACE_RETRIEVAL_REST_PORT}/api/trace' url = f'http://{netconst.TRACE_RETRIEVAL_HOSTNAME}:{netconst.TRACE_RETRIEVAL_REST_PORT}/api/trace'
response = requests.get(url)
# disable university http(s) proxy for name resolution and routing
session = requests.Session()
session.trust_env = False
response = session.get(url)
if response.status_code == 200: if response.status_code == 200:
traces = response.json() traces = response.json()
self._processor.process(traces) Thread(target=self._processor.process(traces)).start()
else: else:
LOGGER.error(f"Could not retrieve JSON from {url} with GET request ({response.status_code})") LOGGER.error(f"Could not retrieve JSON from {url} with GET request ({response.status_code})")
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment