Commit 186681b8 authored by Alexander Lercher's avatar Alexander Lercher

Merge branch 'feature/user-graph-creation' into develop

parents a04c3301 60dfb9eb
...@@ -130,6 +130,19 @@ paths: ...@@ -130,6 +130,19 @@ paths:
# 200: # 200:
# description: "Successful operation" # description: "Successful operation"
/user-cluster-graphs:
get:
operationId: "rest.user_cluster.get"
tags:
- "User Graphs"
summary: "Get user graphs per layer per cluster"
parameters: []
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/UserClusterGraphCollection"
definitions: definitions:
Location: Location:
type: "object" type: "object"
...@@ -163,8 +176,6 @@ definitions: ...@@ -163,8 +176,6 @@ definitions:
type: array type: array
items: items:
$ref: "#/definitions/Location" $ref: "#/definitions/Location"
# example:
# 0: [1dc61b1a0602de0eaee9dba7eece9279c2844202, b4b31bbe5e12f55737e3a910827c81595fbca3eb]
LocationClusterCollection: LocationClusterCollection:
type: array type: array
...@@ -186,10 +197,31 @@ definitions: ...@@ -186,10 +197,31 @@ definitions:
type: array type: array
items: items:
$ref: "#/definitions/Location" $ref: "#/definitions/Location"
# example:
# 0: [1dc61b1a0602de0eaee9dba7eece9279c2844202, b4b31bbe5e12f55737e3a910827c81595fbca3eb]
TimeClusterCollection: TimeClusterCollection:
type: array type: array
items: items:
$ref: "#/definitions/TimeCluster" $ref: "#/definitions/TimeCluster"
UserClusterGraph:
type: object
properties:
nodes:
type: array
items:
type: string
edges:
type: array
items:
type: array
items:
type: string
example:
- user1
- user2
- weight
UserClusterGraphCollection:
type: array
items:
$ref: "#/definitions/UserClusterGraph"
\ No newline at end of file
import sys
import os
modules_paths = ['../../../modules/']
for modules_path in modules_paths:
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from typing import List, Tuple, Any
from networkx import Graph
from db.entities import LocationCluster, UserClusterGraph, Cluster
from db.repository import Repository
from processing.user_graph_generator import UserGraphGenerator
repo = Repository()
def get_edges_with_weights(g: Graph) -> List[Tuple[Any, Any, int]]:
res = []
for e in g.edges:
res.append((*e, g.edges[e]['weight']))
return res
def create_graphs_for_location_clusters():
graphs_for_clusters = []
ug = UserGraphGenerator()
clusters: Cluster = repo.get_location_clusters()
for cluster in clusters:
user_ids = [n['user'] for n in cluster.nodes]
graph: Graph = ug.create_graph_from_nodes(user_ids)
vertices = list(graph.nodes)
edges = get_edges_with_weights(graph)
cluster_graph = UserClusterGraph(vertices, edges)
graphs_for_clusters.append(cluster_graph)
store_graphs(graphs_for_clusters)
def store_graphs(graphs: List):
for g in graphs:
repo.add_user_cluster_graph(g)
if __name__ == "__main__":
create_graphs_for_location_clusters()
from db.entities.location import Location from db.entities.location import Location
from db.entities.popular_location import PopularLocation from db.entities.popular_location import PopularLocation
from db.entities.cluster import LocationCluster, TimeCluster from db.entities.cluster import Cluster, LocationCluster, TimeCluster
\ No newline at end of file from db.entities.user_cluster_graph import UserClusterGraph
\ No newline at end of file
import json
from typing import Dict, List
class UserClusterGraph:
'''This class represents a graph of users with weighted connections for a single cluster of one single layer'''
def __init__(self, nodes: List = None, edges: List = None, dict_: Dict = None, from_db=False):
self.nodes = nodes
self.edges = edges
if dict_ is not None:
self.from_serializable_dict(dict_, from_db)
def to_serializable_dict(self, for_db=False) -> Dict:
return {
"nodes": json.dumps(self.nodes) if for_db else self.nodes,
"edges": json.dumps(self.edges) if for_db else self.edges
}
def from_serializable_dict(self, dict_: Dict, from_db=False):
self.nodes = json.loads(dict_["nodes"]) \
if from_db else dict_["nodes"]
self.edges = json.loads(dict_["edges"]) \
if from_db else dict_["edges"]
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"UserClusterGraph({repr(self)})"
...@@ -5,7 +5,7 @@ import json ...@@ -5,7 +5,7 @@ import json
from db.agi.agi_repository import AgiRepository from db.agi.agi_repository import AgiRepository
from db.entities import Location, TimeCluster, PopularLocation, LocationCluster from db.entities import Location, TimeCluster, PopularLocation, LocationCluster, UserClusterGraph
from typing import List from typing import List
...@@ -20,6 +20,7 @@ class Repository(MongoRepositoryBase): ...@@ -20,6 +20,7 @@ class Repository(MongoRepositoryBase):
self._location_collection = 'location' self._location_collection = 'location'
self._location_cluster_collection = 'location_cluster' self._location_cluster_collection = 'location_cluster'
self._time_cluster_collection = 'time_cluster' self._time_cluster_collection = 'time_cluster'
self._user_cluster_graph_collection = 'user_cluster_graph'
self.agi_repo = AgiRepository() self.agi_repo = AgiRepository()
...@@ -49,3 +50,11 @@ class Repository(MongoRepositoryBase): ...@@ -49,3 +50,11 @@ class Repository(MongoRepositoryBase):
def get_time_clusters(self) -> List[TimeCluster]: def get_time_clusters(self) -> List[TimeCluster]:
clusters = super().get_entries(self._time_cluster_collection) clusters = super().get_entries(self._time_cluster_collection)
return [TimeCluster(time_dict=c, from_db=True) for c in clusters] return [TimeCluster(time_dict=c, from_db=True) for c in clusters]
def add_user_cluster_graph(self, user_graph: UserClusterGraph):
super().insert_entry(self._user_cluster_graph_collection,
user_graph.to_serializable_dict(for_db=True))
def get_user_cluster_graphs(self) -> List[UserClusterGraph]:
user_graphs = super().get_entries(self._user_cluster_graph_collection)
return [UserClusterGraph(dict_=u, from_db=True) for u in user_graphs]
import itertools
from typing import List, Dict, Tuple, Any
from networkx import Graph
class UserGraphGenerator:
def __init__(self):
pass
def count_edges(self, nodes: List) -> Dict[Tuple, int]:
edge_counts = {}
coms = itertools.combinations(nodes, 2)
for first, second in coms:
if first == second: # dont process reflexive connections
continue
if (first, second) in edge_counts:
edge_counts[first, second] += 1
else:
edge_counts[first, second] = 1
return edge_counts
def create_edges_with_weights(self, edge_counts: Dict[Tuple[Any, Any], int]) -> List[Tuple[Any, Any, Dict]]:
edges = []
for (key1, key2), value in edge_counts.items():
edge = (key1, key2, {'weight': value})
edges.append(edge)
return edges
def create_fully_connected_edges_for_nodes(self, nodes: List) -> List[Tuple[Any, Any, Dict]]:
return self.create_edges_with_weights(self.count_edges(nodes))
def create_graph_from_nodes(self, nodes: List) -> Graph:
'''Creates a networkx.Graph with distinct nodes and weighted edges between these nodes'''
g = Graph()
g.add_nodes_from(nodes)
g.add_edges_from(self.create_fully_connected_edges_for_nodes(nodes))
return g
from flask import request, Response
from db.repository import Repository
repo = Repository()
def get():
data = repo.get_user_cluster_graphs()
return [d.to_serializable_dict() for d in data]
import unittest
import sys
for path in ['../', './']:
sys.path.insert(1, path)
# python -m unittest discover
from processing.user_graph_generator import UserGraphGenerator
import networkx as nx
class TestUserGraphGenerator(unittest.TestCase):
def setUp(self):
self.user_graph = UserGraphGenerator()
def test_count_edges_oneNode(self):
count_res = {}
self.assertEqual(count_res, self.user_graph.count_edges([1]))
def test_count_edges_threeDistinctNodes_threeEdges(self):
count_res = {(1, 2): 1, (1, 3): 1, (2, 3): 1}
self.assertEqual(count_res, self.user_graph.count_edges([1, 2, 3]))
def test_count_edges_twoNodesWithDups_notReflexive(self):
count_res = {}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1]))
def test_count_edges_threeNodesWithDups_countGtOne_notReflexive(self):
count_res = {(1, 3): 2}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1, 3]))
def test_count_edges_fourNodesWithDups_countGtOne_notReflexive(self):
count_res = {(1, 3): 2, (1, 4): 2, (3, 4): 1}
self.assertEqual(count_res, self.user_graph.count_edges([1, 1, 3, 4]))
def test_count_edges_fourStringNodesWithDups_countGtOne_notReflexive(self):
count_res = {('test', 'test2'): 2, ('test', '4'): 2, ('test2', '4'): 1}
self.assertEqual(count_res,
self.user_graph.count_edges(['test', 'test', 'test2', '4']))
def test_count_edges_fourDistinctStringNodes_fullyConnectedEdges(self):
count_res = {
('1', '2'): 1, ('1', '3'): 1, ('1', '4'): 1,
('2', '3'): 1, ('2', '4'): 1, ('3', '4'): 1
}
self.assertEqual(count_res,
self.user_graph.count_edges(['1', '2', '3', '4']))
def test_create_edges_with_weights_SingleEdge(self):
counts = {('a', 'b'): 1}
edge_result = [('a', 'b', {'weight': 1})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_edges_with_weights_SingleEdgeWeightTwo(self):
counts = {('a', 'b'): 2}
edge_result = [('a', 'b', {'weight': 2})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_edges_with_weights_TwoEdgesWithWeights(self):
counts = {('a', 'b'): 2, ('b', 'c'): 1}
edge_result = [('a', 'b', {'weight': 2}), ('b', 'c', {'weight': 1})]
self.assertEqual(edge_result,
self.user_graph.create_edges_with_weights(counts))
def test_create_graph_from_nodes_singleNode(self):
nodes = [1]
edges = []
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_twoDistinctNodes(self):
nodes = [1, 2]
edges = [(1, 2, {'weight': 1})]
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_threeDistinctNodes(self):
nodes = [1, 2, 3]
edges = [(1, 2, {'weight': 1}), (1, 3, {'weight': 1}),
(2, 3, {'weight': 1})]
self.assertGraph(nodes, edges,
self.user_graph.create_graph_from_nodes(nodes))
def test_create_graph_from_nodes_threeNodesWithDuplicates_TwoNodes_EdgesWithAccordingWeight(self):
nodes = [1, 1, 3]
edges = [(1, 3, {'weight': 2})]
self.assertGraph(list(set(nodes)), edges,
self.user_graph.create_graph_from_nodes(nodes))
# unittest custom assertions
def assertGraph(self, nodes, edges, g: nx.Graph):
self.assertEqual(len(nodes), g.number_of_nodes())
self.assertEqual(len(edges), g.number_of_edges())
for i in range(len(nodes)):
self.assertEqual(nodes[i], list(g.nodes)[i])
for i in range(len(edges)):
graph_edge = list(g.edges)[i]
first, second, weight = edges[i]
self.assertEqual((first, second), graph_edge)
self.assertEqual(weight, g.edges[graph_edge])
if __name__ == '__main__':
unittest.main()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment