# ABOUTME: Tests for selection resolver - historical resolution and validation. # ABOUTME: Tests filter-based animal resolution at point-in-time. import time import pytest from animaltrack.events.payloads import AnimalCohortCreatedPayload, AnimalMovedPayload from animaltrack.events.store import EventStore from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.selection import SelectionResolverError, resolve_selection from animaltrack.selection.ast import FieldFilter, FilterAST from animaltrack.selection.hash import compute_roster_hash from animaltrack.selection.resolver import SelectionResult, resolve_filter from animaltrack.services.animal import AnimalService @pytest.fixture def event_store(seeded_db): """Create an EventStore for testing.""" return EventStore(seeded_db) @pytest.fixture def projection_registry(seeded_db): """Create a ProjectionRegistry with animal projections registered.""" registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) return registry @pytest.fixture def animal_service(seeded_db, event_store, projection_registry): """Create an AnimalService for testing.""" return AnimalService(seeded_db, event_store, projection_registry) @pytest.fixture def valid_location_id(seeded_db): """Get Strip 1 location ID from seeds.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() return row[0] @pytest.fixture def strip2_location_id(seeded_db): """Get Strip 2 location ID from seeds.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone() return row[0] def make_cohort_payload( location_id: str, count: int = 3, species: str = "duck", sex: str = "unknown", life_stage: str = "adult", ) -> AnimalCohortCreatedPayload: """Create a cohort payload for testing.""" return AnimalCohortCreatedPayload( species=species, count=count, life_stage=life_stage, sex=sex, location_id=location_id, origin="purchased", ) @pytest.fixture def animal_ids(seeded_db, animal_service, valid_location_id): """Create a cohort and return the animal IDs.""" payload = make_cohort_payload(valid_location_id, count=5) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") return event.entity_refs["animal_ids"] # ============================================================================ # Tests for validate_animal_ids (backward compatibility) # ============================================================================ class TestResolveSelectionValid: """Tests for resolve_selection (validates pre-resolved IDs).""" def test_returns_validated_ids_when_all_exist(self, seeded_db, animal_ids): """resolve_selection returns the IDs when all are valid and alive.""" result = resolve_selection(seeded_db, animal_ids) assert result == animal_ids def test_handles_single_animal(self, seeded_db, animal_ids): """resolve_selection works with a single animal.""" single_id = [animal_ids[0]] result = resolve_selection(seeded_db, single_id) assert result == single_id def test_handles_subset_of_animals(self, seeded_db, animal_ids): """resolve_selection works with a subset of animals.""" subset = animal_ids[:2] result = resolve_selection(seeded_db, subset) assert result == subset class TestResolveSelectionErrors: """Tests for resolve_selection error cases.""" def test_raises_for_nonexistent_animal(self, seeded_db, animal_ids): """Raises SelectionResolverError for animal not found.""" fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV" ids_with_fake = animal_ids[:1] + [fake_id] with pytest.raises(SelectionResolverError, match="not found"): resolve_selection(seeded_db, ids_with_fake) def test_raises_for_dead_animal(self, seeded_db, animal_ids): """Raises SelectionResolverError for animal with status != 'alive'.""" # Mark the first animal as dead dead_id = animal_ids[0] seeded_db.execute( "UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?", (dead_id,), ) with pytest.raises(SelectionResolverError, match="not alive"): resolve_selection(seeded_db, [dead_id]) def test_raises_for_mixed_valid_invalid(self, seeded_db, animal_ids): """Raises SelectionResolverError when mix of valid and invalid animals.""" # Mark one as dead dead_id = animal_ids[0] seeded_db.execute( "UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?", (dead_id,), ) # Mix: one dead, one alive mixed_ids = [dead_id, animal_ids[1]] with pytest.raises(SelectionResolverError): resolve_selection(seeded_db, mixed_ids) def test_raises_for_empty_list(self, seeded_db): """Raises SelectionResolverError for empty resolved_ids list.""" with pytest.raises(SelectionResolverError, match="empty"): resolve_selection(seeded_db, []) # ============================================================================ # Tests for resolve_filter (new historical resolution) # ============================================================================ class TestResolveFilterMatchAll: """Tests for resolve_filter with empty filter (match all).""" def test_empty_filter_returns_all_alive_animals( self, seeded_db, animal_service, valid_location_id ): """Empty filter returns all alive animals at ts_utc.""" # Create a cohort of 5 ducks payload = make_cohort_payload(valid_location_id, count=5, species="duck") ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") expected_ids = sorted(event.entity_refs["animal_ids"]) filter_ast = FilterAST([]) # empty = match all result = resolve_filter(seeded_db, filter_ast, ts_utc) assert isinstance(result, SelectionResult) assert result.animal_ids == expected_ids assert result.roster_hash == compute_roster_hash(expected_ids) def test_empty_filter_excludes_dead_animals(self, seeded_db, animal_service, valid_location_id): """Empty filter excludes animals with status != 'alive'.""" payload = make_cohort_payload(valid_location_id, count=3) ts_utc = int(time.time() * 1000) event = animal_service.create_cohort(payload, ts_utc, "test_user") ids = event.entity_refs["animal_ids"] # Mark one as dead via attr interval dead_id = ids[0] seeded_db.execute( """ UPDATE animal_attr_intervals SET value = 'dead' WHERE animal_id = ? AND attr = 'status' """, (dead_id,), ) result = resolve_filter(seeded_db, FilterAST([]), ts_utc + 1) assert dead_id not in result.animal_ids assert len(result.animal_ids) == 2 class TestResolveFilterSpecies: """Tests for species filter.""" def test_filters_by_species(self, seeded_db, animal_service, valid_location_id): """species:duck returns only ducks.""" ts_utc = int(time.time() * 1000) # Create ducks duck_payload = make_cohort_payload(valid_location_id, count=3, species="duck") duck_event = animal_service.create_cohort(duck_payload, ts_utc, "test_user") duck_ids = duck_event.entity_refs["animal_ids"] # Create geese goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose") animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user") filter_ast = FilterAST([FieldFilter("species", ["duck"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert sorted(result.animal_ids) == sorted(duck_ids) def test_species_or_values(self, seeded_db, animal_service, valid_location_id): """species:duck|goose returns ducks and geese.""" ts_utc = int(time.time() * 1000) # Create ducks duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck") duck_event = animal_service.create_cohort(duck_payload, ts_utc, "test_user") duck_ids = duck_event.entity_refs["animal_ids"] # Create geese goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose") goose_event = animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user") goose_ids = goose_event.entity_refs["animal_ids"] filter_ast = FilterAST([FieldFilter("species", ["duck", "goose"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) expected = sorted(duck_ids + goose_ids) assert result.animal_ids == expected class TestResolveFilterSex: """Tests for sex filter (historical).""" def test_filters_by_sex(self, seeded_db, animal_service, valid_location_id): """sex:female returns only females.""" ts_utc = int(time.time() * 1000) # Create females female_payload = make_cohort_payload( valid_location_id, count=3, species="duck", sex="female" ) female_event = animal_service.create_cohort(female_payload, ts_utc, "test_user") female_ids = female_event.entity_refs["animal_ids"] # Create males male_payload = make_cohort_payload(valid_location_id, count=2, species="duck", sex="male") animal_service.create_cohort(male_payload, ts_utc + 1, "test_user") filter_ast = FilterAST([FieldFilter("sex", ["female"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert sorted(result.animal_ids) == sorted(female_ids) class TestResolveFilterLifeStage: """Tests for life_stage filter (historical).""" def test_filters_by_life_stage(self, seeded_db, animal_service, valid_location_id): """life_stage:adult returns only adults.""" ts_utc = int(time.time() * 1000) # Create adults adult_payload = make_cohort_payload( valid_location_id, count=3, species="duck", life_stage="adult" ) adult_event = animal_service.create_cohort(adult_payload, ts_utc, "test_user") adult_ids = adult_event.entity_refs["animal_ids"] # Create juveniles juvenile_payload = make_cohort_payload( valid_location_id, count=2, species="duck", life_stage="juvenile" ) animal_service.create_cohort(juvenile_payload, ts_utc + 1, "test_user") filter_ast = FilterAST([FieldFilter("life_stage", ["adult"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert sorted(result.animal_ids) == sorted(adult_ids) class TestResolveFilterLocation: """Tests for location filter (historical).""" def test_filters_by_location_name( self, seeded_db, animal_service, valid_location_id, strip2_location_id ): """location:'Strip 1' returns only animals at Strip 1.""" ts_utc = int(time.time() * 1000) # Create at Strip 1 strip1_payload = make_cohort_payload(valid_location_id, count=3) strip1_event = animal_service.create_cohort(strip1_payload, ts_utc, "test_user") strip1_ids = strip1_event.entity_refs["animal_ids"] # Create at Strip 2 strip2_payload = make_cohort_payload(strip2_location_id, count=2) animal_service.create_cohort(strip2_payload, ts_utc + 1, "test_user") filter_ast = FilterAST([FieldFilter("location", ["Strip 1"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert sorted(result.animal_ids) == sorted(strip1_ids) class TestResolveFilterIdentified: """Tests for identified filter.""" def test_filters_by_identified(self, seeded_db, animal_service, valid_location_id): """identified:1 returns only identified animals.""" ts_utc = int(time.time() * 1000) # Create cohort (not identified by default) payload = make_cohort_payload(valid_location_id, count=3) event = animal_service.create_cohort(payload, ts_utc, "test_user") ids = event.entity_refs["animal_ids"] # Mark one as identified identified_id = ids[0] seeded_db.execute( "UPDATE animal_registry SET identified = 1 WHERE animal_id = ?", (identified_id,), ) filter_ast = FilterAST([FieldFilter("identified", ["1"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 1) assert result.animal_ids == [identified_id] def test_identified_zero_returns_unidentified( self, seeded_db, animal_service, valid_location_id ): """identified:0 returns only unidentified animals.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=3) event = animal_service.create_cohort(payload, ts_utc, "test_user") ids = event.entity_refs["animal_ids"] # Mark one as identified identified_id = ids[0] seeded_db.execute( "UPDATE animal_registry SET identified = 1 WHERE animal_id = ?", (identified_id,), ) filter_ast = FilterAST([FieldFilter("identified", ["0"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 1) expected = sorted([i for i in ids if i != identified_id]) assert result.animal_ids == expected class TestResolveFilterStatus: """Tests for status filter.""" def test_status_alive_is_default(self, seeded_db, animal_service, valid_location_id): """Empty filter returns only alive animals (status:alive is implicit).""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=3) event = animal_service.create_cohort(payload, ts_utc, "test_user") ids = event.entity_refs["animal_ids"] # Kill one animal by updating status in attr_intervals dead_id = ids[0] seeded_db.execute( """ UPDATE animal_attr_intervals SET end_utc = ? WHERE animal_id = ? AND attr = 'status' AND value = 'alive' """, (ts_utc + 1, dead_id), ) seeded_db.execute( """ INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc) VALUES (?, 'status', 'dead', ?, NULL) """, (dead_id, ts_utc + 1), ) # Default filter excludes dead result = resolve_filter(seeded_db, FilterAST([]), ts_utc + 2) assert dead_id not in result.animal_ids assert len(result.animal_ids) == 2 def test_status_dead_returns_dead_animals(self, seeded_db, animal_service, valid_location_id): """status:dead returns only dead animals.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=3) event = animal_service.create_cohort(payload, ts_utc, "test_user") ids = event.entity_refs["animal_ids"] # Kill one animal dead_id = ids[0] seeded_db.execute( """ UPDATE animal_attr_intervals SET end_utc = ? WHERE animal_id = ? AND attr = 'status' AND value = 'alive' """, (ts_utc + 1, dead_id), ) seeded_db.execute( """ INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc) VALUES (?, 'status', 'dead', ?, NULL) """, (dead_id, ts_utc + 1), ) # status:dead filter returns only dead filter_ast = FilterAST([FieldFilter("status", ["dead"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert result.animal_ids == [dead_id] def test_status_alive_or_dead_returns_both(self, seeded_db, animal_service, valid_location_id): """status:alive|dead returns both alive and dead animals.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=3) event = animal_service.create_cohort(payload, ts_utc, "test_user") ids = event.entity_refs["animal_ids"] # Kill one animal dead_id = ids[0] seeded_db.execute( """ UPDATE animal_attr_intervals SET end_utc = ? WHERE animal_id = ? AND attr = 'status' AND value = 'alive' """, (ts_utc + 1, dead_id), ) seeded_db.execute( """ INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc) VALUES (?, 'status', 'dead', ?, NULL) """, (dead_id, ts_utc + 1), ) # status:alive|dead filter returns all filter_ast = FilterAST([FieldFilter("status", ["alive", "dead"])]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert sorted(result.animal_ids) == sorted(ids) def test_status_negated_excludes(self, seeded_db, animal_service, valid_location_id): """-status:dead excludes dead animals (same as default).""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=3) event = animal_service.create_cohort(payload, ts_utc, "test_user") ids = event.entity_refs["animal_ids"] # Kill one animal dead_id = ids[0] seeded_db.execute( """ UPDATE animal_attr_intervals SET end_utc = ? WHERE animal_id = ? AND attr = 'status' AND value = 'alive' """, (ts_utc + 1, dead_id), ) seeded_db.execute( """ INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc) VALUES (?, 'status', 'dead', ?, NULL) """, (dead_id, ts_utc + 1), ) # -status:dead uses alive as base, then excludes dead (effectively same as default) filter_ast = FilterAST([FieldFilter("status", ["dead"], negated=True)]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert dead_id not in result.animal_ids def test_status_combined_with_species(self, seeded_db, animal_service, valid_location_id): """status:dead species:duck returns only dead ducks.""" ts_utc = int(time.time() * 1000) # Create ducks duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck") duck_event = animal_service.create_cohort(duck_payload, ts_utc, "test_user") duck_ids = duck_event.entity_refs["animal_ids"] # Create geese goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose") goose_event = animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user") goose_ids = goose_event.entity_refs["animal_ids"] # Kill one duck and one goose dead_duck_id = duck_ids[0] dead_goose_id = goose_ids[0] for dead_id in [dead_duck_id, dead_goose_id]: seeded_db.execute( """ UPDATE animal_attr_intervals SET end_utc = ? WHERE animal_id = ? AND attr = 'status' AND value = 'alive' """, (ts_utc + 2, dead_id), ) seeded_db.execute( """ INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc) VALUES (?, 'status', 'dead', ?, NULL) """, (dead_id, ts_utc + 2), ) # status:dead species:duck returns only dead duck filter_ast = FilterAST( [ FieldFilter("status", ["dead"]), FieldFilter("species", ["duck"]), ] ) result = resolve_filter(seeded_db, filter_ast, ts_utc + 3) assert result.animal_ids == [dead_duck_id] class TestResolveFilterNegation: """Tests for negated filters.""" def test_negated_species(self, seeded_db, animal_service, valid_location_id): """-species:duck excludes ducks.""" ts_utc = int(time.time() * 1000) # Create ducks duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck") animal_service.create_cohort(duck_payload, ts_utc, "test_user") # Create geese goose_payload = make_cohort_payload(valid_location_id, count=3, species="goose") goose_event = animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user") goose_ids = goose_event.entity_refs["animal_ids"] filter_ast = FilterAST([FieldFilter("species", ["duck"], negated=True)]) result = resolve_filter(seeded_db, filter_ast, ts_utc + 2) assert sorted(result.animal_ids) == sorted(goose_ids) class TestResolveFilterCombined: """Tests for combined filters (AND).""" def test_species_and_sex(self, seeded_db, animal_service, valid_location_id): """species:duck sex:female returns only female ducks.""" ts_utc = int(time.time() * 1000) # Female ducks female_duck_payload = make_cohort_payload( valid_location_id, count=3, species="duck", sex="female" ) female_duck_event = animal_service.create_cohort(female_duck_payload, ts_utc, "test_user") female_duck_ids = female_duck_event.entity_refs["animal_ids"] # Male ducks male_duck_payload = make_cohort_payload( valid_location_id, count=2, species="duck", sex="male" ) animal_service.create_cohort(male_duck_payload, ts_utc + 1, "test_user") # Female geese female_goose_payload = make_cohort_payload( valid_location_id, count=2, species="goose", sex="female" ) animal_service.create_cohort(female_goose_payload, ts_utc + 2, "test_user") filter_ast = FilterAST( [ FieldFilter("species", ["duck"]), FieldFilter("sex", ["female"]), ] ) result = resolve_filter(seeded_db, filter_ast, ts_utc + 3) assert sorted(result.animal_ids) == sorted(female_duck_ids) class TestResolveFilterHistorical: """Tests for historical resolution at different timestamps.""" def test_historical_location_before_move( self, seeded_db, animal_service, valid_location_id, strip2_location_id ): """Resolve at ts before move returns animals at original location.""" ts_create = int(time.time() * 1000) ts_before_move = ts_create + 1000 ts_move = ts_create + 2000 ts_after_move = ts_create + 3000 # Create animals at Strip 1 payload = make_cohort_payload(valid_location_id, count=5) event = animal_service.create_cohort(payload, ts_create, "test_user") animal_ids = event.entity_refs["animal_ids"] # Move some animals to Strip 2 moved_ids = animal_ids[:3] move_payload = AnimalMovedPayload( to_location_id=strip2_location_id, resolved_ids=moved_ids, ) animal_service.move_animals(move_payload, ts_move, "test_user") # Query at ts_before_move - all 5 should be at Strip 1 filter_strip1 = FilterAST([FieldFilter("location", ["Strip 1"])]) result_before = resolve_filter(seeded_db, filter_strip1, ts_before_move) assert len(result_before.animal_ids) == 5 # Query at ts_after_move - only 2 should be at Strip 1 result_after = resolve_filter(seeded_db, filter_strip1, ts_after_move) assert len(result_after.animal_ids) == 2 # Strip 2 should have 3 after move filter_strip2 = FilterAST([FieldFilter("location", ["Strip 2"])]) result_strip2 = resolve_filter(seeded_db, filter_strip2, ts_after_move) assert len(result_strip2.animal_ids) == 3 class TestResolveFilterRosterHash: """Tests for roster hash computation.""" def test_roster_hash_is_deterministic(self, seeded_db, animal_service, valid_location_id): """Same animal IDs produce same hash.""" ts_utc = int(time.time() * 1000) payload = make_cohort_payload(valid_location_id, count=3) animal_service.create_cohort(payload, ts_utc, "test_user") filter_ast = FilterAST([]) result1 = resolve_filter(seeded_db, filter_ast, ts_utc) result2 = resolve_filter(seeded_db, filter_ast, ts_utc) assert result1.roster_hash == result2.roster_hash def test_roster_hash_changes_with_different_animals( self, seeded_db, animal_service, valid_location_id ): """Different animal sets produce different hashes.""" ts_utc = int(time.time() * 1000) # Create ducks duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck") animal_service.create_cohort(duck_payload, ts_utc, "test_user") # Create geese goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose") animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user") duck_filter = FilterAST([FieldFilter("species", ["duck"])]) goose_filter = FilterAST([FieldFilter("species", ["goose"])]) duck_result = resolve_filter(seeded_db, duck_filter, ts_utc + 2) goose_result = resolve_filter(seeded_db, goose_filter, ts_utc + 2) assert duck_result.roster_hash != goose_result.roster_hash class TestComputeRosterHash: """Tests for compute_roster_hash function.""" def test_sorts_animal_ids(self): """Hash is computed from sorted IDs.""" ids1 = ["C", "A", "B"] ids2 = ["A", "B", "C"] assert compute_roster_hash(ids1) == compute_roster_hash(ids2) def test_includes_from_location(self): """from_location_id changes the hash.""" ids = ["A", "B"] hash_without = compute_roster_hash(ids) hash_with = compute_roster_hash(ids, from_location_id="LOC123") assert hash_without != hash_with def test_empty_list(self): """Empty list produces a hash.""" result = compute_roster_hash([]) assert isinstance(result, str) assert len(result) > 0