# ABOUTME: Tests for event deletion functionality. # ABOUTME: Validates tombstone creation, projection reversal, and cascade rules. import time import pytest from animaltrack.events.store import EventStore from animaltrack.events.types import ( ANIMAL_COHORT_CREATED, ANIMAL_MOVED, FEED_PURCHASED, ) from animaltrack.id_gen import generate_id @pytest.fixture def now_utc(): """Current time in milliseconds since epoch.""" return int(time.time() * 1000) @pytest.fixture def event_store(migrated_db): """Create an EventStore instance with a migrated database.""" return EventStore(migrated_db) @pytest.fixture def seeded_event_store(seeded_db): """Create an EventStore with seeded database.""" return EventStore(seeded_db) class TestFindDependentEvents: """Tests for finding events that depend on a target event.""" @pytest.fixture def full_registry(self, seeded_db): """Create a ProjectionRegistry with all projections.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.intervals import IntervalProjection registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) return registry def test_find_no_dependents_returns_empty(self, seeded_db, seeded_event_store, now_utc): """Event with no dependents returns empty list.""" from animaltrack.events.dependencies import find_dependent_events # Create a standalone event (FeedPurchased has no animal links) event = seeded_event_store.append_event( event_type=FEED_PURCHASED, ts_utc=now_utc, actor="test_user", entity_refs={ "feed_type_code": "layer", "total_kg": 20, "price_per_kg_cents": 120, }, payload={ "feed_type_code": "layer", "bag_size_kg": 20, "bags_count": 1, "bag_price_cents": 2400, }, ) dependents = find_dependent_events(seeded_db, seeded_event_store, event.id) assert dependents == [] def test_find_dependents_for_cohort_with_move( self, seeded_db, seeded_event_store, full_registry, now_utc ): """AnimalMoved event depends on the AnimalCohortCreated event.""" from animaltrack.events.dependencies import find_dependent_events from animaltrack.events.processor import process_event # Get two locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall() loc1, loc2 = locations[0][0], locations[1][0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # Move the animal move_event = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event, full_registry) # Find dependents of the cohort event dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id) assert len(dependents) == 1 assert dependents[0].id == move_event.id def test_find_dependents_excludes_tombstoned_events( self, seeded_db, seeded_event_store, full_registry, now_utc ): """Tombstoned events should not be returned as dependents.""" from animaltrack.events.dependencies import find_dependent_events from animaltrack.events.processor import process_event # Get two locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall() loc1, loc2 = locations[0][0], locations[1][0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # Move the animal move_event = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event, full_registry) # Tombstone the move event tombstone_id = generate_id() seeded_db.execute( """INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason) VALUES (?, ?, ?, ?, ?)""", (tombstone_id, now_utc + 2000, "admin", move_event.id, "Test deletion"), ) # Find dependents of the cohort event - should be empty now dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id) assert dependents == [] def test_find_dependents_does_not_include_self( self, seeded_db, seeded_event_store, full_registry, now_utc ): """The target event itself is not included in dependents.""" from animaltrack.events.dependencies import find_dependent_events from animaltrack.events.processor import process_event # Get a location loc1 = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # Find dependents of the cohort event (no moves, so only the cohort itself) dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id) # The cohort event should NOT be in the list assert cohort_event.id not in [d.id for d in dependents] def test_find_dependents_multiple_dependents( self, seeded_db, seeded_event_store, full_registry, now_utc ): """Multiple dependent events are all returned.""" from animaltrack.events.dependencies import find_dependent_events from animaltrack.events.processor import process_event # Get three locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 3").fetchall() loc1, loc2, loc3 = locations[0][0], locations[1][0], locations[2][0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # First move: loc1 -> loc2 move_event1 = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event1, full_registry) # Second move: loc2 -> loc3 move_event2 = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 2000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc2, "to_location_id": loc3, }, payload={}, ) process_event(move_event2, full_registry) # Find dependents of the cohort event dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id) assert len(dependents) == 2 dependent_ids = {d.id for d in dependents} assert move_event1.id in dependent_ids assert move_event2.id in dependent_ids class TestDeleteEventTombstone: """Tests for tombstone creation when deleting events.""" @pytest.fixture def feed_registry(self, seeded_db): """Create a ProjectionRegistry with feed projections.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.feed import FeedInventoryProjection registry = ProjectionRegistry() registry.register(FeedInventoryProjection(seeded_db)) return registry def test_delete_event_creates_tombstone( self, seeded_db, seeded_event_store, feed_registry, now_utc ): """Deleting an event creates a tombstone record.""" from animaltrack.events.delete import delete_event # Create a feed purchase event event = seeded_event_store.append_event( event_type=FEED_PURCHASED, ts_utc=now_utc, actor="test_user", entity_refs={ "feed_type_code": "layer", "total_kg": 20, "price_per_kg_cents": 120, }, payload={ "feed_type_code": "layer", "bag_size_kg": 20, "bags_count": 1, "bag_price_cents": 2400, }, ) # Delete the event delete_event( db=seeded_db, event_store=seeded_event_store, event_id=event.id, actor="admin", role="admin", registry=feed_registry, ) # Verify tombstone exists row = seeded_db.execute( "SELECT actor, target_event_id FROM event_tombstones WHERE target_event_id = ?", (event.id,), ).fetchone() assert row is not None assert row[0] == "admin" assert row[1] == event.id def test_delete_event_makes_event_tombstoned( self, seeded_db, seeded_event_store, feed_registry, now_utc ): """After deletion, is_tombstoned() returns True.""" from animaltrack.events.delete import delete_event event = seeded_event_store.append_event( event_type=FEED_PURCHASED, ts_utc=now_utc, actor="test_user", entity_refs={ "feed_type_code": "layer", "total_kg": 20, "price_per_kg_cents": 120, }, payload={ "feed_type_code": "layer", "bag_size_kg": 20, "bags_count": 1, "bag_price_cents": 2400, }, ) assert not seeded_event_store.is_tombstoned(event.id) delete_event( db=seeded_db, event_store=seeded_event_store, event_id=event.id, actor="admin", role="admin", registry=feed_registry, ) assert seeded_event_store.is_tombstoned(event.id) def test_delete_already_tombstoned_raises( self, seeded_db, seeded_event_store, feed_registry, now_utc ): """Deleting an already tombstoned event raises EventTombstonedError.""" from animaltrack.events.delete import delete_event from animaltrack.events.exceptions import EventTombstonedError event = seeded_event_store.append_event( event_type=FEED_PURCHASED, ts_utc=now_utc, actor="test_user", entity_refs={ "feed_type_code": "layer", "total_kg": 20, "price_per_kg_cents": 120, }, payload={ "feed_type_code": "layer", "bag_size_kg": 20, "bags_count": 1, "bag_price_cents": 2400, }, ) # First deletion succeeds delete_event( db=seeded_db, event_store=seeded_event_store, event_id=event.id, actor="admin", role="admin", registry=feed_registry, ) # Second deletion raises with pytest.raises(EventTombstonedError): delete_event( db=seeded_db, event_store=seeded_event_store, event_id=event.id, actor="admin", role="admin", registry=feed_registry, ) def test_delete_nonexistent_event_raises( self, seeded_db, seeded_event_store, feed_registry, now_utc ): """Deleting a non-existent event raises EventNotFoundError.""" from animaltrack.events.delete import delete_event from animaltrack.events.exceptions import EventNotFoundError with pytest.raises(EventNotFoundError): delete_event( db=seeded_db, event_store=seeded_event_store, event_id="01ARZ3NDEKTSV4RRFFQ69G5FAV", actor="admin", role="admin", registry=feed_registry, ) class TestDeleteEventProjectionRevert: """Tests for projection reversal when deleting events.""" @pytest.fixture def feed_registry(self, seeded_db): """Create a ProjectionRegistry with feed projections.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.feed import FeedInventoryProjection registry = ProjectionRegistry() registry.register(FeedInventoryProjection(seeded_db)) return registry @pytest.fixture def feed_service(self, seeded_db, feed_registry): """Create a FeedService for testing.""" from animaltrack.services.feed import FeedService event_store = EventStore(seeded_db) return FeedService(seeded_db, event_store, feed_registry) def test_delete_feed_given_reverts_inventory( self, seeded_db, seeded_event_store, feed_registry, feed_service, now_utc ): """Deleting a FeedGiven event reverts the inventory changes.""" from animaltrack.events.delete import delete_event from animaltrack.events.payloads import FeedGivenPayload, FeedPurchasedPayload # Get a valid location_id location_id = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] # Purchase 40kg of feed purchase_payload = FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=2, bag_price_cents=2400, ) feed_service.purchase_feed(purchase_payload, now_utc, "test_user") # Give 6kg of feed give_payload = FeedGivenPayload( location_id=location_id, feed_type_code="layer", amount_kg=6, ) give_event = feed_service.give_feed(give_payload, now_utc + 1000, "test_user") # Verify initial state: 40 purchased, 6 given, 34 balance row = seeded_db.execute( "SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory " "WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 40 # purchased_kg assert row[1] == 6 # given_kg assert row[2] == 34 # balance_kg # Delete the give event delete_event( db=seeded_db, event_store=seeded_event_store, event_id=give_event.id, actor="test_user", role="recorder", registry=feed_registry, ) # Verify reverted state: 40 purchased, 0 given, 40 balance row = seeded_db.execute( "SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory " "WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 40 # purchased_kg unchanged assert row[1] == 0 # given_kg reverted assert row[2] == 40 # balance_kg restored def test_delete_feed_purchased_reverts_inventory( self, seeded_db, seeded_event_store, feed_registry, feed_service, now_utc ): """Deleting a FeedPurchased event reverts the inventory changes.""" from animaltrack.events.delete import delete_event from animaltrack.events.payloads import FeedPurchasedPayload # Purchase 40kg of feed purchase_payload = FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=2, bag_price_cents=2400, ) purchase_event = feed_service.purchase_feed(purchase_payload, now_utc, "test_user") # Verify initial state row = seeded_db.execute( "SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 40 # purchased_kg assert row[1] == 40 # balance_kg # Delete the purchase event delete_event( db=seeded_db, event_store=seeded_event_store, event_id=purchase_event.id, actor="admin", role="admin", registry=feed_registry, ) # Verify reverted state: 0 purchased, 0 balance row = seeded_db.execute( "SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'" ).fetchone() assert row[0] == 0 # purchased_kg reverted assert row[1] == 0 # balance_kg reverted class TestDeleteEventRoleRules: """Tests for recorder vs admin deletion rules.""" @pytest.fixture def full_registry(self, seeded_db): """Create a ProjectionRegistry with all projections.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.intervals import IntervalProjection registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) return registry def test_recorder_blocked_with_dependents( self, seeded_db, seeded_event_store, full_registry, now_utc ): """Recorder cannot delete event that has dependents.""" from animaltrack.events.delete import delete_event from animaltrack.events.exceptions import DependentEventsError from animaltrack.events.processor import process_event # Get two locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall() loc1, loc2 = locations[0][0], locations[1][0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # Move the animal move_event = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event, full_registry) # Recorder tries to delete cohort - should fail with pytest.raises(DependentEventsError) as exc_info: delete_event( db=seeded_db, event_store=seeded_event_store, event_id=cohort_event.id, actor="test_user", role="recorder", registry=full_registry, ) # Verify the error contains the dependent event assert len(exc_info.value.dependent_events) == 1 assert exc_info.value.dependent_events[0].id == move_event.id def test_admin_blocked_without_cascade( self, seeded_db, seeded_event_store, full_registry, now_utc ): """Admin without cascade flag cannot delete event with dependents.""" from animaltrack.events.delete import delete_event from animaltrack.events.exceptions import DependentEventsError from animaltrack.events.processor import process_event # Get two locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall() loc1, loc2 = locations[0][0], locations[1][0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # Move the animal move_event = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event, full_registry) # Admin without cascade tries to delete cohort - should fail with pytest.raises(DependentEventsError): delete_event( db=seeded_db, event_store=seeded_event_store, event_id=cohort_event.id, actor="admin", role="admin", cascade=False, registry=full_registry, ) def test_admin_cascade_deletes_dependents( self, seeded_db, seeded_event_store, full_registry, now_utc ): """Admin with cascade flag deletes target and all dependents.""" from animaltrack.events.delete import delete_event from animaltrack.events.processor import process_event # Get two locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall() loc1, loc2 = locations[0][0], locations[1][0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # Move the animal move_event = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event, full_registry) # Admin with cascade deletes cohort delete_event( db=seeded_db, event_store=seeded_event_store, event_id=cohort_event.id, actor="admin", role="admin", cascade=True, registry=full_registry, ) # Both events should be tombstoned assert seeded_event_store.is_tombstoned(cohort_event.id) assert seeded_event_store.is_tombstoned(move_event.id) def test_cascade_reverts_all_projections( self, seeded_db, seeded_event_store, full_registry, now_utc ): """Cascade deletion reverts projections for all deleted events.""" from animaltrack.events.delete import delete_event from animaltrack.events.processor import process_event # Get two locations locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall() loc1, loc2 = locations[0][0], locations[1][0] # Create cohort animal_id = generate_id() cohort_event = seeded_event_store.append_event( event_type=ANIMAL_COHORT_CREATED, ts_utc=now_utc, actor="test_user", entity_refs={"animal_ids": [animal_id]}, payload={ "location_id": loc1, "species": "duck", "count": 1, "sex": "female", "life_stage": "adult", "origin": "hatched", }, ) process_event(cohort_event, full_registry) # Move the animal move_event = seeded_event_store.append_event( event_type=ANIMAL_MOVED, ts_utc=now_utc + 1000, actor="test_user", entity_refs={ "animal_ids": [animal_id], "from_location_id": loc1, "to_location_id": loc2, }, payload={}, ) process_event(move_event, full_registry) # Verify animal exists before deletion animal_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert animal_count == 1 # Admin cascade deletes cohort delete_event( db=seeded_db, event_store=seeded_event_store, event_id=cohort_event.id, actor="admin", role="admin", cascade=True, registry=full_registry, ) # Animal should be removed from registry animal_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert animal_count == 0 # Location intervals should be removed interval_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert interval_count == 0 # Attribute intervals should be removed attr_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert attr_count == 0