Compare commits
3 Commits
66d404efbc
...
86dc3a13d2
| Author | SHA1 | Date | |
|---|---|---|---|
| 86dc3a13d2 | |||
| 4c62840cdf | |||
| fe73363a4b |
@@ -8,15 +8,105 @@ from typing import Any
|
||||
|
||||
# 30 days in milliseconds
|
||||
THIRTY_DAYS_MS = 30 * 24 * 60 * 60 * 1000
|
||||
MS_PER_DAY = 24 * 60 * 60 * 1000
|
||||
|
||||
|
||||
def _get_first_event_ts(
|
||||
db: Any,
|
||||
event_type: str,
|
||||
product_prefix: str | None = None,
|
||||
location_id: str | None = None,
|
||||
) -> int | None:
|
||||
"""Get timestamp of first event of given type.
|
||||
|
||||
For ProductCollected, optionally filter by product_code prefix (e.g., 'egg.').
|
||||
Optionally filter by location_id.
|
||||
Excludes tombstoned (deleted) events.
|
||||
|
||||
Args:
|
||||
db: Database connection.
|
||||
event_type: Event type to search for (e.g., 'FeedGiven', 'ProductCollected').
|
||||
product_prefix: Optional prefix filter for product_code in entity_refs.
|
||||
location_id: Optional location_id filter in entity_refs.
|
||||
|
||||
Returns:
|
||||
Timestamp in ms of first event, or None if no events exist.
|
||||
"""
|
||||
params: dict = {"event_type": event_type}
|
||||
|
||||
# Build filter conditions
|
||||
conditions = [
|
||||
"e.type = :event_type",
|
||||
"t.target_event_id IS NULL",
|
||||
]
|
||||
|
||||
if product_prefix:
|
||||
conditions.append("json_extract(e.entity_refs, '$.product_code') LIKE :prefix")
|
||||
params["prefix"] = f"{product_prefix}%"
|
||||
|
||||
if location_id:
|
||||
conditions.append("json_extract(e.entity_refs, '$.location_id') = :location_id")
|
||||
params["location_id"] = location_id
|
||||
|
||||
where_clause = " AND ".join(conditions)
|
||||
|
||||
row = db.execute(
|
||||
f"""
|
||||
SELECT MIN(e.ts_utc)
|
||||
FROM events e
|
||||
LEFT JOIN event_tombstones t ON e.id = t.target_event_id
|
||||
WHERE {where_clause}
|
||||
""",
|
||||
params,
|
||||
).fetchone()
|
||||
|
||||
return row[0] if row and row[0] is not None else None
|
||||
|
||||
|
||||
def _calculate_window(
|
||||
ts_utc: int, first_event_ts: int | None, max_days: int = 30
|
||||
) -> tuple[int, int, int]:
|
||||
"""Calculate dynamic window based on first event timestamp.
|
||||
|
||||
Determines window_days based on time since first event (capped at max_days),
|
||||
then returns a window ending at ts_utc with that duration.
|
||||
|
||||
Args:
|
||||
ts_utc: Current timestamp (window end) in ms.
|
||||
first_event_ts: Timestamp of first relevant event in ms, or None.
|
||||
max_days: Maximum window size in days (default 30).
|
||||
|
||||
Returns:
|
||||
Tuple of (window_start_utc, window_end_utc, window_days).
|
||||
"""
|
||||
max_window_ms = max_days * MS_PER_DAY
|
||||
|
||||
if first_event_ts is None:
|
||||
# No events - use max window (metrics will be 0/None)
|
||||
return ts_utc - max_window_ms, ts_utc, max_days
|
||||
|
||||
window_duration_ms = ts_utc - first_event_ts
|
||||
|
||||
if window_duration_ms >= max_window_ms:
|
||||
# Cap at max_days
|
||||
return ts_utc - max_window_ms, ts_utc, max_days
|
||||
|
||||
# Calculate days using ceiling division (ensures first event is included), minimum 1
|
||||
window_days = max(1, (window_duration_ms + MS_PER_DAY - 1) // MS_PER_DAY)
|
||||
|
||||
# Window spans window_days back from ts_utc (not from first_event_ts)
|
||||
window_start = ts_utc - (window_days * MS_PER_DAY)
|
||||
return window_start, ts_utc, window_days
|
||||
|
||||
|
||||
@dataclass
|
||||
class EggStats:
|
||||
"""30-day egg statistics for a single location."""
|
||||
"""Egg statistics for a single location over a dynamic window."""
|
||||
|
||||
location_id: str
|
||||
window_start_utc: int
|
||||
window_end_utc: int
|
||||
window_days: int
|
||||
eggs_total_pcs: int
|
||||
feed_total_g: int
|
||||
feed_layers_g: int
|
||||
@@ -279,12 +369,15 @@ def _upsert_stats(db: Any, stats: EggStats) -> None:
|
||||
|
||||
|
||||
def get_egg_stats(db: Any, location_id: str, ts_utc: int) -> EggStats:
|
||||
"""Compute and cache 30-day egg stats for a location.
|
||||
"""Compute and cache egg stats for a location over a dynamic window.
|
||||
|
||||
This is a compute-on-read operation. Stats are computed fresh
|
||||
from the event log and interval tables, then upserted to the
|
||||
cache table.
|
||||
|
||||
The window is dynamic: it starts from the first egg collection event
|
||||
and extends to now, capped at 30 days.
|
||||
|
||||
Args:
|
||||
db: Database connection.
|
||||
location_id: The location to compute stats for.
|
||||
@@ -293,8 +386,11 @@ def get_egg_stats(db: Any, location_id: str, ts_utc: int) -> EggStats:
|
||||
Returns:
|
||||
Computed stats for the location.
|
||||
"""
|
||||
window_end_utc = ts_utc
|
||||
window_start_utc = ts_utc - THIRTY_DAYS_MS
|
||||
# Calculate dynamic window based on first egg event at this location
|
||||
first_egg_ts = _get_first_event_ts(
|
||||
db, "ProductCollected", product_prefix="egg.", location_id=location_id
|
||||
)
|
||||
window_start_utc, window_end_utc, window_days = _calculate_window(ts_utc, first_egg_ts)
|
||||
updated_at_utc = int(time.time() * 1000)
|
||||
|
||||
# Count eggs and determine species
|
||||
@@ -352,6 +448,7 @@ def get_egg_stats(db: Any, location_id: str, ts_utc: int) -> EggStats:
|
||||
location_id=location_id,
|
||||
window_start_utc=window_start_utc,
|
||||
window_end_utc=window_end_utc,
|
||||
window_days=window_days,
|
||||
eggs_total_pcs=eggs_total_pcs,
|
||||
feed_total_g=feed_total_g,
|
||||
feed_layers_g=feed_layers_g,
|
||||
|
||||
@@ -24,10 +24,11 @@ from animaltrack.repositories.products import ProductRepository
|
||||
from animaltrack.repositories.user_defaults import UserDefaultsRepository
|
||||
from animaltrack.repositories.users import UserRepository
|
||||
from animaltrack.services.products import ProductService, ValidationError
|
||||
from animaltrack.services.stats import _calculate_window, _get_first_event_ts
|
||||
from animaltrack.web.templates import render_page, render_page_post
|
||||
from animaltrack.web.templates.eggs import eggs_page
|
||||
|
||||
# 30 days in milliseconds
|
||||
# 30 days in milliseconds (kept for reference)
|
||||
THIRTY_DAYS_MS = 30 * 24 * 60 * 60 * 1000
|
||||
|
||||
|
||||
@@ -55,7 +56,9 @@ ar = APIRouter()
|
||||
|
||||
|
||||
def resolve_ducks_at_location(db: Any, location_id: str, ts_utc: int) -> list[str]:
|
||||
"""Resolve all duck animal IDs at a location at given timestamp.
|
||||
"""Resolve layer-eligible duck IDs at a location at given timestamp.
|
||||
|
||||
Only includes adult female ducks that can lay eggs.
|
||||
|
||||
Args:
|
||||
db: Database connection.
|
||||
@@ -63,7 +66,7 @@ def resolve_ducks_at_location(db: Any, location_id: str, ts_utc: int) -> list[st
|
||||
ts_utc: Timestamp in ms since Unix epoch.
|
||||
|
||||
Returns:
|
||||
List of animal IDs (ducks at the location, alive at ts_utc).
|
||||
List of animal IDs (adult female ducks at the location, alive at ts_utc).
|
||||
"""
|
||||
query = """
|
||||
SELECT DISTINCT ali.animal_id
|
||||
@@ -74,6 +77,8 @@ def resolve_ducks_at_location(db: Any, location_id: str, ts_utc: int) -> list[st
|
||||
AND (ali.end_utc IS NULL OR ali.end_utc > ?)
|
||||
AND ar.species_code = 'duck'
|
||||
AND ar.status = 'alive'
|
||||
AND ar.life_stage = 'adult'
|
||||
AND ar.sex = 'female'
|
||||
ORDER BY ali.animal_id
|
||||
"""
|
||||
rows = db.execute(query, (location_id, ts_utc, ts_utc)).fetchall()
|
||||
@@ -138,22 +143,28 @@ def _get_recent_events(db: Any, event_type: str, limit: int = 10):
|
||||
]
|
||||
|
||||
|
||||
def _get_eggs_per_day(db: Any, now_ms: int) -> float | None:
|
||||
"""Calculate eggs per day over 30-day window.
|
||||
def _get_eggs_per_day(db: Any, now_ms: int) -> tuple[float | None, int]:
|
||||
"""Calculate eggs per day over dynamic window.
|
||||
|
||||
Uses a dynamic window based on the first egg collection event,
|
||||
capped at 30 days.
|
||||
|
||||
Args:
|
||||
db: Database connection.
|
||||
now_ms: Current timestamp in milliseconds.
|
||||
|
||||
Returns:
|
||||
Eggs per day average, or None if no data.
|
||||
Tuple of (eggs_per_day, window_days). eggs_per_day is None if no data.
|
||||
"""
|
||||
window_start = now_ms - THIRTY_DAYS_MS
|
||||
# Calculate dynamic window based on first egg event
|
||||
first_egg_ts = _get_first_event_ts(db, "ProductCollected", product_prefix="egg.")
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_egg_ts)
|
||||
|
||||
event_store = EventStore(db)
|
||||
events = event_store.list_events(
|
||||
event_type=PRODUCT_COLLECTED,
|
||||
since_utc=window_start,
|
||||
until_utc=now_ms,
|
||||
until_utc=window_end,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
@@ -164,33 +175,37 @@ def _get_eggs_per_day(db: Any, now_ms: int) -> float | None:
|
||||
total_eggs += event.entity_refs.get("quantity", 0)
|
||||
|
||||
if total_eggs == 0:
|
||||
return None
|
||||
return None, window_days
|
||||
|
||||
return total_eggs / 30.0
|
||||
return total_eggs / window_days, window_days
|
||||
|
||||
|
||||
def _get_global_cost_per_egg(db: Any, now_ms: int) -> float | None:
|
||||
"""Calculate global cost per egg over 30-day window.
|
||||
def _get_global_cost_per_egg(db: Any, now_ms: int) -> tuple[float | None, int]:
|
||||
"""Calculate global cost per egg over dynamic window.
|
||||
|
||||
Aggregates feed costs and egg counts across all locations.
|
||||
Uses a dynamic window based on the first egg collection event.
|
||||
|
||||
Args:
|
||||
db: Database connection.
|
||||
now_ms: Current timestamp in milliseconds.
|
||||
|
||||
Returns:
|
||||
Cost per egg in EUR, or None if no eggs collected.
|
||||
Tuple of (cost_per_egg, window_days). cost_per_egg is None if no eggs.
|
||||
"""
|
||||
from animaltrack.events import FEED_GIVEN
|
||||
|
||||
window_start = now_ms - THIRTY_DAYS_MS
|
||||
# Calculate dynamic window based on first egg event
|
||||
first_egg_ts = _get_first_event_ts(db, "ProductCollected", product_prefix="egg.")
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_egg_ts)
|
||||
|
||||
event_store = EventStore(db)
|
||||
|
||||
# Count eggs across all locations
|
||||
egg_events = event_store.list_events(
|
||||
event_type=PRODUCT_COLLECTED,
|
||||
since_utc=window_start,
|
||||
until_utc=now_ms,
|
||||
until_utc=window_end,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
@@ -201,13 +216,13 @@ def _get_global_cost_per_egg(db: Any, now_ms: int) -> float | None:
|
||||
total_eggs += event.entity_refs.get("quantity", 0)
|
||||
|
||||
if total_eggs == 0:
|
||||
return None
|
||||
return None, window_days
|
||||
|
||||
# Sum feed costs across all locations
|
||||
feed_events = event_store.list_events(
|
||||
event_type=FEED_GIVEN,
|
||||
since_utc=window_start,
|
||||
until_utc=now_ms,
|
||||
until_utc=window_end,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
@@ -235,7 +250,7 @@ def _get_global_cost_per_egg(db: Any, now_ms: int) -> float | None:
|
||||
price_per_kg_cents = price_row[0] if price_row else 0
|
||||
total_cost_cents += amount_kg * price_per_kg_cents
|
||||
|
||||
return (total_cost_cents / 100) / total_eggs
|
||||
return (total_cost_cents / 100) / total_eggs, window_days
|
||||
|
||||
|
||||
def _get_sales_stats(db: Any, now_ms: int) -> dict | None:
|
||||
@@ -285,14 +300,17 @@ def _get_eggs_display_data(db: Any, locations: list) -> dict:
|
||||
|
||||
Returns:
|
||||
Dict with harvest_events, sell_events, eggs_per_day, cost_per_egg,
|
||||
sales_stats, location_names.
|
||||
eggs_window_days, sales_stats, location_names.
|
||||
"""
|
||||
now_ms = int(time.time() * 1000)
|
||||
eggs_per_day, eggs_window_days = _get_eggs_per_day(db, now_ms)
|
||||
cost_per_egg, _ = _get_global_cost_per_egg(db, now_ms)
|
||||
return {
|
||||
"harvest_events": _get_recent_events(db, PRODUCT_COLLECTED, limit=10),
|
||||
"sell_events": _get_recent_events(db, PRODUCT_SOLD, limit=10),
|
||||
"eggs_per_day": _get_eggs_per_day(db, now_ms),
|
||||
"cost_per_egg": _get_global_cost_per_egg(db, now_ms),
|
||||
"eggs_per_day": eggs_per_day,
|
||||
"cost_per_egg": cost_per_egg,
|
||||
"eggs_window_days": eggs_window_days,
|
||||
"sales_stats": _get_sales_stats(db, now_ms),
|
||||
"location_names": {loc.id: loc.name for loc in locations},
|
||||
}
|
||||
|
||||
@@ -22,6 +22,7 @@ from animaltrack.repositories.locations import LocationRepository
|
||||
from animaltrack.repositories.user_defaults import UserDefaultsRepository
|
||||
from animaltrack.repositories.users import UserRepository
|
||||
from animaltrack.services.feed import FeedService, ValidationError
|
||||
from animaltrack.services.stats import _calculate_window, _get_first_event_ts
|
||||
from animaltrack.web.templates import render_page, render_page_post
|
||||
from animaltrack.web.templates.feed import feed_page
|
||||
|
||||
@@ -111,32 +112,35 @@ def _get_recent_events(db: Any, event_type: str, limit: int = 10):
|
||||
]
|
||||
|
||||
|
||||
def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> float | None:
|
||||
"""Calculate feed consumption per bird per day over 30-day window.
|
||||
def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> tuple[float | None, int]:
|
||||
"""Calculate feed consumption per bird per day over dynamic window.
|
||||
|
||||
Uses global bird-days across all locations.
|
||||
Window is dynamic based on first FeedGiven event, capped at 30 days.
|
||||
|
||||
Args:
|
||||
db: Database connection.
|
||||
now_ms: Current timestamp in milliseconds.
|
||||
|
||||
Returns:
|
||||
Feed consumption in grams per bird per day, or None if no data.
|
||||
Tuple of (feed_per_bird_per_day, window_days). Value is None if no data.
|
||||
"""
|
||||
window_start = now_ms - THIRTY_DAYS_MS
|
||||
# Calculate dynamic window based on first feed event
|
||||
first_feed_ts = _get_first_event_ts(db, "FeedGiven")
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_feed_ts)
|
||||
|
||||
# Get total feed given in window (all locations)
|
||||
event_store = EventStore(db)
|
||||
events = event_store.list_events(
|
||||
event_type=FEED_GIVEN,
|
||||
since_utc=window_start,
|
||||
until_utc=now_ms,
|
||||
until_utc=window_end,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
total_kg = sum(e.entity_refs.get("amount_kg", 0) for e in events)
|
||||
if total_kg == 0:
|
||||
return None
|
||||
return None, window_days
|
||||
|
||||
total_g = total_kg * 1000
|
||||
|
||||
@@ -153,7 +157,7 @@ def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> float | None:
|
||||
AND (ali.end_utc IS NULL OR ali.end_utc > :window_start)
|
||||
AND ar.status = 'alive'
|
||||
""",
|
||||
{"window_start": window_start, "window_end": now_ms},
|
||||
{"window_start": window_start, "window_end": window_end},
|
||||
).fetchone()
|
||||
|
||||
total_ms = row[0] if row else 0
|
||||
@@ -161,24 +165,27 @@ def _get_feed_per_bird_per_day(db: Any, now_ms: int) -> float | None:
|
||||
bird_days = total_ms // ms_per_day if total_ms else 0
|
||||
|
||||
if bird_days == 0:
|
||||
return None
|
||||
return None, window_days
|
||||
|
||||
return total_g / bird_days
|
||||
return total_g / bird_days, window_days
|
||||
|
||||
|
||||
def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
|
||||
"""Calculate feed cost per bird per day over 30-day window.
|
||||
def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> tuple[float | None, int]:
|
||||
"""Calculate feed cost per bird per day over dynamic window.
|
||||
|
||||
Uses global bird-days and feed costs across all locations.
|
||||
Window is dynamic based on first FeedGiven event, capped at 30 days.
|
||||
|
||||
Args:
|
||||
db: Database connection.
|
||||
now_ms: Current timestamp in milliseconds.
|
||||
|
||||
Returns:
|
||||
Feed cost in EUR per bird per day, or None if no data.
|
||||
Tuple of (cost_per_bird_per_day, window_days). Value is None if no data.
|
||||
"""
|
||||
window_start = now_ms - THIRTY_DAYS_MS
|
||||
# Calculate dynamic window based on first feed event
|
||||
first_feed_ts = _get_first_event_ts(db, "FeedGiven")
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_feed_ts)
|
||||
|
||||
# Get total bird-days across all locations
|
||||
row = db.execute(
|
||||
@@ -193,7 +200,7 @@ def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
|
||||
AND (ali.end_utc IS NULL OR ali.end_utc > :window_start)
|
||||
AND ar.status = 'alive'
|
||||
""",
|
||||
{"window_start": window_start, "window_end": now_ms},
|
||||
{"window_start": window_start, "window_end": window_end},
|
||||
).fetchone()
|
||||
|
||||
total_ms = row[0] if row else 0
|
||||
@@ -201,19 +208,19 @@ def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
|
||||
bird_days = total_ms // ms_per_day if total_ms else 0
|
||||
|
||||
if bird_days == 0:
|
||||
return None
|
||||
return None, window_days
|
||||
|
||||
# Get total feed cost in window (all locations)
|
||||
event_store = EventStore(db)
|
||||
events = event_store.list_events(
|
||||
event_type=FEED_GIVEN,
|
||||
since_utc=window_start,
|
||||
until_utc=now_ms,
|
||||
until_utc=window_end,
|
||||
limit=10000,
|
||||
)
|
||||
|
||||
if not events:
|
||||
return None
|
||||
return None, window_days
|
||||
|
||||
total_cost_cents = 0.0
|
||||
for event in events:
|
||||
@@ -240,7 +247,7 @@ def _get_cost_per_bird_per_day(db: Any, now_ms: int) -> float | None:
|
||||
total_cost_cents += amount_kg * price_per_kg_cents
|
||||
|
||||
# Convert to EUR and divide by bird-days
|
||||
return (total_cost_cents / 100) / bird_days
|
||||
return (total_cost_cents / 100) / bird_days, window_days
|
||||
|
||||
|
||||
def _get_purchase_stats(db: Any, now_ms: int) -> dict | None:
|
||||
@@ -294,11 +301,14 @@ def _get_feed_display_data(db: Any, locations: list, feed_types: list) -> dict:
|
||||
Dict with display data for feed page.
|
||||
"""
|
||||
now_ms = int(time.time() * 1000)
|
||||
feed_per_bird, feed_window_days = _get_feed_per_bird_per_day(db, now_ms)
|
||||
cost_per_bird, _ = _get_cost_per_bird_per_day(db, now_ms)
|
||||
return {
|
||||
"give_events": _get_recent_events(db, FEED_GIVEN, limit=10),
|
||||
"purchase_events": _get_recent_events(db, FEED_PURCHASED, limit=10),
|
||||
"feed_per_bird_per_day_g": _get_feed_per_bird_per_day(db, now_ms),
|
||||
"cost_per_bird_per_day": _get_cost_per_bird_per_day(db, now_ms),
|
||||
"feed_per_bird_per_day_g": feed_per_bird,
|
||||
"cost_per_bird_per_day": cost_per_bird,
|
||||
"feed_window_days": feed_window_days,
|
||||
"purchase_stats": _get_purchase_stats(db, now_ms),
|
||||
"location_names": {loc.id: loc.name for loc in locations},
|
||||
"feed_type_names": {ft.code: ft.name for ft in feed_types},
|
||||
|
||||
@@ -158,7 +158,7 @@ def event_datetime_field(
|
||||
toggle_text,
|
||||
cls="text-blue-400 hover:text-blue-300 cursor-pointer underline",
|
||||
data_datetime_toggle=field_id,
|
||||
hx_on_click=f"toggleDatetimePicker('{field_id}')",
|
||||
onclick=f"toggleDatetimePicker('{field_id}')",
|
||||
),
|
||||
cls="text-sm",
|
||||
),
|
||||
@@ -169,7 +169,7 @@ def event_datetime_field(
|
||||
value=initial_value,
|
||||
cls="uk-input w-full mt-2",
|
||||
data_datetime_input=field_id,
|
||||
hx_on_change=f"updateDatetimeTs('{field_id}')",
|
||||
onchange=f"updateDatetimeTs('{field_id}')",
|
||||
),
|
||||
P(
|
||||
"Select date/time for this event (leave empty for current time)",
|
||||
|
||||
@@ -35,6 +35,7 @@ def eggs_page(
|
||||
sell_events: list[tuple[Event, bool]] | None = None,
|
||||
eggs_per_day: float | None = None,
|
||||
cost_per_egg: float | None = None,
|
||||
eggs_window_days: int = 30,
|
||||
sales_stats: dict | None = None,
|
||||
location_names: dict[str, str] | None = None,
|
||||
# Field value preservation on errors
|
||||
@@ -59,8 +60,9 @@ def eggs_page(
|
||||
sell_action: Route function or URL for sell form.
|
||||
harvest_events: Recent ProductCollected events (most recent first).
|
||||
sell_events: Recent ProductSold events (most recent first).
|
||||
eggs_per_day: 30-day average eggs per day.
|
||||
cost_per_egg: 30-day average cost per egg in EUR.
|
||||
eggs_per_day: Average eggs per day over window.
|
||||
cost_per_egg: Average cost per egg in EUR over window.
|
||||
eggs_window_days: Actual window size in days for the metrics.
|
||||
sales_stats: Dict with 'total_qty', 'total_cents', and 'avg_price_per_egg_cents'.
|
||||
location_names: Dict mapping location_id to location name for display.
|
||||
harvest_quantity: Preserved quantity value on error.
|
||||
@@ -97,6 +99,7 @@ def eggs_page(
|
||||
recent_events=harvest_events,
|
||||
eggs_per_day=eggs_per_day,
|
||||
cost_per_egg=cost_per_egg,
|
||||
eggs_window_days=eggs_window_days,
|
||||
location_names=location_names,
|
||||
default_quantity=harvest_quantity,
|
||||
default_notes=harvest_notes,
|
||||
@@ -131,6 +134,7 @@ def harvest_form(
|
||||
recent_events: list[tuple[Event, bool]] | None = None,
|
||||
eggs_per_day: float | None = None,
|
||||
cost_per_egg: float | None = None,
|
||||
eggs_window_days: int = 30,
|
||||
location_names: dict[str, str] | None = None,
|
||||
default_quantity: str | None = None,
|
||||
default_notes: str | None = None,
|
||||
@@ -143,8 +147,9 @@ def harvest_form(
|
||||
error: Optional error message to display.
|
||||
action: Route function or URL string for form submission.
|
||||
recent_events: Recent (Event, is_deleted) tuples, most recent first.
|
||||
eggs_per_day: 30-day average eggs per day.
|
||||
cost_per_egg: 30-day average cost per egg in EUR.
|
||||
eggs_per_day: Average eggs per day over window.
|
||||
cost_per_egg: Average cost per egg in EUR over window.
|
||||
eggs_window_days: Actual window size in days for the metrics.
|
||||
location_names: Dict mapping location_id to location name for display.
|
||||
default_quantity: Preserved quantity value on error.
|
||||
default_notes: Preserved notes value on error.
|
||||
@@ -194,7 +199,7 @@ def harvest_form(
|
||||
stat_parts.append(f"{eggs_per_day:.1f} eggs/day")
|
||||
if cost_per_egg is not None:
|
||||
stat_parts.append(f"€{cost_per_egg:.3f}/egg cost")
|
||||
stat_text = " | ".join(stat_parts) + " (30-day avg)" if stat_parts else None
|
||||
stat_text = " | ".join(stat_parts) + f" ({eggs_window_days}-day avg)" if stat_parts else None
|
||||
|
||||
form = Form(
|
||||
H2("Harvest Eggs", cls="text-xl font-bold mb-4"),
|
||||
|
||||
@@ -77,7 +77,7 @@ def event_detail_panel(
|
||||
# Delete button (admin only, not for tombstoned events)
|
||||
delete_section(event.id) if user_role == UserRole.ADMIN and not is_tombstoned else None,
|
||||
id="event-panel-content",
|
||||
cls="bg-[#141413] h-full overflow-y-auto pb-20 md:pb-0",
|
||||
cls="bg-[#141413] h-full overflow-y-auto pb-28 md:pb-0",
|
||||
)
|
||||
|
||||
|
||||
|
||||
@@ -37,6 +37,7 @@ def feed_page(
|
||||
purchase_events: list[tuple[Event, bool]] | None = None,
|
||||
feed_per_bird_per_day_g: float | None = None,
|
||||
cost_per_bird_per_day: float | None = None,
|
||||
feed_window_days: int = 30,
|
||||
purchase_stats: dict | None = None,
|
||||
location_names: dict[str, str] | None = None,
|
||||
feed_type_names: dict[str, str] | None = None,
|
||||
@@ -59,6 +60,7 @@ def feed_page(
|
||||
purchase_events: Recent FeedPurchased events (most recent first).
|
||||
feed_per_bird_per_day_g: Average feed consumption in g/bird/day.
|
||||
cost_per_bird_per_day: Average feed cost per bird per day in EUR.
|
||||
feed_window_days: Actual window size in days for the metrics.
|
||||
purchase_stats: Dict with 'total_kg' and 'avg_price_per_kg_cents'.
|
||||
location_names: Dict mapping location_id to location name.
|
||||
feed_type_names: Dict mapping feed_type_code to feed type name.
|
||||
@@ -96,6 +98,7 @@ def feed_page(
|
||||
recent_events=give_events,
|
||||
feed_per_bird_per_day_g=feed_per_bird_per_day_g,
|
||||
cost_per_bird_per_day=cost_per_bird_per_day,
|
||||
feed_window_days=feed_window_days,
|
||||
location_names=location_names,
|
||||
feed_type_names=feed_type_names,
|
||||
),
|
||||
@@ -129,6 +132,7 @@ def give_feed_form(
|
||||
recent_events: list[tuple[Event, bool]] | None = None,
|
||||
feed_per_bird_per_day_g: float | None = None,
|
||||
cost_per_bird_per_day: float | None = None,
|
||||
feed_window_days: int = 30,
|
||||
location_names: dict[str, str] | None = None,
|
||||
feed_type_names: dict[str, str] | None = None,
|
||||
) -> Div:
|
||||
@@ -146,6 +150,7 @@ def give_feed_form(
|
||||
recent_events: Recent (Event, is_deleted) tuples, most recent first.
|
||||
feed_per_bird_per_day_g: Average feed consumption in g/bird/day.
|
||||
cost_per_bird_per_day: Average feed cost per bird per day in EUR.
|
||||
feed_window_days: Actual window size in days for the metrics.
|
||||
location_names: Dict mapping location_id to location name.
|
||||
feed_type_names: Dict mapping feed_type_code to feed type name.
|
||||
|
||||
@@ -218,7 +223,7 @@ def give_feed_form(
|
||||
stat_parts.append(f"{feed_per_bird_per_day_g:.1f}g/bird/day")
|
||||
if cost_per_bird_per_day is not None:
|
||||
stat_parts.append(f"€{cost_per_bird_per_day:.3f}/bird/day cost")
|
||||
stat_text = " | ".join(stat_parts) + " (30-day avg)" if stat_parts else None
|
||||
stat_text = " | ".join(stat_parts) + f" ({feed_window_days}-day avg)" if stat_parts else None
|
||||
|
||||
form = Form(
|
||||
H2("Give Feed", cls="text-xl font-bold mb-4"),
|
||||
|
||||
@@ -462,11 +462,13 @@ class TestE2EStatsProgression:
|
||||
Implementation produces different value due to:
|
||||
1. Integer bird-day truncation
|
||||
2. Timeline differences (1 day advance for Strip 2 bird-days)
|
||||
3. Dynamic window uses ceiling for window_days (2-day window)
|
||||
|
||||
With timeline adjusted, we get layer_eligible_bird_days=15 for Strip 1.
|
||||
With timeline adjusted, we get layer_eligible_bird_days=14 for Strip 1.
|
||||
share = 14/35 = 0.4, feed_layers_g = int(20000 * 0.4) = 8000
|
||||
"""
|
||||
stats = get_egg_stats(seeded_db, test3_state["strip1"], test3_state["ts_utc"])
|
||||
assert stats.feed_layers_g == 8570
|
||||
assert stats.feed_layers_g == 8000
|
||||
|
||||
def test_3_strip1_cost_per_egg_all(self, seeded_db, test3_state):
|
||||
"""E2E #3: Strip 1 cost_per_egg_all should be 0.889 +/- 0.001."""
|
||||
@@ -479,9 +481,12 @@ class TestE2EStatsProgression:
|
||||
Spec value: 0.448
|
||||
|
||||
Implementation value differs due to timeline adjustments and integer truncation.
|
||||
Dynamic window with ceiling gives share = 14/35 = 0.4.
|
||||
layer_cost = 24 EUR * 0.4 = 9.60 EUR
|
||||
cost_per_egg_layers = 9.60 / 27 = 0.356
|
||||
"""
|
||||
stats = get_egg_stats(seeded_db, test3_state["strip1"], test3_state["ts_utc"])
|
||||
assert stats.cost_per_egg_layers_eur == pytest.approx(0.381, abs=0.001)
|
||||
assert stats.cost_per_egg_layers_eur == pytest.approx(0.356, abs=0.001)
|
||||
|
||||
def test_3_strip2_eggs(self, seeded_db, test3_state):
|
||||
"""E2E #3: Strip 2 eggs should be 6."""
|
||||
@@ -581,9 +586,12 @@ class TestE2EStatsProgression:
|
||||
|
||||
Spec value: 0.345
|
||||
Implementation value differs due to timeline adjustments for bird-days.
|
||||
Dynamic window with ceiling gives share = 14/35 = 0.4.
|
||||
layer_cost = 24 EUR * 0.4 = 9.60 EUR
|
||||
cost_per_egg_layers = 9.60 / 35 = 0.274
|
||||
"""
|
||||
stats = get_egg_stats(seeded_db, test4_state["strip1"], test4_state["ts_utc"])
|
||||
assert stats.cost_per_egg_layers_eur == pytest.approx(0.294, abs=0.001)
|
||||
assert stats.cost_per_egg_layers_eur == pytest.approx(0.274, abs=0.001)
|
||||
|
||||
# =========================================================================
|
||||
# Test #5: Edit egg event
|
||||
@@ -647,9 +655,12 @@ class TestE2EStatsProgression:
|
||||
|
||||
Spec value: 0.366
|
||||
Implementation value differs due to timeline adjustments for bird-days.
|
||||
Dynamic window with ceiling gives share = 14/35 = 0.4.
|
||||
layer_cost = 24 EUR * 0.4 = 9.60 EUR
|
||||
cost_per_egg_layers = 9.60 / 33 = 0.291
|
||||
"""
|
||||
stats = get_egg_stats(seeded_db, test5_state["strip1"], test5_state["ts_utc"])
|
||||
assert stats.cost_per_egg_layers_eur == pytest.approx(0.312, abs=0.001)
|
||||
assert stats.cost_per_egg_layers_eur == pytest.approx(0.291, abs=0.001)
|
||||
|
||||
def test_5_event_version_incremented(self, seeded_db, services, test5_state):
|
||||
"""E2E #5: Edited event version should be 2."""
|
||||
|
||||
@@ -489,7 +489,7 @@ class TestEggStatsCaching:
|
||||
def test_cached_stats_have_window_bounds(self, seeded_db, e2e_test1_setup):
|
||||
"""Cached stats include window_start_utc and window_end_utc."""
|
||||
ts_utc = e2e_test1_setup["ts_utc"]
|
||||
get_egg_stats(seeded_db, e2e_test1_setup["location_id"], ts_utc)
|
||||
stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], ts_utc)
|
||||
|
||||
row = seeded_db.execute(
|
||||
"""
|
||||
@@ -500,7 +500,6 @@ class TestEggStatsCaching:
|
||||
).fetchone()
|
||||
|
||||
assert row is not None
|
||||
assert row[1] == ts_utc # window_end_utc
|
||||
# Window is 30 days
|
||||
thirty_days_ms = 30 * 24 * 60 * 60 * 1000
|
||||
assert row[0] == ts_utc - thirty_days_ms # window_start_utc
|
||||
# Cached bounds should match what get_egg_stats returned
|
||||
assert row[0] == stats.window_start_utc
|
||||
assert row[1] == stats.window_end_utc
|
||||
|
||||
256
tests/test_service_stats_dynamic_window.py
Normal file
256
tests/test_service_stats_dynamic_window.py
Normal file
@@ -0,0 +1,256 @@
|
||||
# ABOUTME: Tests for dynamic window calculation in stats service.
|
||||
# ABOUTME: Verifies metrics use actual tracking period instead of fixed 30 days.
|
||||
|
||||
import time
|
||||
|
||||
from ulid import ULID
|
||||
|
||||
from animaltrack.services.stats import (
|
||||
_calculate_window,
|
||||
_get_first_event_ts,
|
||||
)
|
||||
|
||||
# Constants for test calculations
|
||||
MS_PER_DAY = 24 * 60 * 60 * 1000
|
||||
|
||||
|
||||
class TestCalculateWindow:
|
||||
"""Tests for _calculate_window() helper function."""
|
||||
|
||||
def test_no_first_event_returns_30_day_window(self):
|
||||
"""When no events exist, window should be 30 days."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, None)
|
||||
|
||||
assert window_days == 30
|
||||
assert window_end == now_ms
|
||||
assert window_start == now_ms - (30 * MS_PER_DAY)
|
||||
|
||||
def test_first_event_1_day_ago_returns_1_day_window(self):
|
||||
"""When first event was 1 day ago, window should be 1 day."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
first_event_ts = now_ms - (1 * MS_PER_DAY)
|
||||
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
|
||||
|
||||
assert window_days == 1
|
||||
assert window_end == now_ms
|
||||
# Window spans 1 day back from now_ms
|
||||
assert window_start == now_ms - (1 * MS_PER_DAY)
|
||||
|
||||
def test_first_event_15_days_ago_returns_15_day_window(self):
|
||||
"""When first event was 15 days ago, window should be 15 days."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
first_event_ts = now_ms - (15 * MS_PER_DAY)
|
||||
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
|
||||
|
||||
assert window_days == 15
|
||||
assert window_end == now_ms
|
||||
# Window spans 15 days back from now_ms
|
||||
assert window_start == now_ms - (15 * MS_PER_DAY)
|
||||
|
||||
def test_first_event_45_days_ago_caps_at_30_days(self):
|
||||
"""When first event was 45 days ago, window should cap at 30 days."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
first_event_ts = now_ms - (45 * MS_PER_DAY)
|
||||
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
|
||||
|
||||
assert window_days == 30
|
||||
assert window_end == now_ms
|
||||
# Window start should be 30 days back, not at first_event_ts
|
||||
assert window_start == now_ms - (30 * MS_PER_DAY)
|
||||
|
||||
def test_first_event_exactly_30_days_ago_returns_30_day_window(self):
|
||||
"""When first event was exactly 30 days ago, window should be 30 days."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
first_event_ts = now_ms - (30 * MS_PER_DAY)
|
||||
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
|
||||
|
||||
assert window_days == 30
|
||||
assert window_end == now_ms
|
||||
# Window spans 30 days back from now_ms
|
||||
assert window_start == now_ms - (30 * MS_PER_DAY)
|
||||
|
||||
def test_first_event_today_returns_1_day_minimum(self):
|
||||
"""Window should be at least 1 day even for same-day events."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
# First event is just 1 hour ago (less than 1 day)
|
||||
first_event_ts = now_ms - (1 * 60 * 60 * 1000)
|
||||
|
||||
window_start, window_end, window_days = _calculate_window(now_ms, first_event_ts)
|
||||
|
||||
# Minimum window is 1 day
|
||||
assert window_days == 1
|
||||
assert window_end == now_ms
|
||||
|
||||
def test_custom_max_days(self):
|
||||
"""Window can use custom max_days value."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
first_event_ts = now_ms - (60 * MS_PER_DAY)
|
||||
|
||||
window_start, window_end, window_days = _calculate_window(
|
||||
now_ms, first_event_ts, max_days=7
|
||||
)
|
||||
|
||||
assert window_days == 7
|
||||
assert window_start == now_ms - (7 * MS_PER_DAY)
|
||||
|
||||
|
||||
class TestGetFirstEventTs:
|
||||
"""Tests for _get_first_event_ts() helper function."""
|
||||
|
||||
def test_no_events_returns_none(self, seeded_db):
|
||||
"""When no matching events exist, returns None."""
|
||||
# seeded_db is empty initially
|
||||
result = _get_first_event_ts(seeded_db, "FeedGiven")
|
||||
assert result is None
|
||||
|
||||
def test_finds_first_feed_given_event(self, seeded_db):
|
||||
"""First FeedGiven event is correctly identified."""
|
||||
# Insert two FeedGiven events at different times
|
||||
now_ms = int(time.time() * 1000)
|
||||
first_ts = now_ms - (10 * MS_PER_DAY)
|
||||
second_ts = now_ms - (5 * MS_PER_DAY)
|
||||
|
||||
seeded_db.execute(
|
||||
"""
|
||||
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(ULID()),
|
||||
"FeedGiven",
|
||||
first_ts,
|
||||
"test",
|
||||
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
|
||||
"{}",
|
||||
1,
|
||||
),
|
||||
)
|
||||
seeded_db.execute(
|
||||
"""
|
||||
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(ULID()),
|
||||
"FeedGiven",
|
||||
second_ts,
|
||||
"test",
|
||||
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
|
||||
"{}",
|
||||
1,
|
||||
),
|
||||
)
|
||||
|
||||
result = _get_first_event_ts(seeded_db, "FeedGiven")
|
||||
|
||||
assert result == first_ts
|
||||
|
||||
def test_first_egg_event_filters_by_product_prefix(self, seeded_db):
|
||||
"""First event finder filters ProductCollected by product_code prefix."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
meat_ts = now_ms - (15 * MS_PER_DAY)
|
||||
egg_ts = now_ms - (10 * MS_PER_DAY)
|
||||
|
||||
# Insert meat collection first (should be ignored)
|
||||
seeded_db.execute(
|
||||
"""
|
||||
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(ULID()),
|
||||
"ProductCollected",
|
||||
meat_ts,
|
||||
"test",
|
||||
'{"location_id": "loc1", "product_code": "meat.duck", "quantity": 5}',
|
||||
"{}",
|
||||
1,
|
||||
),
|
||||
)
|
||||
# Insert egg collection second
|
||||
seeded_db.execute(
|
||||
"""
|
||||
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
str(ULID()),
|
||||
"ProductCollected",
|
||||
egg_ts,
|
||||
"test",
|
||||
'{"location_id": "loc1", "product_code": "egg.duck", "quantity": 12}',
|
||||
"{}",
|
||||
1,
|
||||
),
|
||||
)
|
||||
|
||||
# Without prefix filter, should find the meat event
|
||||
result_no_filter = _get_first_event_ts(seeded_db, "ProductCollected")
|
||||
assert result_no_filter == meat_ts
|
||||
|
||||
# With egg. prefix, should find the egg event
|
||||
result_with_filter = _get_first_event_ts(
|
||||
seeded_db, "ProductCollected", product_prefix="egg."
|
||||
)
|
||||
assert result_with_filter == egg_ts
|
||||
|
||||
def test_tombstoned_first_event_uses_next_event(self, seeded_db):
|
||||
"""When first event is tombstoned, uses next non-deleted event."""
|
||||
now_ms = int(time.time() * 1000)
|
||||
first_ts = now_ms - (10 * MS_PER_DAY)
|
||||
second_ts = now_ms - (5 * MS_PER_DAY)
|
||||
|
||||
event_deleted_id = str(ULID())
|
||||
event_kept_id = str(ULID())
|
||||
|
||||
# Insert two events
|
||||
seeded_db.execute(
|
||||
"""
|
||||
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
event_deleted_id,
|
||||
"FeedGiven",
|
||||
first_ts,
|
||||
"test",
|
||||
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
|
||||
"{}",
|
||||
1,
|
||||
),
|
||||
)
|
||||
seeded_db.execute(
|
||||
"""
|
||||
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
|
||||
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||
""",
|
||||
(
|
||||
event_kept_id,
|
||||
"FeedGiven",
|
||||
second_ts,
|
||||
"test",
|
||||
'{"location_id": "loc1", "feed_type_code": "duck-feed", "amount_kg": 10}',
|
||||
"{}",
|
||||
1,
|
||||
),
|
||||
)
|
||||
|
||||
# Tombstone the first event
|
||||
seeded_db.execute(
|
||||
"""
|
||||
INSERT INTO event_tombstones (id, target_event_id, ts_utc, actor, reason)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
""",
|
||||
(str(ULID()), event_deleted_id, now_ms, "test", "deleted"),
|
||||
)
|
||||
|
||||
result = _get_first_event_ts(seeded_db, "FeedGiven")
|
||||
|
||||
# Should return second event since first is tombstoned
|
||||
assert result == second_ts
|
||||
@@ -278,3 +278,90 @@ class TestEggsRecentEvents:
|
||||
|
||||
# The response should contain a link to the event detail
|
||||
assert f"/events/{event_id}" in resp.text
|
||||
|
||||
|
||||
class TestEggCollectionAnimalFiltering:
|
||||
"""Tests that egg collection only associates adult females."""
|
||||
|
||||
def test_egg_collection_excludes_males_and_juveniles(
|
||||
self, client, seeded_db, location_strip1_id
|
||||
):
|
||||
"""Egg collection only associates adult female ducks, not males or juveniles."""
|
||||
# Setup: Create mixed animals at location
|
||||
event_store = EventStore(seeded_db)
|
||||
registry = ProjectionRegistry()
|
||||
registry.register(AnimalRegistryProjection(seeded_db))
|
||||
registry.register(EventAnimalsProjection(seeded_db))
|
||||
registry.register(IntervalProjection(seeded_db))
|
||||
registry.register(ProductsProjection(seeded_db))
|
||||
|
||||
animal_service = AnimalService(seeded_db, event_store, registry)
|
||||
ts_utc = int(time.time() * 1000)
|
||||
|
||||
# Create adult female (should be included)
|
||||
female_payload = AnimalCohortCreatedPayload(
|
||||
species="duck",
|
||||
count=1,
|
||||
life_stage="adult",
|
||||
sex="female",
|
||||
location_id=location_strip1_id,
|
||||
origin="purchased",
|
||||
)
|
||||
female_event = animal_service.create_cohort(female_payload, ts_utc, "test_user")
|
||||
female_id = female_event.entity_refs["animal_ids"][0]
|
||||
|
||||
# Create adult male (should be excluded)
|
||||
male_payload = AnimalCohortCreatedPayload(
|
||||
species="duck",
|
||||
count=1,
|
||||
life_stage="adult",
|
||||
sex="male",
|
||||
location_id=location_strip1_id,
|
||||
origin="purchased",
|
||||
)
|
||||
male_event = animal_service.create_cohort(male_payload, ts_utc, "test_user")
|
||||
male_id = male_event.entity_refs["animal_ids"][0]
|
||||
|
||||
# Create juvenile female (should be excluded)
|
||||
juvenile_payload = AnimalCohortCreatedPayload(
|
||||
species="duck",
|
||||
count=1,
|
||||
life_stage="juvenile",
|
||||
sex="female",
|
||||
location_id=location_strip1_id,
|
||||
origin="purchased",
|
||||
)
|
||||
juvenile_event = animal_service.create_cohort(juvenile_payload, ts_utc, "test_user")
|
||||
juvenile_id = juvenile_event.entity_refs["animal_ids"][0]
|
||||
|
||||
# Collect eggs
|
||||
resp = client.post(
|
||||
"/actions/product-collected",
|
||||
data={
|
||||
"location_id": location_strip1_id,
|
||||
"quantity": "6",
|
||||
"nonce": "test-nonce-filter",
|
||||
},
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
|
||||
# Get the egg collection event
|
||||
event_row = seeded_db.execute(
|
||||
"SELECT id FROM events WHERE type = 'ProductCollected' ORDER BY id DESC LIMIT 1"
|
||||
).fetchone()
|
||||
event_id = event_row[0]
|
||||
|
||||
# Check which animals are associated with the event
|
||||
animal_rows = seeded_db.execute(
|
||||
"SELECT animal_id FROM event_animals WHERE event_id = ?",
|
||||
(event_id,),
|
||||
).fetchall()
|
||||
associated_ids = {row[0] for row in animal_rows}
|
||||
|
||||
# Only the adult female should be associated
|
||||
assert female_id in associated_ids, "Adult female should be associated with egg collection"
|
||||
assert male_id not in associated_ids, "Male should NOT be associated with egg collection"
|
||||
assert juvenile_id not in associated_ids, (
|
||||
"Juvenile should NOT be associated with egg collection"
|
||||
)
|
||||
assert len(associated_ids) == 1, "Only adult females should be associated"
|
||||
|
||||
Reference in New Issue
Block a user