feat: add historical state resolver with point-in-time queries

Implement resolve_filter() to resolve animals matching FilterAST at ts_utc.
Uses interval tables for historical location, sex, life_stage, and tags.
Includes roster hash computation using xxhash64.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 15:31:07 +00:00
parent 6e9fd17327
commit c80d9f7fda
5 changed files with 590 additions and 18 deletions

10
PLAN.md
View File

@@ -198,11 +198,11 @@ Check off items as completed. Each phase builds on the previous.
- [x] **Commit checkpoint**
### Step 5.2: Historical State Resolver
- [ ] Update `selection/resolver.py` for point-in-time resolution
- [ ] Use interval tables for historical state (spec §7 query pattern)
- [ ] Create `selection/hash.py` with roster hash (xxhash64)
- [ ] Write tests: resolver returns correct animals before/after events
- [ ] **Commit checkpoint**
- [x] Update `selection/resolver.py` for point-in-time resolution
- [x] Use interval tables for historical state (spec §7 query pattern)
- [x] Create `selection/hash.py` with roster hash (xxhash64)
- [x] Write tests: resolver returns correct animals before/after events
- [x] **Commit checkpoint**
### Step 5.3: Optimistic Locking
- [ ] Create `selection/validation.py` for selection validation

View File

@@ -1,15 +1,24 @@
# ABOUTME: Selection system for resolving animal sets from filters.
# ABOUTME: Provides parser, AST, and resolver for animal selection contexts.
# ABOUTME: Provides parser, AST, resolver, and hash for animal selection contexts.
from animaltrack.selection.ast import FieldFilter, FilterAST
from animaltrack.selection.hash import compute_roster_hash
from animaltrack.selection.parser import ParseError, parse_filter
from animaltrack.selection.resolver import SelectionResolverError, resolve_selection
from animaltrack.selection.resolver import (
SelectionResolverError,
SelectionResult,
resolve_filter,
resolve_selection,
)
__all__ = [
"FieldFilter",
"FilterAST",
"ParseError",
"SelectionResolverError",
"SelectionResult",
"compute_roster_hash",
"parse_filter",
"resolve_filter",
"resolve_selection",
]

View File

@@ -0,0 +1,25 @@
# ABOUTME: Roster hash computation using xxhash64.
# ABOUTME: Used for optimistic locking in selection context.
import xxhash
def compute_roster_hash(
animal_ids: list[str],
from_location_id: str | None = None,
) -> str:
"""Compute xxhash64 hash of sorted animal IDs.
Args:
animal_ids: List of animal IDs to hash.
from_location_id: Optional location ID to include in hash
(used for move operations).
Returns:
Hex string of xxhash64 hash.
"""
sorted_ids = sorted(animal_ids)
hash_input = "|".join(sorted_ids)
if from_location_id:
hash_input = f"{from_location_id}|{hash_input}"
return xxhash.xxh64(hash_input.encode()).hexdigest()

View File

@@ -1,21 +1,33 @@
# ABOUTME: Basic animal selection resolver for Step 4.3.
# ABOUTME: Validates resolved_ids exist and are alive.
# ABOUTME: Selection resolver for animal filtering and historical resolution.
# ABOUTME: Resolves FilterAST at point-in-time using interval tables.
from dataclasses import dataclass
from typing import Any
from animaltrack.selection.ast import FieldFilter, FilterAST
from animaltrack.selection.hash import compute_roster_hash
class SelectionResolverError(Exception):
"""Base exception for selection resolver errors."""
@dataclass
class SelectionResult:
"""Result of resolving a filter at a point in time."""
animal_ids: list[str] # sorted
roster_hash: str
def resolve_selection(
db: Any,
resolved_ids: list[str],
) -> list[str]:
"""Validate that animal IDs exist and are alive.
This is the basic resolver for Step 4.3. Full filter DSL
parsing and historical resolution are added in Phase 5.
This function validates pre-resolved IDs (backward compatibility).
For filter-based resolution, use resolve_filter().
Args:
db: Database connection.
@@ -48,3 +60,135 @@ def resolve_selection(
raise SelectionResolverError(f"Animal '{animal_id}' is not alive (status: {status})")
return resolved_ids
def resolve_filter(
db: Any,
filter_ast: FilterAST,
ts_utc: int,
) -> SelectionResult:
"""Resolve animals matching filter at historical timestamp.
Uses interval tables to determine animal state at ts_utc.
Returns sorted animal IDs and roster hash.
Args:
db: Database connection.
filter_ast: Parsed filter AST.
ts_utc: Timestamp in ms since Unix epoch.
Returns:
SelectionResult with sorted animal_ids and roster_hash.
"""
# Build base query - all animals with location interval at ts_utc
# and status='alive' at ts_utc
base_query = """
SELECT DISTINCT ali.animal_id
FROM animal_location_intervals ali
WHERE ali.start_utc <= ?
AND (ali.end_utc IS NULL OR ali.end_utc > ?)
AND EXISTS (
SELECT 1 FROM animal_attr_intervals aai
WHERE aai.animal_id = ali.animal_id
AND aai.attr = 'status'
AND aai.value = 'alive'
AND aai.start_utc <= ?
AND (aai.end_utc IS NULL OR aai.end_utc > ?)
)
"""
params: list[Any] = [ts_utc, ts_utc, ts_utc, ts_utc]
# Apply each filter
for field_filter in filter_ast.filters:
clause, filter_params = _build_filter_clause(field_filter, ts_utc)
if field_filter.negated:
base_query += f"\n AND ali.animal_id NOT IN ({clause})"
else:
base_query += f"\n AND ali.animal_id IN ({clause})"
params.extend(filter_params)
base_query += "\n ORDER BY ali.animal_id"
rows = db.execute(base_query, params).fetchall()
animal_ids = [row[0] for row in rows]
roster_hash = compute_roster_hash(animal_ids)
return SelectionResult(animal_ids=animal_ids, roster_hash=roster_hash)
def _build_filter_clause(field_filter: FieldFilter, ts_utc: int) -> tuple[str, list[Any]]:
"""Build SQL subquery for a single field filter.
Args:
field_filter: The field filter to build clause for.
ts_utc: Timestamp for historical queries.
Returns:
Tuple of (SQL subquery string, list of parameters).
"""
field = field_filter.field
values = list(field_filter.values)
if field == "species":
# Species from animal_registry (current state)
placeholders = ",".join("?" * len(values))
query = f"""
SELECT animal_id FROM animal_registry
WHERE species_code IN ({placeholders})
"""
return query, values
elif field == "identified":
# Identified from animal_registry (current state)
# Values are "0" or "1" strings
placeholders = ",".join("?" * len(values))
int_values = [int(v) for v in values]
query = f"""
SELECT animal_id FROM animal_registry
WHERE identified IN ({placeholders})
"""
return query, int_values
elif field == "location":
# Location by name - join with locations table, historical
placeholders = ",".join("?" * len(values))
query = f"""
SELECT ali.animal_id
FROM animal_location_intervals ali
JOIN locations l ON ali.location_id = l.id
WHERE l.name IN ({placeholders})
AND ali.start_utc <= ?
AND (ali.end_utc IS NULL OR ali.end_utc > ?)
"""
params = values + [ts_utc, ts_utc]
return query, params
elif field in ("sex", "life_stage"):
# Historical attribute from animal_attr_intervals
placeholders = ",".join("?" * len(values))
query = f"""
SELECT animal_id FROM animal_attr_intervals
WHERE attr = ?
AND value IN ({placeholders})
AND start_utc <= ?
AND (end_utc IS NULL OR end_utc > ?)
"""
params = [field] + values + [ts_utc, ts_utc]
return query, params
elif field == "tag":
# Historical tag from animal_tag_intervals
placeholders = ",".join("?" * len(values))
query = f"""
SELECT animal_id FROM animal_tag_intervals
WHERE tag IN ({placeholders})
AND start_utc <= ?
AND (end_utc IS NULL OR end_utc > ?)
"""
params = values + [ts_utc, ts_utc]
return query, params
else:
# Unknown field - should not happen if parser validates
msg = f"Unknown filter field: {field}"
raise SelectionResolverError(msg)

View File

@@ -1,17 +1,20 @@
# ABOUTME: Tests for basic selection resolver.
# ABOUTME: Tests animal ID validation for product collection.
# ABOUTME: Tests for selection resolver - historical resolution and validation.
# ABOUTME: Tests filter-based animal resolution at point-in-time.
import time
import pytest
from animaltrack.events.payloads import AnimalCohortCreatedPayload
from animaltrack.events.payloads import AnimalCohortCreatedPayload, AnimalMovedPayload
from animaltrack.events.store import EventStore
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.selection import SelectionResolverError, resolve_selection
from animaltrack.selection.ast import FieldFilter, FilterAST
from animaltrack.selection.hash import compute_roster_hash
from animaltrack.selection.resolver import SelectionResult, resolve_filter
from animaltrack.services.animal import AnimalService
@@ -39,22 +42,31 @@ def animal_service(seeded_db, event_store, projection_registry):
@pytest.fixture
def valid_location_id(seeded_db):
"""Get a valid location ID from seeds."""
"""Get Strip 1 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
@pytest.fixture
def strip2_location_id(seeded_db):
"""Get Strip 2 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
return row[0]
def make_cohort_payload(
location_id: str,
count: int = 3,
species: str = "duck",
sex: str = "unknown",
life_stage: str = "adult",
) -> AnimalCohortCreatedPayload:
"""Create a cohort payload for testing."""
return AnimalCohortCreatedPayload(
species=species,
count=count,
life_stage="adult",
sex="unknown",
life_stage=life_stage,
sex=sex,
location_id=location_id,
origin="purchased",
)
@@ -69,8 +81,13 @@ def animal_ids(seeded_db, animal_service, valid_location_id):
return event.entity_refs["animal_ids"]
# ============================================================================
# Tests for validate_animal_ids (backward compatibility)
# ============================================================================
class TestResolveSelectionValid:
"""Tests for resolve_selection with valid inputs."""
"""Tests for resolve_selection (validates pre-resolved IDs)."""
def test_returns_validated_ids_when_all_exist(self, seeded_db, animal_ids):
"""resolve_selection returns the IDs when all are valid and alive."""
@@ -137,3 +154,380 @@ class TestResolveSelectionErrors:
"""Raises SelectionResolverError for empty resolved_ids list."""
with pytest.raises(SelectionResolverError, match="empty"):
resolve_selection(seeded_db, [])
# ============================================================================
# Tests for resolve_filter (new historical resolution)
# ============================================================================
class TestResolveFilterMatchAll:
"""Tests for resolve_filter with empty filter (match all)."""
def test_empty_filter_returns_all_alive_animals(
self, seeded_db, animal_service, valid_location_id
):
"""Empty filter returns all alive animals at ts_utc."""
# Create a cohort of 5 ducks
payload = make_cohort_payload(valid_location_id, count=5, species="duck")
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
expected_ids = sorted(event.entity_refs["animal_ids"])
filter_ast = FilterAST([]) # empty = match all
result = resolve_filter(seeded_db, filter_ast, ts_utc)
assert isinstance(result, SelectionResult)
assert result.animal_ids == expected_ids
assert result.roster_hash == compute_roster_hash(expected_ids)
def test_empty_filter_excludes_dead_animals(self, seeded_db, animal_service, valid_location_id):
"""Empty filter excludes animals with status != 'alive'."""
payload = make_cohort_payload(valid_location_id, count=3)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Mark one as dead via attr interval
dead_id = ids[0]
seeded_db.execute(
"""
UPDATE animal_attr_intervals
SET value = 'dead'
WHERE animal_id = ? AND attr = 'status'
""",
(dead_id,),
)
result = resolve_filter(seeded_db, FilterAST([]), ts_utc + 1)
assert dead_id not in result.animal_ids
assert len(result.animal_ids) == 2
class TestResolveFilterSpecies:
"""Tests for species filter."""
def test_filters_by_species(self, seeded_db, animal_service, valid_location_id):
"""species:duck returns only ducks."""
ts_utc = int(time.time() * 1000)
# Create ducks
duck_payload = make_cohort_payload(valid_location_id, count=3, species="duck")
duck_event = animal_service.create_cohort(duck_payload, ts_utc, "test_user")
duck_ids = duck_event.entity_refs["animal_ids"]
# Create geese
goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose")
animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
filter_ast = FilterAST([FieldFilter("species", ["duck"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert sorted(result.animal_ids) == sorted(duck_ids)
def test_species_or_values(self, seeded_db, animal_service, valid_location_id):
"""species:duck|goose returns ducks and geese."""
ts_utc = int(time.time() * 1000)
# Create ducks
duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck")
duck_event = animal_service.create_cohort(duck_payload, ts_utc, "test_user")
duck_ids = duck_event.entity_refs["animal_ids"]
# Create geese
goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose")
goose_event = animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
goose_ids = goose_event.entity_refs["animal_ids"]
filter_ast = FilterAST([FieldFilter("species", ["duck", "goose"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
expected = sorted(duck_ids + goose_ids)
assert result.animal_ids == expected
class TestResolveFilterSex:
"""Tests for sex filter (historical)."""
def test_filters_by_sex(self, seeded_db, animal_service, valid_location_id):
"""sex:female returns only females."""
ts_utc = int(time.time() * 1000)
# Create females
female_payload = make_cohort_payload(
valid_location_id, count=3, species="duck", sex="female"
)
female_event = animal_service.create_cohort(female_payload, ts_utc, "test_user")
female_ids = female_event.entity_refs["animal_ids"]
# Create males
male_payload = make_cohort_payload(valid_location_id, count=2, species="duck", sex="male")
animal_service.create_cohort(male_payload, ts_utc + 1, "test_user")
filter_ast = FilterAST([FieldFilter("sex", ["female"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert sorted(result.animal_ids) == sorted(female_ids)
class TestResolveFilterLifeStage:
"""Tests for life_stage filter (historical)."""
def test_filters_by_life_stage(self, seeded_db, animal_service, valid_location_id):
"""life_stage:adult returns only adults."""
ts_utc = int(time.time() * 1000)
# Create adults
adult_payload = make_cohort_payload(
valid_location_id, count=3, species="duck", life_stage="adult"
)
adult_event = animal_service.create_cohort(adult_payload, ts_utc, "test_user")
adult_ids = adult_event.entity_refs["animal_ids"]
# Create juveniles
juvenile_payload = make_cohort_payload(
valid_location_id, count=2, species="duck", life_stage="juvenile"
)
animal_service.create_cohort(juvenile_payload, ts_utc + 1, "test_user")
filter_ast = FilterAST([FieldFilter("life_stage", ["adult"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert sorted(result.animal_ids) == sorted(adult_ids)
class TestResolveFilterLocation:
"""Tests for location filter (historical)."""
def test_filters_by_location_name(
self, seeded_db, animal_service, valid_location_id, strip2_location_id
):
"""location:'Strip 1' returns only animals at Strip 1."""
ts_utc = int(time.time() * 1000)
# Create at Strip 1
strip1_payload = make_cohort_payload(valid_location_id, count=3)
strip1_event = animal_service.create_cohort(strip1_payload, ts_utc, "test_user")
strip1_ids = strip1_event.entity_refs["animal_ids"]
# Create at Strip 2
strip2_payload = make_cohort_payload(strip2_location_id, count=2)
animal_service.create_cohort(strip2_payload, ts_utc + 1, "test_user")
filter_ast = FilterAST([FieldFilter("location", ["Strip 1"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert sorted(result.animal_ids) == sorted(strip1_ids)
class TestResolveFilterIdentified:
"""Tests for identified filter."""
def test_filters_by_identified(self, seeded_db, animal_service, valid_location_id):
"""identified:1 returns only identified animals."""
ts_utc = int(time.time() * 1000)
# Create cohort (not identified by default)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Mark one as identified
identified_id = ids[0]
seeded_db.execute(
"UPDATE animal_registry SET identified = 1 WHERE animal_id = ?",
(identified_id,),
)
filter_ast = FilterAST([FieldFilter("identified", ["1"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 1)
assert result.animal_ids == [identified_id]
def test_identified_zero_returns_unidentified(
self, seeded_db, animal_service, valid_location_id
):
"""identified:0 returns only unidentified animals."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Mark one as identified
identified_id = ids[0]
seeded_db.execute(
"UPDATE animal_registry SET identified = 1 WHERE animal_id = ?",
(identified_id,),
)
filter_ast = FilterAST([FieldFilter("identified", ["0"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 1)
expected = sorted([i for i in ids if i != identified_id])
assert result.animal_ids == expected
class TestResolveFilterNegation:
"""Tests for negated filters."""
def test_negated_species(self, seeded_db, animal_service, valid_location_id):
"""-species:duck excludes ducks."""
ts_utc = int(time.time() * 1000)
# Create ducks
duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck")
animal_service.create_cohort(duck_payload, ts_utc, "test_user")
# Create geese
goose_payload = make_cohort_payload(valid_location_id, count=3, species="goose")
goose_event = animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
goose_ids = goose_event.entity_refs["animal_ids"]
filter_ast = FilterAST([FieldFilter("species", ["duck"], negated=True)])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert sorted(result.animal_ids) == sorted(goose_ids)
class TestResolveFilterCombined:
"""Tests for combined filters (AND)."""
def test_species_and_sex(self, seeded_db, animal_service, valid_location_id):
"""species:duck sex:female returns only female ducks."""
ts_utc = int(time.time() * 1000)
# Female ducks
female_duck_payload = make_cohort_payload(
valid_location_id, count=3, species="duck", sex="female"
)
female_duck_event = animal_service.create_cohort(female_duck_payload, ts_utc, "test_user")
female_duck_ids = female_duck_event.entity_refs["animal_ids"]
# Male ducks
male_duck_payload = make_cohort_payload(
valid_location_id, count=2, species="duck", sex="male"
)
animal_service.create_cohort(male_duck_payload, ts_utc + 1, "test_user")
# Female geese
female_goose_payload = make_cohort_payload(
valid_location_id, count=2, species="goose", sex="female"
)
animal_service.create_cohort(female_goose_payload, ts_utc + 2, "test_user")
filter_ast = FilterAST(
[
FieldFilter("species", ["duck"]),
FieldFilter("sex", ["female"]),
]
)
result = resolve_filter(seeded_db, filter_ast, ts_utc + 3)
assert sorted(result.animal_ids) == sorted(female_duck_ids)
class TestResolveFilterHistorical:
"""Tests for historical resolution at different timestamps."""
def test_historical_location_before_move(
self, seeded_db, animal_service, valid_location_id, strip2_location_id
):
"""Resolve at ts before move returns animals at original location."""
ts_create = int(time.time() * 1000)
ts_before_move = ts_create + 1000
ts_move = ts_create + 2000
ts_after_move = ts_create + 3000
# Create animals at Strip 1
payload = make_cohort_payload(valid_location_id, count=5)
event = animal_service.create_cohort(payload, ts_create, "test_user")
animal_ids = event.entity_refs["animal_ids"]
# Move some animals to Strip 2
moved_ids = animal_ids[:3]
move_payload = AnimalMovedPayload(
to_location_id=strip2_location_id,
resolved_ids=moved_ids,
)
animal_service.move_animals(move_payload, ts_move, "test_user")
# Query at ts_before_move - all 5 should be at Strip 1
filter_strip1 = FilterAST([FieldFilter("location", ["Strip 1"])])
result_before = resolve_filter(seeded_db, filter_strip1, ts_before_move)
assert len(result_before.animal_ids) == 5
# Query at ts_after_move - only 2 should be at Strip 1
result_after = resolve_filter(seeded_db, filter_strip1, ts_after_move)
assert len(result_after.animal_ids) == 2
# Strip 2 should have 3 after move
filter_strip2 = FilterAST([FieldFilter("location", ["Strip 2"])])
result_strip2 = resolve_filter(seeded_db, filter_strip2, ts_after_move)
assert len(result_strip2.animal_ids) == 3
class TestResolveFilterRosterHash:
"""Tests for roster hash computation."""
def test_roster_hash_is_deterministic(self, seeded_db, animal_service, valid_location_id):
"""Same animal IDs produce same hash."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
animal_service.create_cohort(payload, ts_utc, "test_user")
filter_ast = FilterAST([])
result1 = resolve_filter(seeded_db, filter_ast, ts_utc)
result2 = resolve_filter(seeded_db, filter_ast, ts_utc)
assert result1.roster_hash == result2.roster_hash
def test_roster_hash_changes_with_different_animals(
self, seeded_db, animal_service, valid_location_id
):
"""Different animal sets produce different hashes."""
ts_utc = int(time.time() * 1000)
# Create ducks
duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck")
animal_service.create_cohort(duck_payload, ts_utc, "test_user")
# Create geese
goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose")
animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
duck_filter = FilterAST([FieldFilter("species", ["duck"])])
goose_filter = FilterAST([FieldFilter("species", ["goose"])])
duck_result = resolve_filter(seeded_db, duck_filter, ts_utc + 2)
goose_result = resolve_filter(seeded_db, goose_filter, ts_utc + 2)
assert duck_result.roster_hash != goose_result.roster_hash
class TestComputeRosterHash:
"""Tests for compute_roster_hash function."""
def test_sorts_animal_ids(self):
"""Hash is computed from sorted IDs."""
ids1 = ["C", "A", "B"]
ids2 = ["A", "B", "C"]
assert compute_roster_hash(ids1) == compute_roster_hash(ids2)
def test_includes_from_location(self):
"""from_location_id changes the hash."""
ids = ["A", "B"]
hash_without = compute_roster_hash(ids)
hash_with = compute_roster_hash(ids, from_location_id="LOC123")
assert hash_without != hash_with
def test_empty_list(self):
"""Empty list produces a hash."""
result = compute_roster_hash([])
assert isinstance(result, str)
assert len(result) > 0