Files
animaltrack/tests/test_repository_animal_timeline.py
Petru Paler 301b925be3 feat: implement Animal Detail page with timeline (Step 8.3)
Add GET /animals/{animal_id} route to display individual animal details:
- Header summary with species, location, status, tags
- Event timeline showing all events affecting the animal (newest first)
- Quick actions card (Move functional, others disabled for now)
- Merge info alert for animals that have been merged

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-30 20:14:12 +00:00

304 lines
11 KiB
Python

# ABOUTME: Tests for AnimalTimelineRepository - get_animal, get_timeline, get_merge_info.
# ABOUTME: Covers fetching animal details with joins and building event timelines.
import time
import pytest
from animaltrack.events import types as event_types
from animaltrack.events.payloads import (
AnimalCohortCreatedPayload,
AnimalMergedPayload,
AnimalMovedPayload,
AnimalTaggedPayload,
)
from animaltrack.events.store import EventStore
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.projections.tags import TagProjection
from animaltrack.repositories.animal_timeline import (
AnimalDetail,
AnimalTimelineRepository,
MergeInfo,
TimelineEvent,
)
from animaltrack.services.animal import AnimalService
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with animal projections registered."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(TagProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, event_store, projection_registry):
"""Create an AnimalService for testing."""
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def valid_location_id(seeded_db):
"""Get Strip 1 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
@pytest.fixture
def strip2_location_id(seeded_db):
"""Get Strip 2 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
return row[0]
def make_cohort_payload(
location_id: str,
count: int = 3,
species: str = "duck",
sex: str = "unknown",
life_stage: str = "adult",
) -> AnimalCohortCreatedPayload:
"""Create a cohort payload for testing."""
return AnimalCohortCreatedPayload(
species=species,
count=count,
life_stage=life_stage,
sex=sex,
location_id=location_id,
origin="purchased",
)
class TestAnimalTimelineRepositoryGetAnimal:
"""Tests for get_animal method."""
def test_returns_animal_detail_with_joined_data(
self, seeded_db, animal_service, valid_location_id
):
"""get_animal returns AnimalDetail with species and location names."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
repo = AnimalTimelineRepository(seeded_db)
animal = repo.get_animal(animal_id)
assert animal is not None
assert isinstance(animal, AnimalDetail)
assert animal.animal_id == animal_id
assert animal.species_code == "duck"
assert animal.species_name == "Duck" # From species table
assert animal.location_id == valid_location_id
assert animal.location_name == "Strip 1" # From locations table
assert animal.sex == "unknown"
assert animal.life_stage == "adult"
assert animal.status == "alive"
assert animal.origin == "purchased"
def test_returns_none_for_invalid_id(self, seeded_db):
"""get_animal returns None for non-existent animal ID."""
repo = AnimalTimelineRepository(seeded_db)
animal = repo.get_animal("00000000000000000000000000")
assert animal is None
def test_includes_active_tags(self, seeded_db, animal_service, valid_location_id):
"""get_animal includes currently active tags."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
# Add tags
tag_payload = AnimalTaggedPayload(tag="layer", resolved_ids=[animal_id])
animal_service.add_tag(tag_payload, ts_utc + 1000, "test_user")
repo = AnimalTimelineRepository(seeded_db)
animal = repo.get_animal(animal_id)
assert animal is not None
assert "layer" in animal.tags
def test_includes_nickname_when_identified(self, seeded_db, animal_service, valid_location_id):
"""get_animal includes nickname for identified animals."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
# Promote with nickname
seeded_db.execute(
"UPDATE animal_registry SET identified = 1, nickname = 'Daisy' WHERE animal_id = ?",
(animal_id,),
)
repo = AnimalTimelineRepository(seeded_db)
animal = repo.get_animal(animal_id)
assert animal is not None
assert animal.identified is True
assert animal.nickname == "Daisy"
class TestAnimalTimelineRepositoryGetTimeline:
"""Tests for get_timeline method."""
def test_returns_events_for_animal(self, seeded_db, animal_service, valid_location_id):
"""get_timeline returns events affecting the animal."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
repo = AnimalTimelineRepository(seeded_db)
timeline = repo.get_timeline(animal_id)
assert len(timeline) == 1
assert isinstance(timeline[0], TimelineEvent)
assert timeline[0].event_type == "AnimalCohortCreated"
assert timeline[0].actor == "test_user"
def test_orders_events_newest_first(
self, seeded_db, animal_service, valid_location_id, strip2_location_id
):
"""get_timeline returns events in descending timestamp order."""
ts_utc = int(time.time() * 1000)
# Create animal
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
# Move animal (later event)
move_payload = AnimalMovedPayload(
resolved_ids=[animal_id],
to_location_id=strip2_location_id,
)
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
repo = AnimalTimelineRepository(seeded_db)
timeline = repo.get_timeline(animal_id)
assert len(timeline) == 2
# Most recent first
assert timeline[0].event_type == "AnimalMoved"
assert timeline[1].event_type == "AnimalCohortCreated"
def test_returns_empty_for_nonexistent_animal(self, seeded_db):
"""get_timeline returns empty list for non-existent animal."""
repo = AnimalTimelineRepository(seeded_db)
timeline = repo.get_timeline("00000000000000000000000000")
assert timeline == []
def test_respects_limit_parameter(self, seeded_db, animal_service, valid_location_id):
"""get_timeline respects the limit parameter."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
# Add multiple tags to create more events
for i in range(5):
tag_payload = AnimalTaggedPayload(tag=f"tag{i}", resolved_ids=[animal_id])
animal_service.add_tag(tag_payload, ts_utc + (i + 1) * 1000, "test_user")
repo = AnimalTimelineRepository(seeded_db)
timeline = repo.get_timeline(animal_id, limit=3)
assert len(timeline) == 3
def test_includes_summary_data(self, seeded_db, animal_service, valid_location_id):
"""get_timeline includes summary data in each event."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
# Add a tag
tag_payload = AnimalTaggedPayload(tag="layer", resolved_ids=[animal_id])
animal_service.add_tag(tag_payload, ts_utc + 1000, "test_user")
repo = AnimalTimelineRepository(seeded_db)
timeline = repo.get_timeline(animal_id)
# Check tag event has summary data
tag_event = timeline[0]
assert tag_event.event_type == event_types.ANIMAL_TAGGED
assert "tag" in tag_event.summary
assert tag_event.summary["tag"] == "layer"
class TestAnimalTimelineRepositoryGetMergeInfo:
"""Tests for get_merge_info method."""
def test_returns_merge_info_for_merged_animal(
self, seeded_db, animal_service, valid_location_id
):
"""get_merge_info returns info about the survivor for merged animals."""
ts_utc = int(time.time() * 1000)
# Create two cohorts
payload1 = make_cohort_payload(valid_location_id, count=1)
event1 = animal_service.create_cohort(payload1, ts_utc, "test_user")
survivor_id = event1.entity_refs["animal_ids"][0]
payload2 = make_cohort_payload(valid_location_id, count=1)
event2 = animal_service.create_cohort(payload2, ts_utc + 1000, "test_user")
alias_id = event2.entity_refs["animal_ids"][0]
# Set up survivor with nickname
seeded_db.execute(
"UPDATE animal_registry SET identified = 1, nickname = 'Daisy' WHERE animal_id = ?",
(survivor_id,),
)
# Merge alias into survivor
merge_payload = AnimalMergedPayload(
survivor_animal_id=survivor_id,
merged_animal_ids=[alias_id],
)
animal_service.merge_animals(merge_payload, ts_utc + 2000, "test_user")
repo = AnimalTimelineRepository(seeded_db)
merge_info = repo.get_merge_info(alias_id)
assert merge_info is not None
assert isinstance(merge_info, MergeInfo)
assert merge_info.survivor_animal_id == survivor_id
assert merge_info.survivor_nickname == "Daisy"
assert merge_info.merged_at_utc == ts_utc + 2000
def test_returns_none_for_unmerged_animal(self, seeded_db, animal_service, valid_location_id):
"""get_merge_info returns None for animals that are not merged."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=1)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
repo = AnimalTimelineRepository(seeded_db)
merge_info = repo.get_merge_info(animal_id)
assert merge_info is None
def test_returns_none_for_invalid_id(self, seeded_db):
"""get_merge_info returns None for non-existent animal."""
repo = AnimalTimelineRepository(seeded_db)
merge_info = repo.get_merge_info("00000000000000000000000000")
assert merge_info is None