Commit b57623ef authored by Alexander Lercher's avatar Alexander Lercher

added user clustering based on date and hour

parent c38ae421
from db.location_datastore import LocationDatastore
from db.entities import Location, UserCluster, PopularLocation
from typing import List
from db.agi.agi_repository import AgiRepository
class Repository:
def __init__(self):
self.store = LocationDatastore.get_instance()
self.agi_repo = AgiRepository()
def addLocation(self, location):
def add_location(self, location: Location):
self.store.add(location)
def getLocations(self):
return self.store.get()
def get_locations(self) -> List[Location]:
agi_locations = self.agi_repo.getLocations()
return [Location(agi_loc) for agi_loc in agi_locations]
def add_user_cluster(self, cluster: UserCluster):
print(cluster)
def add_popular_location(self, popular_location: PopularLocation):
print(popular_location)
from processing.clusterer import Clusterer
from db.repository import Repository
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
from db.entities import Location, PopularLocation, UserCluster
import statistics
from collections import Counter
import json
DEBUG = False
NR_DECIMAL_FOR_BEST_LOCATIONS = 4
# used to cluster locations of a single user to detect main location per time slice
main_loc_clusterer = Clusterer()
# used to cluster the users based on their main location
user_clusterer = Clusterer()
time_slices = list(range(24))
repo = Repository()
def run_clustering():
user_clusters: List[UserCluster] = []
popular_locations: List[PopularLocation] = []
all_location_traces = repo.get_locations()
# for each date in timestamp list
dates = {trace.timestamp.date() for trace in all_location_traces}
for cur_date in dates:
traces_for_cur_date = [
trace for trace in all_location_traces if trace.timestamp.date() == cur_date]
location_counter: Dict[str, int] = {}
# for each hour of that day
for cur_hour in time_slices:
traces_for_time_slice = [
trace for trace in traces_for_cur_date if trace.timestamp.hour - cur_hour == 0]
if len(traces_for_time_slice) == 0:
continue
main_locations = []
# store the main location for each user
users = {trace.user for trace in traces_for_time_slice}
for user in users:
main_loc = get_main_location_for_user(
traces_for_time_slice, user)
main_loc['user'] = user
main_locations.append(main_loc)
# cluster the main locations for all users
cluster_result = user_clusterer.run(main_locations)
clusters = {}
for key, vals in cluster_result.items():
clusters[key] = [v['user'] for v in vals]
# print(f"{cur_date} @ {cur_hour}h-{cur_hour+1}h (Group #{key}): {[v['user'] for v in vals]}")
# add the clusters for the cur_hour to the global cluster list
user_clusters.append(UserCluster(cur_date, cur_hour, clusters))
# add locations for cur_hour to location counter
for main_l in main_locations:
key = json.dumps({'lat': round(main_l['latitude'], NR_DECIMAL_FOR_BEST_LOCATIONS),
'long': round(main_l['longitude'], NR_DECIMAL_FOR_BEST_LOCATIONS)})
if key not in location_counter:
location_counter[key] = 0
location_counter[key] += 1
# print(f"{cur_date} @ {cur_hour}h-{cur_hour+1}h: {main_locations}")
# add the top three locations to the global popular location list
top_locations = get_top_three_locations(location_counter)
top_locations = [json.loads(l[0]) for l in top_locations]
popular_locations.append(PopularLocation(cur_date, top_locations))
store_user_clusters(user_clusters)
store_popular_locations(popular_locations)
def get_main_location_for_user(location_traces: List[Location], user: str) -> dict:
# cluster based on locations
locations_for_user = [t for t in location_traces if t.user == user]
clusters = main_loc_clusterer.run([l.__dict__
for l in locations_for_user])
# largest cluster has most locations
max_c = {'id': -1, 'size': 0}
for cluster_key, cluster_vals in clusters.items():
if len(cluster_vals) > max_c['size']:
max_c['id'] = cluster_key
max_c['size'] = len(cluster_vals)
# calculate center of the location from the largest cluster
locations_of_largest_cluster = clusters[max_c['id']]
center = get_center_of_2d_points(locations_of_largest_cluster)
return center
def get_center_of_2d_points(points, nr_decimal_places=5) -> dict:
center = {}
center['latitude'] = round(statistics.mean(
[p['latitude'] for p in points]), nr_decimal_places)
center['longitude'] = round(statistics.mean(
[p['longitude'] for p in points]), nr_decimal_places)
return center
def get_top_three_locations(location_counts: Dict[str, int]) -> List[Tuple[str, int]]:
cnter = Counter(location_counts)
max_three = cnter.most_common(3)
return max_three
def store_user_clusters(user_clusters: List[UserCluster]):
if DEBUG:
print(user_clusters)
return
for c in user_clusters:
repo.add_user_cluster(c)
def store_popular_locations(popular_locations: List[PopularLocation]):
if DEBUG:
print(popular_locations)
return
for l in popular_locations:
repo.add_popular_location(l)
if __name__ == "__main__":
run_clustering()
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment