feat: add animal movement projection and service

Implement AnimalMoved event handling:
- Update AnimalRegistryProjection for move events
- Update IntervalProjection to close/open location intervals
- Update EventAnimalsProjection to link move events to animals
- Add move_animals() to AnimalService with validations

Validations include:
- Destination location must exist and be active
- All animals must be from a single location
- Cannot move to the same location as current

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 07:02:19 +00:00
parent 144d23235e
commit b89ea41d63
7 changed files with 1007 additions and 8 deletions

View File

@@ -3,7 +3,7 @@
from typing import Any
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.events.types import ANIMAL_COHORT_CREATED, ANIMAL_MOVED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
@@ -26,17 +26,21 @@ class AnimalRegistryProjection(Projection):
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED]
return [ANIMAL_COHORT_CREATED, ANIMAL_MOVED]
def apply(self, event: Event) -> None:
"""Apply an event to update registry tables."""
if event.type == ANIMAL_COHORT_CREATED:
self._apply_cohort_created(event)
elif event.type == ANIMAL_MOVED:
self._apply_animal_moved(event)
def revert(self, event: Event) -> None:
"""Revert an event from registry tables."""
if event.type == ANIMAL_COHORT_CREATED:
self._revert_cohort_created(event)
elif event.type == ANIMAL_MOVED:
self._revert_animal_moved(event)
def _apply_cohort_created(self, event: Event) -> None:
"""Create animals in registry from cohort event.
@@ -121,3 +125,65 @@ class AnimalRegistryProjection(Projection):
"DELETE FROM animal_registry WHERE animal_id = ?",
(animal_id,),
)
def _apply_animal_moved(self, event: Event) -> None:
"""Update animal locations from move event.
Updates both animal_registry and live_animals_by_location
with the new location_id.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
to_location_id = event.entity_refs.get("to_location_id")
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Update animal_registry
self.db.execute(
"""
UPDATE animal_registry
SET location_id = ?, last_event_utc = ?
WHERE animal_id = ?
""",
(to_location_id, ts_utc, animal_id),
)
# Update live_animals_by_location
self.db.execute(
"""
UPDATE live_animals_by_location
SET location_id = ?, last_move_utc = ?
WHERE animal_id = ?
""",
(to_location_id, ts_utc, animal_id),
)
def _revert_animal_moved(self, event: Event) -> None:
"""Revert animal move, restoring original location.
Uses from_location_id from entity_refs to restore
the previous location state.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
from_location_id = event.entity_refs.get("from_location_id")
for animal_id in animal_ids:
# Restore animal_registry location
self.db.execute(
"""
UPDATE animal_registry
SET location_id = ?
WHERE animal_id = ?
""",
(from_location_id, animal_id),
)
# Restore live_animals_by_location
# Set last_move_utc to NULL since we're reverting to before the move
self.db.execute(
"""
UPDATE live_animals_by_location
SET location_id = ?, last_move_utc = NULL
WHERE animal_id = ?
""",
(from_location_id, animal_id),
)

View File

@@ -3,7 +3,7 @@
from typing import Any
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.events.types import ANIMAL_COHORT_CREATED, ANIMAL_MOVED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
@@ -26,7 +26,7 @@ class EventAnimalsProjection(Projection):
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED]
return [ANIMAL_COHORT_CREATED, ANIMAL_MOVED]
def apply(self, event: Event) -> None:
"""Link event to affected animals."""

View File

@@ -3,7 +3,7 @@
from typing import Any
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.events.types import ANIMAL_COHORT_CREATED, ANIMAL_MOVED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
@@ -29,17 +29,21 @@ class IntervalProjection(Projection):
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED]
return [ANIMAL_COHORT_CREATED, ANIMAL_MOVED]
def apply(self, event: Event) -> None:
"""Create intervals for event."""
if event.type == ANIMAL_COHORT_CREATED:
self._apply_cohort_created(event)
elif event.type == ANIMAL_MOVED:
self._apply_animal_moved(event)
def revert(self, event: Event) -> None:
"""Remove intervals created by event."""
if event.type == ANIMAL_COHORT_CREATED:
self._revert_cohort_created(event)
elif event.type == ANIMAL_MOVED:
self._revert_animal_moved(event)
def _apply_cohort_created(self, event: Event) -> None:
"""Create initial intervals for new animals.
@@ -101,3 +105,68 @@ class IntervalProjection(Projection):
"DELETE FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
)
def _apply_animal_moved(self, event: Event) -> None:
"""Close old location interval and create new one for move.
For each animal:
- Close the current open location interval with end_utc=ts_utc
- Create a new open interval at the destination location
"""
animal_ids = event.entity_refs.get("animal_ids", [])
from_location_id = event.entity_refs.get("from_location_id")
to_location_id = event.entity_refs.get("to_location_id")
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Close the old location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = ?
WHERE animal_id = ? AND location_id = ? AND end_utc IS NULL
""",
(ts_utc, animal_id, from_location_id),
)
# Create new location interval at destination
self.db.execute(
"""
INSERT INTO animal_location_intervals
(animal_id, location_id, start_utc, end_utc)
VALUES (?, ?, ?, NULL)
""",
(animal_id, to_location_id, ts_utc),
)
def _revert_animal_moved(self, event: Event) -> None:
"""Revert move by removing new interval and reopening old one.
For each animal:
- Delete the new location interval at destination
- Reopen the old interval by setting end_utc=NULL
"""
animal_ids = event.entity_refs.get("animal_ids", [])
from_location_id = event.entity_refs.get("from_location_id")
to_location_id = event.entity_refs.get("to_location_id")
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Delete the new location interval
self.db.execute(
"""
DELETE FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ? AND start_utc = ?
""",
(animal_id, to_location_id, ts_utc),
)
# Reopen the old location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = NULL
WHERE animal_id = ? AND location_id = ? AND end_utc = ?
""",
(animal_id, from_location_id, ts_utc),
)

View File

@@ -4,10 +4,10 @@
from typing import Any
from animaltrack.db import transaction
from animaltrack.events.payloads import AnimalCohortCreatedPayload
from animaltrack.events.payloads import AnimalCohortCreatedPayload, AnimalMovedPayload
from animaltrack.events.processor import process_event
from animaltrack.events.store import EventStore
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.events.types import ANIMAL_COHORT_CREATED, ANIMAL_MOVED
from animaltrack.id_gen import generate_id
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
@@ -145,3 +145,104 @@ class AnimalService:
if not species.active:
msg = f"Species {species_code} is not active"
raise ValidationError(msg)
def move_animals(
self,
payload: AnimalMovedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Move animals to a new location.
Creates an AnimalMoved event and processes it through
all registered projections. All operations happen atomically
within a transaction.
Args:
payload: Validated move payload with to_location_id and resolved_ids.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the move.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If validation fails.
"""
# Validate destination location exists and is active
self._validate_location(payload.to_location_id)
# Validate all animals exist and get their current location
from_location_id = self._validate_animals_for_move(
payload.resolved_ids, payload.to_location_id
)
# Build entity_refs with from/to locations and animal IDs
entity_refs = {
"from_location_id": from_location_id,
"to_location_id": payload.to_location_id,
"animal_ids": payload.resolved_ids,
}
with transaction(self.db):
# Append event to store
event = self.event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
# Process event through projections
process_event(event, self.registry)
return event
def _validate_animals_for_move(self, animal_ids: list[str], to_location_id: str) -> str:
"""Validate animals exist and are from a single location.
Args:
animal_ids: List of animal IDs to validate.
to_location_id: The destination location ID.
Returns:
The from_location_id (current location of all animals).
Raises:
ValidationError: If animals don't exist, are from multiple
locations, or are already at the destination.
"""
locations: set[str] = set()
for animal_id in animal_ids:
row = self.db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
if row is None:
msg = f"Animal {animal_id} not found"
raise ValidationError(msg)
locations.add(row[0])
# Check all animals are from the same location
if len(locations) > 1:
msg = "All animals must be from a single location"
raise ValidationError(msg)
from_location_id = locations.pop()
# Check not moving to the same location
if from_location_id == to_location_id:
msg = "Cannot move animals to the same location"
raise ValidationError(msg)
return from_location_id

View File

@@ -403,3 +403,290 @@ class TestAnimalRegistryProjectionRevert:
row = seeded_db.execute("SELECT animal_id FROM animal_registry").fetchone()
assert row[0] == animal_ids_2[0]
# =============================================================================
# AnimalMoved Tests
# =============================================================================
def make_move_event(
event_id: str,
animal_ids: list[str],
from_location_id: str,
to_location_id: str,
ts_utc: int = 1704067300000,
) -> Event:
"""Create a test AnimalMoved event."""
from animaltrack.events.types import ANIMAL_MOVED
return Event(
id=event_id,
type=ANIMAL_MOVED,
ts_utc=ts_utc,
actor="test_user",
entity_refs={
"from_location_id": from_location_id,
"to_location_id": to_location_id,
"animal_ids": animal_ids,
},
payload={
"to_location_id": to_location_id,
"resolved_ids": animal_ids,
"notes": None,
},
version=1,
)
class TestAnimalRegistryProjectionMoveEventTypes:
"""Tests for get_event_types method including move."""
def test_handles_animal_moved(self, seeded_db):
"""Projection handles AnimalMoved event type."""
from animaltrack.events.types import ANIMAL_MOVED
projection = AnimalRegistryProjection(seeded_db)
assert ANIMAL_MOVED in projection.get_event_types()
class TestAnimalRegistryProjectionApplyMove:
"""Tests for apply() on AnimalMoved."""
def test_updates_location_in_animal_registry(self, seeded_db):
"""Apply move updates location_id in animal_registry."""
# Get two location IDs
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
# First create a cohort at Strip 1
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1)
projection.apply(cohort_event)
# Verify initial location
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == strip1
# Now move to Strip 2
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# Verify new location
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == strip2
def test_updates_last_event_utc_in_registry(self, seeded_db):
"""Apply move updates last_event_utc in animal_registry."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1, ts_utc=1704067200000)
projection.apply(cohort_event)
move_ts = 1704067300000
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
ts_utc=move_ts,
)
projection.apply(move_event)
row = seeded_db.execute(
"SELECT last_event_utc FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == move_ts
def test_updates_location_in_live_animals(self, seeded_db):
"""Apply move updates location_id in live_animals_by_location."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
row = seeded_db.execute(
"SELECT location_id FROM live_animals_by_location WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == strip2
def test_updates_last_move_utc_in_live_animals(self, seeded_db):
"""Apply move updates last_move_utc in live_animals_by_location."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1)
projection.apply(cohort_event)
# Verify last_move_utc is NULL initially
row = seeded_db.execute(
"SELECT last_move_utc FROM live_animals_by_location WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] is None
move_ts = 1704067300000
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
ts_utc=move_ts,
)
projection.apply(move_event)
row = seeded_db.execute(
"SELECT last_move_utc FROM live_animals_by_location WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == move_ts
def test_moves_multiple_animals(self, seeded_db):
"""Apply move updates all animals in resolved_ids."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = [
"01ARZ3NDEKTSV4RRFFQ69G5A01",
"01ARZ3NDEKTSV4RRFFQ69G5A02",
"01ARZ3NDEKTSV4RRFFQ69G5A03",
]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# All animals should now be at strip2
for animal_id in animal_ids:
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == strip2
class TestAnimalRegistryProjectionRevertMove:
"""Tests for revert() on AnimalMoved."""
def test_revert_restores_original_location_in_registry(self, seeded_db):
"""Revert move restores location_id in animal_registry."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1, ts_utc=1704067200000)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
ts_utc=1704067300000,
)
projection.apply(move_event)
# Verify moved
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == strip2
# Revert the move
projection.revert(move_event)
# Location should be restored
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == strip1
def test_revert_restores_original_location_in_live_animals(self, seeded_db):
"""Revert move restores location_id in live_animals_by_location."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
projection.revert(move_event)
row = seeded_db.execute(
"SELECT location_id FROM live_animals_by_location WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] == strip1
def test_revert_clears_last_move_utc_if_first_move(self, seeded_db):
"""Revert first move clears last_move_utc to NULL."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
projection = AnimalRegistryProjection(seeded_db)
cohort_event = make_cohort_event(animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
projection.revert(move_event)
row = seeded_db.execute(
"SELECT last_move_utc FROM live_animals_by_location WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()
assert row[0] is None

View File

@@ -336,3 +336,257 @@ class TestIntervalProjectionRevert:
# Check correct animal remains
row = seeded_db.execute("SELECT animal_id FROM animal_location_intervals").fetchone()
assert row[0] == animal_ids_2[0]
# =============================================================================
# AnimalMoved Tests
# =============================================================================
def make_move_event(
event_id: str,
animal_ids: list[str],
from_location_id: str,
to_location_id: str,
ts_utc: int = 1704067300000,
) -> Event:
"""Create a test AnimalMoved event."""
from animaltrack.events.types import ANIMAL_MOVED
return Event(
id=event_id,
type=ANIMAL_MOVED,
ts_utc=ts_utc,
actor="test_user",
entity_refs={
"from_location_id": from_location_id,
"to_location_id": to_location_id,
"animal_ids": animal_ids,
},
payload={
"to_location_id": to_location_id,
"resolved_ids": animal_ids,
"notes": None,
},
version=1,
)
class TestIntervalProjectionMoveEventTypes:
"""Tests for get_event_types method including move."""
def test_handles_animal_moved(self, seeded_db):
"""Projection handles AnimalMoved event type."""
from animaltrack.events.types import ANIMAL_MOVED
projection = IntervalProjection(seeded_db)
assert ANIMAL_MOVED in projection.get_event_types()
class TestIntervalProjectionApplyMove:
"""Tests for apply() on AnimalMoved."""
def test_closes_old_location_interval(self, seeded_db):
"""Apply move closes the old location interval with end_utc."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
cohort_ts = 1704067200000
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1, ts_utc=cohort_ts)
projection.apply(cohort_event)
# Verify open interval exists
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] is None
# Move to strip2
move_ts = 1704067300000
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
ts_utc=move_ts,
)
projection.apply(move_event)
# Old interval should be closed with move timestamp
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] == move_ts
def test_creates_new_location_interval(self, seeded_db):
"""Apply move creates a new open location interval at destination."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
# Verify only 1 location interval exists initially
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 1
# Move to strip2
move_ts = 1704067300000
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
ts_utc=move_ts,
)
projection.apply(move_event)
# Now there should be 2 intervals
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 2
# New interval should be at strip2, open-ended
row = seeded_db.execute(
"""SELECT start_utc, end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip2),
).fetchone()
assert row[0] == move_ts
assert row[1] is None
def test_move_multiple_animals_creates_intervals(self, seeded_db):
"""Apply move on multiple animals creates correct intervals."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = [
"01ARZ3NDEKTSV4RRFFQ69G5A01",
"01ARZ3NDEKTSV4RRFFQ69G5A02",
]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# Each animal should have 2 location intervals (1 closed, 1 open)
for animal_id in animal_ids:
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert count == 2
class TestIntervalProjectionRevertMove:
"""Tests for revert() on AnimalMoved."""
def test_revert_removes_new_location_interval(self, seeded_db):
"""Revert move removes the new location interval."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# Verify 2 intervals exist
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 2
# Revert
projection.revert(move_event)
# Now back to 1 interval
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 1
# No interval at strip2
row = seeded_db.execute(
"""SELECT * FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip2),
).fetchone()
assert row is None
def test_revert_reopens_old_location_interval(self, seeded_db):
"""Revert move reopens the original location interval (end_utc=NULL)."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# Verify old interval is closed
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] is not None
# Revert
projection.revert(move_event)
# Old interval should be open again
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] is None

View File

@@ -270,3 +270,225 @@ class TestAnimalServiceTransactionIntegrity:
animal_count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0]
assert animal_count == 0
# =============================================================================
# move_animals Tests
# =============================================================================
def make_move_payload(
to_location_id: str,
resolved_ids: list[str],
):
"""Create a move payload for testing."""
from animaltrack.events.payloads import AnimalMovedPayload
return AnimalMovedPayload(
to_location_id=to_location_id,
resolved_ids=resolved_ids,
)
class TestAnimalServiceMoveAnimals:
"""Tests for move_animals()."""
def test_creates_animal_moved_event(self, seeded_db, animal_service, valid_location_id):
"""move_animals creates an AnimalMoved event."""
from animaltrack.events.types import ANIMAL_MOVED
# First create a cohort
cohort_payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Get another location
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
# Move the animals
move_payload = make_move_payload(strip2, animal_ids)
move_ts = ts_utc + 1000
move_event = animal_service.move_animals(move_payload, move_ts, "test_user")
assert move_event.type == ANIMAL_MOVED
assert move_event.actor == "test_user"
assert move_event.ts_utc == move_ts
def test_event_has_animal_ids_in_entity_refs(
self, seeded_db, animal_service, valid_location_id
):
"""Event entity_refs contains animal_ids list."""
cohort_payload = make_payload(valid_location_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
assert "animal_ids" in move_event.entity_refs
assert set(move_event.entity_refs["animal_ids"]) == set(animal_ids)
def test_event_has_from_and_to_location_in_entity_refs(
self, seeded_db, animal_service, valid_location_id
):
"""Event entity_refs contains both from_location_id and to_location_id."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
assert move_event.entity_refs["from_location_id"] == valid_location_id
assert move_event.entity_refs["to_location_id"] == strip2
def test_updates_location_in_registry(self, seeded_db, animal_service, valid_location_id):
"""Animals are moved in animal_registry table."""
cohort_payload = make_payload(valid_location_id, count=2)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
# Check each animal is now at strip2
for animal_id in animal_ids:
row = seeded_db.execute(
"SELECT location_id FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
assert row[0] == strip2
def test_creates_location_intervals(self, seeded_db, animal_service, valid_location_id):
"""Move creates new location intervals and closes old ones."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
# Should have 2 location intervals: one closed (strip1), one open (strip2)
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 2
def test_event_animal_links_created(self, seeded_db, animal_service, valid_location_id):
"""Event-animal links are created for move event."""
cohort_payload = make_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
move_payload = make_move_payload(strip2, animal_ids)
move_event = animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
# Check event_animals has 3 rows for the move event
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(move_event.id,),
).fetchone()[0]
assert count == 3
class TestAnimalServiceMoveValidation:
"""Tests for move_animals() validation."""
def test_rejects_nonexistent_to_location(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError for non-existent to_location_id."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
move_payload = make_move_payload(fake_location_id, animal_ids)
with pytest.raises(ValidationError, match="not found"):
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
def test_rejects_archived_to_location(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError for archived to_location."""
from animaltrack.id_gen import generate_id
# Create an archived location
archived_id = generate_id()
ts = int(time.time() * 1000)
seeded_db.execute(
"""INSERT INTO locations (id, name, active, created_at_utc, updated_at_utc)
VALUES (?, 'Archived Test', 0, ?, ?)""",
(archived_id, ts, ts),
)
cohort_payload = make_payload(valid_location_id, count=1)
cohort_event = animal_service.create_cohort(cohort_payload, ts, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
move_payload = make_move_payload(archived_id, animal_ids)
with pytest.raises(ValidationError, match="archived"):
animal_service.move_animals(move_payload, ts + 1000, "test_user")
def test_rejects_same_location(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError when moving to the same location."""
cohort_payload = make_payload(valid_location_id, count=1)
ts_utc = int(time.time() * 1000)
cohort_event = animal_service.create_cohort(cohort_payload, ts_utc, "test_user")
animal_ids = cohort_event.entity_refs["animal_ids"]
# Try to move to the same location
move_payload = make_move_payload(valid_location_id, animal_ids)
with pytest.raises(ValidationError, match="same location"):
animal_service.move_animals(move_payload, ts_utc + 1000, "test_user")
def test_rejects_animals_from_multiple_locations(self, seeded_db, animal_service):
"""Raises ValidationError when animals are from different locations."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
strip3 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 3'").fetchone()[0]
ts_utc = int(time.time() * 1000)
# Create a cohort at strip1
cohort1 = animal_service.create_cohort(make_payload(strip1, count=1), ts_utc, "test_user")
animal1 = cohort1.entity_refs["animal_ids"][0]
# Create a cohort at strip2
cohort2 = animal_service.create_cohort(
make_payload(strip2, count=1), ts_utc + 1000, "test_user"
)
animal2 = cohort2.entity_refs["animal_ids"][0]
# Try to move animals from different locations
move_payload = make_move_payload(strip3, [animal1, animal2])
with pytest.raises(ValidationError, match="single location"):
animal_service.move_animals(move_payload, ts_utc + 2000, "test_user")
def test_rejects_nonexistent_animal(self, seeded_db, animal_service, valid_location_id):
"""Raises ValidationError for non-existent animal_id."""
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5XXX"
move_payload = make_move_payload(strip2, [fake_animal_id])
with pytest.raises(ValidationError, match="not found"):
animal_service.move_animals(move_payload, int(time.time() * 1000), "test_user")