# ABOUTME: Tests for animal lifecycle events in AnimalService. # ABOUTME: Covers HatchRecorded, AnimalOutcome, AnimalPromoted, AnimalMerged, AnimalStatusCorrected. import time import pytest from animaltrack.events.payloads import ( AnimalCohortCreatedPayload, AnimalMergedPayload, AnimalOutcomePayload, AnimalPromotedPayload, AnimalStatusCorrectedPayload, HatchRecordedPayload, ) from animaltrack.events.store import EventStore from animaltrack.events.types import ( ANIMAL_MERGED, ANIMAL_OUTCOME, ANIMAL_PROMOTED, ANIMAL_STATUS_CORRECTED, HATCH_RECORDED, ) from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.services.animal import AnimalService, ValidationError @pytest.fixture def event_store(seeded_db): """Create an EventStore for testing.""" return EventStore(seeded_db) @pytest.fixture def projection_registry(seeded_db): """Create a ProjectionRegistry with all animal projections registered.""" registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) return registry @pytest.fixture def animal_service(seeded_db, event_store, projection_registry): """Create an AnimalService for testing.""" return AnimalService(seeded_db, event_store, projection_registry) @pytest.fixture def strip1_id(seeded_db): """Get Strip 1 location ID from seeds.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() return row[0] @pytest.fixture def strip2_id(seeded_db): """Get Strip 2 location ID from seeds.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone() return row[0] def make_cohort_payload( location_id: str, count: int = 5, species: str = "duck", life_stage: str = "adult", sex: str = "female", origin: str = "purchased", ) -> AnimalCohortCreatedPayload: """Create a cohort payload for testing.""" return AnimalCohortCreatedPayload( species=species, count=count, life_stage=life_stage, sex=sex, location_id=location_id, origin=origin, ) # ============================================================================= # HatchRecorded Tests # ============================================================================= def make_hatch_payload( location_id: str, hatched_live: int = 5, species: str = "duck", assigned_brood_location_id: str | None = None, ) -> HatchRecordedPayload: """Create a hatch recorded payload for testing.""" return HatchRecordedPayload( species=species, location_id=location_id, assigned_brood_location_id=assigned_brood_location_id, hatched_live=hatched_live, ) class TestHatchRecorded: """Tests for record_hatch().""" def test_creates_hatch_recorded_event(self, seeded_db, animal_service, strip1_id): """record_hatch creates a HATCH_RECORDED event.""" payload = make_hatch_payload(strip1_id, hatched_live=3) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") assert event.type == HATCH_RECORDED assert event.actor == "test_user" assert event.ts_utc == ts_utc def test_event_has_animal_ids_in_entity_refs(self, seeded_db, animal_service, strip1_id): """Event entity_refs contains generated animal_ids matching hatched_live count.""" payload = make_hatch_payload(strip1_id, hatched_live=5) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") assert "animal_ids" in event.entity_refs assert len(event.entity_refs["animal_ids"]) == 5 # Verify all IDs are ULIDs (26 chars) for animal_id in event.entity_refs["animal_ids"]: assert len(animal_id) == 26 def test_uses_location_id_when_no_brood_location(self, seeded_db, animal_service, strip1_id): """Animals are placed at location_id when assigned_brood_location_id is None.""" payload = make_hatch_payload(strip1_id, hatched_live=2) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") assert event.entity_refs["location_id"] == strip1_id # Verify animals are at strip1 for animal_id in event.entity_refs["animal_ids"]: row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == strip1_id def test_uses_assigned_brood_location_when_provided( self, seeded_db, animal_service, strip1_id, strip2_id ): """Animals are placed at assigned_brood_location_id if provided.""" payload = make_hatch_payload( strip1_id, hatched_live=3, assigned_brood_location_id=strip2_id ) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") assert event.entity_refs["location_id"] == strip2_id # Verify animals are at strip2 (brood location), not strip1 for animal_id in event.entity_refs["animal_ids"]: row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == strip2_id def test_animals_created_as_hatchlings(self, seeded_db, animal_service, strip1_id): """Animals are created with life_stage=hatchling, sex=unknown, status=alive.""" payload = make_hatch_payload(strip1_id, hatched_live=2) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") for animal_id in event.entity_refs["animal_ids"]: row = seeded_db.execute( "SELECT life_stage, sex, status FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "hatchling" assert row[1] == "unknown" assert row[2] == "alive" def test_animals_have_origin_hatched(self, seeded_db, animal_service, strip1_id): """Animals are created with origin=hatched.""" payload = make_hatch_payload(strip1_id, hatched_live=1) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") animal_id = event.entity_refs["animal_ids"][0] row = seeded_db.execute( "SELECT origin FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "hatched" def test_animals_in_live_animals_by_location(self, seeded_db, animal_service, strip1_id): """Animals are inserted into live_animals_by_location.""" payload = make_hatch_payload(strip1_id, hatched_live=4) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0] assert count == 4 # Verify each animal is in live roster for animal_id in event.entity_refs["animal_ids"]: row = seeded_db.execute( "SELECT animal_id FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone() assert row is not None def test_location_intervals_created(self, seeded_db, animal_service, strip1_id): """Location intervals are created for each animal.""" payload = make_hatch_payload(strip1_id, hatched_live=3) ts_utc = int(time.time() * 1000) animal_service.record_hatch(payload, ts_utc, "test_user") count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0] assert count == 3 def test_attr_intervals_created(self, seeded_db, animal_service, strip1_id): """Attribute intervals (sex, life_stage, repro_status, status) are created.""" payload = make_hatch_payload(strip1_id, hatched_live=2) ts_utc = int(time.time() * 1000) animal_service.record_hatch(payload, ts_utc, "test_user") # 2 animals * 4 attrs = 8 intervals count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0] assert count == 8 def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id): """Event-animal links are created in event_animals table.""" payload = make_hatch_payload(strip1_id, hatched_live=4) ts_utc = int(time.time() * 1000) event = animal_service.record_hatch(payload, ts_utc, "test_user") count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (event.id,), ).fetchone()[0] assert count == 4 def test_validates_location_exists(self, seeded_db, animal_service): """Raises ValidationError if location_id doesn't exist.""" fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" payload = make_hatch_payload(fake_location_id, hatched_live=1) ts_utc = int(time.time() * 1000) with pytest.raises(ValidationError, match="not found"): animal_service.record_hatch(payload, ts_utc, "test_user") def test_validates_brood_location_exists(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if assigned_brood_location_id doesn't exist.""" fake_brood_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" payload = make_hatch_payload( strip1_id, hatched_live=1, assigned_brood_location_id=fake_brood_id ) ts_utc = int(time.time() * 1000) with pytest.raises(ValidationError, match="not found"): animal_service.record_hatch(payload, ts_utc, "test_user") def test_validates_species_active(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if species is not active.""" # Deactivate duck species seeded_db.execute("UPDATE species SET active = 0 WHERE code = 'duck'") payload = make_hatch_payload(strip1_id, hatched_live=1, species="duck") ts_utc = int(time.time() * 1000) with pytest.raises(ValidationError, match="not active"): animal_service.record_hatch(payload, ts_utc, "test_user") # ============================================================================= # AnimalOutcome Tests # ============================================================================= def make_outcome_payload( resolved_ids: list[str], outcome: str = "harvest", reason: str | None = None, yield_items: list | None = None, ) -> AnimalOutcomePayload: """Create an outcome payload for testing.""" return AnimalOutcomePayload( outcome=outcome, resolved_ids=resolved_ids, reason=reason, yield_items=yield_items, ) class TestAnimalOutcome: """Tests for record_outcome().""" def test_creates_animal_outcome_event(self, seeded_db, animal_service, strip1_id): """record_outcome creates an ANIMAL_OUTCOME event.""" # First create animals cohort_payload = make_cohort_payload(strip1_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Record outcome outcome_payload = make_outcome_payload(animal_ids, outcome="harvest") outcome_ts = ts_utc + 1000 outcome_event = animal_service.record_outcome(outcome_payload, outcome_ts, "test_user") assert outcome_event.type == ANIMAL_OUTCOME assert outcome_event.actor == "test_user" assert outcome_event.ts_utc == outcome_ts def test_updates_status_for_harvest(self, seeded_db, animal_service, strip1_id): """Harvest outcome sets status=harvested in animal_registry.""" cohort_payload = make_cohort_payload(strip1_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] outcome_payload = make_outcome_payload(animal_ids, outcome="harvest") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") for animal_id in animal_ids: row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "harvested" def test_updates_status_for_death(self, seeded_db, animal_service, strip1_id): """Death outcome sets status=dead in animal_registry.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] outcome_payload = make_outcome_payload([animal_id], outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "dead" def test_updates_status_for_sold(self, seeded_db, animal_service, strip1_id): """Sold outcome sets status=sold in animal_registry.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] outcome_payload = make_outcome_payload([animal_id], outcome="sold") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "sold" def test_updates_status_for_predator_loss(self, seeded_db, animal_service, strip1_id): """Predator loss outcome sets status=dead in animal_registry.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] outcome_payload = make_outcome_payload([animal_id], outcome="predator_loss") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "dead" def test_removes_from_live_animals(self, seeded_db, animal_service, strip1_id): """Animals are removed from live_animals_by_location.""" cohort_payload = make_cohort_payload(strip1_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Verify animals are in live roster count_before = seeded_db.execute( "SELECT COUNT(*) FROM live_animals_by_location" ).fetchone()[0] assert count_before == 3 # Record outcome for 2 of them outcome_payload = make_outcome_payload(animal_ids[:2], outcome="harvest") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Should have 1 left count_after = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[ 0 ] assert count_after == 1 def test_closes_location_interval(self, seeded_db, animal_service, strip1_id): """Open location interval is closed with end_utc=ts_utc.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] outcome_ts = ts_utc + 1000 outcome_payload = make_outcome_payload([animal_id], outcome="death") animal_service.record_outcome(outcome_payload, outcome_ts, "test_user") # Location interval should be closed row = seeded_db.execute( """SELECT end_utc FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_id, strip1_id), ).fetchone() assert row[0] == outcome_ts def test_closes_status_interval_and_creates_new(self, seeded_db, animal_service, strip1_id): """Open status='alive' interval is closed, new status interval created.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] outcome_ts = ts_utc + 1000 outcome_payload = make_outcome_payload([animal_id], outcome="harvest") animal_service.record_outcome(outcome_payload, outcome_ts, "test_user") # Alive interval should be closed alive_row = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'status' AND value = 'alive'""", (animal_id,), ).fetchone() assert alive_row[0] == outcome_ts # Harvested interval should be open harvested_row = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'status' AND value = 'harvested'""", (animal_id,), ).fetchone() assert harvested_row[0] is None def test_stores_yield_items_in_payload(self, seeded_db, animal_service, strip1_id): """yield_items are stored in the event payload.""" cohort_payload = make_cohort_payload(strip1_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] yield_items = [ { "product_code": "meat.part.breast.duck", "unit": "kg", "quantity": 2, "weight_kg": 1.4, }, {"product_code": "fat.rendered.duck", "unit": "kg", "quantity": 1, "weight_kg": 0.3}, ] outcome_payload = make_outcome_payload( animal_ids, outcome="harvest", yield_items=yield_items ) outcome_event = animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") assert outcome_event.payload["yield_items"] is not None assert len(outcome_event.payload["yield_items"]) == 2 assert outcome_event.payload["yield_items"][0]["product_code"] == "meat.part.breast.duck" def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id): """Event-animal links are created for all resolved_ids.""" cohort_payload = make_cohort_payload(strip1_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] outcome_payload = make_outcome_payload(animal_ids, outcome="harvest") outcome_event = animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (outcome_event.id,), ).fetchone()[0] assert count == 3 def test_validates_animals_exist(self, seeded_db, animal_service): """Raises ValidationError if any animal_id doesn't exist.""" fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" outcome_payload = make_outcome_payload([fake_animal_id], outcome="death") with pytest.raises(ValidationError, match="not found"): animal_service.record_outcome(outcome_payload, int(time.time() * 1000), "test_user") def test_validates_animals_alive(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if any animal is not status=alive.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # First outcome outcome_payload = make_outcome_payload([animal_id], outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Try to record another outcome on dead animal with pytest.raises(ValidationError, match="not alive"): animal_service.record_outcome( make_outcome_payload([animal_id], outcome="harvest"), ts_utc + 2000, "test_user", ) # ============================================================================= # AnimalPromoted Tests # ============================================================================= def make_promoted_payload( animal_id: str, nickname: str | None = None, sex: str | None = None, repro_status: str | None = None, ) -> AnimalPromotedPayload: """Create a promoted payload for testing.""" return AnimalPromotedPayload( animal_id=animal_id, nickname=nickname, sex=sex, repro_status=repro_status, ) class TestAnimalPromoted: """Tests for promote_animal().""" def test_creates_animal_promoted_event(self, seeded_db, animal_service, strip1_id): """promote_animal creates an ANIMAL_PROMOTED event.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] promoted_payload = make_promoted_payload(animal_id, nickname="Daisy") promoted_event = animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user") assert promoted_event.type == ANIMAL_PROMOTED assert promoted_event.actor == "test_user" def test_sets_identified_flag(self, seeded_db, animal_service, strip1_id): """promote_animal sets identified=1 in both registry tables.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Verify initially not identified row = seeded_db.execute( "SELECT identified FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == 0 promoted_payload = make_promoted_payload(animal_id, nickname="Daisy") animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user") # Check both tables row = seeded_db.execute( "SELECT identified FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == 1 row = seeded_db.execute( "SELECT identified FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == 1 def test_sets_nickname(self, seeded_db, animal_service, strip1_id): """promote_animal sets nickname in both registry tables.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] promoted_payload = make_promoted_payload(animal_id, nickname="Daisy") animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user") row = seeded_db.execute( "SELECT nickname FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "Daisy" row = seeded_db.execute( "SELECT nickname FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "Daisy" def test_updates_sex_and_creates_interval(self, seeded_db, animal_service, strip1_id): """promote_animal updates sex and creates new sex interval.""" # Create hatchling with unknown sex hatch_payload = make_hatch_payload(strip1_id, hatched_live=1) ts_utc = int(time.time() * 1000) hatch_event = animal_service.record_hatch(hatch_payload, ts_utc, "test_user") animal_id = hatch_event.entity_refs["animal_ids"][0] # Verify sex is unknown row = seeded_db.execute( "SELECT sex FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "unknown" # Promote with sex=female promoted_payload = make_promoted_payload(animal_id, nickname="Daisy", sex="female") promote_ts = ts_utc + 1000 animal_service.promote_animal(promoted_payload, promote_ts, "test_user") # Verify sex updated row = seeded_db.execute( "SELECT sex FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "female" # Verify old interval closed row = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'sex' AND value = 'unknown'""", (animal_id,), ).fetchone() assert row[0] == promote_ts # Verify new interval open row = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'sex' AND value = 'female'""", (animal_id,), ).fetchone() assert row[0] is None def test_updates_repro_status_and_creates_interval(self, seeded_db, animal_service, strip1_id): """promote_animal updates repro_status and creates new interval.""" cohort_payload = make_cohort_payload(strip1_id, count=1, sex="female") ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] promoted_payload = make_promoted_payload(animal_id, nickname="Daisy", repro_status="intact") promote_ts = ts_utc + 1000 animal_service.promote_animal(promoted_payload, promote_ts, "test_user") # Verify repro_status updated row = seeded_db.execute( "SELECT repro_status FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "intact" # Verify new repro_status interval exists row = seeded_db.execute( """SELECT start_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'repro_status' AND value = 'intact'""", (animal_id,), ).fetchone() assert row is not None def test_event_animal_link_created(self, seeded_db, animal_service, strip1_id): """Event-animal link is created for the promoted animal.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] promoted_payload = make_promoted_payload(animal_id, nickname="Daisy") promoted_event = animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user") count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (promoted_event.id,), ).fetchone()[0] assert count == 1 def test_validates_animal_exists(self, seeded_db, animal_service): """Raises ValidationError if animal_id doesn't exist.""" fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" promoted_payload = make_promoted_payload(fake_animal_id, nickname="Ghost") with pytest.raises(ValidationError, match="not found"): animal_service.promote_animal(promoted_payload, int(time.time() * 1000), "test_user") def test_validates_animal_alive(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if animal is not alive.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Kill the animal outcome_payload = make_outcome_payload([animal_id], outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Try to promote dead animal promoted_payload = make_promoted_payload(animal_id, nickname="Ghost") with pytest.raises(ValidationError, match="not alive"): animal_service.promote_animal(promoted_payload, ts_utc + 2000, "test_user") def test_validates_nickname_unique_among_active(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if nickname is already used by another active animal.""" cohort_payload = make_cohort_payload(strip1_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Promote first animal with nickname promoted_payload = make_promoted_payload(animal_ids[0], nickname="Daisy") animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user") # Try to promote second animal with same nickname duplicate_payload = make_promoted_payload(animal_ids[1], nickname="Daisy") with pytest.raises(ValidationError, match="(?i)nickname.*already in use"): animal_service.promote_animal(duplicate_payload, ts_utc + 2000, "test_user") # ============================================================================= # AnimalMerged Tests # ============================================================================= def make_merged_payload( survivor_animal_id: str, merged_animal_ids: list[str], notes: str | None = None, ) -> AnimalMergedPayload: """Create a merged payload for testing.""" return AnimalMergedPayload( survivor_animal_id=survivor_animal_id, merged_animal_ids=merged_animal_ids, notes=notes, ) class TestAnimalMerged: """Tests for merge_animals().""" def test_creates_animal_merged_event(self, seeded_db, animal_service, strip1_id): """merge_animals creates an ANIMAL_MERGED event.""" cohort_payload = make_cohort_payload(strip1_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:] ) merged_event = animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user") assert merged_event.type == ANIMAL_MERGED assert merged_event.actor == "test_user" def test_merged_animals_status_merged_into(self, seeded_db, animal_service, strip1_id): """Merged animals get status=merged_into.""" cohort_payload = make_cohort_payload(strip1_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:] ) animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user") # Survivor should still be alive row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "alive" # Merged animals should be merged_into for merged_id in animal_ids[1:]: row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (merged_id,), ).fetchone() assert row[0] == "merged_into" def test_removes_merged_from_live_animals(self, seeded_db, animal_service, strip1_id): """Merged animals are removed from live_animals_by_location.""" cohort_payload = make_cohort_payload(strip1_id, count=4) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] count_before = seeded_db.execute( "SELECT COUNT(*) FROM live_animals_by_location" ).fetchone()[0] assert count_before == 4 merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:] ) animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user") # Only survivor should remain count_after = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[ 0 ] assert count_after == 1 def test_creates_alias_records(self, seeded_db, animal_service, strip1_id): """Alias records are created mapping merged IDs to survivor.""" cohort_payload = make_cohort_payload(strip1_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:] ) animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user") # Verify alias records for merged_id in animal_ids[1:]: row = seeded_db.execute( """SELECT survivor_animal_id FROM animal_aliases WHERE alias_animal_id = ?""", (merged_id,), ).fetchone() assert row is not None assert row[0] == animal_ids[0] def test_closes_location_and_status_intervals(self, seeded_db, animal_service, strip1_id): """Merged animals get location and status intervals closed.""" cohort_payload = make_cohort_payload(strip1_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] merge_ts = ts_utc + 1000 merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=[animal_ids[1]] ) animal_service.merge_animals(merged_payload, merge_ts, "test_user") # Location interval closed row = seeded_db.execute( """SELECT end_utc FROM animal_location_intervals WHERE animal_id = ? AND end_utc IS NOT NULL""", (animal_ids[1],), ).fetchone() assert row is not None assert row[0] == merge_ts # Status alive interval closed row = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'status' AND value = 'alive'""", (animal_ids[1],), ).fetchone() assert row[0] == merge_ts # merged_into status interval created row = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'status' AND value = 'merged_into'""", (animal_ids[1],), ).fetchone() assert row[0] is None def test_event_animal_links_include_all(self, seeded_db, animal_service, strip1_id): """Event-animal links include survivor and all merged animals.""" cohort_payload = make_cohort_payload(strip1_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:] ) merged_event = animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user") count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (merged_event.id,), ).fetchone()[0] assert count == 3 # survivor + 2 merged def test_validates_survivor_not_in_merged_list(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if survivor is in merged list.""" cohort_payload = make_cohort_payload(strip1_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Include survivor in merged list merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids ) with pytest.raises(ValidationError, match="(?i)survivor.*cannot be in.*merged"): animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user") def test_validates_all_animals_exist(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if any animal doesn't exist.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] fake_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" merged_payload = make_merged_payload( survivor_animal_id=animal_id, merged_animal_ids=[fake_id] ) with pytest.raises(ValidationError, match="not found"): animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user") def test_validates_all_animals_alive(self, seeded_db, animal_service, strip1_id): """Raises ValidationError if any animal is not alive.""" cohort_payload = make_cohort_payload(strip1_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Kill one of the animals to merge outcome_payload = make_outcome_payload([animal_ids[2]], outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Try to merge including dead animal merged_payload = make_merged_payload( survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:] ) with pytest.raises(ValidationError, match="not alive"): animal_service.merge_animals(merged_payload, ts_utc + 2000, "test_user") # ============================================================================= # AnimalStatusCorrected Tests # ============================================================================= def make_status_corrected_payload( resolved_ids: list[str], new_status: str, reason: str, notes: str | None = None, ) -> AnimalStatusCorrectedPayload: """Create a status corrected payload for testing.""" return AnimalStatusCorrectedPayload( resolved_ids=resolved_ids, new_status=new_status, reason=reason, notes=notes, ) class TestAnimalStatusCorrected: """Tests for correct_status().""" def test_creates_animal_status_corrected_event(self, seeded_db, animal_service, strip1_id): """correct_status creates an ANIMAL_STATUS_CORRECTED event.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # First record as dead outcome_payload = make_outcome_payload([animal_id], outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Correct to alive corrected_payload = make_status_corrected_payload( [animal_id], "alive", "Misidentified animal, still alive" ) corrected_event = animal_service.correct_status( corrected_payload, ts_utc + 2000, "admin_user" ) assert corrected_event.type == ANIMAL_STATUS_CORRECTED assert corrected_event.actor == "admin_user" def test_updates_status_in_registry(self, seeded_db, animal_service, strip1_id): """correct_status updates status in animal_registry.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Mark as sold outcome_payload = make_outcome_payload([animal_id], outcome="sold") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Correct to dead corrected_payload = make_status_corrected_payload( [animal_id], "dead", "Animal actually died, not sold" ) animal_service.correct_status(corrected_payload, ts_utc + 2000, "admin_user") row = seeded_db.execute( "SELECT status FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "dead" def test_restores_to_live_animals_when_correcting_to_alive( self, seeded_db, animal_service, strip1_id ): """Correcting to alive restores animal to live_animals_by_location.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Mark as dead outcome_payload = make_outcome_payload([animal_id], outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Verify removed from live roster count = seeded_db.execute( "SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert count == 0 # Correct to alive corrected_payload = make_status_corrected_payload( [animal_id], "alive", "Was misidentified, still alive" ) animal_service.correct_status(corrected_payload, ts_utc + 2000, "admin_user") # Verify restored to live roster count = seeded_db.execute( "SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert count == 1 def test_removes_from_live_animals_when_correcting_from_alive( self, seeded_db, animal_service, strip1_id ): """Correcting from alive removes animal from live_animals_by_location.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Verify in live roster count = seeded_db.execute( "SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert count == 1 # Correct to dead (bypassing normal outcome event) corrected_payload = make_status_corrected_payload( [animal_id], "dead", "Animal found dead, correcting records" ) animal_service.correct_status(corrected_payload, ts_utc + 1000, "admin_user") # Verify removed from live roster count = seeded_db.execute( "SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert count == 0 def test_creates_status_interval_change(self, seeded_db, animal_service, strip1_id): """correct_status closes old status interval and creates new one.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Mark as dead outcome_payload = make_outcome_payload([animal_id], outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") correct_ts = ts_utc + 2000 corrected_payload = make_status_corrected_payload([animal_id], "alive", "Misidentified") animal_service.correct_status(corrected_payload, correct_ts, "admin_user") # dead interval should be closed row = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'status' AND value = 'dead'""", (animal_id,), ).fetchone() assert row[0] == correct_ts # New alive interval should be open row = seeded_db.execute( """SELECT start_utc, end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'status' AND value = 'alive' AND start_utc = ?""", (animal_id, correct_ts), ).fetchone() assert row is not None assert row[1] is None def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id): """Event-animal links are created for all corrected animals.""" cohort_payload = make_cohort_payload(strip1_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Mark both as dead outcome_payload = make_outcome_payload(animal_ids, outcome="death") animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user") # Correct both to alive corrected_payload = make_status_corrected_payload( animal_ids, "alive", "Both were misidentified" ) corrected_event = animal_service.correct_status( corrected_payload, ts_utc + 2000, "admin_user" ) count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (corrected_event.id,), ).fetchone()[0] assert count == 2 def test_validates_animals_exist(self, seeded_db, animal_service): """Raises ValidationError if any animal doesn't exist.""" fake_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" corrected_payload = make_status_corrected_payload([fake_id], "alive", "Testing") with pytest.raises(ValidationError, match="not found"): animal_service.correct_status(corrected_payload, int(time.time() * 1000), "admin_user") def test_stores_old_status_in_entity_refs(self, seeded_db, animal_service, strip1_id): """Event entity_refs contains old_status_map for revert.""" cohort_payload = make_cohort_payload(strip1_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] corrected_payload = make_status_corrected_payload([animal_id], "dead", "Correcting status") corrected_event = animal_service.correct_status( corrected_payload, ts_utc + 1000, "admin_user" ) assert "old_status_map" in corrected_event.entity_refs assert corrected_event.entity_refs["old_status_map"][animal_id] == "alive"