# ABOUTME: Tests for AnimalRegistryProjection. # ABOUTME: Validates animal_registry and live_animals_by_location updates on cohort creation. import json from animaltrack.events.types import ANIMAL_COHORT_CREATED from animaltrack.models.events import Event from animaltrack.projections.animal_registry import AnimalRegistryProjection def make_cohort_event( animal_ids: list[str], location_id: str = "01ARZ3NDEKTSV4RRFFQ69G5FAV", species: str = "duck", sex: str = "unknown", life_stage: str = "adult", origin: str = "purchased", ts_utc: int = 1704067200000, ) -> Event: """Create a test AnimalCohortCreated event.""" return Event( id="01ARZ3NDEKTSV4RRFFQ69G5001", type=ANIMAL_COHORT_CREATED, ts_utc=ts_utc, actor="test_user", entity_refs={ "location_id": location_id, "animal_ids": animal_ids, }, payload={ "species": species, "count": len(animal_ids), "life_stage": life_stage, "sex": sex, "location_id": location_id, "origin": origin, "notes": None, }, version=1, ) class TestAnimalRegistryProjectionEventTypes: """Tests for get_event_types method.""" def test_handles_animal_cohort_created(self, seeded_db): """Projection handles AnimalCohortCreated event type.""" projection = AnimalRegistryProjection(seeded_db) assert ANIMAL_COHORT_CREATED in projection.get_event_types() class TestAnimalRegistryProjectionApply: """Tests for apply() on AnimalCohortCreated.""" def test_creates_animal_registry_row_for_each_animal(self, seeded_db): """Apply creates one row in animal_registry per animal_id.""" # Get a valid location_id from seeds row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", "01ARZ3NDEKTSV4RRFFQ69G5A03", ] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) # Check animal_registry has 3 rows count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count == 3 # Check each animal_id exists for animal_id in animal_ids: row = seeded_db.execute( "SELECT animal_id FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row is not None def test_animal_has_correct_species(self, seeded_db): """Registry row has correct species_code from payload.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id, species="goose") projection.apply(event) row = seeded_db.execute( "SELECT species_code FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "goose" def test_animal_has_correct_sex(self, seeded_db): """Registry row has correct sex from payload.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id, sex="female") projection.apply(event) row = seeded_db.execute( "SELECT sex FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "female" def test_animal_has_correct_life_stage(self, seeded_db): """Registry row has correct life_stage from payload.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id, life_stage="juvenile") projection.apply(event) row = seeded_db.execute( "SELECT life_stage FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "juvenile" def test_animal_has_correct_location(self, seeded_db): """Registry row has correct location_id from payload.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == location_id def test_animal_has_correct_origin(self, seeded_db): """Registry row has correct origin from payload.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id, origin="rescued") projection.apply(event) row = seeded_db.execute( "SELECT origin FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "rescued" def test_status_is_alive(self, seeded_db): """Registry row has status='alive' for new cohort.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "alive" def test_identified_is_false(self, seeded_db): """Registry row has identified=false for new cohort.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( "SELECT identified FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == 0 # SQLite stores booleans as 0/1 def test_repro_status_is_unknown(self, seeded_db): """Registry row has repro_status='unknown' for new cohort.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( "SELECT repro_status FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "unknown" def test_first_seen_utc_matches_event(self, seeded_db): """Registry row first_seen_utc matches event ts_utc.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] ts_utc = 1704067200000 projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id, ts_utc=ts_utc) projection.apply(event) row = seeded_db.execute( "SELECT first_seen_utc FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == ts_utc def test_last_event_utc_matches_event(self, seeded_db): """Registry row last_event_utc matches event ts_utc.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] ts_utc = 1704067200000 projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id, ts_utc=ts_utc) projection.apply(event) row = seeded_db.execute( "SELECT last_event_utc FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == ts_utc def test_creates_live_animal_row_for_each_animal(self, seeded_db): """Apply creates one row in live_animals_by_location per animal_id.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", ] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) # Check live_animals_by_location has 2 rows count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0] assert count == 2 # Check each animal_id exists for animal_id in animal_ids: row = seeded_db.execute( "SELECT animal_id FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone() assert row is not None def test_live_animal_tags_empty_json_array(self, seeded_db): """Live animal row has tags='[]' for new cohort.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( "SELECT tags FROM live_animals_by_location WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "[]" assert json.loads(row[0]) == [] def test_live_animal_last_move_utc_is_null(self, seeded_db): """Live animal row has last_move_utc=NULL for new cohort.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( "SELECT last_move_utc FROM live_animals_by_location WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] is None class TestAnimalRegistryProjectionRevert: """Tests for revert() on AnimalCohortCreated.""" def test_removes_animal_registry_rows(self, seeded_db): """Revert deletes rows from animal_registry.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", ] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) # Verify rows exist count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count == 2 # Revert projection.revert(event) # Verify rows removed count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count == 0 def test_removes_live_animal_rows(self, seeded_db): """Revert deletes rows from live_animals_by_location.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", ] projection = AnimalRegistryProjection(seeded_db) event = make_cohort_event(animal_ids, location_id=location_id) projection.apply(event) # Verify rows exist count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0] assert count == 2 # Revert projection.revert(event) # Verify rows removed count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0] assert count == 0 def test_revert_only_affects_event_animals(self, seeded_db): """Revert only removes animals from the specific event.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] # Create first cohort animal_ids_1 = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) event1 = make_cohort_event(animal_ids_1, location_id=location_id) projection.apply(event1) # Create second cohort with different event animal_ids_2 = ["01ARZ3NDEKTSV4RRFFQ69G5A02"] event2 = Event( id="01ARZ3NDEKTSV4RRFFQ69G5002", # Different event ID type=ANIMAL_COHORT_CREATED, ts_utc=1704067300000, actor="test_user", entity_refs={ "location_id": location_id, "animal_ids": animal_ids_2, }, payload={ "species": "duck", "count": 1, "life_stage": "adult", "sex": "unknown", "location_id": location_id, "origin": "purchased", "notes": None, }, version=1, ) projection.apply(event2) # Verify both exist count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count == 2 # Revert only event1 projection.revert(event1) # Event2's animal should still exist count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count == 1 row = seeded_db.execute("SELECT animal_id FROM animal_registry").fetchone() assert row[0] == animal_ids_2[0] # ============================================================================= # AnimalMoved Tests # ============================================================================= def make_move_event( event_id: str, animal_ids: list[str], from_location_id: str, to_location_id: str, ts_utc: int = 1704067300000, ) -> Event: """Create a test AnimalMoved event.""" from animaltrack.events.types import ANIMAL_MOVED return Event( id=event_id, type=ANIMAL_MOVED, ts_utc=ts_utc, actor="test_user", entity_refs={ "from_location_id": from_location_id, "to_location_id": to_location_id, "animal_ids": animal_ids, }, payload={ "to_location_id": to_location_id, "resolved_ids": animal_ids, "notes": None, }, version=1, ) class TestAnimalRegistryProjectionMoveEventTypes: """Tests for get_event_types method including move.""" def test_handles_animal_moved(self, seeded_db): """Projection handles AnimalMoved event type.""" from animaltrack.events.types import ANIMAL_MOVED projection = AnimalRegistryProjection(seeded_db) assert ANIMAL_MOVED in projection.get_event_types() class TestAnimalRegistryProjectionApplyMove: """Tests for apply() on AnimalMoved.""" def test_updates_location_in_animal_registry(self, seeded_db): """Apply move updates location_id in animal_registry.""" # Get two location IDs strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] # First create a cohort at Strip 1 animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1) projection.apply(cohort_event) # Verify initial location row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == strip1 # Now move to Strip 2 move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) # Verify new location row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == strip2 def test_updates_last_event_utc_in_registry(self, seeded_db): """Apply move updates last_event_utc in animal_registry.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1, ts_utc=1704067200000) projection.apply(cohort_event) move_ts = 1704067300000 move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ts_utc=move_ts, ) projection.apply(move_event) row = seeded_db.execute( "SELECT last_event_utc FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == move_ts def test_updates_location_in_live_animals(self, seeded_db): """Apply move updates location_id in live_animals_by_location.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) row = seeded_db.execute( "SELECT location_id FROM live_animals_by_location WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == strip2 def test_updates_last_move_utc_in_live_animals(self, seeded_db): """Apply move updates last_move_utc in live_animals_by_location.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1) projection.apply(cohort_event) # Verify last_move_utc is NULL initially row = seeded_db.execute( "SELECT last_move_utc FROM live_animals_by_location WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] is None move_ts = 1704067300000 move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ts_utc=move_ts, ) projection.apply(move_event) row = seeded_db.execute( "SELECT last_move_utc FROM live_animals_by_location WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == move_ts def test_moves_multiple_animals(self, seeded_db): """Apply move updates all animals in resolved_ids.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", "01ARZ3NDEKTSV4RRFFQ69G5A03", ] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) # All animals should now be at strip2 for animal_id in animal_ids: row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == strip2 class TestAnimalRegistryProjectionRevertMove: """Tests for revert() on AnimalMoved.""" def test_revert_restores_original_location_in_registry(self, seeded_db): """Revert move restores location_id in animal_registry.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1, ts_utc=1704067200000) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ts_utc=1704067300000, ) projection.apply(move_event) # Verify moved row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == strip2 # Revert the move projection.revert(move_event) # Location should be restored row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == strip1 def test_revert_restores_original_location_in_live_animals(self, seeded_db): """Revert move restores location_id in live_animals_by_location.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) projection.revert(move_event) row = seeded_db.execute( "SELECT location_id FROM live_animals_by_location WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == strip1 def test_revert_clears_last_move_utc_if_first_move(self, seeded_db): """Revert first move clears last_move_utc to NULL.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] projection = AnimalRegistryProjection(seeded_db) cohort_event = make_cohort_event(animal_ids, location_id=strip1) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) projection.revert(move_event) row = seeded_db.execute( "SELECT last_move_utc FROM live_animals_by_location WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] is None