Files
animaltrack/tests/test_service_animal.py
Petru Paler dc7700da20 feat: add animal attributes update projection and service
Implement AnimalAttributesUpdated event handling:
- Update IntervalProjection to close old attr intervals and open new ones
- Update AnimalRegistryProjection to update registry tables
- Update EventAnimalsProjection to track event-animal links
- Add update_attributes() to AnimalService

Only attributes that actually change create new intervals.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 07:18:17 +00:00

743 lines
31 KiB
Python

# ABOUTME: Tests for AnimalService.
# ABOUTME: Integration tests for cohort creation with full transaction.
import time
import pytest
from animaltrack.events.payloads import AnimalCohortCreatedPayload
from animaltrack.events.store import EventStore
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.services.animal import AnimalService, ValidationError
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with all cohort projections registered."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, event_store, projection_registry):
"""Create an AnimalService for testing."""
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def valid_location_id(seeded_db):
"""Get a valid location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
def make_payload(
location_id: str,
count: int = 5,
species: str = "duck",
life_stage: str = "adult",
sex: str = "unknown",
origin: str = "purchased",
) -> AnimalCohortCreatedPayload:
"""Create a cohort payload for testing."""
return AnimalCohortCreatedPayload(
species=species,
count=count,
life_stage=life_stage,
sex=sex,
location_id=location_id,
origin=origin,
)
class TestAnimalServiceCreateCohort:
"""Tests for create_cohort()."""
def test_creates_event(self, seeded_db, animal_service, valid_location_id):
"""create_cohort creates an AnimalCohortCreated event."""
payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
assert event.type == ANIMAL_COHORT_CREATED
assert event.actor == "test_user"
assert event.ts_utc == ts_utc
def test_event_has_animal_ids_in_entity_refs(
self, seeded_db, animal_service, valid_location_id
):
"""Event entity_refs contains generated animal_ids list."""
payload = make_payload(valid_location_id, count=5)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
assert "animal_ids" in event.entity_refs
assert len(event.entity_refs["animal_ids"]) == 5
# Verify all IDs are ULIDs (26 chars)
for animal_id in event.entity_refs["animal_ids"]:
assert len(animal_id) == 26
def test_event_has_location_in_entity_refs(self, seeded_db, animal_service, valid_location_id):
"""Event entity_refs contains location_id."""
payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
assert "location_id" in event.entity_refs
assert event.entity_refs["location_id"] == valid_location_id
def test_animals_created_in_registry(self, seeded_db, animal_service, valid_location_id):
"""Animals are created in animal_registry table."""
payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
# Check animals exist in registry
count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0]
assert count == 3
# Check each generated animal_id is in the registry
for animal_id in event.entity_refs["animal_ids"]:
row = seeded_db.execute(
"SELECT animal_id FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row is not None
def test_correct_number_of_animals_created(self, seeded_db, animal_service, valid_location_id):
"""Number of animals matches payload.count."""
payload = make_payload(valid_location_id, count=7)
ts_utc = int(time.time() * 1000)
animal_service.create_cohort(payload, ts_utc, "test_user")
count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0]
assert count == 7
def test_event_animal_links_created(self, seeded_db, animal_service, valid_location_id):
"""Event-animal links are created."""
payload = make_payload(valid_location_id, count=4)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
# Check event_animals has 4 rows
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(event.id,),
).fetchone()[0]
assert count == 4
def test_location_intervals_created(self, seeded_db, animal_service, valid_location_id):
"""Location intervals are created for each animal."""
payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
animal_service.create_cohort(payload, ts_utc, "test_user")
count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0]
assert count == 3
def test_attr_intervals_created(self, seeded_db, animal_service, valid_location_id):
"""Attribute intervals are created for each animal."""
payload = make_payload(valid_location_id, count=2)
ts_utc = int(time.time() * 1000)
animal_service.create_cohort(payload, ts_utc, "test_user")
# 2 animals * 4 attrs = 8 intervals
count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0]
assert count == 8
def test_live_animals_created(self, seeded_db, animal_service, valid_location_id):
"""Live animals are created in live_animals_by_location."""
payload = make_payload(valid_location_id, count=5)
ts_utc = int(time.time() * 1000)
animal_service.create_cohort(payload, ts_utc, "test_user")
count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0]
assert count == 5
def test_event_stored_in_events_table(self, seeded_db, animal_service, valid_location_id):
"""Event is stored in events table."""
payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
# Verify event exists in database
row = seeded_db.execute(
"SELECT id FROM events WHERE id = ?",
(event.id,),
).fetchone()
assert row is not None
class TestAnimalServiceValidation:
"""Tests for create_cohort() validation."""
def test_rejects_nonexistent_location(self, seeded_db, animal_service):
"""Raises ValidationError for non-existent location_id."""
# Use a valid ULID format but non-existent location
fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
payload = make_payload(fake_location_id, count=1)
ts_utc = int(time.time() * 1000)
with pytest.raises(ValidationError, match="not found"):
animal_service.create_cohort(payload, ts_utc, "test_user")
def test_rejects_archived_location(self, seeded_db, animal_service):
"""Raises ValidationError for archived location."""
# First, create and archive a location
from animaltrack.id_gen import generate_id
location_id = generate_id()
ts = int(time.time() * 1000)
seeded_db.execute(
"""INSERT INTO locations (id, name, active, created_at_utc, updated_at_utc)
VALUES (?, 'Archived Test', 0, ?, ?)""",
(location_id, ts, ts),
)
payload = make_payload(location_id, count=1)
ts_utc = int(time.time() * 1000)
with pytest.raises(ValidationError, match="archived"):
animal_service.create_cohort(payload, ts_utc, "test_user")
def test_rejects_inactive_species(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError for inactive species."""
# First, deactivate duck species
seeded_db.execute("UPDATE species SET active = 0 WHERE code = 'duck'")
payload = make_payload(valid_location_id, count=1, species="duck")
ts_utc = int(time.time() * 1000)
with pytest.raises(ValidationError, match="not active"):
animal_service.create_cohort(payload, ts_utc, "test_user")
class TestAnimalServiceTransactionIntegrity:
"""Tests for transaction integrity."""
def test_no_partial_data_on_projection_error(self, seeded_db, event_store, valid_location_id):
"""If projection fails, event is not persisted."""
# Create a registry with a failing projection
from animaltrack.projections import Projection, ProjectionError
class FailingProjection(Projection):
def get_event_types(self):
return [ANIMAL_COHORT_CREATED]
def apply(self, event):
raise ProjectionError("Intentional failure")
def revert(self, event):
pass
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(FailingProjection(seeded_db))
service = AnimalService(seeded_db, event_store, registry)
payload = make_payload(valid_location_id, count=2)
ts_utc = int(time.time() * 1000)
with pytest.raises(ProjectionError):
service.create_cohort(payload, ts_utc, "test_user")
# Verify nothing was persisted
event_count = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0]
assert event_count == 0
animal_count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0]
assert animal_count == 0
# =============================================================================
# move_animals Tests
# =============================================================================
def make_move_payload(
to_location_id: str,
resolved_ids: list[str],
):
"""Create a move payload for testing."""
from animaltrack.events.payloads import AnimalMovedPayload
return AnimalMovedPayload(
to_location_id=to_location_id,
resolved_ids=resolved_ids,
)
class TestAnimalServiceMoveAnimals:
"""Tests for move_animals()."""
def test_creates_animal_moved_event(self, seeded_db, animal_service, valid_location_id):
"""move_animals creates an AnimalMoved event."""
from animaltrack.events.types import ANIMAL_MOVED
# First create a cohort
cohort_payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Get another location
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
# Move the animals
move_payload = make_move_payload(strip2, animal_ids)
move_ts = ts_utc + 1000
move_event = animal_service.move_animals(move_payload, move_ts, "test_user")
assert move_event.type == ANIMAL_MOVED
assert move_event.actor == "test_user"
assert move_event.ts_utc == move_ts
def test_event_has_animal_ids_in_entity_refs(
self, seeded_db, animal_service, valid_location_id
):
"""Event entity_refs contains animal_ids list."""
cohort_payload = make_payload(valid_location_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
assert "animal_ids" in move_event.entity_refs
assert set(move_event.entity_refs["animal_ids"]) == set(animal_ids)
def test_event_has_from_and_to_location_in_entity_refs(
self, seeded_db, animal_service, valid_location_id
):
"""Event entity_refs contains both from_location_id and to_location_id."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
assert move_event.entity_refs["from_location_id"] == valid_location_id
assert move_event.entity_refs["to_location_id"] == strip2
def test_updates_location_in_registry(self, seeded_db, animal_service, valid_location_id):
"""Animals are moved in animal_registry table."""
cohort_payload = make_payload(valid_location_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
# Check each animal is now at strip2
for animal_id in animal_ids:
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == strip2
def test_creates_location_intervals(self, seeded_db, animal_service, valid_location_id):
"""Move creates new location intervals and closes old ones."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
# Should have 2 location intervals: one closed (strip1), one open (strip2)
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 2
def test_event_animal_links_created(self, seeded_db, animal_service, valid_location_id):
"""Event-animal links are created for move event."""
cohort_payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
# Check event_animals has 3 rows for the move event
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(move_event.id,),
).fetchone()[0]
assert count == 3
class TestAnimalServiceMoveValidation:
"""Tests for move_animals() validation."""
def test_rejects_nonexistent_to_location(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError for non-existent to_location_id."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
move_payload = make_move_payload(fake_location_id, animal_ids)
with pytest.raises(ValidationError, match="not found"):
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
def test_rejects_archived_to_location(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError for archived to_location."""
from animaltrack.id_gen import generate_id
# Create an archived location
archived_id = generate_id()
ts = int(time.time() * 1000)
seeded_db.execute(
"""INSERT INTO locations (id, name, active, created_at_utc, updated_at_utc)
VALUES (?, 'Archived Test', 0, ?, ?)""",
(archived_id, ts, ts),
)
cohort_payload = make_payload(valid_location_id, count=1)
cohort_event = animal_service.create_cohort(cohort_payload, ts, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
move_payload = make_move_payload(archived_id, animal_ids)
with pytest.raises(ValidationError, match="archived"):
animal_service.move_animals(move_payload, ts + 1000, "test_user")
def test_rejects_same_location(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError when moving to the same location."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Try to move to the same location
move_payload = make_move_payload(valid_location_id, animal_ids)
with pytest.raises(ValidationError, match="same location"):
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
def test_rejects_animals_from_multiple_locations(self, seeded_db, animal_service):
"""Raises ValidationError when animals are from different locations."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
strip3 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 3'").fetchone()[0]
ts_utc = int(time.time() * 1000)
# Create a cohort at strip1
cohort1 = animal_service.create_cohort(make_payload(strip1, count=1), ts_utc, "test_user")
animal1 = cohort1.entity_refs["animal_ids"][0]
# Create a cohort at strip2
cohort2 = animal_service.create_cohort(
make_payload(strip2, count=1), ts_utc + 1000, "test_user"
)
animal2 = cohort2.entity_refs["animal_ids"][0]
# Try to move animals from different locations
move_payload = make_move_payload(strip3, [animal1, animal2])
with pytest.raises(ValidationError, match="single location"):
animal_service.move_animals(move_payload, ts_utc + 2000, "test_user")
def test_rejects_nonexistent_animal(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError for non-existent animal_id."""
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
move_payload = make_move_payload(strip2, [fake_animal_id])
with pytest.raises(ValidationError, match="not found"):
animal_service.move_animals(move_payload, int(time.time() * 1000), "test_user")
# =============================================================================
# update_attributes Tests
# =============================================================================
def make_attrs_payload(
resolved_ids: list[str],
sex: str | None = None,
life_stage: str | None = None,
repro_status: str | None = None,
):
"""Create an attributes update payload for testing."""
from animaltrack.events.payloads import AnimalAttributesUpdatedPayload, AttributeSet
attr_set = AttributeSet(sex=sex, life_stage=life_stage, repro_status=repro_status)
return AnimalAttributesUpdatedPayload(
resolved_ids=resolved_ids,
set=attr_set,
)
class TestAnimalServiceUpdateAttributes:
"""Tests for update_attributes()."""
def test_creates_animal_attributes_updated_event(
self, seeded_db, animal_service, valid_location_id
):
"""update_attributes creates an AnimalAttributesUpdated event."""
from animaltrack.events.types import ANIMAL_ATTRIBUTES_UPDATED
# First create a cohort
cohort_payload = make_payload(valid_location_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Update attributes
attrs_payload = make_attrs_payload(animal_ids, sex="female")
attrs_ts = ts_utc + 1000
attrs_event = animal_service.update_attributes(attrs_payload, attrs_ts, "test_user")
assert attrs_event.type == ANIMAL_ATTRIBUTES_UPDATED
assert attrs_event.actor == "test_user"
assert attrs_event.ts_utc == attrs_ts
def test_event_has_animal_ids_in_entity_refs(
self, seeded_db, animal_service, valid_location_id
):
"""Event entity_refs contains animal_ids list."""
cohort_payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
attrs_payload = make_attrs_payload(animal_ids, life_stage="juvenile")
attrs_event = animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
assert "animal_ids" in attrs_event.entity_refs
assert set(attrs_event.entity_refs["animal_ids"]) == set(animal_ids)
def test_updates_sex_in_registry(self, seeded_db, animal_service, valid_location_id):
"""Animals have updated sex in animal_registry table."""
cohort_payload = make_payload(valid_location_id, count=2, sex="unknown")
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
attrs_payload = make_attrs_payload(animal_ids, sex="male")
animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
# Check each animal has updated sex
for animal_id in animal_ids:
row = seeded_db.execute(
"SELECT sex FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "male"
def test_updates_life_stage_in_registry(self, seeded_db, animal_service, valid_location_id):
"""Animals have updated life_stage in animal_registry table."""
cohort_payload = make_payload(valid_location_id, count=1, life_stage="juvenile")
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
attrs_payload = make_attrs_payload(animal_ids, life_stage="adult")
animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT life_stage FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == "adult"
def test_updates_repro_status_in_registry(self, seeded_db, animal_service, valid_location_id):
"""Animals have updated repro_status in animal_registry table."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
attrs_payload = make_attrs_payload(animal_ids, repro_status="intact")
animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT repro_status FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == "intact"
def test_creates_attr_intervals_for_changed_attrs_only(
self, seeded_db, animal_service, valid_location_id
):
"""Only changed attrs create new intervals."""
# Create cohort with sex=unknown, life_stage=adult
cohort_payload = make_payload(valid_location_id, count=1, sex="unknown", life_stage="adult")
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Initial intervals: sex, life_stage, repro_status, status = 4
initial_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert initial_count == 4
# Update only sex (life_stage stays the same)
attrs_payload = make_attrs_payload([animal_id], sex="female")
animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
# Should have 5 intervals: 4 initial + 1 new for sex (old one closed)
new_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert new_count == 5
# Verify old sex interval was closed
closed_sex = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'sex' AND value = 'unknown'""",
(animal_id,),
).fetchone()
assert closed_sex[0] == ts_utc + 1000
# Verify new sex interval is open
open_sex = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'sex' AND value = 'female'""",
(animal_id,),
).fetchone()
assert open_sex[0] is None
def test_event_animal_links_created(self, seeded_db, animal_service, valid_location_id):
"""Event-animal links are created for attrs event."""
cohort_payload = make_payload(valid_location_id, count=4)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
attrs_payload = make_attrs_payload(animal_ids, sex="female")
attrs_event = animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
# Check event_animals has 4 rows for the attrs event
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(attrs_event.id,),
).fetchone()[0]
assert count == 4
def test_updates_multiple_attrs_at_once(self, seeded_db, animal_service, valid_location_id):
"""Multiple attributes can be updated at once."""
cohort_payload = make_payload(
valid_location_id, count=1, sex="unknown", life_stage="hatchling"
)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
# Update both sex and life_stage
attrs_payload = make_attrs_payload([animal_id], sex="female", life_stage="juvenile")
animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
# Check both were updated in registry
row = seeded_db.execute(
"SELECT sex, life_stage FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == "female"
assert row[1] == "juvenile"
# Should have 6 intervals: 4 initial + 2 new (sex + life_stage)
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert count == 6
def test_noop_when_value_unchanged(self, seeded_db, animal_service, valid_location_id):
"""No new intervals created when value is already the same."""
cohort_payload = make_payload(valid_location_id, count=1, sex="female")
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_id = cohort_event.entity_refs["animal_ids"][0]
initial_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
# Update sex to same value
attrs_payload = make_attrs_payload([animal_id], sex="female")
animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")
# Should have same number of intervals
new_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert new_count == initial_count
class TestAnimalServiceUpdateAttributesValidation:
"""Tests for update_attributes() validation."""
def test_rejects_nonexistent_animal(self, seeded_db, animal_service):
"""Raises ValidationError for non-existent animal_id."""
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
attrs_payload = make_attrs_payload([fake_animal_id], sex="female")
with pytest.raises(ValidationError, match="not found"):
animal_service.update_attributes(attrs_payload, int(time.time() * 1000), "test_user")
def test_rejects_empty_attribute_set(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError when no attributes are being updated."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Create payload with no attrs set
attrs_payload = make_attrs_payload(animal_ids)
with pytest.raises(ValidationError, match="at least one attribute"):
animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")