Commit 6d186827 authored by Manuel's avatar Manuel

[reputation, semantic-linking] added MQ communication

parent 03f0f4a7
......@@ -13,23 +13,29 @@ class MessageHandler:
self.user_trust_repository = user_trust_repository
self.trust_trace_repository = trust_trace_repository
def handle(self, body: str):
# decode the message
data: Dict = None
try:
data = json.loads(body)
except json.decoder.JSONDecodeError:
raise ValueError("Invalid Message: Not in JSON format")
def handle_generic(self, body: str):
LOGGER.info(f"Received message: {body}")
if "type" not in data.keys() or "content" not in data.keys():
raise ValueError(
"Invalid Message: Missing fields \"type\" or \"content\"")
message = None
try:
message = json.loads(body)
except ValueError:
LOGGER.warning("Message is not in JSON format and is ignored")
return
if data["type"] != "new-trace":
if not 'type' in message:
LOGGER.warning("Message has no type field and is ignored")
return
content: Dict = data["content"]
if message['type'] == 'new-trace':
self.handle(message)
elif message['type'] == 'new-traces-available':
self.handle_new_traces_available()
else:
LOGGER.info("Message Type could not be processed")
def handle(self, content: Dict):
# decode the message
if "use_case" not in content.keys() or "table" not in content.keys() or "properties" not in content.keys():
raise ValueError(
"Invalid Message: Missing fields \"use_case\" or \"table\" or \"properties\"")
......
# add modules folder to interpreter path
from lib.trust.message_handler import MessageHandler
import sys
import os
from pathlib import Path
......@@ -18,6 +19,7 @@ import connexion
from security import swagger_util
from pathlib import Path
import env_info
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
from flask import request
from flask import redirect
......@@ -47,7 +49,23 @@ else:
app.add_api(swagger_util.get_bundled_specs(Path(swagger_path)),
resolver = connexion.RestyResolver("cms_rest_api"))
from lib.database.repositories.trust_adapter_repository import TrustAdapterRepository
from lib.database.repositories.trust_trace_repository import TrustTraceRepository
from lib.database.repositories.user_repository import UserRepository
from lib.database.repositories.user_trust_repository import UserTrustRepository
message_handler: MessageHandler = MessageHandler(
trust_adapter_repository=TrustAdapterRepository(),
trust_trace_repository=TrustTraceRepository(),
user_repository=UserRepository(),
user_trust_repository=UserTrustRepository()
)
def message_received_callback(channel, method, properties, body):
message_handler.handle_generic(body)
# start app
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, ssl_context=context)#, debug=True)
\ No newline at end of file
message_manager = ReconnectingMessageManager.getInstance()
message_manager.start_consuming('datahub', 'direct', 'reputation-calculation', True, message_received_callback)
app.run(host='0.0.0.0', port=5000, ssl_context=context)
\ No newline at end of file
......@@ -70,7 +70,6 @@ class MessageHandler:
mappings - Required: contains string->path mappings, describing how the flattened object is built
'''
print("yey new trace ^.^")
flattened = {}
# iterate over schema mappings and resolve paths
......@@ -245,3 +244,6 @@ class MessageHandler:
# inform semantic linking microservice
self._message_sender.send_message(
'datahub', 'semantic-linking', msg_string)
# inform reputation-calculation microservice
self._message_sender.send_message(
'datahub', 'reputation-calculation', msg_string)
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment