# ABOUTME: Tests for IntervalProjection. # ABOUTME: Validates interval table updates for location and attributes. from animaltrack.events.types import ANIMAL_COHORT_CREATED from animaltrack.models.events import Event from animaltrack.projections.intervals import IntervalProjection def make_cohort_event( event_id: str, animal_ids: list[str], location_id: str = "01ARZ3NDEKTSV4RRFFQ69G5FAV", sex: str = "unknown", life_stage: str = "adult", ts_utc: int = 1704067200000, ) -> Event: """Create a test AnimalCohortCreated event.""" return Event( id=event_id, type=ANIMAL_COHORT_CREATED, ts_utc=ts_utc, actor="test_user", entity_refs={ "location_id": location_id, "animal_ids": animal_ids, }, payload={ "species": "duck", "count": len(animal_ids), "life_stage": life_stage, "sex": sex, "location_id": location_id, "origin": "purchased", "notes": None, }, version=1, ) class TestIntervalProjectionEventTypes: """Tests for get_event_types method.""" def test_handles_animal_cohort_created(self, seeded_db): """Projection handles AnimalCohortCreated event type.""" projection = IntervalProjection(seeded_db) assert ANIMAL_COHORT_CREATED in projection.get_event_types() class TestIntervalProjectionApply: """Tests for apply() on AnimalCohortCreated.""" def test_creates_location_interval_for_each_animal(self, seeded_db): """Apply creates one location interval per animal.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", ] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) # Check animal_location_intervals has 2 rows count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0] assert count == 2 # Check each animal has a location interval for animal_id in animal_ids: row = seeded_db.execute( """SELECT location_id FROM animal_location_intervals WHERE animal_id = ?""", (animal_id,), ).fetchone() assert row is not None assert row[0] == location_id def test_location_interval_is_open_ended(self, seeded_db): """Location interval has end_utc=NULL.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( """SELECT end_utc FROM animal_location_intervals WHERE animal_id = ?""", (animal_ids[0],), ).fetchone() assert row[0] is None def test_location_interval_start_matches_event(self, seeded_db): """Location interval start_utc matches event ts_utc.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" ts_utc = 1704067200000 projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id, ts_utc=ts_utc) projection.apply(event) row = seeded_db.execute( """SELECT start_utc FROM animal_location_intervals WHERE animal_id = ?""", (animal_ids[0],), ).fetchone() assert row[0] == ts_utc def test_creates_sex_attr_interval(self, seeded_db): """Apply creates sex attribute interval.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id, sex="female") projection.apply(event) row = seeded_db.execute( """SELECT value FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'sex'""", (animal_ids[0],), ).fetchone() assert row is not None assert row[0] == "female" def test_creates_life_stage_attr_interval(self, seeded_db): """Apply creates life_stage attribute interval.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event( event_id, animal_ids, location_id=location_id, life_stage="juvenile" ) projection.apply(event) row = seeded_db.execute( """SELECT value FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'life_stage'""", (animal_ids[0],), ).fetchone() assert row is not None assert row[0] == "juvenile" def test_creates_repro_status_attr_interval_unknown(self, seeded_db): """Apply creates repro_status attribute interval (default unknown).""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( """SELECT value FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'repro_status'""", (animal_ids[0],), ).fetchone() assert row is not None assert row[0] == "unknown" def test_creates_status_attr_interval_alive(self, seeded_db): """Apply creates status attribute interval (alive).""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) row = seeded_db.execute( """SELECT value FROM animal_attr_intervals WHERE animal_id = ? AND attr = 'status'""", (animal_ids[0],), ).fetchone() assert row is not None assert row[0] == "alive" def test_creates_four_attr_intervals_per_animal(self, seeded_db): """Each animal gets 4 attribute intervals (sex, life_stage, repro_status, status).""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) count = seeded_db.execute( """SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?""", (animal_ids[0],), ).fetchone()[0] assert count == 4 def test_attr_intervals_are_open_ended(self, seeded_db): """All attribute intervals have end_utc=NULL.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) rows = seeded_db.execute( """SELECT end_utc FROM animal_attr_intervals WHERE animal_id = ?""", (animal_ids[0],), ).fetchall() assert len(rows) == 4 for row in rows: assert row[0] is None class TestIntervalProjectionRevert: """Tests for revert() on AnimalCohortCreated.""" def test_removes_location_intervals(self, seeded_db): """Revert deletes location intervals for event animals.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", ] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) # Verify rows exist count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0] assert count == 2 # Revert projection.revert(event) # Verify rows removed count = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[0] assert count == 0 def test_removes_attr_intervals(self, seeded_db): """Revert deletes all attribute intervals for event animals.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event = make_cohort_event(event_id, animal_ids, location_id=location_id) projection.apply(event) # Verify rows exist count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0] assert count == 4 # Revert projection.revert(event) # Verify rows removed count = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0] assert count == 0 def test_revert_only_affects_event_animals(self, seeded_db): """Revert only removes intervals for animals from specific event.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() location_id = row[0] # Create first event animal_ids_1 = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id_1 = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) event1 = make_cohort_event(event_id_1, animal_ids_1, location_id=location_id) projection.apply(event1) # Create second event animal_ids_2 = ["01ARZ3NDEKTSV4RRFFQ69G5A02"] event_id_2 = "01ARZ3NDEKTSV4RRFFQ69G5002" event2 = make_cohort_event( event_id_2, animal_ids_2, location_id=location_id, ts_utc=1704067300000 ) projection.apply(event2) # Verify both exist: 2 location intervals, 8 attr intervals count_loc = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[ 0 ] assert count_loc == 2 count_attr = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0] assert count_attr == 8 # Revert only event1 projection.revert(event1) # Event2's intervals should still exist count_loc = seeded_db.execute("SELECT COUNT(*) FROM animal_location_intervals").fetchone()[ 0 ] assert count_loc == 1 count_attr = seeded_db.execute("SELECT COUNT(*) FROM animal_attr_intervals").fetchone()[0] assert count_attr == 4 # Check correct animal remains row = seeded_db.execute("SELECT animal_id FROM animal_location_intervals").fetchone() assert row[0] == animal_ids_2[0] # ============================================================================= # AnimalMoved Tests # ============================================================================= def make_move_event( event_id: str, animal_ids: list[str], from_location_id: str, to_location_id: str, ts_utc: int = 1704067300000, ) -> Event: """Create a test AnimalMoved event.""" from animaltrack.events.types import ANIMAL_MOVED return Event( id=event_id, type=ANIMAL_MOVED, ts_utc=ts_utc, actor="test_user", entity_refs={ "from_location_id": from_location_id, "to_location_id": to_location_id, "animal_ids": animal_ids, }, payload={ "to_location_id": to_location_id, "resolved_ids": animal_ids, "notes": None, }, version=1, ) class TestIntervalProjectionMoveEventTypes: """Tests for get_event_types method including move.""" def test_handles_animal_moved(self, seeded_db): """Projection handles AnimalMoved event type.""" from animaltrack.events.types import ANIMAL_MOVED projection = IntervalProjection(seeded_db) assert ANIMAL_MOVED in projection.get_event_types() class TestIntervalProjectionApplyMove: """Tests for apply() on AnimalMoved.""" def test_closes_old_location_interval(self, seeded_db): """Apply move closes the old location interval with end_utc.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" cohort_ts = 1704067200000 projection = IntervalProjection(seeded_db) cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1, ts_utc=cohort_ts) projection.apply(cohort_event) # Verify open interval exists row = seeded_db.execute( """SELECT end_utc FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_ids[0], strip1), ).fetchone() assert row[0] is None # Move to strip2 move_ts = 1704067300000 move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ts_utc=move_ts, ) projection.apply(move_event) # Old interval should be closed with move timestamp row = seeded_db.execute( """SELECT end_utc FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_ids[0], strip1), ).fetchone() assert row[0] == move_ts def test_creates_new_location_interval(self, seeded_db): """Apply move creates a new open location interval at destination.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1) projection.apply(cohort_event) # Verify only 1 location interval exists initially count = seeded_db.execute( "SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?", (animal_ids[0],), ).fetchone()[0] assert count == 1 # Move to strip2 move_ts = 1704067300000 move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ts_utc=move_ts, ) projection.apply(move_event) # Now there should be 2 intervals count = seeded_db.execute( "SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?", (animal_ids[0],), ).fetchone()[0] assert count == 2 # New interval should be at strip2, open-ended row = seeded_db.execute( """SELECT start_utc, end_utc FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_ids[0], strip2), ).fetchone() assert row[0] == move_ts assert row[1] is None def test_move_multiple_animals_creates_intervals(self, seeded_db): """Apply move on multiple animals creates correct intervals.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = [ "01ARZ3NDEKTSV4RRFFQ69G5A01", "01ARZ3NDEKTSV4RRFFQ69G5A02", ] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) # Each animal should have 2 location intervals (1 closed, 1 open) for animal_id in animal_ids: count = seeded_db.execute( "SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?", (animal_id,), ).fetchone()[0] assert count == 2 class TestIntervalProjectionRevertMove: """Tests for revert() on AnimalMoved.""" def test_revert_removes_new_location_interval(self, seeded_db): """Revert move removes the new location interval.""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) # Verify 2 intervals exist count = seeded_db.execute( "SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?", (animal_ids[0],), ).fetchone()[0] assert count == 2 # Revert projection.revert(move_event) # Now back to 1 interval count = seeded_db.execute( "SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?", (animal_ids[0],), ).fetchone()[0] assert count == 1 # No interval at strip2 row = seeded_db.execute( """SELECT * FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_ids[0], strip2), ).fetchone() assert row is None def test_revert_reopens_old_location_interval(self, seeded_db): """Revert move reopens the original location interval (end_utc=NULL).""" strip1 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] strip2 = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] animal_ids = ["01ARZ3NDEKTSV4RRFFQ69G5A01"] event_id = "01ARZ3NDEKTSV4RRFFQ69G5001" projection = IntervalProjection(seeded_db) cohort_event = make_cohort_event(event_id, animal_ids, location_id=strip1) projection.apply(cohort_event) move_event = make_move_event( "01ARZ3NDEKTSV4RRFFQ69G5002", animal_ids, from_location_id=strip1, to_location_id=strip2, ) projection.apply(move_event) # Verify old interval is closed row = seeded_db.execute( """SELECT end_utc FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_ids[0], strip1), ).fetchone() assert row[0] is not None # Revert projection.revert(move_event) # Old interval should be open again row = seeded_db.execute( """SELECT end_utc FROM animal_location_intervals WHERE animal_id = ? AND location_id = ?""", (animal_ids[0], strip1), ).fetchone() assert row[0] is None