# ABOUTME: Tests for dynamic window calculation in stats service. # ABOUTME: Verifies metrics use actual tracking period instead of fixed 30 days. import time from ulid import ULID from animaltrack.services.stats import ( _calculate_window, _get_first_event_ts, ) # Constants for test calculations MS_PER_DAY = 24 * 60 * 60 * 1000 class TestCalculateWindow: """Tests for _calculate_window() helper function.""" def test_no_first_event_returns_30_day_window(self): """When no events exist, window should be 30 days.""" now_ms = int(time.time() * 1000) window_start, window_end, window_days = _calculate_window(now_ms, None) assert window_days == 30 assert window_end == now_ms assert window_start == now_ms - (30 * MS_PER_DAY) def test_first_event_1_day_ago_returns_1_day_window(self): """When first event was 1 day ago, window should be 1 day.""" now_ms = int(time.time() * 1000) first_event_ts = now_ms - (1 * MS_PER_DAY) window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts) assert window_days == 1 assert window_end == now_ms # Window spans 1 day back from now_ms assert window_start == now_ms - (1 * MS_PER_DAY) def test_first_event_15_days_ago_returns_15_day_window(self): """When first event was 15 days ago, window should be 15 days.""" now_ms = int(time.time() * 1000) first_event_ts = now_ms - (15 * MS_PER_DAY) window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts) assert window_days == 15 assert window_end == now_ms # Window spans 15 days back from now_ms assert window_start == now_ms - (15 * MS_PER_DAY) def test_first_event_45_days_ago_caps_at_30_days(self): """When first event was 45 days ago, window should cap at 30 days.""" now_ms = int(time.time() * 1000) first_event_ts = now_ms - (45 * MS_PER_DAY) window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts) assert window_days == 30 assert window_end == now_ms # Window start should be 30 days back, not at first_event_ts assert window_start == now_ms - (30 * MS_PER_DAY) def test_first_event_exactly_30_days_ago_returns_30_day_window(self): """When first event was exactly 30 days ago, window should be 30 days.""" now_ms = int(time.time() * 1000) first_event_ts = now_ms - (30 * MS_PER_DAY) window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts) assert window_days == 30 assert window_end == now_ms # Window spans 30 days back from now_ms assert window_start == now_ms - (30 * MS_PER_DAY) def test_first_event_today_returns_1_day_minimum(self): """Window should be at least 1 day even for same-day events.""" now_ms = int(time.time() * 1000) # First event is just 1 hour ago (less than 1 day) first_event_ts = now_ms - (1 * 60 * 60 * 1000) window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts) # Minimum window is 1 day assert window_days == 1 assert window_end == now_ms def test_custom_max_days(self): """Window can use custom max_days value.""" now_ms = int(time.time() * 1000) first_event_ts = now_ms - (60 * MS_PER_DAY) window_start, window_end, window_days = _calculate_window( now_ms, first_event_ts, max_days=7 ) assert window_days == 7 assert window_start == now_ms - (7 * MS_PER_DAY) class TestGetFirstEventTs: """Tests for _get_first_event_ts() helper function.""" def test_no_events_returns_none(self, seeded_db): """When no matching events exist, returns None.""" # seeded_db is empty initially result = _get_first_event_ts(seeded_db, "FeedGiven") assert result is None def test_finds_first_feed_given_event(self, seeded_db): """First FeedGiven event is correctly identified.""" # Insert two FeedGiven events at different times now_ms = int(time.time() * 1000) first_ts = now_ms - (10 * MS_PER_DAY) second_ts = now_ms - (5 * MS_PER_DAY) seeded_db.execute( """ INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( str(ULID()), "FeedGiven", first_ts, "test", '{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}', "{}", 1, ), ) seeded_db.execute( """ INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( str(ULID()), "FeedGiven", second_ts, "test", '{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}', "{}", 1, ), ) result = _get_first_event_ts(seeded_db, "FeedGiven") assert result == first_ts def test_first_egg_event_filters_by_product_prefix(self, seeded_db): """First event finder filters ProductCollected by product_code prefix.""" now_ms = int(time.time() * 1000) meat_ts = now_ms - (15 * MS_PER_DAY) egg_ts = now_ms - (10 * MS_PER_DAY) # Insert meat collection first (should be ignored) seeded_db.execute( """ INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( str(ULID()), "ProductCollected", meat_ts, "test", '{"location_id": "loc1", "product_code": "meat.duck", "quantity": 5}', "{}", 1, ), ) # Insert egg collection second seeded_db.execute( """ INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( str(ULID()), "ProductCollected", egg_ts, "test", '{"location_id": "loc1", "product_code": "egg.duck", "quantity": 12}', "{}", 1, ), ) # Without prefix filter, should find the meat event result_no_filter = _get_first_event_ts(seeded_db, "ProductCollected") assert result_no_filter == meat_ts # With egg. prefix, should find the egg event result_with_filter = _get_first_event_ts( seeded_db, "ProductCollected", product_prefix="egg." ) assert result_with_filter == egg_ts def test_tombstoned_first_event_uses_next_event(self, seeded_db): """When first event is tombstoned, uses next non-deleted event.""" now_ms = int(time.time() * 1000) first_ts = now_ms - (10 * MS_PER_DAY) second_ts = now_ms - (5 * MS_PER_DAY) event_deleted_id = str(ULID()) event_kept_id = str(ULID()) # Insert two events seeded_db.execute( """ INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( event_deleted_id, "FeedGiven", first_ts, "test", '{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}', "{}", 1, ), ) seeded_db.execute( """ INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version) VALUES (?, ?, ?, ?, ?, ?, ?) """, ( event_kept_id, "FeedGiven", second_ts, "test", '{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}', "{}", 1, ), ) # Tombstone the first event seeded_db.execute( """ INSERT INTO event_tombstones (id, target_event_id, ts_utc, actor, reason) VALUES (?, ?, ?, ?, ?) """, (str(ULID()), event_deleted_id, now_ms, "test", "deleted"), ) result = _get_first_event_ts(seeded_db, "FeedGiven") # Should return second event since first is tombstoned assert result == second_ts