Files
animaltrack/tests/test_service_stats_dynamic_window.py
Petru Paler 86dc3a13d2
All checks were successful
Deploy / deploy (push) Successful in 2m37s
Dynamic window metrics for cold start scenarios
Calculate metrics from first relevant event to now (capped at 30 days)
instead of a fixed 30-day window. This fixes inaccurate metrics for new
users who have only a few days of data.

Changes:
- Add _get_first_event_ts() and _calculate_window() helpers to stats.py
- Add window_days field to EggStats dataclass
- Update routes/eggs.py and routes/feed.py to use dynamic window
- Update templates to display "N-day avg" instead of "30-day avg"
- Use ceiling division for window_days to ensure first event is included

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-10 19:06:00 +00:00

257 lines
8.7 KiB
Python

# ABOUTME: Tests for dynamic window calculation in stats service.
# ABOUTME: Verifies metrics use actual tracking period instead of fixed 30 days.
import time
from ulid import ULID
from animaltrack.services.stats import (
_calculate_window,
_get_first_event_ts,
)
# Constants for test calculations
MS_PER_DAY = 24 * 60 * 60 * 1000
class TestCalculateWindow:
"""Tests for _calculate_window() helper function."""
def test_no_first_event_returns_30_day_window(self):
"""When no events exist, window should be 30 days."""
now_ms = int(time.time() * 1000)
window_start, window_end, window_days = _calculate_window(now_ms, None)
assert window_days == 30
assert window_end == now_ms
assert window_start == now_ms - (30 * MS_PER_DAY)
def test_first_event_1_day_ago_returns_1_day_window(self):
"""When first event was 1 day ago, window should be 1 day."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (1 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 1
assert window_end == now_ms
# Window spans 1 day back from now_ms
assert window_start == now_ms - (1 * MS_PER_DAY)
def test_first_event_15_days_ago_returns_15_day_window(self):
"""When first event was 15 days ago, window should be 15 days."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (15 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 15
assert window_end == now_ms
# Window spans 15 days back from now_ms
assert window_start == now_ms - (15 * MS_PER_DAY)
def test_first_event_45_days_ago_caps_at_30_days(self):
"""When first event was 45 days ago, window should cap at 30 days."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (45 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 30
assert window_end == now_ms
# Window start should be 30 days back, not at first_event_ts
assert window_start == now_ms - (30 * MS_PER_DAY)
def test_first_event_exactly_30_days_ago_returns_30_day_window(self):
"""When first event was exactly 30 days ago, window should be 30 days."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (30 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 30
assert window_end == now_ms
# Window spans 30 days back from now_ms
assert window_start == now_ms - (30 * MS_PER_DAY)
def test_first_event_today_returns_1_day_minimum(self):
"""Window should be at least 1 day even for same-day events."""
now_ms = int(time.time() * 1000)
# First event is just 1 hour ago (less than 1 day)
first_event_ts = now_ms - (1 * 60 * 60 * 1000)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
# Minimum window is 1 day
assert window_days == 1
assert window_end == now_ms
def test_custom_max_days(self):
"""Window can use custom max_days value."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (60 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(
now_ms, first_event_ts, max_days=7
)
assert window_days == 7
assert window_start == now_ms - (7 * MS_PER_DAY)
class TestGetFirstEventTs:
"""Tests for _get_first_event_ts() helper function."""
def test_no_events_returns_none(self, seeded_db):
"""When no matching events exist, returns None."""
# seeded_db is empty initially
result = _get_first_event_ts(seeded_db, "FeedGiven")
assert result is None
def test_finds_first_feed_given_event(self, seeded_db):
"""First FeedGiven event is correctly identified."""
# Insert two FeedGiven events at different times
now_ms = int(time.time() * 1000)
first_ts = now_ms - (10 * MS_PER_DAY)
second_ts = now_ms - (5 * MS_PER_DAY)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"FeedGiven",
first_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"FeedGiven",
second_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
result = _get_first_event_ts(seeded_db, "FeedGiven")
assert result == first_ts
def test_first_egg_event_filters_by_product_prefix(self, seeded_db):
"""First event finder filters ProductCollected by product_code prefix."""
now_ms = int(time.time() * 1000)
meat_ts = now_ms - (15 * MS_PER_DAY)
egg_ts = now_ms - (10 * MS_PER_DAY)
# Insert meat collection first (should be ignored)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"ProductCollected",
meat_ts,
"test",
'{"location_id": "loc1", "product_code": "meat.duck", "quantity": 5}',
"{}",
1,
),
)
# Insert egg collection second
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"ProductCollected",
egg_ts,
"test",
'{"location_id": "loc1", "product_code": "egg.duck", "quantity": 12}',
"{}",
1,
),
)
# Without prefix filter, should find the meat event
result_no_filter = _get_first_event_ts(seeded_db, "ProductCollected")
assert result_no_filter == meat_ts
# With egg. prefix, should find the egg event
result_with_filter = _get_first_event_ts(
seeded_db, "ProductCollected", product_prefix="egg."
)
assert result_with_filter == egg_ts
def test_tombstoned_first_event_uses_next_event(self, seeded_db):
"""When first event is tombstoned, uses next non-deleted event."""
now_ms = int(time.time() * 1000)
first_ts = now_ms - (10 * MS_PER_DAY)
second_ts = now_ms - (5 * MS_PER_DAY)
event_deleted_id = str(ULID())
event_kept_id = str(ULID())
# Insert two events
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
event_deleted_id,
"FeedGiven",
first_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
event_kept_id,
"FeedGiven",
second_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
# Tombstone the first event
seeded_db.execute(
"""
INSERT INTO event_tombstones (id, target_event_id, ts_utc, actor, reason)
VALUES (?, ?, ?, ?, ?)
""",
(str(ULID()), event_deleted_id, now_ms, "test", "deleted"),
)
result = _get_first_event_ts(seeded_db, "FeedGiven")
# Should return second event since first is tombstoned
assert result == second_ts