# ABOUTME: Tests for AnimalService. # ABOUTME: Integration tests for cohort creation with full transaction. import time import pytest from animaltrack.events.payloads import AnimalCohortCreatedPayload from animaltrack.events.store import EventStore from animaltrack.events.types import ANIMAL_COHORT_CREATED from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.services.animal import AnimalService, ValidationError @pytest.fixture def event_store(seeded_db): """Create an EventStore for testing.""" return EventStore(seeded_db) @pytest.fixture def projection_registry(seeded_db): """Create a ProjectionRegistry with all cohort projections registered.""" registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) return registry @pytest.fixture def animal_service(seeded_db, event_store, projection_registry): """Create an AnimalService for testing.""" return AnimalService(seeded_db, event_store, projection_registry) @pytest.fixture def valid_location_id(seeded_db): """Get a valid location ID from seeds.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() return row[0] def make_payload( location_id: str, count: int = 5, species: str = "duck", life_stage: str = "adult", sex: str = "unknown", origin: str = "purchased", ) -> AnimalCohortCreatedPayload: """Create a cohort payload for testing.""" return AnimalCohortCreatedPayload( species=species, count=count, life_stage=life_stage, sex=sex, location_id=location_id, origin=origin, ) class TestAnimalServiceCreateCohort: """Tests for create_cohort().""" def test_creates_event(self, seeded_db, animal_service, valid_location_id): """create_cohort creates an AnimalCohortCreated event.""" payload = make_payload(valid_location_id, count=3) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") assert event.type == ANIMAL_COHORT_CREATED assert event.actor == "test_user" assert event.ts_utc == ts_utc def test_event_has_animal_ids_in_entity_refs( self, seeded_db, animal_service, valid_location_id ): """Event entity_refs contains generated animal_ids list.""" payload = make_payload(valid_location_id, count=5) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") assert "animal_ids" in event.entity_refs assert len(event.entity_refs["animal_ids"]) == 5 # Verify all IDs are ULIDs (26 chars) for animal_id in event.entity_refs["animal_ids"]: assert len(animal_id) == 26 def test_event_has_location_in_entity_refs(self, seeded_db, animal_service, valid_location_id): """Event entity_refs contains location_id.""" payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") assert "location_id" in event.entity_refs assert event.entity_refs["location_id"] == valid_location_id def test_animals_created_in_registry(self, seeded_db, animal_service, valid_location_id): """Animals are created in animal_registry table.""" payload = make_payload(valid_location_id, count=3) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") # Check animals exist in registry count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count == 3 # Check each generated animal_id is in the registry for animal_id in event.entity_refs["animal_ids"]: row = seeded_db.execute( "SELECT animal_id FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row is not None def test_correct_number_of_animals_created(self, seeded_db, animal_service, valid_location_id): """Number of animals matches payload.count.""" payload = make_payload(valid_location_id, count=7) ts_utc = int(time.time() * 1000) animal_service.create_cohort(payload, ts_utc, "test_user") count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count == 7 def test_event_animal_links_created(self, seeded_db, animal_service, valid_location_id): """Event-animal links are created.""" payload = make_payload(valid_location_id, count=4) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") # Check event_animals has 4 rows count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (event.id,), ).fetchone()[0] assert count == 4 def test_location_intervals_created(self, seeded_db, animal_service, valid_location_id): """Location intervals are created for each animal.""" payload = make_payload(valid_location_id, count=3) ts_utc = int(time.time() * 1000) animal_service.create_cohort(payload, ts_utc, "test_user") count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0] assert count == 3 def test_attr_intervals_created(self, seeded_db, animal_service, valid_location_id): """Attribute intervals are created for each animal.""" payload = make_payload(valid_location_id, count=2) ts_utc = int(time.time() * 1000) animal_service.create_cohort(payload, ts_utc, "test_user") # 2 animals * 4 attrs = 8 intervals count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0] assert count == 8 def test_live_animals_created(self, seeded_db, animal_service, valid_location_id): """Live animals are created in live_animals_by_location.""" payload = make_payload(valid_location_id, count=5) ts_utc = int(time.time() * 1000) animal_service.create_cohort(payload, ts_utc, "test_user") count = seeded_db.execute("SELECT COUNT(*) FROM live_animals_by_location").fetchone()[0] assert count == 5 def test_event_stored_in_events_table(self, seeded_db, animal_service, valid_location_id): """Event is stored in events table.""" payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") # Verify event exists in database row = seeded_db.execute( "SELECT id FROM events WHERE id = ?", (event.id,), ).fetchone() assert row is not None class TestAnimalServiceValidation: """Tests for create_cohort() validation.""" def test_rejects_nonexistent_location(self, seeded_db, animal_service): """Raises ValidationError for non-existent location_id.""" # Use a valid ULID format but non-existent location fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" payload = make_payload(fake_location_id, count=1) ts_utc = int(time.time() * 1000) with pytest.raises(ValidationError, match="not found"): animal_service.create_cohort(payload, ts_utc, "test_user") def test_rejects_archived_location(self, seeded_db, animal_service): """Raises ValidationError for archived location.""" # First, create and archive a location from animaltrack.id_gen import generate_id location_id = generate_id() ts = int(time.time() * 1000) seeded_db.execute( """INSERT INTO locations (id, name, active, created_at_utc, updated_at_utc) VALUES (?, 'Archived Test', 0, ?, ?)""", (location_id, ts, ts), ) payload = make_payload(location_id, count=1) ts_utc = int(time.time() * 1000) with pytest.raises(ValidationError, match="archived"): animal_service.create_cohort(payload, ts_utc, "test_user") def test_rejects_inactive_species(self, seeded_db, animal_service, valid_location_id): """Raises ValidationError for inactive species.""" # First, deactivate duck species seeded_db.execute("UPDATE species SET active = 0 WHERE code = 'duck'") payload = make_payload(valid_location_id, count=1, species="duck") ts_utc = int(time.time() * 1000) with pytest.raises(ValidationError, match="not active"): animal_service.create_cohort(payload, ts_utc, "test_user") class TestAnimalServiceTransactionIntegrity: """Tests for transaction integrity.""" def test_no_partial_data_on_projection_error(self, seeded_db, event_store, valid_location_id): """If projection fails, event is not persisted.""" # Count events before the test (seeds create LocationCreated events) event_count_before = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0] # Create a registry with a failing projection from animaltrack.projections import Projection, ProjectionError class FailingProjection(Projection): def get_event_types(self): return [ANIMAL_COHORT_CREATED] def apply(self, event): raise ProjectionError("Intentional failure") def revert(self, event): pass registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(FailingProjection(seeded_db)) service = AnimalService(seeded_db, event_store, registry) payload = make_payload(valid_location_id, count=2) ts_utc = int(time.time() * 1000) with pytest.raises(ProjectionError): service.create_cohort(payload, ts_utc, "test_user") # Verify no new events were persisted (count unchanged from before) event_count_after = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0] assert event_count_after == event_count_before animal_count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert animal_count == 0 # ============================================================================= # move_animals Tests # ============================================================================= def make_move_payload( to_location_id: str, resolved_ids: list[str], ): """Create a move payload for testing.""" from animaltrack.events.payloads import AnimalMovedPayload return AnimalMovedPayload( to_location_id=to_location_id, resolved_ids=resolved_ids, ) class TestAnimalServiceMoveAnimals: """Tests for move_animals().""" def test_creates_animal_moved_event(self, seeded_db, animal_service, valid_location_id): """move_animals creates an AnimalMoved event.""" from animaltrack.events.types import ANIMAL_MOVED # First create a cohort cohort_payload = make_payload(valid_location_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Get another location strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] # Move the animals move_payload = make_move_payload(strip2, animal_ids) move_ts = ts_utc + 1000 move_event = animal_service.move_animals(move_payload, move_ts, "test_user") assert move_event.type == ANIMAL_MOVED assert move_event.actor == "test_user" assert move_event.ts_utc == move_ts def test_event_has_animal_ids_in_entity_refs( self, seeded_db, animal_service, valid_location_id ): """Event entity_refs contains animal_ids list.""" cohort_payload = make_payload(valid_location_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] move_payload = make_move_payload(strip2, animal_ids) move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") assert "animal_ids" in move_event.entity_refs assert set(move_event.entity_refs["animal_ids"]) == set(animal_ids) def test_event_has_from_and_to_location_in_entity_refs( self, seeded_db, animal_service, valid_location_id ): """Event entity_refs contains both from_location_id and to_location_id.""" cohort_payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] move_payload = make_move_payload(strip2, animal_ids) move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") assert move_event.entity_refs["from_location_id"] == valid_location_id assert move_event.entity_refs["to_location_id"] == strip2 def test_updates_location_in_registry(self, seeded_db, animal_service, valid_location_id): """Animals are moved in animal_registry table.""" cohort_payload = make_payload(valid_location_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] move_payload = make_move_payload(strip2, animal_ids) animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") # Check each animal is now at strip2 for animal_id in animal_ids: row = seeded_db.execute( "SELECT location_id FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == strip2 def test_creates_location_intervals(self, seeded_db, animal_service, valid_location_id): """Move creates new location intervals and closes old ones.""" cohort_payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] move_payload = make_move_payload(strip2, animal_ids) animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") # Should have 2 location intervals: one closed (strip1), one open (strip2) count = seeded_db.execute( "SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?", (animal_ids[0],), ).fetchone()[0] assert count == 2 def test_event_animal_links_created(self, seeded_db, animal_service, valid_location_id): """Event-animal links are created for move event.""" cohort_payload = make_payload(valid_location_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] move_payload = make_move_payload(strip2, animal_ids) move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") # Check event_animals has 3 rows for the move event count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (move_event.id,), ).fetchone()[0] assert count == 3 class TestAnimalServiceMoveValidation: """Tests for move_animals() validation.""" def test_rejects_nonexistent_to_location(self, seeded_db, animal_service, valid_location_id): """Raises ValidationError for non-existent to_location_id.""" cohort_payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" move_payload = make_move_payload(fake_location_id, animal_ids) with pytest.raises(ValidationError, match="not found"): animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") def test_rejects_archived_to_location(self, seeded_db, animal_service, valid_location_id): """Raises ValidationError for archived to_location.""" from animaltrack.id_gen import generate_id # Create an archived location archived_id = generate_id() ts = int(time.time() * 1000) seeded_db.execute( """INSERT INTO locations (id, name, active, created_at_utc, updated_at_utc) VALUES (?, 'Archived Test', 0, ?, ?)""", (archived_id, ts, ts), ) cohort_payload = make_payload(valid_location_id, count=1) cohort_event = animal_service.create_cohort(cohort_payload, ts, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] move_payload = make_move_payload(archived_id, animal_ids) with pytest.raises(ValidationError, match="archived"): animal_service.move_animals(move_payload, ts + 1000, "test_user") def test_rejects_same_location(self, seeded_db, animal_service, valid_location_id): """Raises ValidationError when moving to the same location.""" cohort_payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Try to move to the same location move_payload = make_move_payload(valid_location_id, animal_ids) with pytest.raises(ValidationError, match="same location"): animal_service.move_animals(move_payload, ts_utc + 1000, "test_user") def test_rejects_animals_from_multiple_locations(self, seeded_db, animal_service): """Raises ValidationError when animals are from different locations.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] strip3 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 3'").fetchone()[0] ts_utc = int(time.time() * 1000) # Create a cohort at strip1 cohort1 = animal_service.create_cohort(make_payload(strip1, count=1), ts_utc, "test_user") animal1 = cohort1.entity_refs["animal_ids"][0] # Create a cohort at strip2 cohort2 = animal_service.create_cohort( make_payload(strip2, count=1), ts_utc + 1000, "test_user" ) animal2 = cohort2.entity_refs["animal_ids"][0] # Try to move animals from different locations move_payload = make_move_payload(strip3, [animal1, animal2]) with pytest.raises(ValidationError, match="single location"): animal_service.move_animals(move_payload, ts_utc + 2000, "test_user") def test_rejects_nonexistent_animal(self, seeded_db, animal_service, valid_location_id): """Raises ValidationError for non-existent animal_id.""" strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" move_payload = make_move_payload(strip2, [fake_animal_id]) with pytest.raises(ValidationError, match="not found"): animal_service.move_animals(move_payload, int(time.time() * 1000), "test_user") # ============================================================================= # update_attributes Tests # ============================================================================= def make_attrs_payload( resolved_ids: list[str], sex: str | None = None, life_stage: str | None = None, repro_status: str | None = None, ): """Create an attributes update payload for testing.""" from animaltrack.events.payloads import AnimalAttributesUpdatedPayload, AttributeSet attr_set = AttributeSet(sex=sex, life_stage=life_stage, repro_status=repro_status) return AnimalAttributesUpdatedPayload( resolved_ids=resolved_ids, set=attr_set, ) class TestAnimalServiceUpdateAttributes: """Tests for update_attributes().""" def test_creates_animal_attributes_updated_event( self, seeded_db, animal_service, valid_location_id ): """update_attributes creates an AnimalAttributesUpdated event.""" from animaltrack.events.types import ANIMAL_ATTRIBUTES_UPDATED # First create a cohort cohort_payload = make_payload(valid_location_id, count=2) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Update attributes attrs_payload = make_attrs_payload(animal_ids, sex="female") attrs_ts = ts_utc + 1000 attrs_event = animal_service.update_attributes(attrs_payload, attrs_ts, "test_user") assert attrs_event.type == ANIMAL_ATTRIBUTES_UPDATED assert attrs_event.actor == "test_user" assert attrs_event.ts_utc == attrs_ts def test_event_has_animal_ids_in_entity_refs( self, seeded_db, animal_service, valid_location_id ): """Event entity_refs contains animal_ids list.""" cohort_payload = make_payload(valid_location_id, count=3) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] attrs_payload = make_attrs_payload(animal_ids, life_stage="juvenile") attrs_event = animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") assert "animal_ids" in attrs_event.entity_refs assert set(attrs_event.entity_refs["animal_ids"]) == set(animal_ids) def test_updates_sex_in_registry(self, seeded_db, animal_service, valid_location_id): """Animals have updated sex in animal_registry table.""" cohort_payload = make_payload(valid_location_id, count=2, sex="unknown") ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] attrs_payload = make_attrs_payload(animal_ids, sex="male") animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") # Check each animal has updated sex for animal_id in animal_ids: row = seeded_db.execute( "SELECT sex FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "male" def test_updates_life_stage_in_registry(self, seeded_db, animal_service, valid_location_id): """Animals have updated life_stage in animal_registry table.""" cohort_payload = make_payload(valid_location_id, count=1, life_stage="juvenile") ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] attrs_payload = make_attrs_payload(animal_ids, life_stage="adult") animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") row = seeded_db.execute( "SELECT life_stage FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "adult" def test_updates_repro_status_in_registry(self, seeded_db, animal_service, valid_location_id): """Animals have updated repro_status in animal_registry table.""" cohort_payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] attrs_payload = make_attrs_payload(animal_ids, repro_status="intact") animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") row = seeded_db.execute( "SELECT repro_status FROM animal_registry WHERE animal_id = ?", (animal_ids[0],), ).fetchone() assert row[0] == "intact" def test_creates_attr_intervals_for_changed_attrs_only( self, seeded_db, animal_service, valid_location_id ): """Only changed attrs create new intervals.""" # Create cohort with sex=unknown, life_stage=adult cohort_payload = make_payload(valid_location_id, count=1, sex="unknown", life_stage="adult") ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Initial intervals: sex, life_stage, repro_status, status = 4 initial_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert initial_count == 4 # Update only sex (life_stage stays the same) attrs_payload = make_attrs_payload([animal_id], sex="female") animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") # Should have 5 intervals: 4 initial + 1 new for sex (old one closed) new_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert new_count == 5 # Verify old sex interval was closed closed_sex = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'sex' AND value = 'unknown'""", (animal_id,), ).fetchone() assert closed_sex[0] == ts_utc + 1000 # Verify new sex interval is open open_sex = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'sex' AND value = 'female'""", (animal_id,), ).fetchone() assert open_sex[0] is None def test_event_animal_links_created(self, seeded_db, animal_service, valid_location_id): """Event-animal links are created for attrs event.""" cohort_payload = make_payload(valid_location_id, count=4) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] attrs_payload = make_attrs_payload(animal_ids, sex="female") attrs_event = animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") # Check event_animals has 4 rows for the attrs event count = seeded_db.execute( "SELECT COUNT(*) FROM event_animals WHERE event_id = ?", (attrs_event.id,), ).fetchone()[0] assert count == 4 def test_updates_multiple_attrs_at_once(self, seeded_db, animal_service, valid_location_id): """Multiple attributes can be updated at once.""" cohort_payload = make_payload( valid_location_id, count=1, sex="unknown", life_stage="hatchling" ) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] # Update both sex and life_stage attrs_payload = make_attrs_payload([animal_id], sex="female", life_stage="juvenile") animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") # Check both were updated in registry row = seeded_db.execute( "SELECT sex, life_stage FROM animal_registry WHERE animal_id = ?", (animal_id,), ).fetchone() assert row[0] == "female" assert row[1] == "juvenile" # Should have 6 intervals: 4 initial + 2 new (sex + life_stage) count = seeded_db.execute( "SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert count == 6 def test_noop_when_value_unchanged(self, seeded_db, animal_service, valid_location_id): """No new intervals created when value is already the same.""" cohort_payload = make_payload(valid_location_id, count=1, sex="female") ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_id = cohort_event.entity_refs["animal_ids"][0] initial_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] # Update sex to same value attrs_payload = make_attrs_payload([animal_id], sex="female") animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user") # Should have same number of intervals new_count = seeded_db.execute( "SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert new_count == initial_count class TestAnimalServiceUpdateAttributesValidation: """Tests for update_attributes() validation.""" def test_rejects_nonexistent_animal(self, seeded_db, animal_service): """Raises ValidationError for non-existent animal_id.""" fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX" attrs_payload = make_attrs_payload([fake_animal_id], sex="female") with pytest.raises(ValidationError, match="not found"): animal_service.update_attributes(attrs_payload, int(time.time() * 1000), "test_user") def test_rejects_empty_attribute_set(self, seeded_db, animal_service, valid_location_id): """Raises ValidationError when no attributes are being updated.""" cohort_payload = make_payload(valid_location_id, count=1) ts_utc = int(time.time() * 1000) cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user") animal_ids = cohort_event.entity_refs["animal_ids"] # Create payload with no attrs set attrs_payload = make_attrs_payload(animal_ids) with pytest.raises(ValidationError, match="at least one attribute"): animal_service.update_attributes(attrs_payload, ts_utc + 1000, "test_user")