Files
animaltrack/tests/test_event_edit.py
Petru Paler f733a067e2 feat: add event editing with revision storage
Implements Step 6.1 of the plan:
- Add edit_event() function in events/edit.py
- Store old version in event_revisions before editing
- Increment version on edit
- Update projections via revert/apply pattern
- Add EventNotFoundError and EventTombstonedError exceptions

Tested with:
- Unit tests for revision storage and version increment
- Fast-revert tests for FeedGiven/FeedPurchased events
- Unbounded replay tests for AnimalMoved events
- E2E test #5: Edit egg event 8→6 with stats verification

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 16:03:17 +00:00

792 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# ABOUTME: Tests for event editing functionality.
# ABOUTME: Validates revision storage, version incrementing, and projection updates.
import time
import pytest
from animaltrack.events import PRODUCT_COLLECTED
from animaltrack.events.store import EventStore
@pytest.fixture
def now_utc():
"""Current time in milliseconds since epoch."""
return int(time.time() * 1000)
@pytest.fixture
def event_store(migrated_db):
"""Create an EventStore instance with a migrated database."""
return EventStore(migrated_db)
class TestEventRevisionStorage:
"""Tests for storing revisions when editing events."""
def test_edit_stores_revision(self, migrated_db, event_store, now_utc):
"""Editing an event stores the original version in event_revisions."""
from animaltrack.events.edit import edit_event
# Create original event
original_event = event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=now_utc,
actor="ppetru",
entity_refs={"location_id": "LOC1", "quantity": 8},
payload={"product_code": "egg.duck"},
)
# Edit the event
edited_at = now_utc + 1000
edit_event(
db=migrated_db,
event_store=event_store,
event_id=original_event.id,
new_entity_refs={"location_id": "LOC1", "quantity": 6},
new_payload={"product_code": "egg.duck"},
edited_by="admin",
edited_at_utc=edited_at,
)
# Verify revision was stored
row = migrated_db.execute(
"""SELECT event_id, version, ts_utc, actor, entity_refs, payload,
edited_at_utc, edited_by
FROM event_revisions WHERE event_id = ?""",
(original_event.id,),
).fetchone()
assert row is not None
assert row[0] == original_event.id # event_id
assert row[1] == 1 # version (the old version)
assert row[2] == now_utc # ts_utc
assert row[3] == "ppetru" # actor
assert '"quantity": 8' in row[4] # entity_refs contains old quantity
assert row[5] == '{"product_code": "egg.duck"}' # payload
assert row[6] == edited_at # edited_at_utc
assert row[7] == "admin" # edited_by
def test_edit_increments_version(self, migrated_db, event_store, now_utc):
"""Editing an event increments its version number."""
from animaltrack.events.edit import edit_event
# Create original event (version=1)
original_event = event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=now_utc,
actor="ppetru",
entity_refs={"location_id": "LOC1", "quantity": 8},
payload={"product_code": "egg.duck"},
)
assert original_event.version == 1
# Edit the event
edited_event = edit_event(
db=migrated_db,
event_store=event_store,
event_id=original_event.id,
new_entity_refs={"location_id": "LOC1", "quantity": 6},
new_payload={"product_code": "egg.duck"},
edited_by="admin",
edited_at_utc=now_utc + 1000,
)
# Verify version incremented
assert edited_event.version == 2
# Verify persisted in database
retrieved = event_store.get_event(original_event.id)
assert retrieved.version == 2
def test_edit_updates_entity_refs(self, migrated_db, event_store, now_utc):
"""Editing an event updates its entity_refs."""
from animaltrack.events.edit import edit_event
original_event = event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=now_utc,
actor="ppetru",
entity_refs={"location_id": "LOC1", "quantity": 8},
payload={"product_code": "egg.duck"},
)
edited_event = edit_event(
db=migrated_db,
event_store=event_store,
event_id=original_event.id,
new_entity_refs={"location_id": "LOC1", "quantity": 6},
new_payload={"product_code": "egg.duck"},
edited_by="admin",
edited_at_utc=now_utc + 1000,
)
assert edited_event.entity_refs["quantity"] == 6
# Verify persisted
retrieved = event_store.get_event(original_event.id)
assert retrieved.entity_refs["quantity"] == 6
def test_edit_updates_payload(self, migrated_db, event_store, now_utc):
"""Editing an event updates its payload."""
from animaltrack.events.edit import edit_event
original_event = event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=now_utc,
actor="ppetru",
entity_refs={"location_id": "LOC1"},
payload={"product_code": "egg.duck", "notes": "morning"},
)
edited_event = edit_event(
db=migrated_db,
event_store=event_store,
event_id=original_event.id,
new_entity_refs={"location_id": "LOC1"},
new_payload={"product_code": "egg.duck", "notes": "evening"},
edited_by="admin",
edited_at_utc=now_utc + 1000,
)
assert edited_event.payload["notes"] == "evening"
def test_edit_nonexistent_event_raises(self, migrated_db, event_store, now_utc):
"""Editing a non-existent event raises EventNotFoundError."""
from animaltrack.events.edit import edit_event
from animaltrack.events.exceptions import EventNotFoundError
with pytest.raises(EventNotFoundError):
edit_event(
db=migrated_db,
event_store=event_store,
event_id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
new_entity_refs={},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc,
)
def test_edit_tombstoned_event_raises(self, migrated_db, event_store, now_utc):
"""Editing a tombstoned event raises EventTombstonedError."""
from animaltrack.events.edit import edit_event
from animaltrack.events.exceptions import EventTombstonedError
from animaltrack.id_gen import generate_id
# Create and then tombstone an event
original_event = event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=now_utc,
actor="ppetru",
entity_refs={},
payload={},
)
tombstone_id = generate_id()
migrated_db.execute(
"""INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason)
VALUES (?, ?, ?, ?, ?)""",
(tombstone_id, now_utc + 1000, "admin", original_event.id, "Test deletion"),
)
with pytest.raises(EventTombstonedError):
edit_event(
db=migrated_db,
event_store=event_store,
event_id=original_event.id,
new_entity_refs={},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 2000,
)
def test_multiple_edits_store_multiple_revisions(self, migrated_db, event_store, now_utc):
"""Each edit stores a new revision with the previous version."""
from animaltrack.events.edit import edit_event
original_event = event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=now_utc,
actor="ppetru",
entity_refs={"quantity": 10},
payload={},
)
# First edit: 10 -> 8
edit_event(
db=migrated_db,
event_store=event_store,
event_id=original_event.id,
new_entity_refs={"quantity": 8},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 1000,
)
# Second edit: 8 -> 6
edit_event(
db=migrated_db,
event_store=event_store,
event_id=original_event.id,
new_entity_refs={"quantity": 6},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 2000,
)
# Verify current version is 3
current = event_store.get_event(original_event.id)
assert current.version == 3
assert current.entity_refs["quantity"] == 6
# Verify two revisions exist
rows = migrated_db.execute(
"""SELECT version, entity_refs FROM event_revisions
WHERE event_id = ? ORDER BY version""",
(original_event.id,),
).fetchall()
assert len(rows) == 2
assert rows[0][0] == 1 # First revision (original)
assert '"quantity": 10' in rows[0][1]
assert rows[1][0] == 2 # Second revision (after first edit)
assert '"quantity": 8' in rows[1][1]
class TestFastRevertStrategy:
"""Tests for fast-revert projection updates when editing events."""
@pytest.fixture
def projection_registry(self, seeded_db):
"""Create a ProjectionRegistry with feed projections registered."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.feed import FeedInventoryProjection
registry = ProjectionRegistry()
registry.register(FeedInventoryProjection(seeded_db))
return registry
@pytest.fixture
def feed_service(self, seeded_db, projection_registry):
"""Create a FeedService for testing."""
from animaltrack.services.feed import FeedService
event_store = EventStore(seeded_db)
return FeedService(seeded_db, event_store, projection_registry)
@pytest.fixture
def seeded_event_store(self, seeded_db):
"""Create an EventStore with seeded database."""
return EventStore(seeded_db)
def test_edit_feed_given_updates_inventory(
self, seeded_db, seeded_event_store, projection_registry, feed_service, now_utc
):
"""Editing a FeedGiven event updates inventory via fast-revert."""
from animaltrack.events.edit import edit_event
from animaltrack.events.payloads import FeedGivenPayload, FeedPurchasedPayload
# Get a valid location_id
location_id = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0]
# Purchase 40kg of feed
purchase_payload = FeedPurchasedPayload(
feed_type_code="layer",
bag_size_kg=20,
bags_count=2,
bag_price_cents=2400,
)
feed_service.purchase_feed(purchase_payload, now_utc, "test_user")
# Give 6kg of feed
give_payload = FeedGivenPayload(
location_id=location_id,
feed_type_code="layer",
amount_kg=6,
)
give_event = feed_service.give_feed(give_payload, now_utc + 1000, "test_user")
# Verify initial state: 40 purchased, 6 given, 34 balance
row = seeded_db.execute(
"SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory "
"WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg
assert row[1] == 6 # given_kg
assert row[2] == 34 # balance_kg
# Edit the give event: 6kg -> 4kg
edit_event(
db=seeded_db,
event_store=seeded_event_store,
registry=projection_registry,
event_id=give_event.id,
new_entity_refs={
"feed_type_code": "layer",
"location_id": location_id,
"amount_kg": 4,
},
new_payload={"location_id": location_id, "feed_type_code": "layer", "amount_kg": 4},
edited_by="admin",
edited_at_utc=now_utc + 2000,
)
# Verify updated state: 40 purchased, 4 given, 36 balance
row = seeded_db.execute(
"SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory "
"WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg unchanged
assert row[1] == 4 # given_kg reduced from 6 to 4
assert row[2] == 36 # balance_kg increased from 34 to 36
def test_edit_feed_purchased_updates_inventory(
self, seeded_db, seeded_event_store, projection_registry, feed_service, now_utc
):
"""Editing a FeedPurchased event updates inventory via fast-revert."""
from animaltrack.events.edit import edit_event
from animaltrack.events.payloads import FeedPurchasedPayload
# Purchase 40kg of feed
purchase_payload = FeedPurchasedPayload(
feed_type_code="layer",
bag_size_kg=20,
bags_count=2,
bag_price_cents=2400,
)
purchase_event = feed_service.purchase_feed(purchase_payload, now_utc, "test_user")
# Verify initial state
row = seeded_db.execute(
"SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg
assert row[1] == 40 # balance_kg
# Edit the purchase: 40kg -> 60kg
edit_event(
db=seeded_db,
event_store=seeded_event_store,
registry=projection_registry,
event_id=purchase_event.id,
new_entity_refs={
"feed_type_code": "layer",
"total_kg": 60,
"price_per_kg_cents": 120,
},
new_payload={
"feed_type_code": "layer",
"bag_size_kg": 20,
"bags_count": 3,
"bag_price_cents": 2400,
},
edited_by="admin",
edited_at_utc=now_utc + 1000,
)
# Verify updated state
row = seeded_db.execute(
"SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 60 # purchased_kg increased from 40 to 60
assert row[1] == 60 # balance_kg increased from 40 to 60
class TestUnboundedReplayStrategy:
"""Tests for unbounded replay when editing interval/snapshot events."""
@pytest.fixture
def full_projection_registry(self, seeded_db):
"""Create a ProjectionRegistry with all projections registered."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
return registry
@pytest.fixture
def seeded_event_store(self, seeded_db):
"""Create an EventStore with seeded database."""
return EventStore(seeded_db)
def test_edit_product_collected_updates_event(
self, seeded_db, seeded_event_store, full_projection_registry, now_utc
):
"""Editing a ProductCollected event updates the event in the database.
Stats are computed on-read from the events table, so editing the event
will automatically update stats without needing projection updates.
"""
from animaltrack.events.edit import edit_event
from animaltrack.events.types import PRODUCT_COLLECTED
# Get a valid location_id
location_id = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0]
# Create a ProductCollected event directly
event = seeded_event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=now_utc,
actor="test_user",
entity_refs={
"location_id": location_id,
"product_code": "egg.duck",
"quantity": 8,
"animal_ids": [],
},
payload={},
)
# Edit the event: quantity 8 -> 6
edited_event = edit_event(
db=seeded_db,
event_store=seeded_event_store,
registry=full_projection_registry,
event_id=event.id,
new_entity_refs={
"location_id": location_id,
"product_code": "egg.duck",
"quantity": 6,
"animal_ids": [],
},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 1000,
)
# Verify event was updated
assert edited_event.version == 2
assert edited_event.entity_refs["quantity"] == 6
# Verify persisted in database
retrieved = seeded_event_store.get_event(event.id)
assert retrieved.entity_refs["quantity"] == 6
# Verify revision was stored
row = seeded_db.execute(
"SELECT entity_refs FROM event_revisions WHERE event_id = ?",
(event.id,),
).fetchone()
assert row is not None
assert '"quantity": 8' in row[0]
def test_edit_animal_moved_triggers_replay(
self, seeded_db, seeded_event_store, full_projection_registry, now_utc
):
"""Editing an AnimalMoved event triggers unbounded replay of projections."""
from animaltrack.events.edit import edit_event
from animaltrack.events.processor import process_event
from animaltrack.events.types import ANIMAL_COHORT_CREATED, ANIMAL_MOVED
from animaltrack.id_gen import generate_id
# Get two locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 3").fetchall()
loc1, loc2, loc3 = locations[0][0], locations[1][0], locations[2][0]
# Create a cohort at loc1
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_projection_registry)
# Move from loc1 to loc2
move_event = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event, full_projection_registry)
# Verify animal is at loc2
row = seeded_db.execute(
"""SELECT location_id FROM animal_location_intervals
WHERE animal_id = ? AND end_utc IS NULL""",
(animal_id,),
).fetchone()
assert row[0] == loc2
# Edit the move: change destination to loc3
edit_event(
db=seeded_db,
event_store=seeded_event_store,
registry=full_projection_registry,
event_id=move_event.id,
new_entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc3,
},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 2000,
)
# Verify animal is now at loc3 (not loc2)
row = seeded_db.execute(
"""SELECT location_id FROM animal_location_intervals
WHERE animal_id = ? AND end_utc IS NULL""",
(animal_id,),
).fetchone()
assert row[0] == loc3
# Verify the old loc2 interval was removed
count = seeded_db.execute(
"""SELECT COUNT(*) FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_id, loc2),
).fetchone()[0]
assert count == 0
class TestE2EEditEggEvent:
"""E2E Test #5: Edit egg event.
From spec section 21.5:
Edit the backdated 8→6.
Expect Strip 1: eggs=33;
cost_all=24/33=0.727±0.001;
cost_layers=(12089g×€0.0012/g)/33=0.366±0.001;
events.version++ and one row in event_revisions.
"""
@pytest.fixture
def full_projection_registry(self, seeded_db):
"""Create a ProjectionRegistry with all projections."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.feed import FeedInventoryProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.projections.products import ProductsProjection
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(ProductsProjection(seeded_db))
registry.register(FeedInventoryProjection(seeded_db))
return registry
@pytest.fixture
def services(self, seeded_db, full_projection_registry):
"""Create all services needed for E2E test."""
from animaltrack.services.animal import AnimalService
from animaltrack.services.feed import FeedService
from animaltrack.services.products import ProductService
event_store = EventStore(seeded_db)
return {
"db": seeded_db,
"event_store": event_store,
"registry": full_projection_registry,
"animal_service": AnimalService(seeded_db, event_store, full_projection_registry),
"feed_service": FeedService(seeded_db, event_store, full_projection_registry),
"product_service": ProductService(seeded_db, event_store, full_projection_registry),
}
@pytest.fixture
def e2e_setup(self, seeded_db, services, now_utc):
"""Set up scenario for E2E test #5.
Creates:
- 10 adult female ducks at Strip 1
- Feed purchase (20kg @ EUR 1.20/kg)
- Two egg collection events: 27 eggs first, then 8 eggs backdated
- Total: 35 eggs before edit
"""
from animaltrack.events.payloads import (
AnimalCohortCreatedPayload,
FeedGivenPayload,
FeedPurchasedPayload,
ProductCollectedPayload,
)
# Get Strip 1 location
location_id = seeded_db.execute(
"SELECT id FROM locations WHERE name = 'Strip 1'"
).fetchone()[0]
one_day_ms = 24 * 60 * 60 * 1000
animal_creation_ts = now_utc - one_day_ms
# Create 10 adult female ducks
cohort_payload = AnimalCohortCreatedPayload(
species="duck",
count=10,
life_stage="adult",
sex="female",
location_id=location_id,
origin="purchased",
)
cohort_event = services["animal_service"].create_cohort(
cohort_payload, animal_creation_ts, "test_user"
)
animal_ids = cohort_event.entity_refs["animal_ids"]
# Purchase feed: 20kg @ EUR 1.20/kg
purchase_payload = FeedPurchasedPayload(
feed_type_code="layer",
bag_size_kg=20,
bags_count=1,
bag_price_cents=2400, # EUR 24 per 20kg = EUR 1.20/kg
)
services["feed_service"].purchase_feed(purchase_payload, now_utc + 1000, "test_user")
# Give feed: 20kg total
give_payload = FeedGivenPayload(
location_id=location_id,
feed_type_code="layer",
amount_kg=20,
)
services["feed_service"].give_feed(give_payload, now_utc + 2000, "test_user")
# Collect 27 eggs (first collection)
collect1_payload = ProductCollectedPayload(
location_id=location_id,
product_code="egg.duck",
quantity=27,
resolved_ids=animal_ids,
)
services["product_service"].collect_product(collect1_payload, now_utc + 3000, "test_user")
# Collect 8 eggs (backdated - this is the event we'll edit)
collect2_payload = ProductCollectedPayload(
location_id=location_id,
product_code="egg.duck",
quantity=8,
resolved_ids=animal_ids,
)
backdated_event = services["product_service"].collect_product(
collect2_payload,
now_utc + 2500,
"test_user", # Backdated before first collection
)
return {
"location_id": location_id,
"backdated_event_id": backdated_event.id,
"animal_ids": animal_ids,
"ts_utc": now_utc + 3500, # After all events
}
def test_e2e_edit_egg_event_version_increments(self, seeded_db, services, e2e_setup, now_utc):
"""Editing the egg event increments version to 2."""
from animaltrack.events.edit import edit_event
# Get the original event
original = services["event_store"].get_event(e2e_setup["backdated_event_id"])
assert original.version == 1
# Edit: quantity 8 -> 6
edited = edit_event(
db=seeded_db,
event_store=services["event_store"],
registry=services["registry"],
event_id=e2e_setup["backdated_event_id"],
new_entity_refs={
"location_id": e2e_setup["location_id"],
"product_code": "egg.duck",
"quantity": 6,
"animal_ids": e2e_setup["animal_ids"],
},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 10000,
)
assert edited.version == 2
def test_e2e_edit_egg_event_revision_stored(self, seeded_db, services, e2e_setup, now_utc):
"""Editing the egg event stores exactly one revision."""
from animaltrack.events.edit import edit_event
# Edit: quantity 8 -> 6
edit_event(
db=seeded_db,
event_store=services["event_store"],
registry=services["registry"],
event_id=e2e_setup["backdated_event_id"],
new_entity_refs={
"location_id": e2e_setup["location_id"],
"product_code": "egg.duck",
"quantity": 6,
"animal_ids": e2e_setup["animal_ids"],
},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 10000,
)
# Verify exactly one revision
rows = seeded_db.execute(
"SELECT version, entity_refs FROM event_revisions WHERE event_id = ?",
(e2e_setup["backdated_event_id"],),
).fetchall()
assert len(rows) == 1
assert rows[0][0] == 1 # Old version was 1
assert '"quantity": 8' in rows[0][1] # Old quantity was 8
def test_e2e_edit_egg_event_stats_updated(self, seeded_db, services, e2e_setup, now_utc):
"""Editing egg 8→6 updates stats from 35 eggs to 33 eggs.
Before edit: 27 + 8 = 35 eggs
After edit: 27 + 6 = 33 eggs
cost_all = EUR 24 / 33 = 0.727 ± 0.001
"""
from animaltrack.events.edit import edit_event
from animaltrack.services.stats import get_egg_stats
# Verify initial state: 35 eggs
stats_before = get_egg_stats(seeded_db, e2e_setup["location_id"], e2e_setup["ts_utc"])
assert stats_before.eggs_total_pcs == 35
# Edit: quantity 8 -> 6
edit_event(
db=seeded_db,
event_store=services["event_store"],
registry=services["registry"],
event_id=e2e_setup["backdated_event_id"],
new_entity_refs={
"location_id": e2e_setup["location_id"],
"product_code": "egg.duck",
"quantity": 6,
"animal_ids": e2e_setup["animal_ids"],
},
new_payload={},
edited_by="admin",
edited_at_utc=now_utc + 10000,
)
# Verify updated stats: 33 eggs
stats_after = get_egg_stats(seeded_db, e2e_setup["location_id"], e2e_setup["ts_utc"])
assert stats_after.eggs_total_pcs == 33
# cost_all = EUR 24 / 33 = 0.727
# Feed: 20kg @ EUR 1.20/kg = EUR 24
assert abs(stats_after.cost_per_egg_all_eur - 0.727) < 0.001