Commit 06ee0e8e authored by Manuel's avatar Manuel

[reputation_calculation] added logic for trust computation and message_handler

parent 6ff4bfac
......@@ -4,7 +4,7 @@ paths:
#####
/use-cases/{use_case}/contexts/{context}/trust-adapters:
get:
operationId: "routes.trust_adapter.all_for_use_case_and_context"
operationId: "lib.routes.trust_adapter.all_for_use_case_and_context"
tags:
- "Trust Adapter"
summary: "Get all trust adapters for a use case"
......@@ -26,7 +26,7 @@ paths:
"404":
description: "Use-Case or Context was not found"
post:
operationId: "routes.trust_adapter.add"
operationId: "lib.routes.trust_adapter.add"
tags:
- "Trust Adapter"
summary: "Add a new trust adapters to a use case/context"
......@@ -57,7 +57,7 @@ paths:
#####
/use-cases/{use_case}/users:
get:
operationId: "routes.user.all_for_use_case"
operationId: "lib.routes.user.all_for_use_case"
tags:
- "User"
summary: "Get all users for a use case"
......@@ -76,7 +76,7 @@ paths:
"400":
description: "Wrong or missing parameters"
post:
operationId: "routes.user.add"
operationId: "lib.routes.user.add"
tags:
- "User"
summary: "Adds a new user to the system"
......@@ -100,7 +100,7 @@ paths:
#####
/use-cases/{use_case}/contexts:
get:
operationId: "routes.context.all_for_use_case"
operationId: "lib.routes.context.all_for_use_case"
tags:
- "Context"
summary: "Get all contexts for a use case"
......@@ -117,7 +117,7 @@ paths:
"404":
description: "Use-Case was not found"
post:
operationId: "routes.context.add"
operationId: "lib.routes.context.add"
tags:
- "Context"
summary: "Adds a new context to the system"
......@@ -182,6 +182,7 @@ definitions:
required:
- use_case
- context
- table
- user_source
- volume_source
- conversion
......@@ -196,6 +197,9 @@ definitions:
context:
type: string
example: "context1"
table:
type: string
example: "travel"
user_source:
type: string
example: "user//id"
......
......@@ -2,12 +2,17 @@ from typing import Dict
class TrustAdapter:
def __init__(self, use_case: str, context: str, user_source: str, volume_source: str,
conversion: float, certainty: float, cutoff: float, bias_negative: float, bias_positive: float):
def __init__(self, use_case: str, context: str, table: str, user_source: str, volume_source: str,
rating_source: str, rating_lower: float, rating_upper: float, conversion: float,
certainty: float, cutoff: float, bias_negative: float, bias_positive: float):
self.use_case = use_case
self.context = context
self.table = table
self.user_source = user_source
self.volume_source = volume_source
self.rating_source = rating_source
self.rating_lower = rating_lower
self.rating_upper = rating_upper
self.conversion = conversion
self.certainty = certainty
self.cutoff = cutoff
......@@ -18,8 +23,12 @@ class TrustAdapter:
return {
"use_case": self.use_case,
"context": self.context,
"table": self.table,
"user_source": self.user_source,
"volume_source": self.volume_source,
"rating_source": self.rating_source,
"rating_lower": self.rating_lower,
"rating_upper": self.rating_upper,
"conversion": self.conversion,
"certainty": self.certainty,
"cutoff": self.cutoff,
......@@ -29,14 +38,18 @@ class TrustAdapter:
@staticmethod
def from_serializable_dict(data: Dict):
if "use_case" not in data.keys() or "context" not in data.keys() or "user_source" not in data.keys() or "volume_source" not in data.keys() or "conversion" not in data.keys() or "certainty" not in data.keys() or "cutoff" not in data.keys() or "bias_negative" not in data.keys() or "bias_positive" not in data.keys():
if "use_case" not in data.keys() or "context" not in data.keys() or "table" not in data.keys() or "user_source" not in data.keys() or "volume_source" not in data.keys() or "rating_source" not in data.keys() or "conversion" not in data.keys() or "certainty" not in data.keys() or "cutoff" not in data.keys() or "bias_negative" not in data.keys() or "bias_positive" not in data.keys():
raise ValueError("Missing fields for TrustAdapter!")
return TrustAdapter(
data["use_case"],
data["context"],
data["table"],
data["user_source"],
data["volume_source"],
data["rating_source"],
data["rating_lower"],
data["rating_upper"],
data["conversion"],
data["certainty"],
data["cutoff"],
......
from datetime import datetime
from lib.trust.transaction import Transaction
from typing import Dict
class TrustTrace:
def __init__(self, use_case: str, context: str, user: str, volume: float, rating: float):
class TrustTrace(Transaction):
def __init__(self, use_case: str, context: str, user: str, table: str, volume: float, timestamp: datetime, is_trustworthy: float = 1):
super().__init__(timestamp, volume, is_trustworthy=is_trustworthy)
self.use_case = use_case
self.context = context
self.user = user
self.volume = volume
self.oracle_rating = rating
def to_serializable_dict(self):
return {
"volume": self.volume,
"trustworthy": self.trustworthy,
"timestamp": self.timestamp.isoformat(),
"use_case": self.use_case,
"context": self.user,
"user": self.volume,
"volume": self.volume,
"oracle_rating": self.oracle_rating
"table": self.table,
}
@staticmethod
def from_serializable_dict(data: Dict):
if "use_case" not in data.keys() or "context" not in data.keys() or "user" not in data.keys() or "volume" not in data.keys() or "oracle_rating" not in data.keys():
if "volume" not in data.keys() or "trustworthy" not in data.keys() or "timestamp" not in data.keys() or "use_case" not in data.keys() or "context" not in data.keys() or "user" not in data.keys() or "table" not in data.keys():
raise ValueError("Missing fields for User!")
return TrustTrace(
data["use_case"],
data["context"],
data["user"],
data["table"],
data["volume"],
data["oracle_rating"],
datetime.fromisoformat(data["timestamp"]),
is_trustworthy=data["trustworthy"]
)
from lib.database.entities.user_trust import UserTrust
from datetime import datetime
from typing import Dict, List
class User:
def __init__(self, use_case: str, name: str, context_trust: List[UserTrust] = []):
self.use_case = use_case
self.name = name
self.context_trust = context_trust
def to_serializable_dict(self, include_trust: bool = False):
if include_trust:
return {
"use_case": self.use_case,
"name": self.name,
"context_trust": [t.to_serializable_dict() for t in self.context_trust]
}
else:
return {
"use_case": self.use_case,
"name": self.name
}
@staticmethod
def from_serializable_dict(data: Dict):
if "use_case" not in data.keys() or "name" not in data.keys():
raise ValueError("Missing fields for User!")
return User(data["use_case"], data["name"])
from datetime import datetime
from typing import Dict
class User:
def __init__(self, use_case:str, name:str):
class UserTrust:
def __init__(self, use_case: str, context: str, user: str, t: float, last_update: datetime):
self.use_case = use_case
self.name = name
self.context = context
self.user = user
self.t = t
self.last_update = last_update
def to_serializable_dict(self):
return {
"use_case": self.use_case,
"name": self.name
"context": self.context,
"user": self.user,
"t": self.t,
"last_update": self.last_update
}
@staticmethod
def from_serializable_dict(data: Dict):
if "use_case" not in data.keys() or "name" not in data.keys():
if "use_case" not in data.keys() or "context" not in data.keys() or "user" not in data.keys() or "t" not in data.keys() or "last_update" not in data.keys():
raise ValueError("Missing fields for User!")
return User(data["use_case"], data["name"])
\ No newline at end of file
return UserTrust(data["use_case"], data["context"], data["user"], data["t"], data["last_update"])
\ No newline at end of file
......@@ -2,7 +2,7 @@
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
from database.entities.context import Context
from lib.database.entities.context import Context
from typing import List, Dict
class ContextRepository(MongoRepositoryBase):
......@@ -11,7 +11,7 @@ class ContextRepository(MongoRepositoryBase):
def __init__(self):
super().__init__(netconst.REPUTATION_CALCULATION_DB_HOSTNAME,
netconst.REPUTATION_CALCULATION_DB_PORT,
'rest-gateway-db')
'reputation-computation-db')
self._collection = 'context'
def add(self, context: Context):
......
......@@ -2,7 +2,7 @@
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
from database.entities.trust_adapter import TrustAdapter
from lib.database.entities.trust_adapter import TrustAdapter
from typing import List, Dict
class TrustAdapterRepository(MongoRepositoryBase):
......@@ -11,7 +11,7 @@ class TrustAdapterRepository(MongoRepositoryBase):
def __init__(self):
super().__init__(netconst.REPUTATION_CALCULATION_DB_HOSTNAME,
netconst.REPUTATION_CALCULATION_DB_PORT,
'rest-gateway-db')
'reputation-computation-db')
self._collection = 'trust_adapter'
def add(self, adapter: TrustAdapter):
......@@ -22,16 +22,36 @@ class TrustAdapterRepository(MongoRepositoryBase):
return list(result)
def one_for_use_case_and_context(self, use_case: str, context: str) -> List[Dict]:
def all_for_use_case_and_context(self, use_case: str, context: str) -> List[Dict]:
result = super().get_entries(
self._collection,
projection={'_id': False},
selection={"use_case": use_case, "context": context}
)
rows = list(result)
return list(result)
def all_for_use_case_and_table(self, use_case: str, table: str) -> List[Dict]:
result = super().get_entries(
self._collection,
projection={'_id': False},
selection={"use_case": use_case, "table": table}
)
return list(result)
if len(rows) == 0:
def one_for_use_case_and_context_and_table(self, use_case: str, context: str, table: str) -> Dict:
result = super().get_entries(
self._collection,
projection={'_id': False},
selection={"use_case": use_case, "context": context, "table": table}
)
result = list(result)
if len(result) != 1:
return None
return rows[0]
\ No newline at end of file
return result[0]
\ No newline at end of file
......@@ -2,23 +2,25 @@
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
from database.entities.trust_trace import TrustTrace
from lib.database.entities.trust_trace import TrustTrace
from typing import List, Dict
class TrustTraceRepository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
def __init__(self):
super().__init__(netconst.REPUTATION_CALCULATION_DB_HOSTNAME,
netconst.REPUTATION_CALCULATION_DB_PORT,
'rest-gateway-db')
'reputation-computation-db')
self._collection = 'trust_trace'
def add(self, trace: TrustTrace):
super().insert_entry(self._collection, trace.to_serializable_dict())
def all(self) -> List[Dict]:
result = super().get_entries(self._collection, projection={'_id': False})
result = super().get_entries(
self._collection, projection={'_id': False})
return list(result)
......
......@@ -2,27 +2,58 @@
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
from database.entities.user import User
from lib.database.entities.user import User
from lib.database.entities.user_trust import UserTrust
from lib.database.repositories.user_trust_repository import UserTrustRepository
from typing import List, Dict
class UserRepository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
_user_trust_repository = UserTrustRepository()
def __init__(self):
super().__init__(netconst.REPUTATION_CALCULATION_DB_HOSTNAME,
netconst.REPUTATION_CALCULATION_DB_PORT,
'rest-gateway-db')
'reputation-computation-db')
self._collection = 'user'
def add(self, user: User):
super().insert_entry(self._collection, user.to_serializable_dict())
def all(self) -> List[Dict]:
result = super().get_entries(self._collection, projection={'_id': False})
def all(self) -> List[User]:
result = super().get_entries(
self._collection, projection={'_id': False})
result = [User.from_serializable_dict(row) for row in list(result)]
for user in result:
self._enrich_user(user)
return result
def all_for_use_case(self, use_case: str) -> List[User]:
result = super().get_entries(self._collection, projection={
'_id': False}, selection={"use_case": use_case})
result = [User.from_serializable_dict(row) for row in list(result)]
for user in result:
self._enrich_user(user)
return result
def one_by_use_case_and_username(self, use_case: str, username: str) -> User:
result = super().get_entries(self._collection, projection={
'_id': False}, selection={"use_case": use_case, "name": username})
if len(result) != 1:
return None
return list(result)
user: User = User.from_serializable_dict(result[0])
self._enrich_user(user)
def all_for_use_case(self, use_case: str) -> List[Dict]:
result = super().get_entries(self._collection, projection={'_id': False}, selection={"use_case": use_case})
return user
return list(result)
\ No newline at end of file
def _enrich_user(self, user: User) -> User:
user.context_trust = self._user_trust_repository.all_for_user(user)
# global imports (dont't worry, red is normal)
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
from lib.database.entities.user import User
from lib.database.entities.user_trust import UserTrust
from typing import List, Dict
class UserTrustRepository(MongoRepositoryBase):
'''This is a repository for MongoDb.'''
def __init__(self):
super().__init__(netconst.REPUTATION_CALCULATION_DB_HOSTNAME,
netconst.REPUTATION_CALCULATION_DB_PORT,
'reputation-computation-db')
self._collection = 'user_trust'
def add(self, user_trust: UserTrust):
super().insert_entry(self._collection, user_trust.to_serializable_dict())
def all(self) -> List[UserTrust]:
result = super().get_entries(
self._collection, projection={'_id': False})
return [UserTrust.from_serializable_dict(row) for row in list(result)]
def all_for_use_case(self, use_case: str) -> List[UserTrust]:
result = super().get_entries(self._collection, projection={
'_id': False}, selection={"use_case": use_case})
return [UserTrust.from_serializable_dict(row) for row in list(result)]
def all_for_user(self, user: User) -> List[UserTrust]:
result = super().get_entries(self._collection, projection={
'_id': False}, selection={"use_case": user.use_case, "user": user.name})
return [UserTrust.from_serializable_dict(row) for row in list(result)]
def one_for_user_and_context(self, user: User, context: str) -> UserTrust:
result = super().get_entries(self._collection, projection={'_id': False}, selection={
"use_case": user.use_case, "user": user.name, "context": context})
result = list(result)
if len(result) != 1:
return None
return UserTrust.from_serializable_dict(result[0])
def update(self, user_trust: UserTrust):
collection = self._database[self._collection]
collection.update_one({"use_case": user_trust.use_case, "context": user_trust.context, "user": user_trust.user}, {
"$set": user_trust.to_serializable_dict()})
......@@ -5,21 +5,18 @@ from database.repositories.context_repository import ContextRepository
from database.repositories.trust_adapter_repository import TrustAdapterRepository
from flask import request, Response
from typing import List
from typing import List, Dict
context_repository: ContextRepository = ContextRepository()
trust_adapter_repository: TrustAdapterRepository = TrustAdapterRepository()
def add(use_case: str, context: str):
_ensure_context_exists_for_use_case(use_case, context)
data = request.json
adapter_reference = trust_adapter_repository.one_for_use_case_and_context(use_case, context)
if adapter_reference != None:
return Response(status=400, response="There already exists a TrustAdapter.")
adapter_new = None
adapter_new: TrustAdapter = None
try:
adapter_new = TrustAdapter.from_serializable_dict(data)
except ValueError:
......@@ -31,6 +28,11 @@ def add(use_case: str, context: str):
if adapter_new.use_case != use_case:
return Response(status=400, response="Use-Cases in body and url do not match!")
adapter_reference = trust_adapter_repository.one_for_use_case_and_context_and_table(
use_case, context, adapter_new.table)
if adapter_reference != None:
return Response(status=400, response="There already exists a TrustAdapter for this use-case/context/table")
trust_adapter_repository.add(adapter_new)
return adapter_new.to_serializable_dict()
......@@ -39,7 +41,8 @@ def add(use_case: str, context: str):
def all_for_use_case_and_context(use_case: str, context: str):
_ensure_context_exists_for_use_case(use_case, context)
return trust_adapter_repository.one_for_use_case_and_context(use_case, context)
return trust_adapter_repository.all_for_use_case_and_context(use_case, context)
def _ensure_context_exists_for_use_case(use_case: str, context: str) -> Context:
contexts: List[Context] = context_repository.all_for_use_case(use_case)
......
......@@ -21,4 +21,4 @@ def add(use_case: str):
return user_new.to_serializable_dict()
def all_for_use_case(use_case: str):
return user_repository.all_for_use_case(use_case)
return [user.to_serializable_dict(include_trust=True) for user in user_repository.all_for_use_case(use_case)]
from lib.database.entities.trust_trace import TrustTrace
from lib.database.entities.user_trust import UserTrust
from lib.trust.oracle import TrustOracle
from lib.database.entities.trust_adapter import TrustAdapter
from lib.database.entities.user import User
from typing import Dict, List
import logging
from datetime import datetime
LOGGER = logging.getLogger(__name__)
class TrustService:
def __init__(self, adapter_repository, user_repository, user_trust_repository, trust_trace_repository):
self._adapter_repository = adapter_repository
self._user_repository = user_repository
self._user_trust_repository = user_trust_repository
self._trust_trace_repository = trust_trace_repository
def updateSystem(self, use_case: str, table: str, properties: Dict):
adapters: List[TrustAdapter] = self._adapter_repository.all_for_use_case_and_table(
use_case, table)
if len(adapters) == 0:
LOGGER.warning(
f"No Trust-Adapters found for Trace(use-case:{use_case}, table:{table}), aborting...")
return
for adapter in adapters:
if adapter.user_source not in properties.keys() or adapter.volume_source not in properties.keys() or adapter.rating_source not in properties.keys():
raise ValueError(
f"Trace does not contain needed fields for trust-computation. Required fields: {adapter.user_source}, {adapter.volume_source}. Present fields: {properties.keys()}")
username: str = properties[adapter.user_source]
volume: float = properties[adapter.volume_source]
rating: float = properties[adapter.rating_source]
# retrieve user (create one if it doesnt exist)
user: User = self._user_repository.one_by_use_case_and_username(
use_case, username)
if user == None:
user = User(use_case, username)
self._user_repository.add(user)
trace: TrustTrace = TrustTrace(
use_case,
adapter.context,
user.name,
adapter.table,
volume,
datetime.now(),
is_trustworthy=(((rating - adapter.rating_lower) /
(adapter.rating_upper - adapter.rating_lower)) * 2) - 1
)
# retrieve user_trust for the trace (create one if it doesnt exist)
target_trust: UserTrust = None
for user_trust in user.context_trust:
if user_trust.context == adapter.context:
target_trust = user_trust
break
if target_trust == None:
target_trust = UserTrust(
use_case, adapter.context, user.name, 0, datetime(1, 1, 1, 0, 0))
self._user_trust_repository.add(target_trust)
user.context_trust.append(target_trust)
oracle: TrustOracle = TrustOracle(
adapter.conversion, adapter.certainty)
delta_t = oracle.calculate_trust_single(trace)
target_trust.t += delta_t
target_trust.last_update = trace.timestamp
self._trust_trace_repository.add(trace)
self._user_trust_repository.update(target_trust)
from lib.services.trust_service import TrustService
from lib.database.entities.trust_trace import TrustTrace
from lib.database.entities.user_trust import UserTrust
from lib.trust.oracle import TrustOracle
from lib.trust.transaction import Transaction
from lib.database.entities.trust_adapter import TrustAdapter
from lib.database.entities.user import User
from typing import Dict, List
import json
import logging
from datetime import datetime
LOGGER = logging.getLogger(__name__)
class MessageHandler:
def __init__(self, trust_adapter_repository, user_repository, user_trust_repository, trust_trace_repository):
self.trust_adapter_repository = trust_adapter_repository
self.user_repository = user_repository
self.user_trust_repository = user_trust_repository
self.trust_trace_repository = trust_trace_repository
def handle(self, body: str):
# decode the message
data: Dict = None
try:
data = json.loads(body)
except json.decoder.JSONDecodeError:
raise ValueError("Invalid Message: Not in JSON format")
if "type" not in data.keys() or "content" not in data.keys():
raise ValueError(
"Invalid Message: Missing fields \"type\" or \"content\"")
if data["type"] != "new-trace":
return
content: Dict = data["content"]
if "use_case" not in content.keys() or "table" not in content.keys() or "properties" not in content.keys():
raise ValueError(
"Invalid Message: Missing fields \"use_case\" or \"table\" or \"properties\"")
use_case: str = content["use_case"]
table: str = content["table"]
properties: Dict = content["properties"]
trust_service: TrustService = TrustService(
self.trust_adapter_repository,
self.user_repository,
self.user_trust_repository,
self.trust_trace_repository
)
trust_service.updateSystem(use_case, table, properties)
# adapters: List[TrustAdapter] = self.trust_adapter_repository.all_for_use_case_and_table(
# use_case, table)
# if len(adapters) == 0:
# LOGGER.warning(
# f"No Trust-Adapters found for Trace(use-case:{use_case}, table:{table}), aborting...")
# return
# for adapter in adapters:
# if adapter.user_source not in properties.keys() or adapter.volume_source not in properties.keys() or adapter.rating_source not in properties.keys():
# raise ValueError(
# f"Trace does not contain needed fields for trust-computation. Required fields: {adapter.user_source}, {adapter.volume_source}. Present fields: {properties.keys()}")
# username: str = properties[adapter.user_source]
# volume: float = properties[adapter.volume_source]
# rating: float = properties[adapter.rating_source]
# trace: TrustTrace = TrustTrace(
# use_case,
# adapter.context,
# user.name,
# adapter.table,
# volume,
# datetime.now(),
# is_trustworthy=(((rating - adapter.rating_lower) /
# (adapter.rating_upper - adapter.rating_lower)) * 2) - 1
# )
# # retrieve user (create one if it doesnt exist)
# user: User = self.user_repository.one_by_use_case_and_username(
# use_case, username)
# if user == None:
# user = User(use_case, username)
# self.user_repository.add(user)
# # retrieve user_trust for the trace (create one if it doesnt exist)
# target_trust: UserTrust = None
# for user_trust in user.context_trust:
# if user_trust.context == adapter.context:
# target_trust = user_trust
# break
# if target_trust == None:
# target_trust = UserTrust(
# use_case, adapter.context, user.name, 0, datetime(1, 1, 1, 0, 0))
# self.user_trust_repository.add(target_trust)
# user.context_trust.append(target_trust)
# oracle: TrustOracle = TrustOracle(
# adapter.conversion, adapter.certainty)
# delta_t = oracle.calculate_trust_single(trace)
# target_trust.t += delta_t
# target_trust.last_update = trace.timestamp
# self.trust_trace_repository.add(trace)
# self.user_trust_repository.update(target_trust)
class Node:
def __init__(self, initial_t=0):
self.t = initial_t
\ No newline at end of file
import math
from enum import Enum
from lib.trust.transaction import Transaction
from lib.trust.node import Node
from datetime import datetime
from typing import List, Dict
class TrustLabel(Enum):
UNTRUSTWORTHY = 1
UNSURE = 2
TRUSTWORTHY = 3
class TrustOracle:
def __init__(self, translate_value: float, knowledge_border: float):
# translates transaction volume into unitless x-axis value for sigmoid input
self.translate_value = translate_value
# nodes must reach a value of at least |trust(node)| = knowledge_border to get a definite label
self.knowledge_border = knowledge_border
def rate_node(self, node: Node) -> TrustLabel:
if node.t < -self.knowledge_border:
return TrustLabel.UNTRUSTWORTHY
if node.t > self.knowledge_border:
return TrustLabel.TRUSTWORTHY
return TrustLabel.UNSURE
def calculate_trust_single(self, node_transaction: Transaction):
return self.calculate_trust([node_transaction])
def calculate_trust(self, node_transactions: List[Transaction]):
trust_total = 0
for transaction in node_transactions:
transaction_t = abs(transaction.volume / self.translate_value)
trust_total += transaction_t
return self.translate_t_to_trust_value(trust_total)
def translate_t_to_trust_value(self, t_value: float):
# classic sigmoid function as in https://en.wikipedia.org/wiki/Sigmoid_function
sig = 1 / (1 + math.exp(-t_value))
# rescale it st.:
# values are between ]-1;1[
sig *= 2
sig -= 1
return sig
class Transaction:
def __init__(self, timestamp, volume=1, is_trustworthy: float = 1):
self.volume = volume
self.trustworthy = is_trustworthy
self.timestamp = timestamp
from lib.database.entities.trust_trace import TrustTrace
from lib.database.entities.user_trust import UserTrust
from lib.database.entities.user import User
from lib.database.entities.trust_adapter import TrustAdapter
from lib.trust.message_handler import MessageHandler
from typing import List, Dict
import unittest
import json
class DummyTrustAdapterRepo:
_adapters: List[TrustAdapter] = [
TrustAdapter(
"car-sharing",
"context1",
"travel",
"user",
"cost",
"rating",
1,
5,
2000,
0.75,
8,
1,
1
)
]
def add(self, adapter: TrustAdapter):
self._adapters.append(adapter)
def all(self) -> List[Dict]:
return self._adapters
def all_for_use_case_and_context(self, use_case: str, context: str) -> List[Dict]:
return [adapter for adapter in self._adapters if (adapter.use_case == use_case and adapter.context == context)]
def all_for_use_case_and_table(self, use_case: str, table: str) -> List[Dict]:
return [adapter for adapter in self._adapters if (adapter.use_case == use_case and adapter.table == table)]
def one_for_use_case_and_context_and_table(self, use_case: str, context: str, table: str) -> Dict:
result = [adapter for adapter in self._adapters if (
adapter.use_case == use_case and adapter.context == context and adapter.table == table)]
if len(result) != 1:
return None
return result[0]
class DummyUserRepo:
_users: List[User] = []
def clear(self):
self._users.clear()
def add(self, user: User):
self._users.append(user)
def all(self) -> List[User]:
return self._users
def all_for_use_case(self, use_case: str) -> List[User]:
return [user for user in self._users if (user.use_case == use_case)]
def one_by_use_case_and_username(self, use_case: str, username: str) -> User:
result = [user for user in self._users if (
user.use_case == use_case and user.name == username)]
if len(result) != 1:
return None
return result[0]
class DummyUserTrustRepo:
_user_trusts: List[UserTrust] = []
def add(self, user_trust: UserTrust):
self._user_trusts.append(user_trust)
def all(self) -> List[UserTrust]:
return self._user_trusts
def all_for_use_case(self, use_case: str) -> List[UserTrust]:
return [user_trust for user_trust in self._user_trusts if (user_trust.use_case == use_case)]
def all_for_user(self, user: User) -> List[UserTrust]:
return [user_trust for user_trust in self._user_trusts if (user_trust.use_case == user.use_case and user_trust.user == user.name)]
def one_for_user_and_context(self, user: User, context: str) -> UserTrust:
result = [user_trust for user_trust in self.user_trust if (
user_trust.use_case == user.use_case and user_trust.name == user.name and user_trust.context == context)]
if len(result) != 1:
return None
return result[0]
def update(self, user_trust: UserTrust):
pass
class DummyTrustTraceRepository:
_traces: List[TrustTrace] = []
def add(self, trace: TrustTrace):
self._traces.append(trace)
def all(self) -> List[Dict]:
return [trace.to_serializable_dict() for trace in self._traces]
def all_for_use_case_and_context(self, use_case: str, context: str) -> List[Dict]:
result = [trace for trace in self._traces if (
trace.use_case == use_case and trace.context == context)]
return [trace.to_serializable_dict() for trace in result]
class Test_Messages(unittest.TestCase):
user_repo = None
message_handler: MessageHandler = None
def setUp(self):
self.user_repo = DummyUserRepo()
self.message_handler = MessageHandler(
DummyTrustAdapterRepo(),
self.user_repo,
DummyUserTrustRepo(),
DummyTrustTraceRepository()
)
def _buildTrace(self):
return json.dumps({
"type": "new-trace",
"content": {
"use_case": "car-sharing",
"table": "travel",
"id": "092870924809481f",
"properties": {
"user": "Bob",
"cost": 201.12,
"rating": 4
}
}
})
def test_message_no_json(self):
message: str = "abcdefg"
try:
self.message_handler.handle(message)
except ValueError:
pass
def test_message_valid(self):
message = self._buildTrace()
self.user_repo.clear()
self.assertEqual(0, len(self.user_repo.all()))
self.message_handler.handle(message)
self.assertEqual(1, len(self.user_repo.all())) # ensure user was added
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment