Add 5 animal lifecycle event handlers with TDD: - HatchRecorded: Creates hatchling animals at brood/event location - AnimalOutcome: Records death/harvest/sold with yields, status updates - AnimalPromoted: Sets identified flag, nickname, optionally updates sex/repro_status - AnimalMerged: Merges animal records, creates aliases, removes merged from live roster - AnimalStatusCorrected: Admin-only status correction with required reason All events include: - Projection handlers in animal_registry.py and intervals.py - Event-animal linking in event_animals.py - Service methods with validation in animal.py - 51 unit tests covering event creation, projections, and validation - E2E test #7 (harvest with yields) per spec §21.7 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
1163 lines
48 KiB
Python
1163 lines
48 KiB
Python
# ABOUTME: Tests for animal lifecycle events in AnimalService.
|
|
# ABOUTME: Covers HatchRecorded, AnimalOutcome, AnimalPromoted, AnimalMerged, AnimalStatusCorrected.
|
|
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from animaltrack.events.payloads import (
|
|
AnimalCohortCreatedPayload,
|
|
AnimalMergedPayload,
|
|
AnimalOutcomePayload,
|
|
AnimalPromotedPayload,
|
|
AnimalStatusCorrectedPayload,
|
|
HatchRecordedPayload,
|
|
)
|
|
from animaltrack.events.store import EventStore
|
|
from animaltrack.events.types import (
|
|
ANIMAL_MERGED,
|
|
ANIMAL_OUTCOME,
|
|
ANIMAL_PROMOTED,
|
|
ANIMAL_STATUS_CORRECTED,
|
|
HATCH_RECORDED,
|
|
)
|
|
from animaltrack.projections import ProjectionRegistry
|
|
from animaltrack.projections.animal_registry import AnimalRegistryProjection
|
|
from animaltrack.projections.event_animals import EventAnimalsProjection
|
|
from animaltrack.projections.intervals import IntervalProjection
|
|
from animaltrack.services.animal import AnimalService, ValidationError
|
|
|
|
|
|
@pytest.fixture
|
|
def event_store(seeded_db):
|
|
"""Create an EventStore for testing."""
|
|
return EventStore(seeded_db)
|
|
|
|
|
|
@pytest.fixture
|
|
def projection_registry(seeded_db):
|
|
"""Create a ProjectionRegistry with all animal projections registered."""
|
|
registry = ProjectionRegistry()
|
|
registry.register(AnimalRegistryProjection(seeded_db))
|
|
registry.register(EventAnimalsProjection(seeded_db))
|
|
registry.register(IntervalProjection(seeded_db))
|
|
return registry
|
|
|
|
|
|
@pytest.fixture
|
|
def animal_service(seeded_db, event_store, projection_registry):
|
|
"""Create an AnimalService for testing."""
|
|
return AnimalService(seeded_db, event_store, projection_registry)
|
|
|
|
|
|
@pytest.fixture
|
|
def strip1_id(seeded_db):
|
|
"""Get Strip 1 location ID from seeds."""
|
|
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
|
|
return row[0]
|
|
|
|
|
|
@pytest.fixture
|
|
def strip2_id(seeded_db):
|
|
"""Get Strip 2 location ID from seeds."""
|
|
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
|
|
return row[0]
|
|
|
|
|
|
def make_cohort_payload(
|
|
location_id: str,
|
|
count: int = 5,
|
|
species: str = "duck",
|
|
life_stage: str = "adult",
|
|
sex: str = "female",
|
|
origin: str = "purchased",
|
|
) -> AnimalCohortCreatedPayload:
|
|
"""Create a cohort payload for testing."""
|
|
return AnimalCohortCreatedPayload(
|
|
species=species,
|
|
count=count,
|
|
life_stage=life_stage,
|
|
sex=sex,
|
|
location_id=location_id,
|
|
origin=origin,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# HatchRecorded Tests
|
|
# =============================================================================
|
|
|
|
|
|
def make_hatch_payload(
|
|
location_id: str,
|
|
hatched_live: int = 5,
|
|
species: str = "duck",
|
|
assigned_brood_location_id: str | None = None,
|
|
) -> HatchRecordedPayload:
|
|
"""Create a hatch recorded payload for testing."""
|
|
return HatchRecordedPayload(
|
|
species=species,
|
|
location_id=location_id,
|
|
assigned_brood_location_id=assigned_brood_location_id,
|
|
hatched_live=hatched_live,
|
|
)
|
|
|
|
|
|
class TestHatchRecorded:
|
|
"""Tests for record_hatch()."""
|
|
|
|
def test_creates_hatch_recorded_event(self, seeded_db, animal_service, strip1_id):
|
|
"""record_hatch creates a HATCH_RECORDED event."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
assert event.type == HATCH_RECORDED
|
|
assert event.actor == "test_user"
|
|
assert event.ts_utc == ts_utc
|
|
|
|
def test_event_has_animal_ids_in_entity_refs(self, seeded_db, animal_service, strip1_id):
|
|
"""Event entity_refs contains generated animal_ids matching hatched_live count."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=5)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
assert "animal_ids" in event.entity_refs
|
|
assert len(event.entity_refs["animal_ids"]) == 5
|
|
# Verify all IDs are ULIDs (26 chars)
|
|
for animal_id in event.entity_refs["animal_ids"]:
|
|
assert len(animal_id) == 26
|
|
|
|
def test_uses_location_id_when_no_brood_location(self, seeded_db, animal_service, strip1_id):
|
|
"""Animals are placed at location_id when assigned_brood_location_id is None."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
assert event.entity_refs["location_id"] == strip1_id
|
|
|
|
# Verify animals are at strip1
|
|
for animal_id in event.entity_refs["animal_ids"]:
|
|
row = seeded_db.execute(
|
|
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == strip1_id
|
|
|
|
def test_uses_assigned_brood_location_when_provided(
|
|
self, seeded_db, animal_service, strip1_id, strip2_id
|
|
):
|
|
"""Animals are placed at assigned_brood_location_id if provided."""
|
|
payload = make_hatch_payload(
|
|
strip1_id, hatched_live=3, assigned_brood_location_id=strip2_id
|
|
)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
assert event.entity_refs["location_id"] == strip2_id
|
|
|
|
# Verify animals are at strip2 (brood location), not strip1
|
|
for animal_id in event.entity_refs["animal_ids"]:
|
|
row = seeded_db.execute(
|
|
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == strip2_id
|
|
|
|
def test_animals_created_as_hatchlings(self, seeded_db, animal_service, strip1_id):
|
|
"""Animals are created with life_stage=hatchling, sex=unknown, status=alive."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
for animal_id in event.entity_refs["animal_ids"]:
|
|
row = seeded_db.execute(
|
|
"SELECT life_stage, sex, status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "hatchling"
|
|
assert row[1] == "unknown"
|
|
assert row[2] == "alive"
|
|
|
|
def test_animals_have_origin_hatched(self, seeded_db, animal_service, strip1_id):
|
|
"""Animals are created with origin=hatched."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
animal_id = event.entity_refs["animal_ids"][0]
|
|
row = seeded_db.execute(
|
|
"SELECT origin FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "hatched"
|
|
|
|
def test_animals_in_live_animals_by_location(self, seeded_db, animal_service, strip1_id):
|
|
"""Animals are inserted into live_animals_by_location."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=4)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0]
|
|
assert count == 4
|
|
|
|
# Verify each animal is in live roster
|
|
for animal_id in event.entity_refs["animal_ids"]:
|
|
row = seeded_db.execute(
|
|
"SELECT animal_id FROM live_animals_by_location WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row is not None
|
|
|
|
def test_location_intervals_created(self, seeded_db, animal_service, strip1_id):
|
|
"""Location intervals are created for each animal."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0]
|
|
assert count == 3
|
|
|
|
def test_attr_intervals_created(self, seeded_db, animal_service, strip1_id):
|
|
"""Attribute intervals (sex, life_stage, repro_status, status) are created."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
# 2 animals * 4 attrs = 8 intervals
|
|
count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0]
|
|
assert count == 8
|
|
|
|
def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id):
|
|
"""Event-animal links are created in event_animals table."""
|
|
payload = make_hatch_payload(strip1_id, hatched_live=4)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
|
|
(event.id,),
|
|
).fetchone()[0]
|
|
assert count == 4
|
|
|
|
def test_validates_location_exists(self, seeded_db, animal_service):
|
|
"""Raises ValidationError if location_id doesn't exist."""
|
|
fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
|
|
payload = make_hatch_payload(fake_location_id, hatched_live=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
def test_validates_brood_location_exists(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if assigned_brood_location_id doesn't exist."""
|
|
fake_brood_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
|
|
payload = make_hatch_payload(
|
|
strip1_id, hatched_live=1, assigned_brood_location_id=fake_brood_id
|
|
)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
def test_validates_species_active(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if species is not active."""
|
|
# Deactivate duck species
|
|
seeded_db.execute("UPDATE species SET active = 0 WHERE code = 'duck'")
|
|
|
|
payload = make_hatch_payload(strip1_id, hatched_live=1, species="duck")
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
with pytest.raises(ValidationError, match="not active"):
|
|
animal_service.record_hatch(payload, ts_utc, "test_user")
|
|
|
|
|
|
# =============================================================================
|
|
# AnimalOutcome Tests
|
|
# =============================================================================
|
|
|
|
|
|
def make_outcome_payload(
|
|
resolved_ids: list[str],
|
|
outcome: str = "harvest",
|
|
reason: str | None = None,
|
|
yield_items: list | None = None,
|
|
) -> AnimalOutcomePayload:
|
|
"""Create an outcome payload for testing."""
|
|
return AnimalOutcomePayload(
|
|
outcome=outcome,
|
|
resolved_ids=resolved_ids,
|
|
reason=reason,
|
|
yield_items=yield_items,
|
|
)
|
|
|
|
|
|
class TestAnimalOutcome:
|
|
"""Tests for record_outcome()."""
|
|
|
|
def test_creates_animal_outcome_event(self, seeded_db, animal_service, strip1_id):
|
|
"""record_outcome creates an ANIMAL_OUTCOME event."""
|
|
# First create animals
|
|
cohort_payload = make_cohort_payload(strip1_id, count=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
# Record outcome
|
|
outcome_payload = make_outcome_payload(animal_ids, outcome="harvest")
|
|
outcome_ts = ts_utc + 1000
|
|
outcome_event = animal_service.record_outcome(outcome_payload, outcome_ts, "test_user")
|
|
|
|
assert outcome_event.type == ANIMAL_OUTCOME
|
|
assert outcome_event.actor == "test_user"
|
|
assert outcome_event.ts_utc == outcome_ts
|
|
|
|
def test_updates_status_for_harvest(self, seeded_db, animal_service, strip1_id):
|
|
"""Harvest outcome sets status=harvested in animal_registry."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
outcome_payload = make_outcome_payload(animal_ids, outcome="harvest")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
for animal_id in animal_ids:
|
|
row = seeded_db.execute(
|
|
"SELECT status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "harvested"
|
|
|
|
def test_updates_status_for_death(self, seeded_db, animal_service, strip1_id):
|
|
"""Death outcome sets status=dead in animal_registry."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "dead"
|
|
|
|
def test_updates_status_for_sold(self, seeded_db, animal_service, strip1_id):
|
|
"""Sold outcome sets status=sold in animal_registry."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="sold")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "sold"
|
|
|
|
def test_updates_status_for_predator_loss(self, seeded_db, animal_service, strip1_id):
|
|
"""Predator loss outcome sets status=dead in animal_registry."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="predator_loss")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "dead"
|
|
|
|
def test_removes_from_live_animals(self, seeded_db, animal_service, strip1_id):
|
|
"""Animals are removed from live_animals_by_location."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
# Verify animals are in live roster
|
|
count_before = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM live_animals_by_location"
|
|
).fetchone()[0]
|
|
assert count_before == 3
|
|
|
|
# Record outcome for 2 of them
|
|
outcome_payload = make_outcome_payload(animal_ids[:2], outcome="harvest")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Should have 1 left
|
|
count_after = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[
|
|
0
|
|
]
|
|
assert count_after == 1
|
|
|
|
def test_closes_location_interval(self, seeded_db, animal_service, strip1_id):
|
|
"""Open location interval is closed with end_utc=ts_utc."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
outcome_ts = ts_utc + 1000
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, outcome_ts, "test_user")
|
|
|
|
# Location interval should be closed
|
|
row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_location_intervals
|
|
WHERE animal_id = ? AND location_id = ?""",
|
|
(animal_id, strip1_id),
|
|
).fetchone()
|
|
assert row[0] == outcome_ts
|
|
|
|
def test_closes_status_interval_and_creates_new(self, seeded_db, animal_service, strip1_id):
|
|
"""Open status='alive' interval is closed, new status interval created."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
outcome_ts = ts_utc + 1000
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="harvest")
|
|
animal_service.record_outcome(outcome_payload, outcome_ts, "test_user")
|
|
|
|
# Alive interval should be closed
|
|
alive_row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'""",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert alive_row[0] == outcome_ts
|
|
|
|
# Harvested interval should be open
|
|
harvested_row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'status' AND value = 'harvested'""",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert harvested_row[0] is None
|
|
|
|
def test_stores_yield_items_in_payload(self, seeded_db, animal_service, strip1_id):
|
|
"""yield_items are stored in the event payload."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
yield_items = [
|
|
{
|
|
"product_code": "meat.part.breast.duck",
|
|
"unit": "kg",
|
|
"quantity": 2,
|
|
"weight_kg": 1.4,
|
|
},
|
|
{"product_code": "fat.rendered.duck", "unit": "kg", "quantity": 1, "weight_kg": 0.3},
|
|
]
|
|
outcome_payload = make_outcome_payload(
|
|
animal_ids, outcome="harvest", yield_items=yield_items
|
|
)
|
|
outcome_event = animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
assert outcome_event.payload["yield_items"] is not None
|
|
assert len(outcome_event.payload["yield_items"]) == 2
|
|
assert outcome_event.payload["yield_items"][0]["product_code"] == "meat.part.breast.duck"
|
|
|
|
def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id):
|
|
"""Event-animal links are created for all resolved_ids."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
outcome_payload = make_outcome_payload(animal_ids, outcome="harvest")
|
|
outcome_event = animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
|
|
(outcome_event.id,),
|
|
).fetchone()[0]
|
|
assert count == 3
|
|
|
|
def test_validates_animals_exist(self, seeded_db, animal_service):
|
|
"""Raises ValidationError if any animal_id doesn't exist."""
|
|
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
|
|
outcome_payload = make_outcome_payload([fake_animal_id], outcome="death")
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
animal_service.record_outcome(outcome_payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_validates_animals_alive(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if any animal is not status=alive."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# First outcome
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Try to record another outcome on dead animal
|
|
with pytest.raises(ValidationError, match="not alive"):
|
|
animal_service.record_outcome(
|
|
make_outcome_payload([animal_id], outcome="harvest"),
|
|
ts_utc + 2000,
|
|
"test_user",
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# AnimalPromoted Tests
|
|
# =============================================================================
|
|
|
|
|
|
def make_promoted_payload(
|
|
animal_id: str,
|
|
nickname: str | None = None,
|
|
sex: str | None = None,
|
|
repro_status: str | None = None,
|
|
) -> AnimalPromotedPayload:
|
|
"""Create a promoted payload for testing."""
|
|
return AnimalPromotedPayload(
|
|
animal_id=animal_id,
|
|
nickname=nickname,
|
|
sex=sex,
|
|
repro_status=repro_status,
|
|
)
|
|
|
|
|
|
class TestAnimalPromoted:
|
|
"""Tests for promote_animal()."""
|
|
|
|
def test_creates_animal_promoted_event(self, seeded_db, animal_service, strip1_id):
|
|
"""promote_animal creates an ANIMAL_PROMOTED event."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
|
|
promoted_event = animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
|
|
|
|
assert promoted_event.type == ANIMAL_PROMOTED
|
|
assert promoted_event.actor == "test_user"
|
|
|
|
def test_sets_identified_flag(self, seeded_db, animal_service, strip1_id):
|
|
"""promote_animal sets identified=1 in both registry tables."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# Verify initially not identified
|
|
row = seeded_db.execute(
|
|
"SELECT identified FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == 0
|
|
|
|
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
|
|
animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Check both tables
|
|
row = seeded_db.execute(
|
|
"SELECT identified FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == 1
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT identified FROM live_animals_by_location WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == 1
|
|
|
|
def test_sets_nickname(self, seeded_db, animal_service, strip1_id):
|
|
"""promote_animal sets nickname in both registry tables."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
|
|
animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT nickname FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "Daisy"
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT nickname FROM live_animals_by_location WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "Daisy"
|
|
|
|
def test_updates_sex_and_creates_interval(self, seeded_db, animal_service, strip1_id):
|
|
"""promote_animal updates sex and creates new sex interval."""
|
|
# Create hatchling with unknown sex
|
|
hatch_payload = make_hatch_payload(strip1_id, hatched_live=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
hatch_event = animal_service.record_hatch(hatch_payload, ts_utc, "test_user")
|
|
animal_id = hatch_event.entity_refs["animal_ids"][0]
|
|
|
|
# Verify sex is unknown
|
|
row = seeded_db.execute(
|
|
"SELECT sex FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "unknown"
|
|
|
|
# Promote with sex=female
|
|
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy", sex="female")
|
|
promote_ts = ts_utc + 1000
|
|
animal_service.promote_animal(promoted_payload, promote_ts, "test_user")
|
|
|
|
# Verify sex updated
|
|
row = seeded_db.execute(
|
|
"SELECT sex FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "female"
|
|
|
|
# Verify old interval closed
|
|
row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'sex' AND value = 'unknown'""",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == promote_ts
|
|
|
|
# Verify new interval open
|
|
row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'sex' AND value = 'female'""",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] is None
|
|
|
|
def test_updates_repro_status_and_creates_interval(self, seeded_db, animal_service, strip1_id):
|
|
"""promote_animal updates repro_status and creates new interval."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1, sex="female")
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy", repro_status="intact")
|
|
promote_ts = ts_utc + 1000
|
|
animal_service.promote_animal(promoted_payload, promote_ts, "test_user")
|
|
|
|
# Verify repro_status updated
|
|
row = seeded_db.execute(
|
|
"SELECT repro_status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "intact"
|
|
|
|
# Verify new repro_status interval exists
|
|
row = seeded_db.execute(
|
|
"""SELECT start_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'repro_status' AND value = 'intact'""",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row is not None
|
|
|
|
def test_event_animal_link_created(self, seeded_db, animal_service, strip1_id):
|
|
"""Event-animal link is created for the promoted animal."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
promoted_payload = make_promoted_payload(animal_id, nickname="Daisy")
|
|
promoted_event = animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
|
|
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
|
|
(promoted_event.id,),
|
|
).fetchone()[0]
|
|
assert count == 1
|
|
|
|
def test_validates_animal_exists(self, seeded_db, animal_service):
|
|
"""Raises ValidationError if animal_id doesn't exist."""
|
|
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
|
|
promoted_payload = make_promoted_payload(fake_animal_id, nickname="Ghost")
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
animal_service.promote_animal(promoted_payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_validates_animal_alive(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if animal is not alive."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# Kill the animal
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Try to promote dead animal
|
|
promoted_payload = make_promoted_payload(animal_id, nickname="Ghost")
|
|
with pytest.raises(ValidationError, match="not alive"):
|
|
animal_service.promote_animal(promoted_payload, ts_utc + 2000, "test_user")
|
|
|
|
def test_validates_nickname_unique_among_active(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if nickname is already used by another active animal."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
# Promote first animal with nickname
|
|
promoted_payload = make_promoted_payload(animal_ids[0], nickname="Daisy")
|
|
animal_service.promote_animal(promoted_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Try to promote second animal with same nickname
|
|
duplicate_payload = make_promoted_payload(animal_ids[1], nickname="Daisy")
|
|
with pytest.raises(ValidationError, match="(?i)nickname.*already in use"):
|
|
animal_service.promote_animal(duplicate_payload, ts_utc + 2000, "test_user")
|
|
|
|
|
|
# =============================================================================
|
|
# AnimalMerged Tests
|
|
# =============================================================================
|
|
|
|
|
|
def make_merged_payload(
|
|
survivor_animal_id: str,
|
|
merged_animal_ids: list[str],
|
|
notes: str | None = None,
|
|
) -> AnimalMergedPayload:
|
|
"""Create a merged payload for testing."""
|
|
return AnimalMergedPayload(
|
|
survivor_animal_id=survivor_animal_id,
|
|
merged_animal_ids=merged_animal_ids,
|
|
notes=notes,
|
|
)
|
|
|
|
|
|
class TestAnimalMerged:
|
|
"""Tests for merge_animals()."""
|
|
|
|
def test_creates_animal_merged_event(self, seeded_db, animal_service, strip1_id):
|
|
"""merge_animals creates an ANIMAL_MERGED event."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
|
|
)
|
|
merged_event = animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
|
|
|
|
assert merged_event.type == ANIMAL_MERGED
|
|
assert merged_event.actor == "test_user"
|
|
|
|
def test_merged_animals_status_merged_into(self, seeded_db, animal_service, strip1_id):
|
|
"""Merged animals get status=merged_into."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
|
|
)
|
|
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Survivor should still be alive
|
|
row = seeded_db.execute(
|
|
"SELECT status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_ids[0],),
|
|
).fetchone()
|
|
assert row[0] == "alive"
|
|
|
|
# Merged animals should be merged_into
|
|
for merged_id in animal_ids[1:]:
|
|
row = seeded_db.execute(
|
|
"SELECT status FROM animal_registry WHERE animal_id = ?",
|
|
(merged_id,),
|
|
).fetchone()
|
|
assert row[0] == "merged_into"
|
|
|
|
def test_removes_merged_from_live_animals(self, seeded_db, animal_service, strip1_id):
|
|
"""Merged animals are removed from live_animals_by_location."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=4)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
count_before = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM live_animals_by_location"
|
|
).fetchone()[0]
|
|
assert count_before == 4
|
|
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
|
|
)
|
|
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Only survivor should remain
|
|
count_after = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[
|
|
0
|
|
]
|
|
assert count_after == 1
|
|
|
|
def test_creates_alias_records(self, seeded_db, animal_service, strip1_id):
|
|
"""Alias records are created mapping merged IDs to survivor."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
|
|
)
|
|
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Verify alias records
|
|
for merged_id in animal_ids[1:]:
|
|
row = seeded_db.execute(
|
|
"""SELECT survivor_animal_id FROM animal_aliases
|
|
WHERE alias_animal_id = ?""",
|
|
(merged_id,),
|
|
).fetchone()
|
|
assert row is not None
|
|
assert row[0] == animal_ids[0]
|
|
|
|
def test_closes_location_and_status_intervals(self, seeded_db, animal_service, strip1_id):
|
|
"""Merged animals get location and status intervals closed."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
merge_ts = ts_utc + 1000
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=[animal_ids[1]]
|
|
)
|
|
animal_service.merge_animals(merged_payload, merge_ts, "test_user")
|
|
|
|
# Location interval closed
|
|
row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_location_intervals
|
|
WHERE animal_id = ? AND end_utc IS NOT NULL""",
|
|
(animal_ids[1],),
|
|
).fetchone()
|
|
assert row is not None
|
|
assert row[0] == merge_ts
|
|
|
|
# Status alive interval closed
|
|
row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'""",
|
|
(animal_ids[1],),
|
|
).fetchone()
|
|
assert row[0] == merge_ts
|
|
|
|
# merged_into status interval created
|
|
row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'status' AND value = 'merged_into'""",
|
|
(animal_ids[1],),
|
|
).fetchone()
|
|
assert row[0] is None
|
|
|
|
def test_event_animal_links_include_all(self, seeded_db, animal_service, strip1_id):
|
|
"""Event-animal links include survivor and all merged animals."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
|
|
)
|
|
merged_event = animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
|
|
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
|
|
(merged_event.id,),
|
|
).fetchone()[0]
|
|
assert count == 3 # survivor + 2 merged
|
|
|
|
def test_validates_survivor_not_in_merged_list(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if survivor is in merged list."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
# Include survivor in merged list
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids
|
|
)
|
|
with pytest.raises(ValidationError, match="(?i)survivor.*cannot be in.*merged"):
|
|
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
|
|
|
|
def test_validates_all_animals_exist(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if any animal doesn't exist."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_id, merged_animal_ids=[fake_id]
|
|
)
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
animal_service.merge_animals(merged_payload, ts_utc + 1000, "test_user")
|
|
|
|
def test_validates_all_animals_alive(self, seeded_db, animal_service, strip1_id):
|
|
"""Raises ValidationError if any animal is not alive."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=3)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
# Kill one of the animals to merge
|
|
outcome_payload = make_outcome_payload([animal_ids[2]], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Try to merge including dead animal
|
|
merged_payload = make_merged_payload(
|
|
survivor_animal_id=animal_ids[0], merged_animal_ids=animal_ids[1:]
|
|
)
|
|
with pytest.raises(ValidationError, match="not alive"):
|
|
animal_service.merge_animals(merged_payload, ts_utc + 2000, "test_user")
|
|
|
|
|
|
# =============================================================================
|
|
# AnimalStatusCorrected Tests
|
|
# =============================================================================
|
|
|
|
|
|
def make_status_corrected_payload(
|
|
resolved_ids: list[str],
|
|
new_status: str,
|
|
reason: str,
|
|
notes: str | None = None,
|
|
) -> AnimalStatusCorrectedPayload:
|
|
"""Create a status corrected payload for testing."""
|
|
return AnimalStatusCorrectedPayload(
|
|
resolved_ids=resolved_ids,
|
|
new_status=new_status,
|
|
reason=reason,
|
|
notes=notes,
|
|
)
|
|
|
|
|
|
class TestAnimalStatusCorrected:
|
|
"""Tests for correct_status()."""
|
|
|
|
def test_creates_animal_status_corrected_event(self, seeded_db, animal_service, strip1_id):
|
|
"""correct_status creates an ANIMAL_STATUS_CORRECTED event."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# First record as dead
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Correct to alive
|
|
corrected_payload = make_status_corrected_payload(
|
|
[animal_id], "alive", "Misidentified animal, still alive"
|
|
)
|
|
corrected_event = animal_service.correct_status(
|
|
corrected_payload, ts_utc + 2000, "admin_user"
|
|
)
|
|
|
|
assert corrected_event.type == ANIMAL_STATUS_CORRECTED
|
|
assert corrected_event.actor == "admin_user"
|
|
|
|
def test_updates_status_in_registry(self, seeded_db, animal_service, strip1_id):
|
|
"""correct_status updates status in animal_registry."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# Mark as sold
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="sold")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Correct to dead
|
|
corrected_payload = make_status_corrected_payload(
|
|
[animal_id], "dead", "Animal actually died, not sold"
|
|
)
|
|
animal_service.correct_status(corrected_payload, ts_utc + 2000, "admin_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT status FROM animal_registry WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == "dead"
|
|
|
|
def test_restores_to_live_animals_when_correcting_to_alive(
|
|
self, seeded_db, animal_service, strip1_id
|
|
):
|
|
"""Correcting to alive restores animal to live_animals_by_location."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# Mark as dead
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Verify removed from live roster
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()[0]
|
|
assert count == 0
|
|
|
|
# Correct to alive
|
|
corrected_payload = make_status_corrected_payload(
|
|
[animal_id], "alive", "Was misidentified, still alive"
|
|
)
|
|
animal_service.correct_status(corrected_payload, ts_utc + 2000, "admin_user")
|
|
|
|
# Verify restored to live roster
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()[0]
|
|
assert count == 1
|
|
|
|
def test_removes_from_live_animals_when_correcting_from_alive(
|
|
self, seeded_db, animal_service, strip1_id
|
|
):
|
|
"""Correcting from alive removes animal from live_animals_by_location."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# Verify in live roster
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()[0]
|
|
assert count == 1
|
|
|
|
# Correct to dead (bypassing normal outcome event)
|
|
corrected_payload = make_status_corrected_payload(
|
|
[animal_id], "dead", "Animal found dead, correcting records"
|
|
)
|
|
animal_service.correct_status(corrected_payload, ts_utc + 1000, "admin_user")
|
|
|
|
# Verify removed from live roster
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
|
|
(animal_id,),
|
|
).fetchone()[0]
|
|
assert count == 0
|
|
|
|
def test_creates_status_interval_change(self, seeded_db, animal_service, strip1_id):
|
|
"""correct_status closes old status interval and creates new one."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
# Mark as dead
|
|
outcome_payload = make_outcome_payload([animal_id], outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
correct_ts = ts_utc + 2000
|
|
corrected_payload = make_status_corrected_payload([animal_id], "alive", "Misidentified")
|
|
animal_service.correct_status(corrected_payload, correct_ts, "admin_user")
|
|
|
|
# dead interval should be closed
|
|
row = seeded_db.execute(
|
|
"""SELECT end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'status' AND value = 'dead'""",
|
|
(animal_id,),
|
|
).fetchone()
|
|
assert row[0] == correct_ts
|
|
|
|
# New alive interval should be open
|
|
row = seeded_db.execute(
|
|
"""SELECT start_utc, end_utc FROM animal_attr_intervals
|
|
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'
|
|
AND start_utc = ?""",
|
|
(animal_id, correct_ts),
|
|
).fetchone()
|
|
assert row is not None
|
|
assert row[1] is None
|
|
|
|
def test_event_animal_links_created(self, seeded_db, animal_service, strip1_id):
|
|
"""Event-animal links are created for all corrected animals."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=2)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_ids = cohort_event.entity_refs["animal_ids"]
|
|
|
|
# Mark both as dead
|
|
outcome_payload = make_outcome_payload(animal_ids, outcome="death")
|
|
animal_service.record_outcome(outcome_payload, ts_utc + 1000, "test_user")
|
|
|
|
# Correct both to alive
|
|
corrected_payload = make_status_corrected_payload(
|
|
animal_ids, "alive", "Both were misidentified"
|
|
)
|
|
corrected_event = animal_service.correct_status(
|
|
corrected_payload, ts_utc + 2000, "admin_user"
|
|
)
|
|
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
|
|
(corrected_event.id,),
|
|
).fetchone()[0]
|
|
assert count == 2
|
|
|
|
def test_validates_animals_exist(self, seeded_db, animal_service):
|
|
"""Raises ValidationError if any animal doesn't exist."""
|
|
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
|
|
corrected_payload = make_status_corrected_payload([fake_id], "alive", "Testing")
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
animal_service.correct_status(corrected_payload, int(time.time() * 1000), "admin_user")
|
|
|
|
def test_stores_old_status_in_entity_refs(self, seeded_db, animal_service, strip1_id):
|
|
"""Event entity_refs contains old_status_map for revert."""
|
|
cohort_payload = make_cohort_payload(strip1_id, count=1)
|
|
ts_utc = int(time.time() * 1000)
|
|
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
|
|
animal_id = cohort_event.entity_refs["animal_ids"][0]
|
|
|
|
corrected_payload = make_status_corrected_payload([animal_id], "dead", "Correcting status")
|
|
corrected_event = animal_service.correct_status(
|
|
corrected_payload, ts_utc + 1000, "admin_user"
|
|
)
|
|
|
|
assert "old_status_map" in corrected_event.entity_refs
|
|
assert corrected_event.entity_refs["old_status_map"][animal_id] == "alive"
|