# ABOUTME: Tests for event editing functionality. # ABOUTME: Validates revision storage, version incrementing, and projection updates. import time import pytest from animaltrack.events import PRODUCT_COLLECTED from animaltrack.events.store import EventStore @pytest.fixture def now_utc(): """Current time in milliseconds since epoch.""" return int(time.time() * 1000) @pytest.fixture def event_store(migrated_db): """Create an EventStore instance with a migrated database.""" return EventStore(migrated_db) class TestEventRevisionStorage: """Tests for storing revisions when editing events.""" def test_edit_stores_revision(self, migrated_db, event_store, now_utc): """Editing an event stores the original version in event_revisions.""" from animaltrack.events.edit import edit_event # Create original event original_event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={"location_id": "LOC1", "quantity": 8}, payload={"product_code": "egg.duck"}, ) # Edit the event edited_at = now_utc + 1000 edit_event( db=migrated_db, event_store=event_store, event_id=original_event.id, new_entity_refs={"location_id": "LOC1", "quantity": 6}, new_payload={"product_code": "egg.duck"}, edited_by="admin", edited_at_utc=edited_at, ) # Verify revision was stored row = migrated_db.execute( """SELECT event_id, version, ts_utc, actor, entity_refs, payload, edited_at_utc, edited_by FROM event_revisions WHERE event_id = ?""", (original_event.id,), ).fetchone() assert row is not None assert row[0] == original_event.id # event_id assert row[1] == 1 # version (the old version) assert row[2] == now_utc # ts_utc assert row[3] == "ppetru" # actor assert '"quantity": 8' in row[4] # entity_refs contains old quantity assert row[5] == '{"product_code": "egg.duck"}' # payload assert row[6] == edited_at # edited_at_utc assert row[7] == "admin" # edited_by def test_edit_increments_version(self, migrated_db, event_store, now_utc): """Editing an event increments its version number.""" from animaltrack.events.edit import edit_event # Create original event (version=1) original_event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={"location_id": "LOC1", "quantity": 8}, payload={"product_code": "egg.duck"}, ) assert original_event.version == 1 # Edit the event edited_event = edit_event( db=migrated_db, event_store=event_store, event_id=original_event.id, new_entity_refs={"location_id": "LOC1", "quantity": 6}, new_payload={"product_code": "egg.duck"}, edited_by="admin", edited_at_utc=now_utc + 1000, ) # Verify version incremented assert edited_event.version == 2 # Verify persisted in database retrieved = event_store.get_event(original_event.id) assert retrieved.version == 2 def test_edit_updates_entity_refs(self, migrated_db, event_store, now_utc): """Editing an event updates its entity_refs.""" from animaltrack.events.edit import edit_event original_event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={"location_id": "LOC1", "quantity": 8}, payload={"product_code": "egg.duck"}, ) edited_event = edit_event( db=migrated_db, event_store=event_store, event_id=original_event.id, new_entity_refs={"location_id": "LOC1", "quantity": 6}, new_payload={"product_code": "egg.duck"}, edited_by="admin", edited_at_utc=now_utc + 1000, ) assert edited_event.entity_refs["quantity"] == 6 # Verify persisted retrieved = event_store.get_event(original_event.id) assert retrieved.entity_refs["quantity"] == 6 def test_edit_updates_payload(self, migrated_db, event_store, now_utc): """Editing an event updates its payload.""" from animaltrack.events.edit import edit_event original_event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={"location_id": "LOC1"}, payload={"product_code": "egg.duck", "notes": "morning"}, ) edited_event = edit_event( db=migrated_db, event_store=event_store, event_id=original_event.id, new_entity_refs={"location_id": "LOC1"}, new_payload={"product_code": "egg.duck", "notes": "evening"}, edited_by="admin", edited_at_utc=now_utc + 1000, ) assert edited_event.payload["notes"] == "evening" def test_edit_nonexistent_event_raises(self, migrated_db, event_store, now_utc): """Editing a non-existent event raises EventNotFoundError.""" from animaltrack.events.edit import edit_event from animaltrack.events.exceptions import EventNotFoundError with pytest.raises(EventNotFoundError): edit_event( db=migrated_db, event_store=event_store, event_id="01ARZ3NDEKTSV4RRFFQ69G5FAV", new_entity_refs={}, new_payload={}, edited_by="admin", edited_at_utc=now_utc, ) def test_edit_tombstoned_event_raises(self, migrated_db, event_store, now_utc): """Editing a tombstoned event raises EventTombstonedError.""" from animaltrack.events.edit import edit_event from animaltrack.events.exceptions import EventTombstonedError from animaltrack.id_gen import generate_id # Create and then tombstone an event original_event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, ) tombstone_id = generate_id() migrated_db.execute( """INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason) VALUES (?, ?, ?, ?, ?)""", (tombstone_id, now_utc + 1000, "admin", original_event.id, "Test deletion"), ) with pytest.raises(EventTombstonedError): edit_event( db=migrated_db, event_store=event_store, event_id=original_event.id, new_entity_refs={}, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 2000, ) def test_multiple_edits_store_multiple_revisions(self, migrated_db, event_store, now_utc): """Each edit stores a new revision with the previous version.""" from animaltrack.events.edit import edit_event original_event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={"quantity": 10}, payload={}, ) # First edit: 10 -> 8 edit_event( db=migrated_db, event_store=event_store, event_id=original_event.id, new_entity_refs={"quantity": 8}, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 1000, ) # Second edit: 8 -> 6 edit_event( db=migrated_db, event_store=event_store, event_id=original_event.id, new_entity_refs={"quantity": 6}, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 2000, ) # Verify current version is 3 current = event_store.get_event(original_event.id) assert current.version == 3 assert current.entity_refs["quantity"] == 6 # Verify two revisions exist rows = migrated_db.execute( """SELECT version, entity_refs FROM event_revisions WHERE event_id = ? ORDER BY version""", (original_event.id,), ).fetchall() assert len(rows) == 2 assert rows[0][0] == 1 # First revision (original) assert '"quantity": 10' in rows[0][1] assert rows[1][0] == 2 # Second revision (after first edit) assert '"quantity": 8' in rows[1][1] class TestFastRevertStrategy: """Tests for fast-revert projection updates when editing events.""" @pytest.fixture def projection_registry(self, seeded_db): """Create a ProjectionRegistry with feed projections registered.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.feed import FeedInventoryProjection registry = ProjectionRegistry() registry.register(FeedInventoryProjection(seeded_db)) return registry @pytest.fixture def feed_service(self, seeded_db, projection_registry): """Create a FeedService for testing.""" from animaltrack.services.feed import FeedService event_store = EventStore(seeded_db) return FeedService(seeded_db, event_store, projection_registry) @pytest.fixture def seeded_event_store(self, seeded_db): """Create an EventStore with seeded database.""" return EventStore(seeded_db) def test_edit_feed_given_updates_inventory( self, seeded_db, seeded_event_store, projection_registry, feed_service, now_utc ): """Editing a FeedGiven event updates inventory via fast-revert.""" from animaltrack.events.edit import edit_event from animaltrack.events.payloads import FeedGivenPayload, FeedPurchasedPayload # Get a valid location_id location_id = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] # Purchase 40kg of feed purchase_payload = FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=2, bag_price_cents=2400, ) feed_service.purchase_feed(purchase_payload, now_utc, "test_user") # Give 6kg of feed give_payload = FeedGivenPayload( location_id=location_id, feed_type_code="layer", amount_kg=6, ) give_event = feed_service.give_feed(give_payload, now_utc + 1000, "test_user") # Verify initial state: 40 purchased, 6 given, 34 balance row = seeded_db.execute( "SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory " "WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 40 # purchased_kg assert row[1] == 6 # given_kg assert row[2] == 34 # balance_kg # Edit the give event: 6kg -> 4kg edit_event( db=seeded_db, event_store=seeded_event_store, registry=projection_registry, event_id=give_event.id, new_entity_refs={ "feed_type_code": "layer", "location_id": location_id, "amount_kg": 4, }, new_payload={"location_id": location_id, "feed_type_code": "layer", "amount_kg": 4}, edited_by="admin", edited_at_utc=now_utc + 2000, ) # Verify updated state: 40 purchased, 4 given, 36 balance row = seeded_db.execute( "SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory " "WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 40 # purchased_kg unchanged assert row[1] == 4 # given_kg reduced from 6 to 4 assert row[2] == 36 # balance_kg increased from 34 to 36 def test_edit_feed_purchased_updates_inventory( self, seeded_db, seeded_event_store, projection_registry, feed_service, now_utc ): """Editing a FeedPurchased event updates inventory via fast-revert.""" from animaltrack.events.edit import edit_event from animaltrack.events.payloads import FeedPurchasedPayload # Purchase 40kg of feed purchase_payload = FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=2, bag_price_cents=2400, ) purchase_event = feed_service.purchase_feed(purchase_payload, now_utc, "test_user") # Verify initial state row = seeded_db.execute( "SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 40 # purchased_kg assert row[1] == 40 # balance_kg # Edit the purchase: 40kg -> 60kg edit_event( db=seeded_db, event_store=seeded_event_store, registry=projection_registry, event_id=purchase_event.id, new_entity_refs={ "feed_type_code": "layer", "total_kg": 60, "price_per_kg_cents": 120, }, new_payload={ "feed_type_code": "layer", "bag_size_kg": 20, "bags_count": 3, "bag_price_cents": 2400, }, edited_by="admin", edited_at_utc=now_utc + 1000, ) # Verify updated state row = seeded_db.execute( "SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 60 # purchased_kg increased from 40 to 60 assert row[1] == 60 # balance_kg increased from 40 to 60 class TestUnboundedReplayStrategy: """Tests for unbounded replay when editing interval/snapshot events.""" @pytest.fixture def full_projection_registry(self, seeded_db): """Create a ProjectionRegistry with all projections registered.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.intervals import IntervalProjection registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) return registry @pytest.fixture def seeded_event_store(self, seeded_db): """Create an EventStore with seeded database.""" return EventStore(seeded_db) def test_edit_product_collected_updates_event( self, seeded_db, seeded_event_store, full_projection_registry, now_utc ): """Editing a ProductCollected event updates the event in the database. Stats are computed on-read from the events table, so editing the event will automatically update stats without needing projection updates. """ from animaltrack.events.edit import edit_event from animaltrack.events.types import PRODUCT_COLLECTED # Get a valid location_id location_id = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] # Create a ProductCollected event directly event = seeded_event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="test_user", entity_refs={ "location_id": location_id, "product_code": "egg.duck", "quantity": 8, "animal_ids": [], }, payload={}, ) # Edit the event: quantity 8 -> 6 edited_event = edit_event( db=seeded_db, event_store=seeded_event_store, registry=full_projection_registry, event_id=event.id, new_entity_refs={ "location_id": location_id, "product_code": "egg.duck", "quantity": 6, "animal_ids": [], }, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 1000, ) # Verify event was updated assert edited_event.version == 2 assert edited_event.entity_refs["quantity"] == 6 # Verify persisted in database retrieved = seeded_event_store.get_event(event.id) assert retrieved.entity_refs["quantity"] == 6 # Verify revision was stored row = seeded_db.execute( "SELECT entity_refs FROM event_revisions WHERE event_id = ?", (event.id,), ).fetchone() assert row is not None assert '"quantity": 8' in row[0] def test_edit_animal_moved_triggers_replay( self, seeded_db, seeded_event_store, full_projection_registry, now_utc ): """Editing an AnimalMoved event triggers unbounded replay of projections.""" from animaltrack.events.edit import edit_event from animaltrack.events.processor import process_event from animaltrack.events.types import ANIMAL_COHORT_CREATED, ANIMAL_MOVED from animaltrack.id_gen import generate_id # Get two locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 3").fetchall() loc1, loc2, loc3 = locations[0][0], locations[1][0], locations[2][0] # Create a cohort at loc1 animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_projection_registry) # Move from loc1 to loc2 move_event = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event, full_projection_registry) # Verify animal is at loc2 row = seeded_db.execute( """SELECT location_id FROM animal_location_intervals WHERE animal_id = ? AND end_utc IS NULL""", (animal_id,), ).fetchone() assert row[0] == loc2 # Edit the move: change destination to loc3 edit_event( db=seeded_db, event_store=seeded_event_store, registry=full_projection_registry, event_id=move_event.id, new_entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc3, }, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 2000, ) # Verify animal is now at loc3 (not loc2) row = seeded_db.execute( """SELECT location_id FROM animal_location_intervals WHERE animal_id = ? AND end_utc IS NULL""", (animal_id,), ).fetchone() assert row[0] == loc3 # Verify the old loc2 interval was removed count = seeded_db.execute( """SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_id, loc2), ).fetchone()[0] assert count == 0 class TestE2EEditEggEvent: """E2E Test #5: Edit egg event. From spec section 21.5: Edit the backdated 8→6. Expect Strip 1: eggs=33; cost_all=24/33=0.727±0.001; cost_layers=(12089g×€0.0012/g)/33=0.366±0.001; events.version++ and one row in event_revisions. """ @pytest.fixture def full_projection_registry(self, seeded_db): """Create a ProjectionRegistry with all projections.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.feed import FeedInventoryProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.projections.products import ProductsProjection registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) registry.register(ProductsProjection(seeded_db)) registry.register(FeedInventoryProjection(seeded_db)) return registry @pytest.fixture def services(self, seeded_db, full_projection_registry): """Create all services needed for E2E test.""" from animaltrack.services.animal import AnimalService from animaltrack.services.feed import FeedService from animaltrack.services.products import ProductService event_store = EventStore(seeded_db) return { "db": seeded_db, "event_store": event_store, "registry": full_projection_registry, "animal_service": AnimalService(seeded_db, event_store, full_projection_registry), "feed_service": FeedService(seeded_db, event_store, full_projection_registry), "product_service": ProductService(seeded_db, event_store, full_projection_registry), } @pytest.fixture def e2e_setup(self, seeded_db, services, now_utc): """Set up scenario for E2E test #5. Creates: - 10 adult female ducks at Strip 1 - Feed purchase (20kg @ EUR 1.20/kg) - Two egg collection events: 27 eggs first, then 8 eggs backdated - Total: 35 eggs before edit """ from animaltrack.events.payloads import ( AnimalCohortCreatedPayload, FeedGivenPayload, FeedPurchasedPayload, ProductCollectedPayload, ) # Get Strip 1 location location_id = seeded_db.execute( "SELECT id FROM locations WHERE name = 'Strip 1'" ).fetchone()[0] one_day_ms = 24 * 60 * 60 * 1000 animal_creation_ts = now_utc - one_day_ms # Create 10 adult female ducks cohort_payload = AnimalCohortCreatedPayload( species="duck", count=10, life_stage="adult", sex="female", location_id=location_id, origin="purchased", ) cohort_event = services["animal_service"].create_cohort( cohort_payload, animal_creation_ts, "test_user" ) animal_ids = cohort_event.entity_refs["animal_ids"] # Purchase feed: 20kg @ EUR 1.20/kg purchase_payload = FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=1, bag_price_cents=2400, # EUR 24 per 20kg = EUR 1.20/kg ) services["feed_service"].purchase_feed(purchase_payload, now_utc + 1000, "test_user") # Give feed: 20kg total give_payload = FeedGivenPayload( location_id=location_id, feed_type_code="layer", amount_kg=20, ) services["feed_service"].give_feed(give_payload, now_utc + 2000, "test_user") # Collect 27 eggs (first collection) collect1_payload = ProductCollectedPayload( location_id=location_id, product_code="egg.duck", quantity=27, resolved_ids=animal_ids, ) services["product_service"].collect_product(collect1_payload, now_utc + 3000, "test_user") # Collect 8 eggs (backdated - this is the event we'll edit) collect2_payload = ProductCollectedPayload( location_id=location_id, product_code="egg.duck", quantity=8, resolved_ids=animal_ids, ) backdated_event = services["product_service"].collect_product( collect2_payload, now_utc + 2500, "test_user", # Backdated before first collection ) return { "location_id": location_id, "backdated_event_id": backdated_event.id, "animal_ids": animal_ids, "ts_utc": now_utc + 3500, # After all events } def test_e2e_edit_egg_event_version_increments(self, seeded_db, services, e2e_setup, now_utc): """Editing the egg event increments version to 2.""" from animaltrack.events.edit import edit_event # Get the original event original = services["event_store"].get_event(e2e_setup["backdated_event_id"]) assert original.version == 1 # Edit: quantity 8 -> 6 edited = edit_event( db=seeded_db, event_store=services["event_store"], registry=services["registry"], event_id=e2e_setup["backdated_event_id"], new_entity_refs={ "location_id": e2e_setup["location_id"], "product_code": "egg.duck", "quantity": 6, "animal_ids": e2e_setup["animal_ids"], }, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 10000, ) assert edited.version == 2 def test_e2e_edit_egg_event_revision_stored(self, seeded_db, services, e2e_setup, now_utc): """Editing the egg event stores exactly one revision.""" from animaltrack.events.edit import edit_event # Edit: quantity 8 -> 6 edit_event( db=seeded_db, event_store=services["event_store"], registry=services["registry"], event_id=e2e_setup["backdated_event_id"], new_entity_refs={ "location_id": e2e_setup["location_id"], "product_code": "egg.duck", "quantity": 6, "animal_ids": e2e_setup["animal_ids"], }, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 10000, ) # Verify exactly one revision rows = seeded_db.execute( "SELECT version, entity_refs FROM event_revisions WHERE event_id = ?", (e2e_setup["backdated_event_id"],), ).fetchall() assert len(rows) == 1 assert rows[0][0] == 1 # Old version was 1 assert '"quantity": 8' in rows[0][1] # Old quantity was 8 def test_e2e_edit_egg_event_stats_updated(self, seeded_db, services, e2e_setup, now_utc): """Editing egg 8→6 updates stats from 35 eggs to 33 eggs. Before edit: 27 + 8 = 35 eggs After edit: 27 + 6 = 33 eggs cost_all = EUR 24 / 33 = 0.727 ± 0.001 """ from animaltrack.events.edit import edit_event from animaltrack.services.stats import get_egg_stats # Verify initial state: 35 eggs stats_before = get_egg_stats(seeded_db, e2e_setup["location_id"], e2e_setup["ts_utc"]) assert stats_before.eggs_total_pcs == 35 # Edit: quantity 8 -> 6 edit_event( db=seeded_db, event_store=services["event_store"], registry=services["registry"], event_id=e2e_setup["backdated_event_id"], new_entity_refs={ "location_id": e2e_setup["location_id"], "product_code": "egg.duck", "quantity": 6, "animal_ids": e2e_setup["animal_ids"], }, new_payload={}, edited_by="admin", edited_at_utc=now_utc + 10000, ) # Verify updated stats: 33 eggs stats_after = get_egg_stats(seeded_db, e2e_setup["location_id"], e2e_setup["ts_utc"]) assert stats_after.eggs_total_pcs == 33 # cost_all = EUR 24 / 33 = 0.727 # Feed: 20kg @ EUR 1.20/kg = EUR 24 assert abs(stats_after.cost_per_egg_all_eur - 0.727) < 0.001