236 lines
6.7 KiB
Python
236 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Data processing"""
|
|
|
|
import datetime
|
|
import typing as t
|
|
from collections import Counter, defaultdict
|
|
|
|
from .db import Event, EventType, vd
|
|
from .util import VimLoadingBar, get_median
|
|
|
|
|
|
class GeneralStats:
|
|
"""General statistics"""
|
|
|
|
def __init__(self) -> None:
|
|
self.langs: Counter[str] = Counter()
|
|
self.files: Counter[str] = Counter()
|
|
self.cmds: Counter[str] = Counter()
|
|
|
|
self.hour: Counter[int] = Counter()
|
|
self.day: Counter[int] = Counter()
|
|
self.month: Counter[int] = Counter()
|
|
|
|
self.opens: int = 0
|
|
self.closes: int = 0
|
|
|
|
self.adds: int = 0
|
|
self.dels: int = 0
|
|
|
|
self.copies: int = 0
|
|
self.pastes: int = 0
|
|
|
|
self.invalid_commands: int = 0
|
|
self.invalid_writes: int = 0
|
|
|
|
def update(self, ev: Event) -> "GeneralStats":
|
|
"""Update event statsd"""
|
|
|
|
self.files[ev.file] += 1
|
|
|
|
self.hour[ev.utc_dt.hour] += 1
|
|
self.day[ev.utc_dt.weekday()] += 1
|
|
self.month[ev.utc_dt.month] += 1
|
|
|
|
if ev.type == EventType.begin_s:
|
|
self.opens += 1
|
|
elif ev.type == EventType.end_s:
|
|
self.closes += 1
|
|
elif ev.type == EventType.command:
|
|
if ev.data:
|
|
self.cmds[ev.data] += 1
|
|
else:
|
|
self.invalid_commands += 1
|
|
elif ev.type == EventType.write:
|
|
if ev.data:
|
|
ax, dx = map(int, ev.data.split(","))
|
|
self.adds += ax
|
|
self.dels += dx
|
|
self.langs[ev.language] += ax + dx
|
|
else:
|
|
self.invalid_writes += 1
|
|
elif ev.type == EventType.copy:
|
|
self.copies += 1
|
|
elif ev.type == EventType.paste:
|
|
self.pastes += 1
|
|
|
|
return self
|
|
|
|
|
|
class SessionDuration:
|
|
"""Session duration"""
|
|
|
|
def __init__(self, start: datetime.datetime, lang: str) -> None:
|
|
self.start: datetime.datetime = start
|
|
self.end: t.Optional[datetime.datetime] = None
|
|
self.langs: t.Set[str] = {
|
|
lang,
|
|
}
|
|
|
|
|
|
class BehaviourStats:
|
|
"""Behavioural statistics"""
|
|
|
|
def __init__(self) -> None:
|
|
self.data_range: float = 0
|
|
|
|
self.editing_time: float = 0
|
|
self.sessions: t.Dict[str, SessionDuration] = {}
|
|
|
|
self.evts: Counter[EventType] = Counter()
|
|
|
|
self.invalid_session_closes: int = 0
|
|
self.invalid_events: int = 0
|
|
self.invalid_writes: int = 0
|
|
self.invalid_commands: int = 0
|
|
|
|
self.avg_langs: int = 0
|
|
self.avg_sessions: int = 0
|
|
self.avg_duration: int = 0
|
|
self.med_duration: int = 0
|
|
|
|
def manage_session(self, ev: Event) -> "BehaviourStats":
|
|
"""Manage a session"""
|
|
|
|
if ev.type != EventType.begin_s and ev.type != EventType.end_s:
|
|
raise ValueError("Non-session event passed to a session manager")
|
|
|
|
if ev.type == EventType.begin_s:
|
|
self.sessions[ev.session_id] = SessionDuration(ev.utc_dt, ev.language)
|
|
elif ev.type == EventType.end_s:
|
|
if ev.session_id in self.sessions:
|
|
self.sessions[ev.session_id].end = ev.utc_dt
|
|
self.editing_time += (
|
|
ev.utc_dt.timestamp()
|
|
- self.sessions[ev.session_id].start.timestamp()
|
|
)
|
|
else:
|
|
self.invalid_session_closes += 1
|
|
|
|
return self
|
|
|
|
def update(self, ev: Event) -> "BehaviourStats":
|
|
"""Update behavioural stats"""
|
|
|
|
# Session events
|
|
if ev.type == EventType.begin_s or ev.type == EventType.end_s:
|
|
return self.manage_session(ev)
|
|
|
|
# Now normal events
|
|
self.evts[ev.type] += 1
|
|
|
|
if ev.session_id not in self.sessions:
|
|
self.invalid_events += 1
|
|
return self
|
|
|
|
self.sessions[ev.session_id].langs.add(ev.language)
|
|
|
|
if ev.type == EventType.write and not ev.data:
|
|
self.invalid_writes += 1
|
|
elif ev.type == EventType.command and not ev.data:
|
|
self.invalid_commands += 1
|
|
|
|
return self
|
|
|
|
|
|
class UserBehaviour:
|
|
"""User behaviour model"""
|
|
|
|
def __init__(self) -> None:
|
|
self.session_durations: t.List[float] = []
|
|
self.time_bw_sessions: t.List[float] = []
|
|
self.average_similarity: float = 0
|
|
|
|
|
|
def count_of_records_yr() -> int:
|
|
"""Get count of records in the past year"""
|
|
|
|
with vd() as db:
|
|
return db.execute(
|
|
"SELECT COUNT(*) FROM event WHERE utc_dt >= DATE('now', '-1 year');"
|
|
).fetchone()[0]
|
|
|
|
|
|
def process_general_statistics(bar: VimLoadingBar) -> GeneralStats:
|
|
"""Processes general statistics."""
|
|
|
|
s: GeneralStats = GeneralStats()
|
|
|
|
with vd() as db:
|
|
for event in db.execute(
|
|
"SELECT * FROM event WHERE utc_dt >= DATE('now', '-1 year');"
|
|
):
|
|
s.update(Event.from_query(event))
|
|
bar.update()
|
|
|
|
return s
|
|
|
|
|
|
def process_behaviour_statistics(bar: VimLoadingBar) -> BehaviourStats:
|
|
"""Processes behavioural statistics."""
|
|
|
|
s: BehaviourStats = BehaviourStats()
|
|
|
|
data_range: t.Optional[float] = None
|
|
ev: t.Optional[Event] = None
|
|
|
|
with vd() as db:
|
|
for event in db.execute(
|
|
"SELECT * FROM event WHERE utc_dt >= DATE('now', '-1 year');"
|
|
):
|
|
ev = Event.from_query(event)
|
|
if data_range is None:
|
|
data_range = ev.utc_dt.timestamp()
|
|
s.update(ev)
|
|
bar.update()
|
|
|
|
if data_range is not None and ev is not None:
|
|
s.data_range = ev.utc_dt.timestamp() - data_range
|
|
|
|
return s
|
|
|
|
|
|
def process_avg_behaviour_statistics(
|
|
bar: VimLoadingBar, b: BehaviourStats
|
|
) -> BehaviourStats:
|
|
"""Processes average behavioural statistics."""
|
|
|
|
bar.render_status()
|
|
|
|
durations: t.Tuple[int, ...] = tuple(
|
|
int((s.end - s.start).seconds) for s in b.sessions.values() if s.end
|
|
)
|
|
|
|
b.avg_duration = int(sum(durations) / len(durations)) if durations else 0
|
|
b.med_duration = int(get_median(durations)) if durations else 0
|
|
|
|
daily_sessions: t.Dict[datetime.date, int] = defaultdict(int)
|
|
daily_languages: t.Dict[datetime.date, t.Set[str]] = defaultdict(set)
|
|
|
|
for session in b.sessions.values():
|
|
session_date: datetime.date = session.start.date()
|
|
daily_sessions[session_date] += 1
|
|
daily_languages[session_date].update(session.langs)
|
|
bar.update()
|
|
|
|
total_days: int = len(daily_sessions)
|
|
total_sessions: int = sum(daily_sessions.values())
|
|
b.avg_sessions = int(total_sessions / total_days) if total_days > 0 else 0
|
|
|
|
total_unique_languages: t.Set[str] = set()
|
|
for languages in daily_languages.values():
|
|
total_unique_languages.update(languages)
|
|
b.avg_langs = int(len(total_unique_languages) / total_days) if total_days > 0 else 0
|
|
|
|
return b
|