Commit ad1ef89c authored by Alexander's avatar Alexander

Merge branch 'feature/clustering-in-time-slices' into develop

parents fe2835b5 a4d43462
......@@ -29,7 +29,7 @@ paths:
200:
description: "Successful echo of request data"
/location:
/locations:
post:
operationId: "rest.location.post"
tags:
......@@ -57,27 +57,46 @@ paths:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/Location"
$ref: "#/definitions/LocationCollection"
/cluster:
/location-collections:
post:
operationId: "rest.location.post_many"
tags:
- "Locations"
summary: "Add new location data collection"
parameters:
- in: body
name: "Locations"
description: "The location data collection to be added"
required: true
schema:
$ref: "#/definitions/LocationCollection"
responses:
201:
description: "Successful operation"
400:
description: "Invalid input"
/clusters:
get:
operationId: "rest.cluster.get"
tags:
- "Clusters"
summary: "Get clustered data"
summary: "Get user communities per date per hour"
parameters: []
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/ClusterValue"
$ref: "#/definitions/UserClusterCollection"
/cluster/cluster.png:
/clusters/cluster.png:
get:
operationId: "rest.cluster.get_image"
tags:
- "Clusters"
summary: "Get clustered data as image"
summary: "Get user communities per date per hour as image"
parameters: []
produces:
- "image/png"
......@@ -85,26 +104,25 @@ paths:
200:
description: "Successful operation"
/agi/cluster:
/agi/clusters:
get:
operationId: "rest.agi_cluster.get"
tags:
- "Clusters"
summary: "Get clustered data"
summary: "Get user communities per date per hour from agi data"
parameters: []
responses:
200:
description: "Successful operation"
schema:
$ref: "#/definitions/ClusterValue"
$ref: "#/definitions/UserClusterCollection"
/agi/cluster/cluster.png:
/agi/clusters/cluster.png:
get:
operationId: "rest.agi_cluster.get_image"
tags:
- "Clusters"
summary: "Get clustered data as image"
summary: "Get user communities per date per hour from agi data as image"
parameters: []
produces:
- "image/png"
......@@ -118,29 +136,41 @@ definitions:
properties:
id:
type: string
format: uuid
user:
type: "string"
latitude:
type: "number"
format: float
longitude:
type: "number"
format: float
timestamp:
type: "number"
ClusterValue:
LocationCollection:
type: array
items:
$ref: "#/definitions/Location"
UserCluster:
type: "object"
properties:
id:
type: string
format: uuid
cluster_label:
type: number
latitude:
type: number
longitude:
type: number
timestamp:
type: number
user:
date:
type: string
\ No newline at end of file
hour:
type: number
clusters:
type: object
additionalProperties:
type: array
items:
type: string
example:
0: [1dc61b1a0602de0eaee9dba7eece9279c2844202, b4b31bbe5e12f55737e3a910827c81595fbca3eb]
UserClusterCollection:
type: array
items:
$ref: "#/definitions/UserCluster"
\ No newline at end of file
......@@ -3,7 +3,7 @@ from typing import List, Dict
import hashlib
class AgiRepository:
def getLocations(self) -> List:
def getLocations(self) -> List[Dict]:
locations = []
travels = self.readDataFromFile()
......
from db.entities.location import Location
from db.entities.popular_location import PopularLocation
from db.entities.user_cluster import UserCluster
\ No newline at end of file
import json
from datetime import datetime
class Location:
def __init__(self, location_info=None):
super().__init__()
if location_info is not None:
self.latitude = float(location_info['latitude'])
self.longitude = float(location_info['longitude'])
self.timestamp = datetime.fromtimestamp(location_info['timestamp'])
self.timestamp_raw = location_info['timestamp']
self.user = location_info['user']
self.id = f'{self.user}-{self.timestamp_raw}'
def to_serializable_dict(self):
return {
"id": self.id,
"latitude": self.latitude,
"longitude": self.longitude,
"timestamp": self.timestamp_raw,
"user": self.user
}
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"Location({self.__repr__()})"
import json
class PopularLocation:
def __init__(self, date, top_locations: list):
super().__init__()
self.date = date
self.top_locations = top_locations
def to_serializable_dict(self, for_db=False):
return {
"date": str(self.date),
"top-locations": json.dumps(self.top_locations) if for_db else self.top_locations
}
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"PopularLocation({self.__repr__()})"
import json
class UserCluster:
def __init__(self, date, hour, clusters):
super().__init__()
self.date = date
self.hour = hour
self.clusters = clusters
self.id = f'{self.date}-{self.hour}'
def to_serializable_dict(self, for_db=False):
return {
"id": self.id,
"date": str(self.date),
"hour": self.hour,
"clusters": json.dumps(self.clusters) if for_db else self.clusters
}
def __repr__(self):
return json.dumps(self.to_serializable_dict())
def __str__(self):
return f"UserCluster({self.__repr__()})"
from db.location_datastore import LocationDatastore
import pymongo
import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
import json
class Repository:
def __init__(self):
self.store = LocationDatastore.get_instance()
from db.agi.agi_repository import AgiRepository
def addLocation(self, location):
self.store.add(location)
from db.entities import Location, UserCluster, PopularLocation
from typing import List
def getLocations(self):
return self.store.get()
class Repository(MongoRepositoryBase):
def __init__(self, agi_data=False):
super().__init__(netconst.COMMUNITY_DETECTION_DB_HOSTNAME,
netconst.COMMUNITY_DETECTION_DB_PORT, 'communityDetectionDb')
self._location_collection = 'location_agi' if agi_data else 'location'
self._cluster_collection = 'cluster_agi' if agi_data else 'cluster'
self.agi_repo = AgiRepository()
def add_location(self, location: Location):
super().insert_entry(self._location_collection, location.to_serializable_dict())
def get_locations(self) -> List[Location]:
locations = super().get_entries(self._location_collection)
return [Location(l) for l in locations]
def get_agi_locations(self) -> List[Location]:
agi_locations = self.agi_repo.getLocations()
return [Location(agi_loc) for agi_loc in agi_locations]
def add_user_cluster(self, cluster: UserCluster):
super().insert_entry(self._cluster_collection, cluster.to_serializable_dict(for_db=True))
def get_user_clusters(self) -> List[UserCluster]:
clusters = super().get_entries(self._cluster_collection)
return [UserCluster(c['date'], int(c['hour']), json.loads(c['clusters'])) for c in clusters]
def add_popular_location(self, popular_location: PopularLocation):
pass
# add modules folder to interpreter path
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
### init logging ###
import logging
LOG_FORMAT = ('%(levelname) -5s %(asctime)s %(name)s:%(funcName) -35s %(lineno) -5d: %(message)s')
......
import io
from flask import request, Response
from db.agi.agi_repository import AgiRepository
from db.repository import Repository
from processing.clusterer import Clusterer
from matplotlib.backends.backend_agg import FigureCanvasAgg as FigureCanvas
repo = AgiRepository()
repo = Repository(agi_data=True)
clusterer = Clusterer()
def get():
locations = repo.getLocations()
clusters = clusterer.run(locations)
return clusters
clusters = repo.get_user_clusters()
return [c.to_serializable_dict() for c in clusters]
def get_image():
return Response(status=501)
# todo
locations = repo.getLocations()
fig = clusterer.draw_locations(locations)
......
......@@ -8,13 +8,13 @@ repo = Repository()
clusterer = Clusterer()
def get():
locations = repo.getLocations()
clusters = clusterer.run(locations)
return clusters
clusters = repo.get_user_clusters()
return [c.to_serializable_dict() for c in clusters]
def get_image():
return Response(status=501)
# todo
locations = repo.getLocations()
fig = clusterer.draw_locations(locations)
......
from flask import request, Response
from db.repository import Repository
from db.entities import Location
repo = Repository()
def post():
body = request.json
repo.addLocation(body)
insert_location(body)
return Response(status=201)
def post_many():
body = request.json
for location in body:
insert_location(location)
return Response(status=201)
def get():
return repo.getLocations()
return [l.to_serializable_dict() for l in repo.get_locations()]
def insert_location(location_data: dict):
repo.add_location(Location(location_data))
import sys
import os
modules_path = '../../../modules/'
if os.path.exists(modules_path):
sys.path.insert(1, modules_path)
from processing.clusterer import Clusterer
from db.repository import Repository
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
from db.entities import Location, PopularLocation, UserCluster
import statistics
from collections import Counter
import json
DEBUG = False
NR_DECIMAL_FOR_BEST_LOCATIONS = 4
# used to cluster locations of a single user to detect main location per time slice
main_loc_clusterer = Clusterer()
# used to cluster the users based on their main location
user_clusterer = Clusterer()
time_slices = list(range(24))
repo = Repository(agi_data=True)
def run_location_clustering():
user_clusters: List[UserCluster] = []
popular_locations: List[PopularLocation] = []
all_location_traces = repo.get_locations()
# for each date in timestamp list
dates = {trace.timestamp.date() for trace in all_location_traces}
for cur_date in dates:
traces_for_cur_date = [
trace for trace in all_location_traces if trace.timestamp.date() == cur_date]
location_counter: Dict[str, int] = {}
# for each hour of that day
for cur_hour in time_slices:
traces_for_time_slice = [
trace for trace in traces_for_cur_date if trace.timestamp.hour - cur_hour == 0]
if len(traces_for_time_slice) == 0:
continue
main_locations = []
# store the main location for each user
users = {trace.user for trace in traces_for_time_slice}
for user in users:
main_loc = get_main_location_for_user(
traces_for_time_slice, user)
main_loc['user'] = user
main_locations.append(main_loc)
# cluster the main locations for all users
cluster_result = user_clusterer.run(main_locations)
clusters = {}
for key, vals in cluster_result.items():
clusters[key] = [v['user'] for v in vals]
# print(f"{cur_date} @ {cur_hour}h-{cur_hour+1}h (Group #{key}): {[v['user'] for v in vals]}")
# add the clusters for the cur_hour to the global cluster list
user_clusters.append(UserCluster(cur_date, cur_hour, clusters))
# add locations for cur_hour to location counter
for main_l in main_locations:
key = json.dumps({'lat': round(main_l['latitude'], NR_DECIMAL_FOR_BEST_LOCATIONS),
'long': round(main_l['longitude'], NR_DECIMAL_FOR_BEST_LOCATIONS)})
if key not in location_counter:
location_counter[key] = 0
location_counter[key] += 1
# print(f"{cur_date} @ {cur_hour}h-{cur_hour+1}h: {main_locations}")
# add the top three locations to the global popular location list
top_locations = get_top_three_locations(location_counter)
top_locations = [json.loads(l[0]) for l in top_locations]
popular_locations.append(PopularLocation(cur_date, top_locations))
store_user_clusters(user_clusters)
store_popular_locations(popular_locations)
def get_main_location_for_user(location_traces: List[Location], user: str) -> dict:
# cluster based on locations
locations_for_user = [t for t in location_traces if t.user == user]
clusters = main_loc_clusterer.run([l.__dict__
for l in locations_for_user])
# largest cluster has most locations
max_c = {'id': -1, 'size': 0}
for cluster_key, cluster_vals in clusters.items():
if len(cluster_vals) > max_c['size']:
max_c['id'] = cluster_key
max_c['size'] = len(cluster_vals)
# calculate center of the location from the largest cluster
locations_of_largest_cluster = clusters[max_c['id']]
center = get_center_of_2d_points(locations_of_largest_cluster)
return center
def get_center_of_2d_points(points, nr_decimal_places=5) -> dict:
center = {}
center['latitude'] = round(statistics.mean(
[p['latitude'] for p in points]), nr_decimal_places)
center['longitude'] = round(statistics.mean(
[p['longitude'] for p in points]), nr_decimal_places)
return center
def get_top_three_locations(location_counts: Dict[str, int]) -> List[Tuple[str, int]]:
cnter = Counter(location_counts)
max_three = cnter.most_common(3)
return max_three
def store_user_clusters(user_clusters: List[UserCluster]):
if DEBUG:
print(user_clusters)
return
for c in user_clusters:
repo.add_user_cluster(c)
def store_popular_locations(popular_locations: List[PopularLocation]):
if DEBUG:
print(popular_locations)
return
for l in popular_locations:
repo.add_popular_location(l)
if __name__ == "__main__":
run_location_clustering()
......@@ -31,4 +31,44 @@ spec:
- name: community-detection
image: alexx882/community-detection-microservice
ports:
- containerPort: 5000
\ No newline at end of file
- containerPort: 5000
---
apiVersion: v1
kind: Service
metadata:
name: community-detection-db
spec:
type: LoadBalancer
selector:
app: community-detection-db
ports:
- name: http
port: 27017
targetPort: 27017
nodePort: 30110
protocol: TCP
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: community-detection-db
spec:
replicas: 1
selector:
matchLabels:
app: community-detection-db
template:
metadata:
labels:
app: community-detection-db
spec:
containers:
- name: community-detection-db
image: mongo
env:
- name: MONGO_INITDB_ROOT_USERNAME
value: root
- name: MONGO_INITDB_ROOT_PASSWORD
value: root
ports:
- containerPort: 27017
\ No newline at end of file
import pymongo
from pymongo import MongoClient, cursor
import network_constants as netconst
class MongoRepositoryBase:
'''Base class to connect to, insert and read from a single MongoDB collection'''
'''Base class to connect to a MongoDB database'''
# TODO extract to docker env var
_username = 'root'
_password = 'root'
_collection: pymongo.collection.Collection = None
_mongo_client: pymongo.MongoClient = None
def __init__(self, hostname, port, database_name, username=_username, password=_password):
self._mongo_client = MongoClient(f"mongodb://{username}:{password}@{hostname}:{port}/")
self._database = self._mongo_client[database_name]
def __init__(self, database_name, collection_name, username=_username, password=_password):
self._mongo_client = pymongo.MongoClient(f"mongodb://{username}:{password}@{netconst.MONGO_DB_HOSTNAME}:{netconst.MONGO_DB_PORT}/")
database = self._mongo_client[database_name]
self._collection = database[collection_name]
def insert_entry(self, collection_name, content: dict):
collection = self._database[collection_name]
collection.insert_one(content)
def insert_entry(self, content: dict):
self._collection.insert_one(content)
def get_entries(self, selection: dict = {}, projection: dict = {'_': 0}) -> pymongo.cursor.Cursor:
return self._collection.find(selection, projection)
def get_entries(self, collection_name, selection: dict = {}, projection: dict = {'_': 0}) -> cursor.Cursor:
collection = self._database[collection_name]
return collection.find(selection, projection)
def close_connection(self):
self._mongo_client.close()
self._mongo_client = None
self._collection = None
### inside k8s
## Rabbit MQ
RABBIT_MQ_HOSTNAME = 'rabbit-mq'
RABBIT_MQ_PORT = 5672
# RABBIT_MQ_HOSTNAME = 'articonf1.itec.aau.at'
# RABBIT_MQ_PORT = 30302
MONGO_DB_HOSTNAME = 'trace-retrieval-db'
MONGO_DB_PORT = 27017
## Trace Retrieval
TRACE_RETRIEVAL_HOSTNAME = 'trace-retrieval'
TRACE_RETRIEVAL_REST_PORT = 80
TRACE_RETRIEVAL_DB_HOSTNAME = 'trace-retrieval-db'
TRACE_RETRIEVAL_DB_PORT = 27017
# TRACE_RETRIEVAL_DB_HOSTNAME = 'articonf1.itec.aau.at'
# TRACE_RETRIEVAL_DB_PORT = 30003
### outside k8s
# HOST_IP = '143.205.173.102'
# RABBIT_MQ_HOSTNAME = HOST_IP
# RABBIT_MQ_PORT = 30302
# MONGO_DB_HOSTNAME = HOST_IP
# MONGO_DB_PORT = 30003
# TRACE_RETRIEVAL_HOSTNAME = HOST_IP
# TRACE_RETRIEVAL_REST_PORT = 30001
\ No newline at end of file
## Community Detection
COMMUNITY_DETECTION_HOSTNAME = 'community-detection'
COMMUNITY_DETECTION_REST_PORT = 80
COMMUNITY_DETECTION_DB_HOSTNAME = 'community-detection-db'
COMMUNITY_DETECTION_DB_PORT = 27017
# COMMUNITY_DETECTION_DB_HOSTNAME = 'localhost'
# COMMUNITY_DETECTION_DB_PORT = 30110
\ No newline at end of file
......@@ -3,19 +3,15 @@ import network_constants as netconst
from database.MongoRepositoryBase import MongoRepositoryBase
class MongoRepository(MongoRepositoryBase):
# TODO extract to docker env var
_username = 'root'
_password = 'root'
_collection: pymongo.collection.Collection = None
_mongo_client: pymongo.MongoClient = None
def __init__(self, username=_username, password=_password):
super().__init__('traceRetrievalDB', 'traces')
def __init__(self):
super().__init__(netconst.TRACE_RETRIEVAL_DB_HOSTNAME,
netconst.TRACE_RETRIEVAL_DB_PORT, 'traceRetrievalDB')
self._collection_name = 'traces'
def insert_trace(self, content: dict):
super().insert_entry(content)
super().insert_entry(self._collection_name, content)
def get_traces(self, selection: dict = {}, projection: dict = {'_': 0}) -> pymongo.cursor.Cursor:
return super().get_entries(selection, projection)
return super().get_entries(self._collection_name, selection, projection)
......@@ -4,7 +4,7 @@ from database.MongoRepository import MongoRepository
mongo_repo = MongoRepository()
def post():
return Response(status=501)
return Response(response='Use the RESTful Gateway instead', status=405)
def get():
return list(mongo_repo.get_traces(projection={'_id': 0}))
\ No newline at end of file
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment