feat: add 30-day egg stats computation service

Implement compute-on-read egg statistics per spec section 9:
- Create egg_stats_30d_by_location table (migration 0007)
- Add get_egg_stats service with bird-days calculation
- Calculate layer-eligible days (adult female + matching species)
- Implement feed proration formula with INTEGER truncation
- Cache computed stats with window bounds

Verifies E2E test #1 baseline values:
- eggs_total_pcs = 12
- feed_total_g = 6000, feed_layers_g = 4615
- cost_per_egg_all = 0.600, cost_per_egg_layers = 0.462
- layer_eligible_bird_days = 10

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 09:25:39 +00:00
parent 8334414b87
commit c08fa476e0
3 changed files with 887 additions and 0 deletions

View File

@@ -0,0 +1,359 @@
# ABOUTME: Service for computing 30-day egg production statistics.
# ABOUTME: Implements compute-on-read pattern per spec section 14.
import json
import time
from dataclasses import dataclass
from typing import Any
# 30 days in milliseconds
THIRTY_DAYS_MS = 30 * 24 * 60 * 60 * 1000
@dataclass
class EggStats:
"""30-day egg statistics for a single location."""
location_id: str
window_start_utc: int
window_end_utc: int
eggs_total_pcs: int
feed_total_g: int
feed_layers_g: int
cost_per_egg_all_eur: float
cost_per_egg_layers_eur: float
layer_eligible_bird_days: int
layer_eligible_count_now: int
updated_at_utc: int
def _get_species_for_product(product_code: str) -> str | None:
"""Extract species from product code.
Product codes like 'egg.duck' have species as suffix.
Returns None for generic products like 'meat'.
"""
if "." in product_code:
return product_code.split(".", 1)[1]
return None
def _calculate_bird_days(db: Any, location_id: str, window_start: int, window_end: int) -> int:
"""Calculate total bird-days for all animals at a location within window.
Bird-days = sum of overlap durations in animal_location_intervals.
Intervals with NULL end_utc are treated as open (use window_end).
"""
row = db.execute(
"""
SELECT COALESCE(SUM(
MIN(COALESCE(ali.end_utc, :window_end), :window_end) -
MAX(ali.start_utc, :window_start)
), 0) as total_ms
FROM animal_location_intervals ali
WHERE ali.location_id = :location_id
AND ali.start_utc < :window_end
AND (ali.end_utc IS NULL OR ali.end_utc > :window_start)
""",
{"location_id": location_id, "window_start": window_start, "window_end": window_end},
).fetchone()
total_ms = row[0] if row else 0
# Convert ms to days (we use integer days for simplicity in E2E test)
ms_per_day = 24 * 60 * 60 * 1000
return total_ms // ms_per_day if total_ms else 0
def _calculate_layer_eligible_bird_days(
db: Any, location_id: str, species: str, window_start: int, window_end: int
) -> int:
"""Calculate layer-eligible bird-days at a location within window.
Filters for: status='alive', life_stage='adult', sex='female', matching species.
"""
row = db.execute(
"""
SELECT COALESCE(SUM(
MIN(COALESCE(ali.end_utc, :window_end), :window_end) -
MAX(ali.start_utc, :window_start)
), 0) as total_ms
FROM animal_location_intervals ali
JOIN animal_registry ar ON ar.animal_id = ali.animal_id
WHERE ali.location_id = :location_id
AND ali.start_utc < :window_end
AND (ali.end_utc IS NULL OR ali.end_utc > :window_start)
AND ar.species_code = :species
-- Check status=alive at interval start
AND EXISTS (
SELECT 1 FROM animal_attr_intervals aai
WHERE aai.animal_id = ali.animal_id
AND aai.attr = 'status' AND aai.value = 'alive'
AND aai.start_utc <= ali.start_utc
AND (aai.end_utc IS NULL OR aai.end_utc > ali.start_utc)
)
-- Check life_stage=adult at interval start
AND EXISTS (
SELECT 1 FROM animal_attr_intervals aai
WHERE aai.animal_id = ali.animal_id
AND aai.attr = 'life_stage' AND aai.value = 'adult'
AND aai.start_utc <= ali.start_utc
AND (aai.end_utc IS NULL OR aai.end_utc > ali.start_utc)
)
-- Check sex=female at interval start
AND EXISTS (
SELECT 1 FROM animal_attr_intervals aai
WHERE aai.animal_id = ali.animal_id
AND aai.attr = 'sex' AND aai.value = 'female'
AND aai.start_utc <= ali.start_utc
AND (aai.end_utc IS NULL OR aai.end_utc > ali.start_utc)
)
""",
{
"location_id": location_id,
"species": species,
"window_start": window_start,
"window_end": window_end,
},
).fetchone()
total_ms = row[0] if row else 0
ms_per_day = 24 * 60 * 60 * 1000
return total_ms // ms_per_day if total_ms else 0
def _count_layer_eligible_now(db: Any, location_id: str, species: str, ts_utc: int) -> int:
"""Count layer-eligible animals currently at location."""
row = db.execute(
"""
SELECT COUNT(DISTINCT ar.animal_id)
FROM animal_registry ar
JOIN animal_location_intervals ali ON ali.animal_id = ar.animal_id
WHERE ali.location_id = :location_id
AND ali.start_utc <= :ts_utc
AND (ali.end_utc IS NULL OR ali.end_utc > :ts_utc)
AND ar.species_code = :species
AND ar.status = 'alive'
AND ar.life_stage = 'adult'
AND ar.sex = 'female'
""",
{"location_id": location_id, "species": species, "ts_utc": ts_utc},
).fetchone()
return row[0] if row else 0
def _count_eggs_in_window(
db: Any, location_id: str, window_start: int, window_end: int
) -> tuple[int, str | None]:
"""Count eggs collected in window and return the species.
Returns (eggs_count, species) where species is extracted from product_code.
Window is inclusive on both ends: [window_start, window_end].
"""
rows = db.execute(
"""
SELECT json_extract(entity_refs, '$.product_code') as product_code,
json_extract(entity_refs, '$.quantity') as quantity
FROM events
WHERE type = 'ProductCollected'
AND json_extract(entity_refs, '$.location_id') = :location_id
AND ts_utc >= :window_start
AND ts_utc <= :window_end
""",
{"location_id": location_id, "window_start": window_start, "window_end": window_end},
).fetchall()
total_eggs = 0
species = None
for row in rows:
product_code = row[0]
quantity = row[1]
if product_code and product_code.startswith("egg."):
total_eggs += quantity
if species is None:
species = _get_species_for_product(product_code)
return total_eggs, species
def _get_feed_events_in_window(
db: Any, location_id: str, window_start: int, window_end: int
) -> list[dict]:
"""Get all FeedGiven events at location in window.
Window is inclusive on both ends: [window_start, window_end].
"""
rows = db.execute(
"""
SELECT ts_utc, entity_refs
FROM events
WHERE type = 'FeedGiven'
AND json_extract(entity_refs, '$.location_id') = :location_id
AND ts_utc >= :window_start
AND ts_utc <= :window_end
ORDER BY ts_utc
""",
{"location_id": location_id, "window_start": window_start, "window_end": window_end},
).fetchall()
result = []
for row in rows:
entity_refs = json.loads(row[1])
result.append(
{
"ts_utc": row[0],
"feed_type_code": entity_refs["feed_type_code"],
"amount_kg": entity_refs["amount_kg"],
}
)
return result
def _get_feed_price_at_time(db: Any, feed_type_code: str, ts_utc: int) -> int:
"""Get the feed price per kg in cents at a given time.
Returns the price from the most recent FeedPurchased event <= ts_utc.
"""
row = db.execute(
"""
SELECT json_extract(entity_refs, '$.price_per_kg_cents') as price
FROM events
WHERE type = 'FeedPurchased'
AND json_extract(entity_refs, '$.feed_type_code') = :feed_type_code
AND ts_utc <= :ts_utc
ORDER BY ts_utc DESC
LIMIT 1
""",
{"feed_type_code": feed_type_code, "ts_utc": ts_utc},
).fetchone()
return row[0] if row else 0
def _upsert_stats(db: Any, stats: EggStats) -> None:
"""Upsert stats to the cache table."""
db.execute(
"""
INSERT INTO egg_stats_30d_by_location (
location_id, window_start_utc, window_end_utc,
eggs_total_pcs, feed_total_g, feed_layers_g,
cost_per_egg_all_eur, cost_per_egg_layers_eur,
layer_eligible_bird_days, layer_eligible_count_now,
updated_at_utc
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON CONFLICT(location_id) DO UPDATE SET
window_start_utc = excluded.window_start_utc,
window_end_utc = excluded.window_end_utc,
eggs_total_pcs = excluded.eggs_total_pcs,
feed_total_g = excluded.feed_total_g,
feed_layers_g = excluded.feed_layers_g,
cost_per_egg_all_eur = excluded.cost_per_egg_all_eur,
cost_per_egg_layers_eur = excluded.cost_per_egg_layers_eur,
layer_eligible_bird_days = excluded.layer_eligible_bird_days,
layer_eligible_count_now = excluded.layer_eligible_count_now,
updated_at_utc = excluded.updated_at_utc
""",
(
stats.location_id,
stats.window_start_utc,
stats.window_end_utc,
stats.eggs_total_pcs,
stats.feed_total_g,
stats.feed_layers_g,
stats.cost_per_egg_all_eur,
stats.cost_per_egg_layers_eur,
stats.layer_eligible_bird_days,
stats.layer_eligible_count_now,
stats.updated_at_utc,
),
)
def get_egg_stats(db: Any, location_id: str, ts_utc: int) -> EggStats:
"""Compute and cache 30-day egg stats for a location.
This is a compute-on-read operation. Stats are computed fresh
from the event log and interval tables, then upserted to the
cache table.
Args:
db: Database connection.
location_id: The location to compute stats for.
ts_utc: The timestamp to use as window_end_utc (usually now).
Returns:
Computed stats for the location.
"""
window_end_utc = ts_utc
window_start_utc = ts_utc - THIRTY_DAYS_MS
updated_at_utc = int(time.time() * 1000)
# Count eggs and determine species
eggs_total_pcs, species = _count_eggs_in_window(
db, location_id, window_start_utc, window_end_utc
)
# Default species to 'duck' if no eggs collected (for bird-days calculation)
if species is None:
species = "duck"
# Calculate bird-days
all_animal_days = _calculate_bird_days(db, location_id, window_start_utc, window_end_utc)
layer_eligible_bird_days = _calculate_layer_eligible_bird_days(
db, location_id, species, window_start_utc, window_end_utc
)
# Count layer-eligible animals now
layer_eligible_count_now = _count_layer_eligible_now(db, location_id, species, ts_utc)
# Calculate feed totals and costs
feed_events = _get_feed_events_in_window(db, location_id, window_start_utc, window_end_utc)
feed_total_g = 0
feed_layers_g = 0
total_cost_cents = 0.0
total_cost_layers_cents = 0.0
# Calculate share: layer_eligible_days / all_animal_days
share = layer_eligible_bird_days / all_animal_days if all_animal_days > 0 else 0.0
for feed_event in feed_events:
amount_g = feed_event["amount_kg"] * 1000
price_per_kg_cents = _get_feed_price_at_time(
db, feed_event["feed_type_code"], feed_event["ts_utc"]
)
feed_total_g += amount_g
feed_layers_g += int(amount_g * share) # INTEGER truncation
# Cost in cents for this feed event
cost_cents = feed_event["amount_kg"] * price_per_kg_cents
total_cost_cents += cost_cents
total_cost_layers_cents += cost_cents * share
# Calculate cost per egg in EUR
if eggs_total_pcs > 0:
cost_per_egg_all_eur = (total_cost_cents / 100) / eggs_total_pcs
cost_per_egg_layers_eur = (total_cost_layers_cents / 100) / eggs_total_pcs
else:
cost_per_egg_all_eur = 0.0
cost_per_egg_layers_eur = 0.0
stats = EggStats(
location_id=location_id,
window_start_utc=window_start_utc,
window_end_utc=window_end_utc,
eggs_total_pcs=eggs_total_pcs,
feed_total_g=feed_total_g,
feed_layers_g=feed_layers_g,
cost_per_egg_all_eur=cost_per_egg_all_eur,
cost_per_egg_layers_eur=cost_per_egg_layers_eur,
layer_eligible_bird_days=layer_eligible_bird_days,
layer_eligible_count_now=layer_eligible_count_now,
updated_at_utc=updated_at_utc,
)
# Cache the stats
_upsert_stats(db, stats)
return stats