# ABOUTME: Tests for AnimalTimelineRepository - get_animal, get_timeline, get_merge_info. # ABOUTME: Covers fetching animal details with joins and building event timelines. import time import pytest from animaltrack.events import types as event_types from animaltrack.events.payloads import ( AnimalCohortCreatedPayload, AnimalMergedPayload, AnimalMovedPayload, AnimalTaggedPayload, ) from animaltrack.events.store import EventStore from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.projections.tags import TagProjection from animaltrack.repositories.animal_timeline import ( AnimalDetail, AnimalTimelineRepository, MergeInfo, TimelineEvent, ) from animaltrack.services.animal import AnimalService @pytest.fixture def event_store(seeded_db): """Create an EventStore for testing.""" return EventStore(seeded_db) @pytest.fixture def projection_registry(seeded_db): """Create a ProjectionRegistry with animal projections registered.""" registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) registry.register(TagProjection(seeded_db)) return registry @pytest.fixture def animal_service(seeded_db, event_store, projection_registry): """Create an AnimalService for testing.""" return AnimalService(seeded_db, event_store, projection_registry) @pytest.fixture def valid_location_id(seeded_db): """Get Strip 1 location ID from seeds.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() return row[0] @pytest.fixture def strip2_location_id(seeded_db): """Get Strip 2 location ID from seeds.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone() return row[0] def make_cohort_payload( location_id: str, count: int = 3, species: str = "duck", sex: str = "unknown", life_stage: str = "adult", ) -> AnimalCohortCreatedPayload: """Create a cohort payload for testing.""" return AnimalCohortCreatedPayload( species=species, count=count, life_stage=life_stage, sex=sex, location_id=location_id, origin="purchased", ) class TestAnimalTimelineRepositoryGetAnimal: """Tests for get_animal method.""" def test_returns_animal_detail_with_joined_data( self, seeded_db, animal_service, valid_location_id ): """get_animal returns AnimalDetail with species and location names.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] repo = AnimalTimelineRepository(seeded_db) animal = repo.get_animal(animal_id) assert animal is not None assert isinstance(animal, AnimalDetail) assert animal.animal_id == animal_id assert animal.species_code == "duck" assert animal.species_name == "Duck" # From species table assert animal.location_id == valid_location_id assert animal.location_name == "Strip 1" # From locations table assert animal.sex == "unknown" assert animal.life_stage == "adult" assert animal.status == "alive" assert animal.origin == "purchased" def test_returns_none_for_invalid_id(self, seeded_db): """get_animal returns None for non-existent animal ID.""" repo = AnimalTimelineRepository(seeded_db) animal = repo.get_animal("00000000000000000000000000") assert animal is None def test_includes_active_tags(self, seeded_db, animal_service, valid_location_id): """get_animal includes currently active tags.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] # Add tags tag_payload = AnimalTaggedPayload(tag="layer", resolved_ids=[animal_id]) animal_service.add_tag(tag_payload, ts_utc + 1000, "test_user") repo = AnimalTimelineRepository(seeded_db) animal = repo.get_animal(animal_id) assert animal is not None assert "layer" in animal.tags def test_includes_nickname_when_identified(self, seeded_db, animal_service, valid_location_id): """get_animal includes nickname for identified animals.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] # Promote with nickname seeded_db.execute( "UPDATE animal_registry SET identified = 1, nickname = 'Daisy' WHERE animal_id = ?", (animal_id,), ) repo = AnimalTimelineRepository(seeded_db) animal = repo.get_animal(animal_id) assert animal is not None assert animal.identified is True assert animal.nickname == "Daisy" class TestAnimalTimelineRepositoryGetTimeline: """Tests for get_timeline method.""" def test_returns_events_for_animal(self, seeded_db, animal_service, valid_location_id): """get_timeline returns events affecting the animal.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] repo = AnimalTimelineRepository(seeded_db) timeline = repo.get_timeline(animal_id) assert len(timeline) == 1 assert isinstance(timeline[0], TimelineEvent) assert timeline[0].event_type == "AnimalCohortCreated" assert timeline[0].actor == "test_user" def test_orders_events_newest_first( self, seeded_db, animal_service, valid_location_id, strip2_location_id ): """get_timeline returns events in descending timestamp order.""" ts_utc = int(time.time() * 1000) # Create animal payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] # Move animal (later event) move_payload = AnimalMovedPayload( resolved_ids=[animal_id], to_location_id=strip2_location_id, ) animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") repo = AnimalTimelineRepository(seeded_db) timeline = repo.get_timeline(animal_id) assert len(timeline) == 2 # Most recent first assert timeline[0].event_type == "AnimalMoved" assert timeline[1].event_type == "AnimalCohortCreated" def test_returns_empty_for_nonexistent_animal(self, seeded_db): """get_timeline returns empty list for non-existent animal.""" repo = AnimalTimelineRepository(seeded_db) timeline = repo.get_timeline("00000000000000000000000000") assert timeline == [] def test_respects_limit_parameter(self, seeded_db, animal_service, valid_location_id): """get_timeline respects the limit parameter.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] # Add multiple tags to create more events for i in range(5): tag_payload = AnimalTaggedPayload(tag=f"tag{i}", resolved_ids=[animal_id]) animal_service.add_tag(tag_payload, ts_utc + (i + 1) * 1000, "test_user") repo = AnimalTimelineRepository(seeded_db) timeline = repo.get_timeline(animal_id, limit=3) assert len(timeline) == 3 def test_includes_summary_data(self, seeded_db, animal_service, valid_location_id): """get_timeline includes summary data in each event.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] # Add a tag tag_payload = AnimalTaggedPayload(tag="layer", resolved_ids=[animal_id]) animal_service.add_tag(tag_payload, ts_utc + 1000, "test_user") repo = AnimalTimelineRepository(seeded_db) timeline = repo.get_timeline(animal_id) # Check tag event has summary data tag_event = timeline[0] assert tag_event.event_type == event_types.ANIMAL_TAGGED assert "tag" in tag_event.summary assert tag_event.summary["tag"] == "layer" class TestAnimalTimelineRepositoryGetMergeInfo: """Tests for get_merge_info method.""" def test_returns_merge_info_for_merged_animal( self, seeded_db, animal_service, valid_location_id ): """get_merge_info returns info about the survivor for merged animals.""" ts_utc = int(time.time() * 1000) # Create two cohorts payload1 = make_cohort_payload(valid_location_id, count=1) event1 = animal_service.create_cohort(payload1, ts_utc, "test_user") survivor_id = event1.entity_refs["animal_ids"][0] payload2 = make_cohort_payload(valid_location_id, count=1) event2 = animal_service.create_cohort(payload2, ts_utc + 1000, "test_user") alias_id = event2.entity_refs["animal_ids"][0] # Set up survivor with nickname seeded_db.execute( "UPDATE animal_registry SET identified = 1, nickname = 'Daisy' WHERE animal_id = ?", (survivor_id,), ) # Merge alias into survivor merge_payload = AnimalMergedPayload( survivor_animal_id=survivor_id, merged_animal_ids=[alias_id], ) animal_service.merge_animals(merge_payload, ts_utc + 2000, "test_user") repo = AnimalTimelineRepository(seeded_db) merge_info = repo.get_merge_info(alias_id) assert merge_info is not None assert isinstance(merge_info, MergeInfo) assert merge_info.survivor_animal_id == survivor_id assert merge_info.survivor_nickname == "Daisy" assert merge_info.merged_at_utc == ts_utc + 2000 def test_returns_none_for_unmerged_animal(self, seeded_db, animal_service, valid_location_id): """get_merge_info returns None for animals that are not merged.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=1) event = animal_service.create_cohort(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] repo = AnimalTimelineRepository(seeded_db) merge_info = repo.get_merge_info(animal_id) assert merge_info is None def test_returns_none_for_invalid_id(self, seeded_db): """get_merge_info returns None for non-existent animal.""" repo = AnimalTimelineRepository(seeded_db) merge_info = repo.get_merge_info("00000000000000000000000000") assert merge_info is None