Dynamic window metrics for cold start scenarios
All checks were successful
Deploy / deploy (push) Successful in 2m37s

Calculate metrics from first relevant event to now (capped at 30 days)
instead of a fixed 30-day window. This fixes inaccurate metrics for new
users who have only a few days of data.

Changes:
- Add _get_first_event_ts() and _calculate_window() helpers to stats.py
- Add window_days field to EggStats dataclass
- Update routes/eggs.py and routes/feed.py to use dynamic window
- Update templates to display "N-day avg" instead of "30-day avg"
- Use ceiling division for window_days to ensure first event is included

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2026-01-10 19:06:00 +00:00
parent 4c62840cdf
commit 86dc3a13d2
8 changed files with 456 additions and 59 deletions

View File

@@ -8,15 +8,105 @@ from typing import Any
# 30 days in milliseconds
THIRTY_DAYS_MS = 30 * 24 * 60 * 60 * 1000
MS_PER_DAY = 24 * 60 * 60 * 1000
def _get_first_event_ts(
db: Any,
event_type: str,
product_prefix: str | None = None,
location_id: str | None = None,
) -> int | None:
"""Get timestamp of first event of given type.
For ProductCollected, optionally filter by product_code prefix (e.g., 'egg.').
Optionally filter by location_id.
Excludes tombstoned (deleted) events.
Args:
db: Database connection.
event_type: Event type to search for (e.g., 'FeedGiven', 'ProductCollected').
product_prefix: Optional prefix filter for product_code in entity_refs.
location_id: Optional location_id filter in entity_refs.
Returns:
Timestamp in ms of first event, or None if no events exist.
"""
params: dict = {"event_type": event_type}
# Build filter conditions
conditions = [
"e.type = :event_type",
"t.target_event_id IS NULL",
]
if product_prefix:
conditions.append("json_extract(e.entity_refs, '$.product_code') LIKE :prefix")
params["prefix"] = f"{product_prefix}%"
if location_id:
conditions.append("json_extract(e.entity_refs, '$.location_id') = :location_id")
params["location_id"] = location_id
where_clause = " AND ".join(conditions)
row = db.execute(
f"""
SELECT MIN(e.ts_utc)
FROM events e
LEFT JOIN event_tombstones t ON e.id = t.target_event_id
WHERE {where_clause}
""",
params,
).fetchone()
return row[0] if row and row[0] is not None else None
def _calculate_window(
ts_utc: int, first_event_ts: int | None, max_days: int = 30
) -> tuple[int, int, int]:
"""Calculate dynamic window based on first event timestamp.
Determines window_days based on time since first event (capped at max_days),
then returns a window ending at ts_utc with that duration.
Args:
ts_utc: Current timestamp (window end) in ms.
first_event_ts: Timestamp of first relevant event in ms, or None.
max_days: Maximum window size in days (default 30).
Returns:
Tuple of (window_start_utc, window_end_utc, window_days).
"""
max_window_ms = max_days * MS_PER_DAY
if first_event_ts is None:
# No events - use max window (metrics will be 0/None)
return ts_utc - max_window_ms, ts_utc, max_days
window_duration_ms = ts_utc - first_event_ts
if window_duration_ms >= max_window_ms:
# Cap at max_days
return ts_utc - max_window_ms, ts_utc, max_days
# Calculate days using ceiling division (ensures first event is included), minimum 1
window_days = max(1, (window_duration_ms + MS_PER_DAY - 1) // MS_PER_DAY)
# Window spans window_days back from ts_utc (not from first_event_ts)
window_start = ts_utc - (window_days * MS_PER_DAY)
return window_start, ts_utc, window_days
@dataclass
class EggStats:
"""30-day egg statistics for a single location."""
"""Egg statistics for a single location over a dynamic window."""
location_id: str
window_start_utc: int
window_end_utc: int
window_days: int
eggs_total_pcs: int
feed_total_g: int
feed_layers_g: int
@@ -279,12 +369,15 @@ def _upsert_stats(db: Any, stats: EggStats) -> None:
def get_egg_stats(db: Any, location_id: str, ts_utc: int) -> EggStats:
"""Compute and cache 30-day egg stats for a location.
"""Compute and cache egg stats for a location over a dynamic window.
This is a compute-on-read operation. Stats are computed fresh
from the event log and interval tables, then upserted to the
cache table.
The window is dynamic: it starts from the first egg collection event
and extends to now, capped at 30 days.
Args:
db: Database connection.
location_id: The location to compute stats for.
@@ -293,8 +386,11 @@ def get_egg_stats(db: Any, location_id: str, ts_utc: int) -> EggStats:
Returns:
Computed stats for the location.
"""
window_end_utc = ts_utc
window_start_utc = ts_utc - THIRTY_DAYS_MS
# Calculate dynamic window based on first egg event at this location
first_egg_ts = _get_first_event_ts(
db, "ProductCollected", product_prefix="egg.", location_id=location_id
)
window_start_utc, window_end_utc, window_days = _calculate_window(ts_utc, first_egg_ts)
updated_at_utc = int(time.time() * 1000)
# Count eggs and determine species
@@ -352,6 +448,7 @@ def get_egg_stats(db: Any, location_id: str, ts_utc: int) -> EggStats:
location_id=location_id,
window_start_utc=window_start_utc,
window_end_utc=window_end_utc,
window_days=window_days,
eggs_total_pcs=eggs_total_pcs,
feed_total_g=feed_total_g,
feed_layers_g=feed_layers_g,

View File

@@ -24,10 +24,11 @@ from animaltrack.repositories.products import ProductRepository
from animaltrack.repositories.user_defaults import UserDefaultsRepository
from animaltrack.repositories.users import UserRepository
from animaltrack.services.products import ProductService, ValidationError
from animaltrack.services.stats import _calculate_window, _get_first_event_ts
from animaltrack.web.templates import render_page, render_page_post
from animaltrack.web.templates.eggs import eggs_page
# 30 days in milliseconds
# 30 days in milliseconds (kept for reference)
THIRTY_DAYS_MS = 30 * 24 * 60 * 60 * 1000
@@ -142,22 +143,28 @@ def _get_recent_events(db: Any, event_type: str, limit: int = 10):
]
def _get_eggs_per_day(db: Any, now_ms: int) -> float | None:
"""Calculate eggs per day over 30-day window.
def _get_eggs_per_day(db: Any, now_ms: int) -> tuple[float | None, int]:
"""Calculate eggs per day over dynamic window.
Uses a dynamic window based on the first egg collection event,
capped at 30 days.
Args:
db: Database connection.
now_ms: Current timestamp in milliseconds.
Returns:
Eggs per day average, or None if no data.
Tuple of (eggs_per_day, window_days). eggs_per_day is None if no data.
"""
window_start = now_ms - THIRTY_DAYS_MS
# Calculate dynamic window based on first egg event
first_egg_ts = _get_first_event_ts(db, "ProductCollected", product_prefix="egg.")
window_start, window_end, window_days = _calculate_window(now_ms, first_egg_ts)
event_store = EventStore(db)
events = event_store.list_events(
event_type=PRODUCT_COLLECTED,
since_utc=window_start,
until_utc=now_ms,
until_utc=window_end,
limit=10000,
)
@@ -168,33 +175,37 @@ def _get_eggs_per_day(db: Any, now_ms: int) -> float | None:
total_eggs += event.entity_refs.get("quantity", 0)
if total_eggs == 0:
return None
return None, window_days
return total_eggs / 30.0
return total_eggs / window_days, window_days
def _get_global_cost_per_egg(db: Any, now_ms: int) -> float | None:
"""Calculate global cost per egg over 30-day window.
def _get_global_cost_per_egg(db: Any, now_ms: int) -> tuple[float | None, int]:
"""Calculate global cost per egg over dynamic window.
Aggregates feed costs and egg counts across all locations.
Uses a dynamic window based on the first egg collection event.
Args:
db: Database connection.
now_ms: Current timestamp in milliseconds.
Returns:
Cost per egg in EUR, or None if no eggs collected.
Tuple of (cost_per_egg, window_days). cost_per_egg is None if no eggs.
"""
from animaltrack.events import FEED_GIVEN
window_start = now_ms - THIRTY_DAYS_MS
# Calculate dynamic window based on first egg event
first_egg_ts = _get_first_event_ts(db, "ProductCollected", product_prefix="egg.")
window_start, window_end, window_days = _calculate_window(now_ms, first_egg_ts)
event_store = EventStore(db)
# Count eggs across all locations
egg_events = event_store.list_events(
event_type=PRODUCT_COLLECTED,
since_utc=window_start,
until_utc=now_ms,
until_utc=window_end,
limit=10000,
)
@@ -205,13 +216,13 @@ def _get_global_cost_per_egg(db: Any, now_ms: int) -> float | None:
total_eggs += event.entity_refs.get("quantity", 0)
if total_eggs == 0:
return None
return None, window_days
# Sum feed costs across all locations
feed_events = event_store.list_events(
event_type=FEED_GIVEN,
since_utc=window_start,
until_utc=now_ms,
until_utc=window_end,
limit=10000,
)
@@ -239,7 +250,7 @@ def _get_global_cost_per_egg(db: Any, now_ms: int) -> float | None:
price_per_kg_cents = price_row[0] if price_row else 0
total_cost_cents += amount_kg * price_per_kg_cents
return (total_cost_cents / 100) / total_eggs
return (total_cost_cents / 100) / total_eggs, window_days
def _get_sales_stats(db: Any, now_ms: int) -> dict | None:
@@ -289,14 +300,17 @@ def _get_eggs_display_data(db: Any, locations: list) -> dict:
Returns:
Dict with harvest_events, sell_events, eggs_per_day, cost_per_egg,
sales_stats, location_names.
eggs_window_days, sales_stats, location_names.
"""
now_ms = int(time.time() * 1000)
eggs_per_day, eggs_window_days = _get_eggs_per_day(db, now_ms)
cost_per_egg, _ = _get_global_cost_per_egg(db, now_ms)
return {
"harvest_events": _get_recent_events(db, PRODUCT_COLLECTED, limit=10),
"sell_events": _get_recent_events(db, PRODUCT_SOLD, limit=10),
"eggs_per_day": _get_eggs_per_day(db, now_ms),
"cost_per_egg": _get_global_cost_per_egg(db, now_ms),
"eggs_per_day": eggs_per_day,
"cost_per_egg": cost_per_egg,
"eggs_window_days": eggs_window_days,
"sales_stats": _get_sales_stats(db, now_ms),
"location_names": {loc.id: loc.name for loc in locations},
}

View File

@@ -22,6 +22,7 @@ from animaltrack.repositories.locations import LocationRepository
from animaltrack.repositories.user_defaults import UserDefaultsRepository
from animaltrack.repositories.users import UserRepository
from animaltrack.services.feed import FeedService, ValidationError
from animaltrack.services.stats import _calculate_window, _get_first_event_ts
from animaltrack.web.templates import render_page, render_page_post
from animaltrack.web.templates.feed import feed_page
@@ -111,32 +112,35 @@ def _get_recent_events(db: Any, event_type: str, limit: int = 10):
]
def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> float | None:
"""Calculate feed consumption per bird per day over 30-day window.
def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> tuple[float | None, int]:
"""Calculate feed consumption per bird per day over dynamic window.
Uses global bird-days across all locations.
Window is dynamic based on first FeedGiven event, capped at 30 days.
Args:
db: Database connection.
now_ms: Current timestamp in milliseconds.
Returns:
Feed consumption in grams per bird per day, or None if no data.
Tuple of (feed_per_bird_per_day, window_days). Value is None if no data.
"""
window_start = now_ms - THIRTY_DAYS_MS
# Calculate dynamic window based on first feed event
first_feed_ts = _get_first_event_ts(db, "FeedGiven")
window_start, window_end, window_days = _calculate_window(now_ms, first_feed_ts)
# Get total feed given in window (all locations)
event_store = EventStore(db)
events = event_store.list_events(
event_type=FEED_GIVEN,
since_utc=window_start,
until_utc=now_ms,
until_utc=window_end,
limit=10000,
)
total_kg = sum(e.entity_refs.get("amount_kg", 0) for e in events)
if total_kg == 0:
return None
return None, window_days
total_g = total_kg * 1000
@@ -153,7 +157,7 @@ def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> float | None:
AND (ali.end_utc IS NULL OR ali.end_utc > :window_start)
AND ar.status = 'alive'
""",
{"window_start": window_start, "window_end": now_ms},
{"window_start": window_start, "window_end": window_end},
).fetchone()
total_ms = row[0] if row else 0
@@ -161,24 +165,27 @@ def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> float | None:
bird_days = total_ms // ms_per_day if total_ms else 0
if bird_days == 0:
return None
return None, window_days
return total_g / bird_days
return total_g / bird_days, window_days
def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
"""Calculate feed cost per bird per day over 30-day window.
def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> tuple[float | None, int]:
"""Calculate feed cost per bird per day over dynamic window.
Uses global bird-days and feed costs across all locations.
Window is dynamic based on first FeedGiven event, capped at 30 days.
Args:
db: Database connection.
now_ms: Current timestamp in milliseconds.
Returns:
Feed cost in EUR per bird per day, or None if no data.
Tuple of (cost_per_bird_per_day, window_days). Value is None if no data.
"""
window_start = now_ms - THIRTY_DAYS_MS
# Calculate dynamic window based on first feed event
first_feed_ts = _get_first_event_ts(db, "FeedGiven")
window_start, window_end, window_days = _calculate_window(now_ms, first_feed_ts)
# Get total bird-days across all locations
row = db.execute(
@@ -193,7 +200,7 @@ def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
AND (ali.end_utc IS NULL OR ali.end_utc > :window_start)
AND ar.status = 'alive'
""",
{"window_start": window_start, "window_end": now_ms},
{"window_start": window_start, "window_end": window_end},
).fetchone()
total_ms = row[0] if row else 0
@@ -201,19 +208,19 @@ def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
bird_days = total_ms // ms_per_day if total_ms else 0
if bird_days == 0:
return None
return None, window_days
# Get total feed cost in window (all locations)
event_store = EventStore(db)
events = event_store.list_events(
event_type=FEED_GIVEN,
since_utc=window_start,
until_utc=now_ms,
until_utc=window_end,
limit=10000,
)
if not events:
return None
return None, window_days
total_cost_cents = 0.0
for event in events:
@@ -240,7 +247,7 @@ def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
total_cost_cents += amount_kg * price_per_kg_cents
# Convert to EUR and divide by bird-days
return (total_cost_cents / 100) / bird_days
return (total_cost_cents / 100) / bird_days, window_days
def _get_purchase_stats(db: Any, now_ms: int) -> dict | None:
@@ -294,11 +301,14 @@ def _get_feed_display_data(db: Any, locations: list, feed_types: list) -> dict:
Dict with display data for feed page.
"""
now_ms = int(time.time() * 1000)
feed_per_bird, feed_window_days = _get_feed_per_bird_per_day(db, now_ms)
cost_per_bird, _ = _get_cost_per_bird_per_day(db, now_ms)
return {
"give_events": _get_recent_events(db, FEED_GIVEN, limit=10),
"purchase_events": _get_recent_events(db, FEED_PURCHASED, limit=10),
"feed_per_bird_per_day_g": _get_feed_per_bird_per_day(db, now_ms),
"cost_per_bird_per_day": _get_cost_per_bird_per_day(db, now_ms),
"feed_per_bird_per_day_g": feed_per_bird,
"cost_per_bird_per_day": cost_per_bird,
"feed_window_days": feed_window_days,
"purchase_stats": _get_purchase_stats(db, now_ms),
"location_names": {loc.id: loc.name for loc in locations},
"feed_type_names": {ft.code: ft.name for ft in feed_types},

View File

@@ -35,6 +35,7 @@ def eggs_page(
sell_events: list[tuple[Event, bool]] | None = None,
eggs_per_day: float | None = None,
cost_per_egg: float | None = None,
eggs_window_days: int = 30,
sales_stats: dict | None = None,
location_names: dict[str, str] | None = None,
# Field value preservation on errors
@@ -59,8 +60,9 @@ def eggs_page(
sell_action: Route function or URL for sell form.
harvest_events: Recent ProductCollected events (most recent first).
sell_events: Recent ProductSold events (most recent first).
eggs_per_day: 30-day average eggs per day.
cost_per_egg: 30-day average cost per egg in EUR.
eggs_per_day: Average eggs per day over window.
cost_per_egg: Average cost per egg in EUR over window.
eggs_window_days: Actual window size in days for the metrics.
sales_stats: Dict with 'total_qty', 'total_cents', and 'avg_price_per_egg_cents'.
location_names: Dict mapping location_id to location name for display.
harvest_quantity: Preserved quantity value on error.
@@ -97,6 +99,7 @@ def eggs_page(
recent_events=harvest_events,
eggs_per_day=eggs_per_day,
cost_per_egg=cost_per_egg,
eggs_window_days=eggs_window_days,
location_names=location_names,
default_quantity=harvest_quantity,
default_notes=harvest_notes,
@@ -131,6 +134,7 @@ def harvest_form(
recent_events: list[tuple[Event, bool]] | None = None,
eggs_per_day: float | None = None,
cost_per_egg: float | None = None,
eggs_window_days: int = 30,
location_names: dict[str, str] | None = None,
default_quantity: str | None = None,
default_notes: str | None = None,
@@ -143,8 +147,9 @@ def harvest_form(
error: Optional error message to display.
action: Route function or URL string for form submission.
recent_events: Recent (Event, is_deleted) tuples, most recent first.
eggs_per_day: 30-day average eggs per day.
cost_per_egg: 30-day average cost per egg in EUR.
eggs_per_day: Average eggs per day over window.
cost_per_egg: Average cost per egg in EUR over window.
eggs_window_days: Actual window size in days for the metrics.
location_names: Dict mapping location_id to location name for display.
default_quantity: Preserved quantity value on error.
default_notes: Preserved notes value on error.
@@ -194,7 +199,7 @@ def harvest_form(
stat_parts.append(f"{eggs_per_day:.1f} eggs/day")
if cost_per_egg is not None:
stat_parts.append(f"{cost_per_egg:.3f}/egg cost")
stat_text = " | ".join(stat_parts) + " (30-day avg)" if stat_parts else None
stat_text = " | ".join(stat_parts) + f" ({eggs_window_days}-day avg)" if stat_parts else None
form = Form(
H2("Harvest Eggs", cls="text-xl font-bold mb-4"),

View File

@@ -37,6 +37,7 @@ def feed_page(
purchase_events: list[tuple[Event, bool]] | None = None,
feed_per_bird_per_day_g: float | None = None,
cost_per_bird_per_day: float | None = None,
feed_window_days: int = 30,
purchase_stats: dict | None = None,
location_names: dict[str, str] | None = None,
feed_type_names: dict[str, str] | None = None,
@@ -59,6 +60,7 @@ def feed_page(
purchase_events: Recent FeedPurchased events (most recent first).
feed_per_bird_per_day_g: Average feed consumption in g/bird/day.
cost_per_bird_per_day: Average feed cost per bird per day in EUR.
feed_window_days: Actual window size in days for the metrics.
purchase_stats: Dict with 'total_kg' and 'avg_price_per_kg_cents'.
location_names: Dict mapping location_id to location name.
feed_type_names: Dict mapping feed_type_code to feed type name.
@@ -96,6 +98,7 @@ def feed_page(
recent_events=give_events,
feed_per_bird_per_day_g=feed_per_bird_per_day_g,
cost_per_bird_per_day=cost_per_bird_per_day,
feed_window_days=feed_window_days,
location_names=location_names,
feed_type_names=feed_type_names,
),
@@ -129,6 +132,7 @@ def give_feed_form(
recent_events: list[tuple[Event, bool]] | None = None,
feed_per_bird_per_day_g: float | None = None,
cost_per_bird_per_day: float | None = None,
feed_window_days: int = 30,
location_names: dict[str, str] | None = None,
feed_type_names: dict[str, str] | None = None,
) -> Div:
@@ -146,6 +150,7 @@ def give_feed_form(
recent_events: Recent (Event, is_deleted) tuples, most recent first.
feed_per_bird_per_day_g: Average feed consumption in g/bird/day.
cost_per_bird_per_day: Average feed cost per bird per day in EUR.
feed_window_days: Actual window size in days for the metrics.
location_names: Dict mapping location_id to location name.
feed_type_names: Dict mapping feed_type_code to feed type name.
@@ -218,7 +223,7 @@ def give_feed_form(
stat_parts.append(f"{feed_per_bird_per_day_g:.1f}g/bird/day")
if cost_per_bird_per_day is not None:
stat_parts.append(f"{cost_per_bird_per_day:.3f}/bird/day cost")
stat_text = " | ".join(stat_parts) + " (30-day avg)" if stat_parts else None
stat_text = " | ".join(stat_parts) + f" ({feed_window_days}-day avg)" if stat_parts else None
form = Form(
H2("Give Feed", cls="text-xl font-bold mb-4"),

View File

@@ -462,11 +462,13 @@ class TestE2EStatsProgression:
Implementation produces different value due to:
1. Integer bird-day truncation
2. Timeline differences (1 day advance for Strip 2 bird-days)
3. Dynamic window uses ceiling for window_days (2-day window)
With timeline adjusted, we get layer_eligible_bird_days=15 for Strip 1.
With timeline adjusted, we get layer_eligible_bird_days=14 for Strip 1.
share = 14/35 = 0.4, feed_layers_g = int(20000 * 0.4) = 8000
"""
stats = get_egg_stats(seeded_db, test3_state["strip1"], test3_state["ts_utc"])
assert stats.feed_layers_g == 8570
assert stats.feed_layers_g == 8000
def test_3_strip1_cost_per_egg_all(self, seeded_db, test3_state):
"""E2E #3: Strip 1 cost_per_egg_all should be 0.889 +/- 0.001."""
@@ -479,9 +481,12 @@ class TestE2EStatsProgression:
Spec value: 0.448
Implementation value differs due to timeline adjustments and integer truncation.
Dynamic window with ceiling gives share = 14/35 = 0.4.
layer_cost = 24 EUR * 0.4 = 9.60 EUR
cost_per_egg_layers = 9.60 / 27 = 0.356
"""
stats = get_egg_stats(seeded_db, test3_state["strip1"], test3_state["ts_utc"])
assert stats.cost_per_egg_layers_eur == pytest.approx(0.381, abs=0.001)
assert stats.cost_per_egg_layers_eur == pytest.approx(0.356, abs=0.001)
def test_3_strip2_eggs(self, seeded_db, test3_state):
"""E2E #3: Strip 2 eggs should be 6."""
@@ -581,9 +586,12 @@ class TestE2EStatsProgression:
Spec value: 0.345
Implementation value differs due to timeline adjustments for bird-days.
Dynamic window with ceiling gives share = 14/35 = 0.4.
layer_cost = 24 EUR * 0.4 = 9.60 EUR
cost_per_egg_layers = 9.60 / 35 = 0.274
"""
stats = get_egg_stats(seeded_db, test4_state["strip1"], test4_state["ts_utc"])
assert stats.cost_per_egg_layers_eur == pytest.approx(0.294, abs=0.001)
assert stats.cost_per_egg_layers_eur == pytest.approx(0.274, abs=0.001)
# =========================================================================
# Test #5: Edit egg event
@@ -647,9 +655,12 @@ class TestE2EStatsProgression:
Spec value: 0.366
Implementation value differs due to timeline adjustments for bird-days.
Dynamic window with ceiling gives share = 14/35 = 0.4.
layer_cost = 24 EUR * 0.4 = 9.60 EUR
cost_per_egg_layers = 9.60 / 33 = 0.291
"""
stats = get_egg_stats(seeded_db, test5_state["strip1"], test5_state["ts_utc"])
assert stats.cost_per_egg_layers_eur == pytest.approx(0.312, abs=0.001)
assert stats.cost_per_egg_layers_eur == pytest.approx(0.291, abs=0.001)
def test_5_event_version_incremented(self, seeded_db, services, test5_state):
"""E2E #5: Edited event version should be 2."""

View File

@@ -489,7 +489,7 @@ class TestEggStatsCaching:
def test_cached_stats_have_window_bounds(self, seeded_db, e2e_test1_setup):
"""Cached stats include window_start_utc and window_end_utc."""
ts_utc = e2e_test1_setup["ts_utc"]
get_egg_stats(seeded_db, e2e_test1_setup["location_id"], ts_utc)
stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], ts_utc)
row = seeded_db.execute(
"""
@@ -500,7 +500,6 @@ class TestEggStatsCaching:
).fetchone()
assert row is not None
assert row[1] == ts_utc # window_end_utc
# Window is 30 days
thirty_days_ms = 30 * 24 * 60 * 60 * 1000
assert row[0] == ts_utc - thirty_days_ms # window_start_utc
# Cached bounds should match what get_egg_stats returned
assert row[0] == stats.window_start_utc
assert row[1] == stats.window_end_utc

View File

@@ -0,0 +1,256 @@
# ABOUTME: Tests for dynamic window calculation in stats service.
# ABOUTME: Verifies metrics use actual tracking period instead of fixed 30 days.
import time
from ulid import ULID
from animaltrack.services.stats import (
_calculate_window,
_get_first_event_ts,
)
# Constants for test calculations
MS_PER_DAY = 24 * 60 * 60 * 1000
class TestCalculateWindow:
"""Tests for _calculate_window() helper function."""
def test_no_first_event_returns_30_day_window(self):
"""When no events exist, window should be 30 days."""
now_ms = int(time.time() * 1000)
window_start, window_end, window_days = _calculate_window(now_ms, None)
assert window_days == 30
assert window_end == now_ms
assert window_start == now_ms - (30 * MS_PER_DAY)
def test_first_event_1_day_ago_returns_1_day_window(self):
"""When first event was 1 day ago, window should be 1 day."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (1 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 1
assert window_end == now_ms
# Window spans 1 day back from now_ms
assert window_start == now_ms - (1 * MS_PER_DAY)
def test_first_event_15_days_ago_returns_15_day_window(self):
"""When first event was 15 days ago, window should be 15 days."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (15 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 15
assert window_end == now_ms
# Window spans 15 days back from now_ms
assert window_start == now_ms - (15 * MS_PER_DAY)
def test_first_event_45_days_ago_caps_at_30_days(self):
"""When first event was 45 days ago, window should cap at 30 days."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (45 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 30
assert window_end == now_ms
# Window start should be 30 days back, not at first_event_ts
assert window_start == now_ms - (30 * MS_PER_DAY)
def test_first_event_exactly_30_days_ago_returns_30_day_window(self):
"""When first event was exactly 30 days ago, window should be 30 days."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (30 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
assert window_days == 30
assert window_end == now_ms
# Window spans 30 days back from now_ms
assert window_start == now_ms - (30 * MS_PER_DAY)
def test_first_event_today_returns_1_day_minimum(self):
"""Window should be at least 1 day even for same-day events."""
now_ms = int(time.time() * 1000)
# First event is just 1 hour ago (less than 1 day)
first_event_ts = now_ms - (1 * 60 * 60 * 1000)
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
# Minimum window is 1 day
assert window_days == 1
assert window_end == now_ms
def test_custom_max_days(self):
"""Window can use custom max_days value."""
now_ms = int(time.time() * 1000)
first_event_ts = now_ms - (60 * MS_PER_DAY)
window_start, window_end, window_days = _calculate_window(
now_ms, first_event_ts, max_days=7
)
assert window_days == 7
assert window_start == now_ms - (7 * MS_PER_DAY)
class TestGetFirstEventTs:
"""Tests for _get_first_event_ts() helper function."""
def test_no_events_returns_none(self, seeded_db):
"""When no matching events exist, returns None."""
# seeded_db is empty initially
result = _get_first_event_ts(seeded_db, "FeedGiven")
assert result is None
def test_finds_first_feed_given_event(self, seeded_db):
"""First FeedGiven event is correctly identified."""
# Insert two FeedGiven events at different times
now_ms = int(time.time() * 1000)
first_ts = now_ms - (10 * MS_PER_DAY)
second_ts = now_ms - (5 * MS_PER_DAY)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"FeedGiven",
first_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"FeedGiven",
second_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
result = _get_first_event_ts(seeded_db, "FeedGiven")
assert result == first_ts
def test_first_egg_event_filters_by_product_prefix(self, seeded_db):
"""First event finder filters ProductCollected by product_code prefix."""
now_ms = int(time.time() * 1000)
meat_ts = now_ms - (15 * MS_PER_DAY)
egg_ts = now_ms - (10 * MS_PER_DAY)
# Insert meat collection first (should be ignored)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"ProductCollected",
meat_ts,
"test",
'{"location_id": "loc1", "product_code": "meat.duck", "quantity": 5}',
"{}",
1,
),
)
# Insert egg collection second
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
str(ULID()),
"ProductCollected",
egg_ts,
"test",
'{"location_id": "loc1", "product_code": "egg.duck", "quantity": 12}',
"{}",
1,
),
)
# Without prefix filter, should find the meat event
result_no_filter = _get_first_event_ts(seeded_db, "ProductCollected")
assert result_no_filter == meat_ts
# With egg. prefix, should find the egg event
result_with_filter = _get_first_event_ts(
seeded_db, "ProductCollected", product_prefix="egg."
)
assert result_with_filter == egg_ts
def test_tombstoned_first_event_uses_next_event(self, seeded_db):
"""When first event is tombstoned, uses next non-deleted event."""
now_ms = int(time.time() * 1000)
first_ts = now_ms - (10 * MS_PER_DAY)
second_ts = now_ms - (5 * MS_PER_DAY)
event_deleted_id = str(ULID())
event_kept_id = str(ULID())
# Insert two events
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
event_deleted_id,
"FeedGiven",
first_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
seeded_db.execute(
"""
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)
""",
(
event_kept_id,
"FeedGiven",
second_ts,
"test",
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
"{}",
1,
),
)
# Tombstone the first event
seeded_db.execute(
"""
INSERT INTO event_tombstones (id, target_event_id, ts_utc, actor, reason)
VALUES (?, ?, ?, ?, ?)
""",
(str(ULID()), event_deleted_id, now_ms, "test", "deleted"),
)
result = _get_first_event_ts(seeded_db, "FeedGiven")
# Should return second event since first is tombstoned
assert result == second_ts