Files
animaltrack/tests/test_service_animal_lifecycle.py
Petru Paler 1153f6c5b6 feat: implement animal lifecycle events (Step 6.3)
Add 5 animal lifecycle event handlers with TDD:
- HatchRecorded: Creates hatchling animals at brood/event location
- AnimalOutcome: Records death/harvest/sold with yields, status updates
- AnimalPromoted: Sets identified flag, nickname, optionally updates sex/repro_status
- AnimalMerged: Merges animal records, creates aliases, removes merged from live roster
- AnimalStatusCorrected: Admin-only status correction with required reason

All events include:
- Projection handlers in animal_registry.py and intervals.py
- Event-animal linking in event_animals.py
- Service methods with validation in animal.py
- 51 unit tests covering event creation, projections, and validation
- E2E test #7 (harvest with yields) per spec §21.7

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 19:20:33 +00:00

1163 lines
48 KiB
Python

# ABOUTME: Tests for animal lifecycle events in AnimalService.
# ABOUTME: Covers HatchRecorded, AnimalOutcome, AnimalPromoted, AnimalMerged, AnimalStatusCorrected.
import time
import pytest
from animaltrack.events.payloads import (
AnimalCohortCreatedPayload,
AnimalMergedPayload,
AnimalOutcomePayload,
AnimalPromotedPayload,
AnimalStatusCorrectedPayload,
HatchRecordedPayload,
)
from animaltrack.events.store import EventStore
from animaltrack.events.types import (
ANIMAL_MERGED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_STATUS_CORRECTED,
HATCH_RECORDED,
)
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.services.animal import AnimalService, ValidationError
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with all animal projections registered."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, event_store, projection_registry):
"""Create an AnimalService for testing."""
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def strip1_id(seeded_db):
"""Get Strip 1 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
@pytest.fixture
def strip2_id(seeded_db):
"""Get Strip 2 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
return row[0]
def make_cohort_payload(
location_id: str,
count: int = 5,
species: str = "duck",
life_stage: str = "adult",
sex: str = "female",
origin: str = "purchased",
) -> AnimalCohortCreatedPayload:
"""Create a cohort payload for testing."""
return AnimalCohortCreatedPayload(
species=species,
count=count,
life_stage=life_stage,
sex=sex,
location_id=location_id,
origin=origin,
)
# =============================================================================
# HatchRecorded Tests
# =============================================================================
def make_hatch_payload(
location_id: str,
hatched_live: int = 5,
species: str = "duck",
assigned_brood_location_id: str | None = None,
) -> HatchRecordedPayload:
"""Create a hatch recorded payload for testing."""
return HatchRecordedPayload(
species=species,
location_id=location_id,
assigned_brood_location_id=assigned_brood_location_id,
hatched_live=hatched_live,
)
class TestHatchRecorded:
"""Tests for record_hatch()."""
def test_creates_hatch_recorded_event(self, seeded_db, animal_service, strip1_id):
"""record_hatch creates a HATCH_RECORDED event."""
payload = make_hatch_payload(strip1_id, hatched_live=3)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
assert event.type == HATCH_RECORDED
assert event.actor == "test_user"
assert event.ts_utc == ts_utc
def test_event_has_animal_ids_in_entity_refs(self, seeded_db, animal_service, strip1_id):
"""Event entity_refs contains generated animal_ids matching hatched_live count."""
payload = make_hatch_payload(strip1_id, hatched_live=5)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
assert "animal_ids" in event.entity_refs
assert len(event.entity_refs["animal_ids"]) == 5
# Verify all IDs are ULIDs (26 chars)
for animal_id in event.entity_refs["animal_ids"]:
assert len(animal_id) == 26
def test_uses_location_id_when_no_brood_location(self, seeded_db, animal_service, strip1_id):
"""Animals are placed at location_id when assigned_brood_location_id is None."""
payload = make_hatch_payload(strip1_id, hatched_live=2)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
assert event.entity_refs["location_id"] == strip1_id
# Verify animals are at strip1
for animal_id in event.entity_refs["animal_ids"]:
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == strip1_id
def test_uses_assigned_brood_location_when_provided(
self, seeded_db, animal_service, strip1_id, strip2_id
):
"""Animals are placed at assigned_brood_location_id if provided."""
payload = make_hatch_payload(
strip1_id, hatched_live=3, assigned_brood_location_id=strip2_id
)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
assert event.entity_refs["location_id"] == strip2_id
# Verify animals are at strip2 (brood location), not strip1
for animal_id in event.entity_refs["animal_ids"]:
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == strip2_id
def test_animals_created_as_hatchlings(self, seeded_db, animal_service, strip1_id):
"""Animals are created with life_stage=hatchling, sex=unknown, status=alive."""
payload = make_hatch_payload(strip1_id, hatched_live=2)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
for animal_id in event.entity_refs["animal_ids"]:
row = seeded_db.execute(
"SELECT life_stage, sex, status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "hatchling"
assert row[1] == "unknown"
assert row[2] == "alive"
def test_animals_have_origin_hatched(self, seeded_db, animal_service, strip1_id):
"""Animals are created with origin=hatched."""
payload = make_hatch_payload(strip1_id, hatched_live=1)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
animal_id = event.entity_refs["animal_ids"][0]
row = seeded_db.execute(
"SELECT origin FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "hatched"
def test_animals_in_live_animals_by_location(self, seeded_db, animal_service, strip1_id):
"""Animals are inserted into live_animals_by_location."""
payload = make_hatch_payload(strip1_id, hatched_live=4)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0]
assert count == 4
# Verify each animal is in live roster
for animal_id in event.entity_refs["animal_ids"]:
row = seeded_db.execute(
"SELECT animal_id FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row is not None
def test_location_intervals_created(self, seeded_db, animal_service, strip1_id):
"""Location intervals are created for each animal."""
payload = make_hatch_payload(strip1_id, hatched_live=3)
ts_utc = int(time.time() * 1000)
animal_service.record_hatch(payload, ts_utc, "test_user")
count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0]
assert count == 3
def test_attr_intervals_created(self, seeded_db, animal_service, strip1_id):
"""Attribute intervals (sex, life_stage, repro_status, status) are created."""
payload = make_hatch_payload(strip1_id, hatched_live=2)
ts_utc = int(time.time() * 1000)
animal_service.record_hatch(payload, ts_utc, "test_user")
# 2 animals * 4 attrs = 8 intervals
count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0]
assert count == 8
def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id):
"""Event-animal links are created in event_animals table."""
payload = make_hatch_payload(strip1_id, hatched_live=4)
ts_utc = int(time.time() * 1000)
event = animal_service.record_hatch(payload, ts_utc, "test_user")
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(event.id,),
).fetchone()[0]
assert count == 4
def test_validates_location_exists(self, seeded_db, animal_service):
"""Raises ValidationError if location_id doesn't exist."""
fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
payload = make_hatch_payload(fake_location_id, hatched_live=1)
ts_utc = int(time.time() * 1000)
with pytest.raises(ValidationError, match="not found"):
animal_service.record_hatch(payload, ts_utc, "test_user")
def test_validates_brood_location_exists(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if assigned_brood_location_id doesn't exist."""
fake_brood_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
payload = make_hatch_payload(
strip1_id, hatched_live=1, assigned_brood_location_id=fake_brood_id
)
ts_utc = int(time.time() * 1000)
with pytest.raises(ValidationError, match="not found"):
animal_service.record_hatch(payload, ts_utc, "test_user")
def test_validates_species_active(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if species is not active."""
# Deactivate duck species
seeded_db.execute("UPDATE species SET active = 0 WHERE code = 'duck'")
payload = make_hatch_payload(strip1_id, hatched_live=1, species="duck")
ts_utc = int(time.time() * 1000)
with pytest.raises(ValidationError, match="not active"):
animal_service.record_hatch(payload, ts_utc, "test_user")
# =============================================================================
# AnimalOutcome Tests
# =============================================================================
def make_outcome_payload(
resolved_ids: list[str],
outcome: str = "harvest",
reason: str | None = None,
yield_items: list | None = None,
) -> AnimalOutcomePayload:
"""Create an outcome payload for testing."""
return AnimalOutcomePayload(
outcome=outcome,
resolved_ids=resolved_ids,
reason=reason,
yield_items=yield_items,
)
class TestAnimalOutcome:
"""Tests for record_outcome()."""
def test_creates_animal_outcome_event(self, seeded_db, animal_service, strip1_id):
"""record_outcome creates an ANIMAL_OUTCOME event."""
# First create animals
cohort_payload = make_cohort_payload(strip1_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Record outcome
outcome_payload = make_outcome_payload(animal_ids, outcome="harvest")
outcome_ts = ts_utc + 1000
outcome_event = animal_service.record_outcome(outcome_payload, outcome_ts, "test_user")
assert outcome_event.type == ANIMAL_OUTCOME
assert outcome_event.actor == "test_user"
assert outcome_event.ts_utc == outcome_ts
def test_updates_status_for_harvest(self, seeded_db, animal_service, strip1_id):
"""Harvest outcome sets status=harvested in animal_registry."""
cohort_payload = make_cohort_payload(strip1_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
outcome_payload = make_outcome_payload(animal_ids, outcome="harvest")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
for animal_id in animal_ids:
row = seeded_db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "harvested"
def test_updates_status_for_death(self, seeded_db, animal_service, strip1_id):
"""Death outcome sets status=dead in animal_registry."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
outcome_payload = make_outcome_payload([animal_id], outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "dead"
def test_updates_status_for_sold(self, seeded_db, animal_service, strip1_id):
"""Sold outcome sets status=sold in animal_registry."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
outcome_payload = make_outcome_payload([animal_id], outcome="sold")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "sold"
def test_updates_status_for_predator_loss(self, seeded_db, animal_service, strip1_id):
"""Predator loss outcome sets status=dead in animal_registry."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
outcome_payload = make_outcome_payload([animal_id], outcome="predator_loss")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "dead"
def test_removes_from_live_animals(self, seeded_db, animal_service, strip1_id):
"""Animals are removed from live_animals_by_location."""
cohort_payload = make_cohort_payload(strip1_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Verify animals are in live roster
count_before = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location"
).fetchone()[0]
assert count_before == 3
# Record outcome for 2 of them
outcome_payload = make_outcome_payload(animal_ids[:2], outcome="harvest")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Should have 1 left
count_after = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[
0
]
assert count_after == 1
def test_closes_location_interval(self, seeded_db, animal_service, strip1_id):
"""Open location interval is closed with end_utc=ts_utc."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
outcome_ts = ts_utc + 1000
outcome_payload = make_outcome_payload([animal_id], outcome="death")
animal_service.record_outcome(outcome_payload, outcome_ts, "test_user")
# Location interval should be closed
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_id, strip1_id),
).fetchone()
assert row[0] == outcome_ts
def test_closes_status_interval_and_creates_new(self, seeded_db, animal_service, strip1_id):
"""Open status='alive' interval is closed, new status interval created."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
outcome_ts = ts_utc + 1000
outcome_payload = make_outcome_payload([animal_id], outcome="harvest")
animal_service.record_outcome(outcome_payload, outcome_ts, "test_user")
# Alive interval should be closed
alive_row = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'""",
(animal_id,),
).fetchone()
assert alive_row[0] == outcome_ts
# Harvested interval should be open
harvested_row = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = 'harvested'""",
(animal_id,),
).fetchone()
assert harvested_row[0] is None
def test_stores_yield_items_in_payload(self, seeded_db, animal_service, strip1_id):
"""yield_items are stored in the event payload."""
cohort_payload = make_cohort_payload(strip1_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
yield_items = [
{
"product_code": "meat.part.breast.duck",
"unit": "kg",
"quantity": 2,
"weight_kg": 1.4,
},
{"product_code": "fat.rendered.duck", "unit": "kg", "quantity": 1, "weight_kg": 0.3},
]
outcome_payload = make_outcome_payload(
animal_ids, outcome="harvest", yield_items=yield_items
)
outcome_event = animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
assert outcome_event.payload["yield_items"] is not None
assert len(outcome_event.payload["yield_items"]) == 2
assert outcome_event.payload["yield_items"][0]["product_code"] == "meat.part.breast.duck"
def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id):
"""Event-animal links are created for all resolved_ids."""
cohort_payload = make_cohort_payload(strip1_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
outcome_payload = make_outcome_payload(animal_ids, outcome="harvest")
outcome_event = animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(outcome_event.id,),
).fetchone()[0]
assert count == 3
def test_validates_animals_exist(self, seeded_db, animal_service):
"""Raises ValidationError if any animal_id doesn't exist."""
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
outcome_payload = make_outcome_payload([fake_animal_id], outcome="death")
with pytest.raises(ValidationError, match="not found"):
animal_service.record_outcome(outcome_payload, int(time.time() * 1000), "test_user")
def test_validates_animals_alive(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if any animal is not status=alive."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# First outcome
outcome_payload = make_outcome_payload([animal_id], outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Try to record another outcome on dead animal
with pytest.raises(ValidationError, match="not alive"):
animal_service.record_outcome(
make_outcome_payload([animal_id], outcome="harvest"),
ts_utc + 2000,
"test_user",
)
# =============================================================================
# AnimalPromoted Tests
# =============================================================================
def make_promoted_payload(
animal_id: str,
nickname: str | None = None,
sex: str | None = None,
repro_status: str | None = None,
) -> AnimalPromotedPayload:
"""Create a promoted payload for testing."""
return AnimalPromotedPayload(
animal_id=animal_id,
nickname=nickname,
sex=sex,
repro_status=repro_status,
)
class TestAnimalPromoted:
"""Tests for promote_animal()."""
def test_creates_animal_promoted_event(self, seeded_db, animal_service, strip1_id):
"""promote_animal creates an ANIMAL_PROMOTED event."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
promoted_event = animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
assert promoted_event.type == ANIMAL_PROMOTED
assert promoted_event.actor == "test_user"
def test_sets_identified_flag(self, seeded_db, animal_service, strip1_id):
"""promote_animal sets identified=1 in both registry tables."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Verify initially not identified
row = seeded_db.execute(
"SELECT identified FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == 0
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
# Check both tables
row = seeded_db.execute(
"SELECT identified FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == 1
row = seeded_db.execute(
"SELECT identified FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == 1
def test_sets_nickname(self, seeded_db, animal_service, strip1_id):
"""promote_animal sets nickname in both registry tables."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT nickname FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "Daisy"
row = seeded_db.execute(
"SELECT nickname FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "Daisy"
def test_updates_sex_and_creates_interval(self, seeded_db, animal_service, strip1_id):
"""promote_animal updates sex and creates new sex interval."""
# Create hatchling with unknown sex
hatch_payload = make_hatch_payload(strip1_id, hatched_live=1)
ts_utc = int(time.time() * 1000)
hatch_event = animal_service.record_hatch(hatch_payload, ts_utc, "test_user")
animal_id = hatch_event.entity_refs["animal_ids"][0]
# Verify sex is unknown
row = seeded_db.execute(
"SELECT sex FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "unknown"
# Promote with sex=female
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy", sex="female")
promote_ts = ts_utc + 1000
animal_service.promote_animal(promoted_payload, promote_ts, "test_user")
# Verify sex updated
row = seeded_db.execute(
"SELECT sex FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "female"
# Verify old interval closed
row = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'sex' AND value = 'unknown'""",
(animal_id,),
).fetchone()
assert row[0] == promote_ts
# Verify new interval open
row = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'sex' AND value = 'female'""",
(animal_id,),
).fetchone()
assert row[0] is None
def test_updates_repro_status_and_creates_interval(self, seeded_db, animal_service, strip1_id):
"""promote_animal updates repro_status and creates new interval."""
cohort_payload = make_cohort_payload(strip1_id, count=1, sex="female")
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy", repro_status="intact")
promote_ts = ts_utc + 1000
animal_service.promote_animal(promoted_payload, promote_ts, "test_user")
# Verify repro_status updated
row = seeded_db.execute(
"SELECT repro_status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "intact"
# Verify new repro_status interval exists
row = seeded_db.execute(
"""SELECT start_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'repro_status' AND value = 'intact'""",
(animal_id,),
).fetchone()
assert row is not None
def test_event_animal_link_created(self, seeded_db, animal_service, strip1_id):
"""Event-animal link is created for the promoted animal."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
promoted_event = animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(promoted_event.id,),
).fetchone()[0]
assert count == 1
def test_validates_animal_exists(self, seeded_db, animal_service):
"""Raises ValidationError if animal_id doesn't exist."""
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
promoted_payload = make_promoted_payload(fake_animal_id, nickname="Ghost")
with pytest.raises(ValidationError, match="not found"):
animal_service.promote_animal(promoted_payload, int(time.time() * 1000), "test_user")
def test_validates_animal_alive(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if animal is not alive."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Kill the animal
outcome_payload = make_outcome_payload([animal_id], outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Try to promote dead animal
promoted_payload = make_promoted_payload(animal_id, nickname="Ghost")
with pytest.raises(ValidationError, match="not alive"):
animal_service.promote_animal(promoted_payload, ts_utc + 2000, "test_user")
def test_validates_nickname_unique_among_active(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if nickname is already used by another active animal."""
cohort_payload = make_cohort_payload(strip1_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Promote first animal with nickname
promoted_payload = make_promoted_payload(animal_ids[0], nickname="Daisy")
animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
# Try to promote second animal with same nickname
duplicate_payload = make_promoted_payload(animal_ids[1], nickname="Daisy")
with pytest.raises(ValidationError, match="(?i)nickname.*already in use"):
animal_service.promote_animal(duplicate_payload, ts_utc + 2000, "test_user")
# =============================================================================
# AnimalMerged Tests
# =============================================================================
def make_merged_payload(
survivor_animal_id: str,
merged_animal_ids: list[str],
notes: str | None = None,
) -> AnimalMergedPayload:
"""Create a merged payload for testing."""
return AnimalMergedPayload(
survivor_animal_id=survivor_animal_id,
merged_animal_ids=merged_animal_ids,
notes=notes,
)
class TestAnimalMerged:
"""Tests for merge_animals()."""
def test_creates_animal_merged_event(self, seeded_db, animal_service, strip1_id):
"""merge_animals creates an ANIMAL_MERGED event."""
cohort_payload = make_cohort_payload(strip1_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
)
merged_event = animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
assert merged_event.type == ANIMAL_MERGED
assert merged_event.actor == "test_user"
def test_merged_animals_status_merged_into(self, seeded_db, animal_service, strip1_id):
"""Merged animals get status=merged_into."""
cohort_payload = make_cohort_payload(strip1_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
)
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
# Survivor should still be alive
row = seeded_db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == "alive"
# Merged animals should be merged_into
for merged_id in animal_ids[1:]:
row = seeded_db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(merged_id,),
).fetchone()
assert row[0] == "merged_into"
def test_removes_merged_from_live_animals(self, seeded_db, animal_service, strip1_id):
"""Merged animals are removed from live_animals_by_location."""
cohort_payload = make_cohort_payload(strip1_id, count=4)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
count_before = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location"
).fetchone()[0]
assert count_before == 4
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
)
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
# Only survivor should remain
count_after = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[
0
]
assert count_after == 1
def test_creates_alias_records(self, seeded_db, animal_service, strip1_id):
"""Alias records are created mapping merged IDs to survivor."""
cohort_payload = make_cohort_payload(strip1_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
)
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
# Verify alias records
for merged_id in animal_ids[1:]:
row = seeded_db.execute(
"""SELECT survivor_animal_id FROM animal_aliases
WHERE alias_animal_id = ?""",
(merged_id,),
).fetchone()
assert row is not None
assert row[0] == animal_ids[0]
def test_closes_location_and_status_intervals(self, seeded_db, animal_service, strip1_id):
"""Merged animals get location and status intervals closed."""
cohort_payload = make_cohort_payload(strip1_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
merge_ts = ts_utc + 1000
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=[animal_ids[1]]
)
animal_service.merge_animals(merged_payload, merge_ts, "test_user")
# Location interval closed
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND end_utc IS NOT NULL""",
(animal_ids[1],),
).fetchone()
assert row is not None
assert row[0] == merge_ts
# Status alive interval closed
row = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'""",
(animal_ids[1],),
).fetchone()
assert row[0] == merge_ts
# merged_into status interval created
row = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = 'merged_into'""",
(animal_ids[1],),
).fetchone()
assert row[0] is None
def test_event_animal_links_include_all(self, seeded_db, animal_service, strip1_id):
"""Event-animal links include survivor and all merged animals."""
cohort_payload = make_cohort_payload(strip1_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
)
merged_event = animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(merged_event.id,),
).fetchone()[0]
assert count == 3 # survivor + 2 merged
def test_validates_survivor_not_in_merged_list(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if survivor is in merged list."""
cohort_payload = make_cohort_payload(strip1_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Include survivor in merged list
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids
)
with pytest.raises(ValidationError, match="(?i)survivor.*cannot be in.*merged"):
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
def test_validates_all_animals_exist(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if any animal doesn't exist."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
merged_payload = make_merged_payload(
survivor_animal_id=animal_id, merged_animal_ids=[fake_id]
)
with pytest.raises(ValidationError, match="not found"):
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
def test_validates_all_animals_alive(self, seeded_db, animal_service, strip1_id):
"""Raises ValidationError if any animal is not alive."""
cohort_payload = make_cohort_payload(strip1_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Kill one of the animals to merge
outcome_payload = make_outcome_payload([animal_ids[2]], outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Try to merge including dead animal
merged_payload = make_merged_payload(
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
)
with pytest.raises(ValidationError, match="not alive"):
animal_service.merge_animals(merged_payload, ts_utc + 2000, "test_user")
# =============================================================================
# AnimalStatusCorrected Tests
# =============================================================================
def make_status_corrected_payload(
resolved_ids: list[str],
new_status: str,
reason: str,
notes: str | None = None,
) -> AnimalStatusCorrectedPayload:
"""Create a status corrected payload for testing."""
return AnimalStatusCorrectedPayload(
resolved_ids=resolved_ids,
new_status=new_status,
reason=reason,
notes=notes,
)
class TestAnimalStatusCorrected:
"""Tests for correct_status()."""
def test_creates_animal_status_corrected_event(self, seeded_db, animal_service, strip1_id):
"""correct_status creates an ANIMAL_STATUS_CORRECTED event."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# First record as dead
outcome_payload = make_outcome_payload([animal_id], outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Correct to alive
corrected_payload = make_status_corrected_payload(
[animal_id], "alive", "Misidentified animal, still alive"
)
corrected_event = animal_service.correct_status(
corrected_payload, ts_utc + 2000, "admin_user"
)
assert corrected_event.type == ANIMAL_STATUS_CORRECTED
assert corrected_event.actor == "admin_user"
def test_updates_status_in_registry(self, seeded_db, animal_service, strip1_id):
"""correct_status updates status in animal_registry."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Mark as sold
outcome_payload = make_outcome_payload([animal_id], outcome="sold")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Correct to dead
corrected_payload = make_status_corrected_payload(
[animal_id], "dead", "Animal actually died, not sold"
)
animal_service.correct_status(corrected_payload, ts_utc + 2000, "admin_user")
row = seeded_db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "dead"
def test_restores_to_live_animals_when_correcting_to_alive(
self, seeded_db, animal_service, strip1_id
):
"""Correcting to alive restores animal to live_animals_by_location."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Mark as dead
outcome_payload = make_outcome_payload([animal_id], outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Verify removed from live roster
count = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert count == 0
# Correct to alive
corrected_payload = make_status_corrected_payload(
[animal_id], "alive", "Was misidentified, still alive"
)
animal_service.correct_status(corrected_payload, ts_utc + 2000, "admin_user")
# Verify restored to live roster
count = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert count == 1
def test_removes_from_live_animals_when_correcting_from_alive(
self, seeded_db, animal_service, strip1_id
):
"""Correcting from alive removes animal from live_animals_by_location."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Verify in live roster
count = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert count == 1
# Correct to dead (bypassing normal outcome event)
corrected_payload = make_status_corrected_payload(
[animal_id], "dead", "Animal found dead, correcting records"
)
animal_service.correct_status(corrected_payload, ts_utc + 1000, "admin_user")
# Verify removed from live roster
count = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert count == 0
def test_creates_status_interval_change(self, seeded_db, animal_service, strip1_id):
"""correct_status closes old status interval and creates new one."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Mark as dead
outcome_payload = make_outcome_payload([animal_id], outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
correct_ts = ts_utc + 2000
corrected_payload = make_status_corrected_payload([animal_id], "alive", "Misidentified")
animal_service.correct_status(corrected_payload, correct_ts, "admin_user")
# dead interval should be closed
row = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = 'dead'""",
(animal_id,),
).fetchone()
assert row[0] == correct_ts
# New alive interval should be open
row = seeded_db.execute(
"""SELECT start_utc, end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'
AND start_utc = ?""",
(animal_id, correct_ts),
).fetchone()
assert row is not None
assert row[1] is None
def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id):
"""Event-animal links are created for all corrected animals."""
cohort_payload = make_cohort_payload(strip1_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Mark both as dead
outcome_payload = make_outcome_payload(animal_ids, outcome="death")
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
# Correct both to alive
corrected_payload = make_status_corrected_payload(
animal_ids, "alive", "Both were misidentified"
)
corrected_event = animal_service.correct_status(
corrected_payload, ts_utc + 2000, "admin_user"
)
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(corrected_event.id,),
).fetchone()[0]
assert count == 2
def test_validates_animals_exist(self, seeded_db, animal_service):
"""Raises ValidationError if any animal doesn't exist."""
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
corrected_payload = make_status_corrected_payload([fake_id], "alive", "Testing")
with pytest.raises(ValidationError, match="not found"):
animal_service.correct_status(corrected_payload, int(time.time() * 1000), "admin_user")
def test_stores_old_status_in_entity_refs(self, seeded_db, animal_service, strip1_id):
"""Event entity_refs contains old_status_map for revert."""
cohort_payload = make_cohort_payload(strip1_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
corrected_payload = make_status_corrected_payload([animal_id], "dead", "Correcting status")
corrected_event = animal_service.correct_status(
corrected_payload, ts_utc + 1000, "admin_user"
)
assert "old_status_map" in corrected_event.entity_refs
assert corrected_event.entity_refs["old_status_map"][animal_id] == "alive"