Files
animaltrack/tests/test_projection_intervals.py
Petru Paler b89ea41d63 feat: add animal movement projection and service
Implement AnimalMoved event handling:
- Update AnimalRegistryProjection for move events
- Update IntervalProjection to close/open location intervals
- Update EventAnimalsProjection to link move events to animals
- Add move_animals() to AnimalService with validations

Validations include:
- Destination location must exist and be active
- All animals must be from a single location
- Cannot move to the same location as current

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2025-12-29 07:02:19 +00:00

593 lines
21 KiB
Python

# ABOUTME: Tests for IntervalProjection.
# ABOUTME: Validates interval table updates for location and attributes.
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.models.events import Event
from animaltrack.projections.intervals import IntervalProjection
def make_cohort_event(
event_id: str,
animal_ids: list[str],
location_id: str = "01ARZ3NDEKTSV4RRFFQ69G5FAV",
sex: str = "unknown",
life_stage: str = "adult",
ts_utc: int = 1704067200000,
) -> Event:
"""Create a test AnimalCohortCreated event."""
return Event(
id=event_id,
type=ANIMAL_COHORT_CREATED,
ts_utc=ts_utc,
actor="test_user",
entity_refs={
"location_id": location_id,
"animal_ids": animal_ids,
},
payload={
"species": "duck",
"count": len(animal_ids),
"life_stage": life_stage,
"sex": sex,
"location_id": location_id,
"origin": "purchased",
"notes": None,
},
version=1,
)
class TestIntervalProjectionEventTypes:
"""Tests for get_event_types method."""
def test_handles_animal_cohort_created(self, seeded_db):
"""Projection handles AnimalCohortCreated event type."""
projection = IntervalProjection(seeded_db)
assert ANIMAL_COHORT_CREATED in projection.get_event_types()
class TestIntervalProjectionApply:
"""Tests for apply() on AnimalCohortCreated."""
def test_creates_location_interval_for_each_animal(self, seeded_db):
"""Apply creates one location interval per animal."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = [
"01ARZ3NDEKTSV4RRFFQ69G5A01",
"01ARZ3NDEKTSV4RRFFQ69G5A02",
]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
# Check animal_location_intervals has 2 rows
count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0]
assert count == 2
# Check each animal has a location interval
for animal_id in animal_ids:
row = seeded_db.execute(
"""SELECT location_id FROM animal_location_intervals
WHERE animal_id = ?""",
(animal_id,),
).fetchone()
assert row is not None
assert row[0] == location_id
def test_location_interval_is_open_ended(self, seeded_db):
"""Location interval has end_utc=NULL."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ?""",
(animal_ids[0],),
).fetchone()
assert row[0] is None
def test_location_interval_start_matches_event(self, seeded_db):
"""Location interval start_utc matches event ts_utc."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
ts_utc = 1704067200000
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id, ts_utc=ts_utc)
projection.apply(event)
row = seeded_db.execute(
"""SELECT start_utc FROM animal_location_intervals
WHERE animal_id = ?""",
(animal_ids[0],),
).fetchone()
assert row[0] == ts_utc
def test_creates_sex_attr_interval(self, seeded_db):
"""Apply creates sex attribute interval."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id, sex="female")
projection.apply(event)
row = seeded_db.execute(
"""SELECT value FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'sex'""",
(animal_ids[0],),
).fetchone()
assert row is not None
assert row[0] == "female"
def test_creates_life_stage_attr_interval(self, seeded_db):
"""Apply creates life_stage attribute interval."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(
event_id, animal_ids, location_id=location_id, life_stage="juvenile"
)
projection.apply(event)
row = seeded_db.execute(
"""SELECT value FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'life_stage'""",
(animal_ids[0],),
).fetchone()
assert row is not None
assert row[0] == "juvenile"
def test_creates_repro_status_attr_interval_unknown(self, seeded_db):
"""Apply creates repro_status attribute interval (default unknown)."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
row = seeded_db.execute(
"""SELECT value FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'repro_status'""",
(animal_ids[0],),
).fetchone()
assert row is not None
assert row[0] == "unknown"
def test_creates_status_attr_interval_alive(self, seeded_db):
"""Apply creates status attribute interval (alive)."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
row = seeded_db.execute(
"""SELECT value FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status'""",
(animal_ids[0],),
).fetchone()
assert row is not None
assert row[0] == "alive"
def test_creates_four_attr_intervals_per_animal(self, seeded_db):
"""Each animal gets 4 attribute intervals (sex, life_stage, repro_status, status)."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
count = seeded_db.execute(
"""SELECT COUNT(*) FROM animal_attr_intervals
WHERE animal_id = ?""",
(animal_ids[0],),
).fetchone()[0]
assert count == 4
def test_attr_intervals_are_open_ended(self, seeded_db):
"""All attribute intervals have end_utc=NULL."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
rows = seeded_db.execute(
"""SELECT end_utc FROM animal_attr_intervals
WHERE animal_id = ?""",
(animal_ids[0],),
).fetchall()
assert len(rows) == 4
for row in rows:
assert row[0] is None
class TestIntervalProjectionRevert:
"""Tests for revert() on AnimalCohortCreated."""
def test_removes_location_intervals(self, seeded_db):
"""Revert deletes location intervals for event animals."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = [
"01ARZ3NDEKTSV4RRFFQ69G5A01",
"01ARZ3NDEKTSV4RRFFQ69G5A02",
]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
# Verify rows exist
count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0]
assert count == 2
# Revert
projection.revert(event)
# Verify rows removed
count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0]
assert count == 0
def test_removes_attr_intervals(self, seeded_db):
"""Revert deletes all attribute intervals for event animals."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event = make_cohort_event(event_id, animal_ids, location_id=location_id)
projection.apply(event)
# Verify rows exist
count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0]
assert count == 4
# Revert
projection.revert(event)
# Verify rows removed
count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0]
assert count == 0
def test_revert_only_affects_event_animals(self, seeded_db):
"""Revert only removes intervals for animals from specific event."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
# Create first event
animal_ids_1 = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id_1 = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
event1 = make_cohort_event(event_id_1, animal_ids_1, location_id=location_id)
projection.apply(event1)
# Create second event
animal_ids_2 = ["01ARZ3NDEKTSV4RRFFQ69G5A02"]
event_id_2 = "01ARZ3NDEKTSV4RRFFQ69G5002"
event2 = make_cohort_event(
event_id_2, animal_ids_2, location_id=location_id, ts_utc=1704067300000
)
projection.apply(event2)
# Verify both exist: 2 location intervals, 8 attr intervals
count_loc = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[
0
]
assert count_loc == 2
count_attr = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0]
assert count_attr == 8
# Revert only event1
projection.revert(event1)
# Event2's intervals should still exist
count_loc = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[
0
]
assert count_loc == 1
count_attr = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0]
assert count_attr == 4
# Check correct animal remains
row = seeded_db.execute("SELECT animal_id FROM animal_location_intervals").fetchone()
assert row[0] == animal_ids_2[0]
# =============================================================================
# AnimalMoved Tests
# =============================================================================
def make_move_event(
event_id: str,
animal_ids: list[str],
from_location_id: str,
to_location_id: str,
ts_utc: int = 1704067300000,
) -> Event:
"""Create a test AnimalMoved event."""
from animaltrack.events.types import ANIMAL_MOVED
return Event(
id=event_id,
type=ANIMAL_MOVED,
ts_utc=ts_utc,
actor="test_user",
entity_refs={
"from_location_id": from_location_id,
"to_location_id": to_location_id,
"animal_ids": animal_ids,
},
payload={
"to_location_id": to_location_id,
"resolved_ids": animal_ids,
"notes": None,
},
version=1,
)
class TestIntervalProjectionMoveEventTypes:
"""Tests for get_event_types method including move."""
def test_handles_animal_moved(self, seeded_db):
"""Projection handles AnimalMoved event type."""
from animaltrack.events.types import ANIMAL_MOVED
projection = IntervalProjection(seeded_db)
assert ANIMAL_MOVED in projection.get_event_types()
class TestIntervalProjectionApplyMove:
"""Tests for apply() on AnimalMoved."""
def test_closes_old_location_interval(self, seeded_db):
"""Apply move closes the old location interval with end_utc."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
cohort_ts = 1704067200000
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1, ts_utc=cohort_ts)
projection.apply(cohort_event)
# Verify open interval exists
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] is None
# Move to strip2
move_ts = 1704067300000
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
ts_utc=move_ts,
)
projection.apply(move_event)
# Old interval should be closed with move timestamp
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] == move_ts
def test_creates_new_location_interval(self, seeded_db):
"""Apply move creates a new open location interval at destination."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
# Verify only 1 location interval exists initially
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 1
# Move to strip2
move_ts = 1704067300000
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
ts_utc=move_ts,
)
projection.apply(move_event)
# Now there should be 2 intervals
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 2
# New interval should be at strip2, open-ended
row = seeded_db.execute(
"""SELECT start_utc, end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip2),
).fetchone()
assert row[0] == move_ts
assert row[1] is None
def test_move_multiple_animals_creates_intervals(self, seeded_db):
"""Apply move on multiple animals creates correct intervals."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = [
"01ARZ3NDEKTSV4RRFFQ69G5A01",
"01ARZ3NDEKTSV4RRFFQ69G5A02",
]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# Each animal should have 2 location intervals (1 closed, 1 open)
for animal_id in animal_ids:
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert count == 2
class TestIntervalProjectionRevertMove:
"""Tests for revert() on AnimalMoved."""
def test_revert_removes_new_location_interval(self, seeded_db):
"""Revert move removes the new location interval."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# Verify 2 intervals exist
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 2
# Revert
projection.revert(move_event)
# Now back to 1 interval
count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_ids[0],),
).fetchone()[0]
assert count == 1
# No interval at strip2
row = seeded_db.execute(
"""SELECT * FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip2),
).fetchone()
assert row is None
def test_revert_reopens_old_location_interval(self, seeded_db):
"""Revert move reopens the original location interval (end_utc=NULL)."""
strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"]
event_id = "01ARZ3NDEKTSV4RRFFQ69G5001"
projection = IntervalProjection(seeded_db)
cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1)
projection.apply(cohort_event)
move_event = make_move_event(
"01ARZ3NDEKTSV4RRFFQ69G5002",
animal_ids,
from_location_id=strip1,
to_location_id=strip2,
)
projection.apply(move_event)
# Verify old interval is closed
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] is not None
# Revert
projection.revert(move_event)
# Old interval should be open again
row = seeded_db.execute(
"""SELECT end_utc FROM animal_location_intervals
WHERE animal_id = ? AND location_id = ?""",
(animal_ids[0], strip1),
).fetchone()
assert row[0] is None