Commit b1f74f85 authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'develop'

parents fca98b09 5f88eba2
Pipeline #13 failed with stages
**/__pycache__ **/__pycache__
*.log **/.vscode
\ No newline at end of file **/.idea
*.log
...@@ -15,3 +15,10 @@ The *images* folder holds the images used in this document. ...@@ -15,3 +15,10 @@ The *images* folder holds the images used in this document.
## Overall microservice architecture ## Overall microservice architecture
![SMART architecture image](images/smart-architecture.png) ![SMART architecture image](images/smart-architecture.png)
## Trace input handling
1. New traces are POSTed to the REST gateway
1. The gateway fowards the new trace to the message broker
1. The trace retrieval microservice receives the message, stores the trace in its document-based database and sends a notification to the message broker
1. The semantic linking microservice receives the notification and GETs all traces (including the new one) from the trace retrieval microservice
1. All traces can now be processed
![Input handling image](images/input-handling.png)
apiVersion: v1
kind: Service
metadata:
name: agent-discovery
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: agent-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30104
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: agent-discovery name: agent-discovery
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: agent-discovery app: agent-discovery
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: agent-discovery - name: agent-discovery
image: 172.16.1.20:5000/agent-discovery-microservice image: 143.205.173.97:5000/agent-discovery-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: agent-discovery
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: agent-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30104
protocol: TCP
\ No newline at end of file
...@@ -11,7 +11,7 @@ RUN pip install connexion[swagger-ui] ...@@ -11,7 +11,7 @@ RUN pip install connexion[swagger-ui]
EXPOSE 5000 EXPOSE 5000
WORKDIR /app WORKDIR /app
COPY data-hub/community-detecion-microservice/app/ /app/ COPY data-hub/community-detection-microservice/app/ /app/
RUN chmod a+x main.py RUN chmod a+x main.py
CMD ["python", "./main.py"] CMD ["python", "./main.py"]
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: community-detection
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: community-detection
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30109
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: community-detection name: community-detection
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: community-detection app: community-detection
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: community-detection - name: community-detection
image: 172.16.1.20:5000/community-detection-microservice image: 143.205.173.97:5000/community-detection-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: community-detection
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: community-detection
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30109
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: democratic-reasoning
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: democratic-reasoning
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30106
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: democratic-reasoning name: democratic-reasoning
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: democratic-reasoning app: democratic-reasoning
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: democratic-reasoning - name: democratic-reasoning
image: 172.16.1.20:5000/democratic-reasoning-microservice image: 143.205.173.97:5000/democratic-reasoning-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: democratic-reasoning
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: democratic-reasoning
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30106
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: geo-profiling
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: geo-profiling
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30105
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: geo-profiling name: geo-profiling
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: geo-profiling app: geo-profiling
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: geo-profiling - name: geo-profiling
image: 172.16.1.20:5000/geo-profiling-microservice image: 143.205.173.97:5000/geo-profiling-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: geo-profiling
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: geo-profiling
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30105
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: pareto-trust
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: pareto-trust
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30108
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: pareto-trust name: pareto-trust
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: pareto-trust app: pareto-trust
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: pareto-trust - name: pareto-trust
image: 172.16.1.20:5000/pareto-trust-microservice image: 143.205.173.97:5000/pareto-trust-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: pareto-trust
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: pareto-trust
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30108
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: reputation-calculation
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: reputation-calculation
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30107
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: reputation-calculation name: reputation-calculation
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: reputation-calculation app: reputation-calculation
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: reputation-calculation - name: reputation-calculation
image: 172.16.1.20:5000/reputation-calculation-microservice image: 143.205.173.97:5000/reputation-calculation-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: reputation-calculation
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: reputation-calculation
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30107
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: role-discovery
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: role-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30102
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: role-discovery name: role-discovery
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: role-discovery app: role-discovery
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: role-discovery - name: role-discovery
image: 172.16.1.20:5000/role-discovery-microservice image: 143.205.173.97:5000/role-discovery-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: role-discovery
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: role-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30102
protocol: TCP
\ No newline at end of file
import json
nodeIds = []
destIds = []
clusterlabels = []
destclusterlabel = []
cluster = []
labalvlues = []
def classify():
df_nodes = load_values()
for row in df_nodes:
for j in range(len(row['TransactionFrom'])):
print(" Input Ids: ", row['TransactionFrom'][j])
nodeIds.append(row['TransactionFrom'])
print("This is nodes: ", nodeIds)
for row in df_nodes:
destIds.append(row['TransactionTo'])
for row in range(len(nodeIds)):
print(nodeIds[row])
print("Finish InputIDs")
i = 0
for row in range(len(nodeIds)):
clusterlabels.append(row)
i += 1
print(i)
"""" classifying Inputs"""
"""" Labaling inputs"""
for row in range(len(nodeIds)):
for rown in range(len(nodeIds[row])):
for row1 in range(len(nodeIds)):
for rown1 in range(len(nodeIds[row1])):
if(nodeIds[row][rown]==nodeIds[row1][rown1]):
# print("row: ",row,"row1: ",row1)
if(row < row1):
for row2 in clusterlabels:
if( clusterlabels[row1]== clusterlabels[row2]):
clusterlabels[row2]=clusterlabels[row]
clusterlabels[row1] = clusterlabels[row]
else:
for row2 in clusterlabels:
if (clusterlabels[row] == clusterlabels[row2]):
clusterlabels[row2] = clusterlabels[row1]
clusterlabels[row] = clusterlabels[row1]
print(clusterlabels)
print("cluster labels:", len(clusterlabels))
print("NodeIDs: ", len(nodeIds))
"""" Calculating the number of clusters"""
clusternum = 1
labalvlues.append(clusterlabels[0])
for row in range(len(clusterlabels)):
flag = True
for row1 in range(len(labalvlues)):
if(clusterlabels[row]== labalvlues[row1]):
flag = False
if (flag):
clusternum = + 1
labalvlues.append(clusterlabels[row])
print("label values (source Ids in the network): ", labalvlues, " and the number of clusters is: ", len(labalvlues))
"""" clustering Ids according to their labels"""
for row in range(len(labalvlues)):
cluster.append([])
for row3 in range(len(nodeIds)):
if (labalvlues[row] == clusterlabels[row3]):
cluster[row].extend(nodeIds[row3])
print("clusters: ", cluster)
""" Removing duplicating items in cluster"""
flag = True
while(flag):
for row in range(len(cluster)):
flag= False
for row1 in range(len(cluster[row])):
flag= False
for row2 in range (len(cluster[row])):
if(row1 != row2):
if(cluster[row][row1] == cluster[row][row2]):
del cluster[row][row2]
flag=True
break
if(flag):
break
if(flag):
break
print("cluster:", cluster)
"""" Clustering Destination Ids """
for row in range(len(destIds)):
destclusterlabel.append([])
for row2 in range(len(destIds[row])):
flag = True
for rownum in range(len(labalvlues)):
for row1 in range(len(cluster[rownum])):
if(destIds[row][row2]== cluster[rownum][row1]):
destclusterlabel[row].append(labalvlues[rownum])
flag = False
if(flag):
destclusterlabel.append(destIds[row][row2])
print("destination labels (destination Ids): ", destclusterlabel)
def load_values():
with open("mult_in_out_large.json", "r") as json_file:
df_nodes = json.load(json_file)
return df_nodes
\ No newline at end of file
import networkx as nx
import matplotlib.pyplot as plt
from collections import Counter
import HyperGraph as hg
import warnings
# pip install networkx
# pip install matplotlib
## pip install pandas
## pip install community
## pip install mplleaflet
## pip install values
class SemanticLinking:
def __init__(self):
hg.classify()
def _color_network(self, G):
"""Colors the network so that neighboring nodes all have distinct colors.
Returns a dict keyed by color to a set of nodes with that color.
"""
coloring = dict() # color => set(node)
colors = nx.coloring.greedy_color(G)
for node, color in colors.items():
if color in coloring:
coloring[color].add(node)
else:
coloring[color] = set([node])
return coloring
def _labeling_complete(self, labeling, G):
"""Determines whether or not LPA is done.
Label propagation is complete when all nodes have a label that is
in the set of highest frequency labels amongst its neighbors.
Nodes with no neighbors are considered complete.
"""
return all(labeling[v] in self._most_frequent_labels(v, labeling, G)
for v in G if len(G[v]) > 0)
def _most_frequent_labels(self, node, labeling, G):
"""Returns a set of all labels with maximum frequency in `labeling`.
Input `labeling` should be a dict keyed by node to labels.
"""
if not G[node]:
# Nodes with no neighbors are themselves a community and are labeled
# accordingly, hence the immediate if statement.
return {labeling[node]}
# Compute the frequencies of all neighbours of node
freqs = Counter(labeling[q] for q in G[node])
max_freq = max(freqs.values())
return {label for label, freq in freqs.items() if freq == max_freq}
def _update_label(self, node, labeling, G):
"""Updates the label of a node using the Prec-Max tie breaking algorithm
The algorithm is explained in: 'Community Detection via Semi-Synchronous
Label Propagation Algorithms' Cordasco and Gargano, 2011
"""
high_labels = self._most_frequent_labels(node, labeling, G)
if len(high_labels) == 1:
labeling[node] = high_labels.pop()
elif len(high_labels) > 1:
# Prec-Max
if labeling[node] not in high_labels:
labeling[node] = max(high_labels)
warnings.filterwarnings('ignore')
#G = nx.DiGraph(directed=True)
G = nx.MultiDiGraph(day="Stackoverflow")
df_nodes = hg.clusterlabels
destf_nodes = hg.destclusterlabel
color_map = {1: '#f09494', 2: '#eebcbc', 3: '#72bbd0', 4: '#91f0a1', 5: '#629fff', 6: '#bcc2f2',
7: '#eebcbc', 8: '#f1f0c0', 9: '#d2ffe7', 10: '#caf3a6', 11: '#ffdf55', 12: '#ef77aa',
13: '#d6dcff', 14: '#d2f5f0'}
i=0
graphedge = []
weigth = []
sourcedestination = []
source = []
dest = []
edge_width = []
weight1 = []
node_adjacencies = []
labeling = {}
def drawedges(self):
"""drawing edges in graph"""
for drow in range(len(self.df_nodes)):
for row in range(len(self.destf_nodes[drow])):
self.G.add_edge(self.df_nodes[drow], self.destf_nodes[drow][row])
for row in range(len(hg.labalvlues)):
for row1 in range(len(hg.labalvlues)):
self.weight1.append(self.G.number_of_edges(hg.labalvlues[row], hg.labalvlues[row1]))
print("The number of coccurance from node ", hg.labalvlues[row],"to node ", hg.labalvlues[row1], ": ", self.weight1[row1])
self.G.__setattr__('weight', self.weight1)
def dolabeling(self):
"""label_propagation_communities(G) """
coloring = self._color_network(self.G)
# Create a unique label for each node in the graph
labeling = {v: k for k, v in enumerate(self.G)}
print("lable value: ", labeling.values())
while not self._labeling_complete(labeling, self.G):
# Update the labels of every node with the same color.
print("lable value: ", labeling.values())
for color, nodes in coloring.items():
for n in nodes:
self._update_label(n, labeling, self.G)
for label in set(labeling.values()):
print("lable value: ", labeling.values())
self.labeling = labeling
def findigneighbors(self):
""" findig nodes' adjecencies"""
node_text = []
for node, adjacencies in enumerate(self.G.adjacency()):
self.node_adjacencies.append(len(adjacencies[1]))
node_text.append('# of connections: '+str(len(adjacencies[1])))
self.G.color = self.node_adjacencies
def result(self):
plt.figure(figsize=(25, 25))
options = {
'with_labels': True,
'font_weight': 'regular',
}
# colors = [color_map[G.node[node][1]] for node in G]
# sizes = [G.node[node]['Timestamp'] * 10 for node in G]
d = nx.degree_centrality(self.G)
d_list = list(d.values())
print("node centrality: ", d_list)
print("node adjacencies: ", self.node_adjacencies)
for row in range(len(self.weigth)):
self.edge_width.append([])
for drow in range(len(self.weigth[row])):
self.edge_width[row].append(self.weigth[row][drow])
node_size = [v * 80 for v in d.values()] # setting node size based on node centrality
edge_width = [row * 0.5 for row in self.weight1]
print("Nodes' Degree: ", nx.degree(self.G))
print("Nodes' Betweeness ", nx.edge_betweenness_centrality(self.G))
print("Nodes' Betweeness-centrality: ", nx.betweenness_centrality(self.G))
"""
Using the spring layout :
- k controls the distance between the nodes and varies between 0 and 1
- iterations is the number of times simulated annealing is run
default k=0.1 and iterations=50
"""
labels2 = {}
for idx, edge in enumerate(self.G.edges):
labels2[edge] = "s"
pos_nodes = nx.spring_layout(self.G, k=0.25, iterations=50)
nx.draw(self.G, pos_nodes, node_color=self.node_adjacencies, node_size=node_size, width=2, arrowstyle='->',
arrowsize=10, weight=self.weight1, edge_color='gray', **options)
edge_labels = nx.get_edge_attributes(self.G, 'weight')
pos_attrs = {}
for node, coords in pos_nodes.items():
pos_attrs[node] = (coords[0], coords[1] + 0.02)
nx.draw_networkx_edge_labels(self.G, pos_nodes, edge_labels=edge_labels, font_size=10, font_color='red')
nx.draw_networkx_labels(self.G, pos_attrs, labels=self.labeling, font_size=10, font_color='red')
ax = plt.gca()
ax.collections[0].set_edgecolor("#555555")
plt.show()
def main(self):
self.drawedges()
self.dolabeling()
self.findigneighbors()
self.result()
linking = SemanticLinking()
linking.main()
\ No newline at end of file
import logging
LOGGER = logging.getLogger(__name__)
class Processor:
def __init__(self):
pass
def process(self, traces: list):
LOGGER.info(f"called processing with: {str(traces)}")
\ No newline at end of file
...@@ -2,16 +2,13 @@ import json ...@@ -2,16 +2,13 @@ import json
import requests import requests
from threading import Thread from threading import Thread
import network_constants as netconst import network_constants as netconst
from intelligence_zahra.Processor import Processor
import logging import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageHandler: class MessageHandler:
_processor: Processor = None
def __init__(self): def __init__(self):
self._processor = Processor() pass
def handle_generic(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
...@@ -43,6 +40,7 @@ class MessageHandler: ...@@ -43,6 +40,7 @@ class MessageHandler:
if response.status_code == 200: if response.status_code == 200:
traces = response.json() traces = response.json()
Thread(target=self._processor.process(traces)).start() # TODO integrate zahras code
# Thread(target=self._processor.process(traces)).start()
else: else:
LOGGER.error(f"Could not retrieve JSON from {url} with GET request ({response.status_code})") LOGGER.error(f"Could not retrieve JSON from {url} with GET request ({response.status_code})")
...@@ -5,7 +5,7 @@ metadata: ...@@ -5,7 +5,7 @@ metadata:
spec: spec:
type: LoadBalancer type: LoadBalancer
externalIPs: externalIPs:
- 143.205.173.36 - 143.205.173.102
selector: selector:
app: semantic-linking app: semantic-linking
ports: ports:
...@@ -31,6 +31,6 @@ spec: ...@@ -31,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: semantic-linking - name: semantic-linking
image: 172.16.1.20:5000/semantic-linking-microservice image: 143.205.173.97:5000/semantic-linking-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: stage-discovery
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: stage-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30103
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: stage-discovery name: stage-discovery
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: stage-discovery app: stage-discovery
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: stage-discovery - name: stage-discovery
image: 172.16.1.20:5000/stage-discovery-microservice image: 143.205.173.97:5000/stage-discovery-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: stage-discovery
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: stage-discovery
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30103
protocol: TCP
\ No newline at end of file
import os import os
import sys
docker_reg = '172.16.1.20:5000' kube_command = 'apply'
if len(sys.argv) > 1:
kube_command = sys.argv[1]
paths = [] paths = []
for p, _, f in os.walk('./'): for p, _, f in os.walk('./'):
...@@ -9,4 +12,4 @@ for p, _, f in os.walk('./'): ...@@ -9,4 +12,4 @@ for p, _, f in os.walk('./'):
paths.append(os.path.join(p, '')) paths.append(os.path.join(p, ''))
for path in paths: for path in paths:
os.system(f"kubectl apply -f {path}") os.system(f"kubectl {kube_command} -f {path}")
\ No newline at end of file
...@@ -5,7 +5,7 @@ metadata: ...@@ -5,7 +5,7 @@ metadata:
spec: spec:
type: LoadBalancer type: LoadBalancer
externalIPs: externalIPs:
- 143.205.173.36 - 143.205.173.102
selector: selector:
app: rabbit-mq app: rabbit-mq
ports: ports:
......
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
class DummyMessageManager(ReconnectingMessageManager):
'''Dummy class to be used for testing - sets itself as singleton instance if called before ReconnectingMessageManager'''
@staticmethod
def get_instance():
if ReconnectingMessageManager._instance == None:
ReconnectingMessageManager._instance = DummyMessageManager()
assert isinstance(ReconnectingMessageManager._instance, DummyMessageManager)
return ReconnectingMessageManager._instance
last_message = {}
def _init_message_manager(self):
pass
def create_message_destination(self, exch, exch_type):
pass
def send_message(self, exch, key, mess):
self.last_message = {'ex': exch, 'key': key, 'msg': mess}
...@@ -6,7 +6,7 @@ import logging ...@@ -6,7 +6,7 @@ import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageManager: class MessageManager:
'''The MessageManager is used for sending and receiving messages''' '''This Class is used for sending and receiving messages with RabbitMQ'''
_rabbit_mq_ip = None _rabbit_mq_ip = None
_rabbit_mq_port = None _rabbit_mq_port = None
...@@ -21,7 +21,7 @@ class MessageManager: ...@@ -21,7 +21,7 @@ class MessageManager:
self._rabbit_mq_port = rabbit_mq_port self._rabbit_mq_port = rabbit_mq_port
def connect(self, error_callback=None): def connect(self, error_callback=None):
'''Creates a connection with two channels to RabbitMQ''' '''Creates a connection with two channels'''
self._error_callback = error_callback self._error_callback = error_callback
self._connection = self._connect_async() self._connection = self._connect_async()
...@@ -55,11 +55,12 @@ class MessageManager: ...@@ -55,11 +55,12 @@ class MessageManager:
self._receive_channel = channel self._receive_channel = channel
LOGGER.info("RabbitMQ connection established") LOGGER.info("RabbitMQ connection established")
def create_exchange_with_queue(self, exchange_name, exchange_type, queue_name): def create_exchange_with_queue(self, exchange_name, exchange_type, queue_name, callback):
'''Creates exchange and queue and binds them''' '''Creates exchange and queue and binds them'''
self._prepare_receive_parameters = {'exchange_name': exchange_name, self._prepare_receive_parameters = {'exchange_name': exchange_name,
'exchange_type': exchange_type, 'exchange_type': exchange_type,
'queue_name': queue_name} 'queue_name': queue_name,
'callback': callback}
self._receive_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, callback=self._exchange_created_callback) self._receive_channel.exchange_declare(exchange=exchange_name, exchange_type=exchange_type, callback=self._exchange_created_callback)
...@@ -74,8 +75,11 @@ class MessageManager: ...@@ -74,8 +75,11 @@ class MessageManager:
exchange_name = self._prepare_receive_parameters['exchange_name'] exchange_name = self._prepare_receive_parameters['exchange_name']
# Bind queue to exchange # Bind queue to exchange
self._receive_channel.queue_bind(exchange=exchange_name, queue=queue_name) self._receive_channel.queue_bind(exchange=exchange_name, queue=queue_name, callback=self._queue_bound_callback)
LOGGER.info(f"RabbitMQ connection to exchange '{exchange_name}' established")
def _queue_bound_callback(self, _):
if self._prepare_receive_parameters['callback'] != None:
self._prepare_receive_parameters['callback']()
def start_consuming(self, queue_name, auto_ack, message_received_callback): def start_consuming(self, queue_name, auto_ack, message_received_callback):
'''Starts listening for messages''' '''Starts listening for messages'''
...@@ -87,10 +91,6 @@ class MessageManager: ...@@ -87,10 +91,6 @@ class MessageManager:
def send_message(self, exchange_name, routing_key, message): def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange''' '''Sends a message to the exchange'''
if self._send_channel == None:
LOGGER.error("Tried to send before connection to RabbitMQ was established")
raise RuntimeError("Connection to RabbitMQ not established")
self._send_channel.basic_publish(exchange_name, routing_key, message) self._send_channel.basic_publish(exchange_name, routing_key, message)
def disconnect(self): def disconnect(self):
......
...@@ -6,25 +6,27 @@ import logging ...@@ -6,25 +6,27 @@ import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class ReconnectingMessageManager: class ReconnectingMessageManager:
'''The ReconnectingMessageManager Singleton handles connection errors by itself''' '''This Class can be used to access RabbitMQ. It handles connection errors by trying to reconnect every second'''
__instance = None _instance = None
_message_manager: MessageManager = None _message_manager: MessageManager = None
_consuming = False _consuming = False
_consumption_parameters = None _consumption_parameters = None
_sending_parameters = None
@staticmethod @staticmethod
def getInstance() -> ReconnectingMessageManager: def getInstance() -> ReconnectingMessageManager:
''' Static access method. ''' ''' Static access method. '''
if ReconnectingMessageManager.__instance == None: if ReconnectingMessageManager._instance == None:
ReconnectingMessageManager.__instance = ReconnectingMessageManager() ReconnectingMessageManager._instance = ReconnectingMessageManager()
return ReconnectingMessageManager.__instance return ReconnectingMessageManager._instance
def __init__(self): def __init__(self):
if ReconnectingMessageManager.__instance != None: '''Do not use the constructor as it is a Singleton!'''
if ReconnectingMessageManager._instance != None:
raise Exception("This class is a singleton!") raise Exception("This class is a singleton!")
ReconnectingMessageManager.__instance = self ReconnectingMessageManager._instance = self
self._init_message_manager() self._init_message_manager()
def _init_message_manager(self): def _init_message_manager(self):
...@@ -36,30 +38,58 @@ class ReconnectingMessageManager: ...@@ -36,30 +38,58 @@ class ReconnectingMessageManager:
# restart receiver # restart receiver
self._message_manager.disconnect() self._message_manager.disconnect()
self._init_message_manager() self._init_message_manager()
if self._consuming: if self._consuming:
self._restart_consuming() self._restart_consuming()
if self._sending_parameters != None:
self._reinit_sending()
def _restart_consuming(self): def _restart_consuming(self):
self.start_consuming(self._consumption_parameters['exchange_name'], if self._consumption_parameters != None:
self.start_consuming(self._consumption_parameters['exchange_name'],
self._consumption_parameters['exchange_type'], self._consumption_parameters['exchange_type'],
self._consumption_parameters['queue_name'], self._consumption_parameters['queue_name'],
self._consumption_parameters['auto_ack'], self._consumption_parameters['auto_ack'],
self._consumption_parameters['message_received_callback']) self._consumption_parameters['message_received_callback'])
def _reinit_sending(self):
if self._sending_parameters != None:
self.create_message_destination(self._sending_parameters['exchange_name'],
self._sending_parameters['exchange_type'])
def start_consuming(self, exchange_name, exchange_type, queue_name, auto_ack, message_received_callback): def start_consuming(self, exchange_name, exchange_type, queue_name, auto_ack, message_received_callback):
'''Creates exchange and queue and starts to listen to new messages''' '''Creates exchange and queue and starts to listen to new messages'''
self._consumption_parameters = {'exchange_name': exchange_name, 'exchange_type': exchange_type, self._consumption_parameters = {'exchange_name': exchange_name,
'queue_name': queue_name, 'auto_ack': auto_ack, 'exchange_type': exchange_type,
'queue_name': queue_name,
'auto_ack': auto_ack,
'message_received_callback': message_received_callback} 'message_received_callback': message_received_callback}
self._consuming = True self._consuming = True
self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name) try:
self._message_manager.start_consuming(queue_name, auto_ack, message_received_callback) self._message_manager.create_exchange_with_queue(exchange_name, exchange_type, queue_name, self._exchange_created_callback)
except:
LOGGER.error("Error while creating exchange and queue")
def _exchange_created_callback(self):
LOGGER.info("Exchange and queue set up")
self._message_manager.start_consuming(self._consumption_parameters['queue_name'],
self._consumption_parameters['auto_ack'],
self._consumption_parameters['message_received_callback'])
def create_message_destination(self, exchange_name, exchange_type): def create_message_destination(self, exchange_name, exchange_type):
'''Creates the exchange''' '''Creates the exchange for sending messages'''
self._message_manager.create_exchange(exchange_name, exchange_type) self._sending_parameters = {'exchange_name': exchange_name,
'exchange_type': exchange_type}
try:
self._message_manager.create_exchange(exchange_name, exchange_type)
except:
LOGGER.error("Error while creating exchange")
def send_message(self, exchange_name, routing_key, message): def send_message(self, exchange_name, routing_key, message):
'''Sends a message to the exchange''' '''Sends a message to the exchange'''
self._message_manager.send_message(exchange_name, routing_key, message) try:
self._message_manager.send_message(exchange_name, routing_key, message)
except:
LOGGER.error("Error while sending message")
...@@ -9,12 +9,12 @@ TRACE_RETRIEVAL_HOSTNAME = 'trace-retrieval' ...@@ -9,12 +9,12 @@ TRACE_RETRIEVAL_HOSTNAME = 'trace-retrieval'
TRACE_RETRIEVAL_REST_PORT = 80 TRACE_RETRIEVAL_REST_PORT = 80
### outside k8s ### outside k8s
# HOST_IP = '143.205.173.36' # HOST_IP = '143.205.173.102'
# RABBIT_MQ_HOSTNAME = HOST_IP # RABBIT_MQ_HOSTNAME = HOST_IP
# RABBIT_MQ_PORT = 30302 # RABBIT_MQ_PORT = 30302
# MONGO_DB_HOST = HOST_IP # MONGO_DB_HOSTNAME = HOST_IP
# MONGO_DB_PORT = 30003 # MONGO_DB_PORT = 30003
# TRACE_RETRIEVAL_HOSTNAME = HOST_IP # TRACE_RETRIEVAL_HOSTNAME = HOST_IP
......
from __future__ import annotations
class MessageList:
__instance = None
_messages = []
@staticmethod
def getInstance() -> MessageList:
""" Static access method. """
if MessageList.__instance == None:
MessageList.__instance = MessageList()
return MessageList.__instance
def __init__(self):
""" Virtually private constructor. """
if MessageList.__instance != None:
raise Exception("This class is a singleton!")
else:
MessageList.__instance = self
def appendMessage(self, message):
self._messages.append(message)
def getMessages(self):
return self._messages
\ No newline at end of file
from MessageList import MessageList
from flask import request from flask import request
def echo(): def echo():
return request.json return request.json
def get_messages():
messages = MessageList.getInstance().getMessages()
return str(messages)
...@@ -5,7 +5,7 @@ metadata: ...@@ -5,7 +5,7 @@ metadata:
spec: spec:
type: LoadBalancer type: LoadBalancer
externalIPs: externalIPs:
- 143.205.173.36 - 143.205.173.102
selector: selector:
app: rest-gateway app: rest-gateway
ports: ports:
...@@ -31,6 +31,6 @@ spec: ...@@ -31,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: rest-gateway - name: rest-gateway
image: 172.16.1.20:5000/rest-gateway image: 143.205.173.97:5000/rest-gateway
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
import unittest import unittest
import manage_sys_paths import manage_sys_paths
from messaging.DummyMessageManager import DummyMessageManager
# init dummy message manager so no connection to rabbitmq is established
_ = DummyMessageManager.get_instance()
import rest.blockchain_trace as blockchain_trace import rest.blockchain_trace as blockchain_trace
class Test_BlockchainTrace(unittest.TestCase): class Test_BlockchainTrace(unittest.TestCase):
...@@ -25,15 +29,15 @@ class Test_BlockchainTrace(unittest.TestCase): ...@@ -25,15 +29,15 @@ class Test_BlockchainTrace(unittest.TestCase):
input = self._get_valid_input() input = self._get_valid_input()
self.assertTrue(blockchain_trace.isBlockchainTraceValid(input), "Trace should be valid") self.assertTrue(blockchain_trace.isBlockchainTraceValid(input), "Trace should be valid")
def test_isBlockchainTraceValid_invalidMetadataInputType(self): # def test_isBlockchainTraceValid_invalidMetadataInputType(self):
input = self._get_valid_input() # input = self._get_valid_input()
input["Metadata"] = "string" # input["Metadata"] = "string"
self.assertFalse(blockchain_trace.isBlockchainTraceValid(input), "Metadata type should be invalid") # self.assertFalse(blockchain_trace.isBlockchainTraceValid(input), "Metadata type should be invalid")
def test_isBlockchainTraceValid_invalidTransactionFromLatLngInputType(self): # def test_isBlockchainTraceValid_invalidTransactionFromLatLngInputType(self):
input = self._get_valid_input() # input = self._get_valid_input()
input["TransactionFromLatLng"] = ["55.1", "44.1"] # input["TransactionFromLatLng"] = ["55.1", "44.1"]
self.assertFalse(blockchain_trace.isBlockchainTraceValid(input), "TransactionFromLatLng type should be invalid") # self.assertFalse(blockchain_trace.isBlockchainTraceValid(input), "TransactionFromLatLng type should be invalid")
def test_isBlockchainTraceValid_emptyInput(self): def test_isBlockchainTraceValid_emptyInput(self):
input = {} input = {}
......
# add modules folder to interpreter path
import sys
import os
modules_path = '../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import requests import requests
import random import random
import json import json
import network_constants
ports = [] ports = []
# transaction hub in # transaction hub in
...@@ -12,15 +20,18 @@ ports.extend(range(30201, 30205)) ...@@ -12,15 +20,18 @@ ports.extend(range(30201, 30205))
print(f"Checking {len(ports)} ports") print(f"Checking {len(ports)} ports")
network_constants.HOST_IP
home_url = 'http://143.205.173.36:{port}/' home_url = 'http://' + str(network_constants.HOST_IP) + ':{port}/'
debug_url = home_url + 'api/debug' debug_url = home_url + 'api/debug'
print(debug_url) print(debug_url)
for port in ports: for port in ports:
response = requests.get(home_url.replace("{port}", str(port))) try:
print(response.text) response = requests.get(home_url.replace("{port}", str(port)))
print(response.text)
data = {'data': f"Echo {str(random.random())}"} data = {'data': f"Echo {str(random.random())}"}
response = requests.post(debug_url.replace("{port}", str(port)), json=data) response = requests.post(debug_url.replace("{port}", str(port)), json=data)
print(response.text) # print(response.text)
\ No newline at end of file except:
print(f"Didnt work for port {port}")
\ No newline at end of file
# add modules folder to interpreter path
import sys
import os
modules_path = '../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import pika import pika
import network_constants
EXCHANGE_NAME = 'rest-gateway' EXCHANGE_NAME = 'inhub'
EXCHANGE_TYPE = 'direct' EXCHANGE_TYPE = 'direct'
ROUTING_KEY = 'rest-gateway' ROUTING_KEY = 'trace-retrieval'
HOST = network_constants.HOST_IP
connection = pika.BlockingConnection(pika.ConnectionParameters('143.205.173.36', 30302, heartbeat=600, blocked_connection_timeout=300)) connection = pika.BlockingConnection(pika.ConnectionParameters(HOST, 30302, heartbeat=60, blocked_connection_timeout=30))
channel = connection.channel() channel = connection.channel()
channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type=EXCHANGE_TYPE) channel.exchange_declare(exchange=EXCHANGE_NAME, exchange_type=EXCHANGE_TYPE)
......
# add modules folder to interpreter path
import sys
import os
modules_path = '../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
import pymongo import pymongo
import network_constants
MONGO_DB_HOST = '143.205.173.36' MONGO_DB_HOST = network_constants.HOST_IP
MONGO_DB_PORT = '30003' MONGO_DB_PORT = '30003'
class MongoRepository: class MongoRepository:
......
apiVersion: v1
kind: Service
metadata:
name: network-metrics-retrieval
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: network-metrics-retrieval
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30002
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: network-metrics-retrieval name: network-metrics-retrieval
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: network-metrics-retrieval app: network-metrics-retrieval
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: network-metrics-retrieval - name: network-metrics-retrieval
image: 172.16.1.20:5000/network-metrics-retrieval-microservice image: 143.205.173.97:5000/network-metrics-retrieval-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: network-metrics-retrieval
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: network-metrics-retrieval
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30002
protocol: TCP
\ No newline at end of file
...@@ -13,11 +13,13 @@ LOGGER = logging.getLogger(__name__) ...@@ -13,11 +13,13 @@ LOGGER = logging.getLogger(__name__)
############################# #############################
import connexion import connexion
from db.MongoRepository import MongoRepository
from messaging.MessageHandler import MessageHandler from messaging.MessageHandler import MessageHandler
from messaging.ReconnectingMessageManager import ReconnectingMessageManager from messaging.ReconnectingMessageManager import ReconnectingMessageManager
# init message handler # init message handler
message_handler = MessageHandler() message_handler = MessageHandler(MongoRepository(), ReconnectingMessageManager.getInstance())
def message_received_callback(channel, method, properties, body): def message_received_callback(channel, method, properties, body):
message_handler.handle_generic(body) message_handler.handle_generic(body)
......
from db.MongoRepository import MongoRepository
from messaging.ReconnectingMessageManager import ReconnectingMessageManager
import json import json
import logging import logging
LOGGER = logging.getLogger(__name__) LOGGER = logging.getLogger(__name__)
class MessageHandler: class MessageHandler:
MSG_NOT_JSON = "Message is not in JSON format and is ignored"
MSG_NO_TYPE = "Message has no type field and is ignored"
MSG_NOT_PROCESSED = "Message Type could not be processed"
MSG_TRACE_PROCESSED = "Message handled as blockchain-transaction"
_mongo_repo = None _mongo_repo = None
_message_sender = None _message_sender = None
def __init__(self): def __init__(self, mongo_repo, message_sender):
self._mongo_repo = MongoRepository() self._mongo_repo = mongo_repo
self._init_message_sender() self._message_sender = message_sender
def _init_message_sender(self):
self._message_sender = ReconnectingMessageManager.getInstance()
self._message_sender.create_message_destination('datahub', 'direct') self._message_sender.create_message_destination('datahub', 'direct')
def handle_generic(self, body): def handle_generic(self, body):
LOGGER.info(f"Received message: {body}") LOGGER.info(f"Received message: {body}")
result = None
message = None message = None
try: try:
message = json.loads(body) message = json.loads(body)
except ValueError: except (ValueError, TypeError):
LOGGER.warning("Message is not in JSON format and is ignored") result = self.MSG_NOT_JSON
return LOGGER.warning(result)
return result
if not 'type' in message: if not 'type' in message:
LOGGER.warning("Message has no type field and is ignored") result = self.MSG_NO_TYPE
return LOGGER.warning(result)
return result
if message['type'] == 'blockchain-transaction': if message['type'] == 'blockchain-transaction':
self.handle_blockchain_transaction(message['content']) self.handle_blockchain_transaction(message['content'])
result = self.MSG_TRACE_PROCESSED
else: else:
LOGGER.info("Message Type could not be processed") result = self.MSG_NOT_PROCESSED
LOGGER.info(result)
return result
def handle_blockchain_transaction(self, transaction): def handle_blockchain_transaction(self, transaction):
self._mongo_repo.insert_trace(transaction) self._mongo_repo.insert_trace(transaction)
......
...@@ -5,7 +5,7 @@ metadata: ...@@ -5,7 +5,7 @@ metadata:
spec: spec:
type: LoadBalancer type: LoadBalancer
externalIPs: externalIPs:
- 143.205.173.36 - 143.205.173.102
selector: selector:
app: trace-retrieval app: trace-retrieval
ports: ports:
...@@ -31,7 +31,7 @@ spec: ...@@ -31,7 +31,7 @@ spec:
spec: spec:
containers: containers:
- name: trace-retrieval - name: trace-retrieval
image: 172.16.1.20:5000/trace-retrieval-microservice image: 143.205.173.97:5000/trace-retrieval-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
imagePullPolicy: Always imagePullPolicy: Always
...@@ -43,7 +43,7 @@ metadata: ...@@ -43,7 +43,7 @@ metadata:
spec: spec:
type: LoadBalancer type: LoadBalancer
externalIPs: externalIPs:
- 143.205.173.36 - 143.205.173.102
selector: selector:
app: trace-retrieval-db app: trace-retrieval-db
ports: ports:
......
# add modules folder to interpreter path
import sys
import os
modules_paths = ['../app/', '../../../modules/']
for path in modules_paths:
if os.path.exists(path):
sys.path.insert(1, path)
print(f"added {path}")
import unittest
import manage_sys_paths
import json
from messaging.MessageHandler import MessageHandler
class DummyMongoRepo:
'''Dummy class to be used for testing the MessageHandler'''
last_trace = None
def insert_trace(self, trace):
self.last_trace = trace
from messaging.DummyMessageManager import DummyMessageManager as DummyMessageSender
class Test_MessageHandler(unittest.TestCase):
handler = None
repo = None
msg_sender = None
def setUp(self):
self.repo = DummyMongoRepo()
self.msg_sender = DummyMessageSender.get_instance()
self.handler = MessageHandler(self.repo, self.msg_sender)
def test_handleGeneric_emptyMessage_NotJsonError(self):
res = self.handler.handle_generic('')
self.assertEqual(self.handler.MSG_NOT_JSON, res)
def test_handleGeneric_noneMessage_NotJsonError(self):
res = self.handler.handle_generic(None)
self.assertEqual(self.handler.MSG_NOT_JSON, res)
def test_handleGeneric_emptyJson1_NoTypeFieldError(self):
message = json.dumps({})
res = self.handler.handle_generic(message)
self.assertEqual(self.handler.MSG_NO_TYPE, res)
def test_handleGeneric_emptyJson2_NoTypeFieldError(self):
message = json.dumps('')
res = self.handler.handle_generic(message)
self.assertEqual(self.handler.MSG_NO_TYPE, res)
def test_handleGeneric_missingTypeJson_NoTypeFieldError(self):
message = json.dumps({'test': 'content'})
res = self.handler.handle_generic(message)
self.assertEqual(self.handler.MSG_NO_TYPE, res)
def test_handleGeneric_randomType1_NotProcessed(self):
message = json.dumps({'type': ''})
res = self.handler.handle_generic(message)
self.assertEqual(self.handler.MSG_NOT_PROCESSED, res)
def test_handleGeneric_randomType2_NotProcessed(self):
message = json.dumps({'type': 'test'})
res = self.handler.handle_generic(message)
self.assertEqual(self.handler.MSG_NOT_PROCESSED, res)
def test_handleGeneric_randomTypeWithOtherField_NotProcessed(self):
message = json.dumps({'type': 'test', 'other': 'content'})
res = self.handler.handle_generic(message)
self.assertEqual(self.handler.MSG_NOT_PROCESSED, res)
def _get_valid_message(self) -> str:
message_values = \
{ 'type': 'blockchain-transaction',
'content':
{
"ApplicationType": "string",
"Metadata": {},
"ResourceIds": "string",
"ResourceMd5": "string",
"ResourceState": "string",
"Timestamp": "2019-08-27T14:00:48.587Z",
"TransactionFrom": "string",
"TransactionFromLatLng": "string",
"TransactionId": "string",
"TransactionTo": "string",
"TransactionToLatLng": "string",
"TransferredAsset": "string"
}
}
return json.dumps(message_values)
def test_handleGeneric_correctTraceContent_ProcessedResult(self):
res = self.handler.handle_generic(self._get_valid_message())
self.assertEqual(self.handler.MSG_TRACE_PROCESSED, res)
def test_handleGeneric_correctTraceContent_AddedToRepo(self):
msg = self._get_valid_message()
_ = self.handler.handle_generic(msg)
trace = json.loads(msg)['content']
self.assertEqual(trace, self.repo.last_trace)
def test_handleGeneric_correctTraceContent_NotificationSentCorrectly(self):
msg = self._get_valid_message()
_ = self.handler.handle_generic(msg)
self.assertEqual('datahub', self.msg_sender.last_message['ex'])
self.assertEqual('semantic-linking', self.msg_sender.last_message['key'])
self.assertEqual(json.dumps({'type': 'new-traces-available'}), self.msg_sender.last_message['msg'])
if __name__ == '__main__':
unittest.main()
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: data-access
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: data-access
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30205
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: data-access name: data-access
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: data-access app: data-access
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: data-access - name: data-access
image: 172.16.1.20:5000/data-access-microservice image: 143.205.173.97:5000/data-access-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: data-access
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: data-access
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30205
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: event-detection
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: event-detection
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30204
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: event-detection name: event-detection
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: event-detection app: event-detection
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: event-detection - name: event-detection
image: 172.16.1.20:5000/event-detection-microservice image: 143.205.173.97:5000/event-detection-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: event-detection
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: event-detection
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30204
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: smart-matching
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: smart-matching
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30203
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: smart-matching name: smart-matching
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: smart-matching app: smart-matching
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: smart-matching - name: smart-matching
image: 172.16.1.20:5000/smart-matching-microservice image: 143.205.173.97:5000/smart-matching-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: smart-matching
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: smart-matching
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30203
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: tokenized-decision-making
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: tokenized-decision-making
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30201
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: tokenized-decision-making name: tokenized-decision-making
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: tokenized-decision-making app: tokenized-decision-making
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: tokenized-decision-making - name: tokenized-decision-making
image: 172.16.1.20:5000/tokenized-decision-making-microservice image: 143.205.173.97:5000/tokenized-decision-making-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: tokenized-decision-making
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: tokenized-decision-making
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30201
protocol: TCP
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: trust-sla
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.102
selector:
app: trust-sla
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30202
protocol: TCP
---
apiVersion: apps/v1 apiVersion: apps/v1
kind: Deployment kind: Deployment
metadata: metadata:
name: trust-sla name: trust-sla
spec: spec:
replicas: 2 replicas: 1
selector: selector:
matchLabels: matchLabels:
app: trust-sla app: trust-sla
...@@ -16,6 +31,6 @@ spec: ...@@ -16,6 +31,6 @@ spec:
spec: spec:
containers: containers:
- name: trust-sla - name: trust-sla
image: 172.16.1.20:5000/trust-sla-microservice image: 143.205.173.97:5000/trust-sla-microservice
ports: ports:
- containerPort: 5000 - containerPort: 5000
\ No newline at end of file
apiVersion: v1
kind: Service
metadata:
name: trust-sla
spec:
type: LoadBalancer
externalIPs:
- 143.205.173.36
selector:
app: trust-sla
ports:
- name: http
port: 80
targetPort: 5000
nodePort: 30202
protocol: TCP
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment