feat: add product collection event handling

Implements Step 4.3 from the plan:
- Add selection/resolver.py with basic resolve_selection for validating
  animal IDs exist and are alive
- Add ProductsProjection placeholder (stats tables added in Step 4.4)
- Add ProductService with collect_product() function
- Add PRODUCT_COLLECTED to EventAnimalsProjection for linking events
  to affected animals
- Full test coverage for all new components

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 09:08:13 +00:00
parent fa3c99b755
commit d53decdb66
8 changed files with 691 additions and 0 deletions

View File

@@ -6,6 +6,7 @@ from animaltrack.projections.base import Projection, ProjectionRegistry
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.exceptions import ProjectionError
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.projections.products import ProductsProjection
__all__ = [
"AnimalRegistryProjection",
@@ -14,4 +15,5 @@ __all__ = [
"Projection",
"ProjectionError",
"ProjectionRegistry",
"ProductsProjection",
]

View File

@@ -9,6 +9,7 @@ from animaltrack.events.types import (
ANIMAL_MOVED,
ANIMAL_TAG_ENDED,
ANIMAL_TAGGED,
PRODUCT_COLLECTED,
)
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
@@ -38,6 +39,7 @@ class EventAnimalsProjection(Projection):
ANIMAL_ATTRIBUTES_UPDATED,
ANIMAL_TAGGED,
ANIMAL_TAG_ENDED,
PRODUCT_COLLECTED,
]
def apply(self, event: Event) -> None:

View File

@@ -0,0 +1,43 @@
# ABOUTME: Projection for product collection tracking.
# ABOUTME: Placeholder for ProductCollected events until stats tables exist.
from typing import Any
from animaltrack.events.types import PRODUCT_COLLECTED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
class ProductsProjection(Projection):
"""Handles ProductCollected events.
This is a placeholder projection for Step 4.3.
Product statistics tables are created in Step 4.4.
The event_animals linkage is handled by EventAnimalsProjection.
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection.
"""
super().__init__(db)
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [PRODUCT_COLLECTED]
def apply(self, event: Event) -> None:
"""Apply ProductCollected event.
Currently a no-op. Stats projection added in Step 4.4.
"""
pass
def revert(self, event: Event) -> None:
"""Revert ProductCollected event.
Currently a no-op. Stats projection added in Step 4.4.
"""
pass

View File

@@ -0,0 +1,9 @@
# ABOUTME: Selection system for resolving animal sets from filters.
# ABOUTME: Provides resolver functions for animal selection contexts.
from animaltrack.selection.resolver import SelectionResolverError, resolve_selection
__all__ = [
"SelectionResolverError",
"resolve_selection",
]

View File

@@ -0,0 +1,50 @@
# ABOUTME: Basic animal selection resolver for Step 4.3.
# ABOUTME: Validates resolved_ids exist and are alive.
from typing import Any
class SelectionResolverError(Exception):
"""Base exception for selection resolver errors."""
def resolve_selection(
db: Any,
resolved_ids: list[str],
) -> list[str]:
"""Validate that animal IDs exist and are alive.
This is the basic resolver for Step 4.3. Full filter DSL
parsing and historical resolution are added in Phase 5.
Args:
db: Database connection.
resolved_ids: List of animal IDs to validate.
Returns:
The validated list of animal IDs (same as input if all valid).
Raises:
SelectionResolverError: If list is empty, any animal not found,
or any animal is not alive.
"""
if not resolved_ids:
raise SelectionResolverError("resolved_ids cannot be empty")
for animal_id in resolved_ids:
row = db.execute(
"""
SELECT animal_id, status FROM animal_registry
WHERE animal_id = ?
""",
(animal_id,),
).fetchone()
if row is None:
raise SelectionResolverError(f"Animal '{animal_id}' not found")
status = row[1]
if status != "alive":
raise SelectionResolverError(f"Animal '{animal_id}' is not alive (status: {status})")
return resolved_ids

View File

@@ -0,0 +1,130 @@
# ABOUTME: Service layer for product operations.
# ABOUTME: Coordinates event creation with projection updates for product collection.
from typing import Any
from animaltrack.db import transaction
from animaltrack.events.payloads import ProductCollectedPayload
from animaltrack.events.processor import process_event
from animaltrack.events.store import EventStore
from animaltrack.events.types import PRODUCT_COLLECTED
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
from animaltrack.repositories.locations import LocationRepository
from animaltrack.repositories.products import ProductRepository
from animaltrack.selection import SelectionResolverError, resolve_selection
class ProductServiceError(Exception):
"""Base exception for product service errors."""
class ValidationError(ProductServiceError):
"""Raised when validation fails."""
class ProductService:
"""Service for product-related operations.
Provides methods to collect products from animals.
All operations are atomic and update projections synchronously.
"""
def __init__(
self,
db: Any,
event_store: EventStore,
registry: ProjectionRegistry,
) -> None:
"""Initialize the service.
Args:
db: A fastlite database connection.
event_store: The event store for appending events.
registry: Registry of projections to update.
"""
self.db = db
self.event_store = event_store
self.registry = registry
self.product_repo = ProductRepository(db)
self.location_repo = LocationRepository(db)
def collect_product(
self,
payload: ProductCollectedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Record product collection from animals.
Creates a ProductCollected event and updates projections.
Args:
payload: Validated product collection payload.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the collection.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If product doesn't exist, is inactive,
is not collectable, location doesn't exist,
or animals are invalid.
"""
# Validate product exists and is active
product = self.product_repo.get(payload.product_code)
if product is None:
msg = f"Product '{payload.product_code}' not found"
raise ValidationError(msg)
if not product.active:
msg = f"Product '{payload.product_code}' is inactive"
raise ValidationError(msg)
if not product.collectable:
msg = f"Product '{payload.product_code}' is not collectable"
raise ValidationError(msg)
# Validate location exists and is active
location = self.location_repo.get(payload.location_id)
if location is None:
msg = f"Location '{payload.location_id}' not found"
raise ValidationError(msg)
if not location.active:
msg = f"Location '{payload.location_id}' is archived"
raise ValidationError(msg)
# Validate animals exist and are alive
try:
resolve_selection(self.db, payload.resolved_ids)
except SelectionResolverError as e:
raise ValidationError(str(e)) from e
# Build entity_refs
entity_refs = {
"product_code": payload.product_code,
"location_id": payload.location_id,
"quantity": payload.quantity,
"animal_ids": payload.resolved_ids,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=PRODUCT_COLLECTED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event

View File

@@ -0,0 +1,139 @@
# ABOUTME: Tests for basic selection resolver.
# ABOUTME: Tests animal ID validation for product collection.
import time
import pytest
from animaltrack.events.payloads import AnimalCohortCreatedPayload
from animaltrack.events.store import EventStore
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.selection import SelectionResolverError, resolve_selection
from animaltrack.services.animal import AnimalService
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with animal projections registered."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, event_store, projection_registry):
"""Create an AnimalService for testing."""
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def valid_location_id(seeded_db):
"""Get a valid location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
def make_cohort_payload(
location_id: str,
count: int = 3,
species: str = "duck",
) -> AnimalCohortCreatedPayload:
"""Create a cohort payload for testing."""
return AnimalCohortCreatedPayload(
species=species,
count=count,
life_stage="adult",
sex="unknown",
location_id=location_id,
origin="purchased",
)
@pytest.fixture
def animal_ids(seeded_db, animal_service, valid_location_id):
"""Create a cohort and return the animal IDs."""
payload = make_cohort_payload(valid_location_id, count=5)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
return event.entity_refs["animal_ids"]
class TestResolveSelectionValid:
"""Tests for resolve_selection with valid inputs."""
def test_returns_validated_ids_when_all_exist(self, seeded_db, animal_ids):
"""resolve_selection returns the IDs when all are valid and alive."""
result = resolve_selection(seeded_db, animal_ids)
assert result == animal_ids
def test_handles_single_animal(self, seeded_db, animal_ids):
"""resolve_selection works with a single animal."""
single_id = [animal_ids[0]]
result = resolve_selection(seeded_db, single_id)
assert result == single_id
def test_handles_subset_of_animals(self, seeded_db, animal_ids):
"""resolve_selection works with a subset of animals."""
subset = animal_ids[:2]
result = resolve_selection(seeded_db, subset)
assert result == subset
class TestResolveSelectionErrors:
"""Tests for resolve_selection error cases."""
def test_raises_for_nonexistent_animal(self, seeded_db, animal_ids):
"""Raises SelectionResolverError for animal not found."""
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
ids_with_fake = animal_ids[:1] + [fake_id]
with pytest.raises(SelectionResolverError, match="not found"):
resolve_selection(seeded_db, ids_with_fake)
def test_raises_for_dead_animal(self, seeded_db, animal_ids):
"""Raises SelectionResolverError for animal with status != 'alive'."""
# Mark the first animal as dead
dead_id = animal_ids[0]
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(dead_id,),
)
with pytest.raises(SelectionResolverError, match="not alive"):
resolve_selection(seeded_db, [dead_id])
def test_raises_for_mixed_valid_invalid(self, seeded_db, animal_ids):
"""Raises SelectionResolverError when mix of valid and invalid animals."""
# Mark one as dead
dead_id = animal_ids[0]
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(dead_id,),
)
# Mix: one dead, one alive
mixed_ids = [dead_id, animal_ids[1]]
with pytest.raises(SelectionResolverError):
resolve_selection(seeded_db, mixed_ids)
def test_raises_for_empty_list(self, seeded_db):
"""Raises SelectionResolverError for empty resolved_ids list."""
with pytest.raises(SelectionResolverError, match="empty"):
resolve_selection(seeded_db, [])

View File

@@ -0,0 +1,316 @@
# ABOUTME: Tests for ProductService operations.
# ABOUTME: Tests collect_product with event creation and projection updates.
import time
import pytest
from animaltrack.events.payloads import (
AnimalCohortCreatedPayload,
ProductCollectedPayload,
)
from animaltrack.events.store import EventStore
from animaltrack.events.types import PRODUCT_COLLECTED
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.projections.products import ProductsProjection
from animaltrack.services.animal import AnimalService
from animaltrack.services.products import ProductService, ValidationError
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with all needed projections."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(ProductsProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, event_store, projection_registry):
"""Create an AnimalService for testing."""
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def product_service(seeded_db, event_store, projection_registry):
"""Create a ProductService for testing."""
return ProductService(seeded_db, event_store, projection_registry)
@pytest.fixture
def location_id(seeded_db):
"""Get a valid location_id from seeded data."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
@pytest.fixture
def animal_ids(seeded_db, animal_service, location_id):
"""Create a cohort and return the animal IDs."""
payload = AnimalCohortCreatedPayload(
species="duck",
count=5,
life_stage="adult",
sex="unknown",
location_id=location_id,
origin="purchased",
)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
return event.entity_refs["animal_ids"]
def make_collect_payload(
location_id: str,
resolved_ids: list[str],
product_code: str = "egg.duck",
quantity: int = 12,
notes: str | None = None,
) -> ProductCollectedPayload:
"""Create a product collection payload for testing."""
return ProductCollectedPayload(
location_id=location_id,
product_code=product_code,
quantity=quantity,
resolved_ids=resolved_ids,
notes=notes,
)
# =============================================================================
# collect_product Tests
# =============================================================================
class TestProductServiceCollect:
"""Tests for collect_product()."""
def test_creates_product_collected_event(
self, seeded_db, product_service, location_id, animal_ids
):
"""collect_product creates a ProductCollected event."""
payload = make_collect_payload(location_id, animal_ids)
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
assert event.type == PRODUCT_COLLECTED
assert event.actor == "test_user"
assert event.ts_utc == ts_utc
def test_event_has_product_code_in_entity_refs(
self, seeded_db, product_service, location_id, animal_ids
):
"""Event entity_refs contains product_code."""
payload = make_collect_payload(location_id, animal_ids, product_code="egg.duck")
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
assert event.entity_refs["product_code"] == "egg.duck"
def test_event_has_location_id_in_entity_refs(
self, seeded_db, product_service, location_id, animal_ids
):
"""Event entity_refs contains location_id."""
payload = make_collect_payload(location_id, animal_ids)
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
assert event.entity_refs["location_id"] == location_id
def test_event_has_quantity_in_entity_refs(
self, seeded_db, product_service, location_id, animal_ids
):
"""Event entity_refs contains quantity."""
payload = make_collect_payload(location_id, animal_ids, quantity=24)
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
assert event.entity_refs["quantity"] == 24
def test_event_has_animal_ids_in_entity_refs(
self, seeded_db, product_service, location_id, animal_ids
):
"""Event entity_refs contains animal_ids."""
payload = make_collect_payload(location_id, animal_ids[:3])
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
assert event.entity_refs["animal_ids"] == animal_ids[:3]
def test_event_stored_in_events_table(
self, seeded_db, product_service, location_id, animal_ids
):
"""Event is stored in the events table."""
payload = make_collect_payload(location_id, animal_ids)
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
row = seeded_db.execute(
"SELECT id, type FROM events WHERE id = ?",
(event.id,),
).fetchone()
assert row is not None
assert row[0] == event.id
assert row[1] == PRODUCT_COLLECTED
class TestProductServiceCollectEventAnimals:
"""Tests for event_animals integration."""
def test_event_animals_populated(self, seeded_db, product_service, location_id, animal_ids):
"""event_animals links are created for each resolved animal."""
payload = make_collect_payload(location_id, animal_ids)
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
rows = seeded_db.execute(
"SELECT animal_id FROM event_animals WHERE event_id = ? ORDER BY animal_id",
(event.id,),
).fetchall()
assert len(rows) == len(animal_ids)
def test_event_animals_count_matches_resolved_ids(
self, seeded_db, product_service, location_id, animal_ids
):
"""event_animals count matches the number of resolved_ids."""
subset = animal_ids[:3]
payload = make_collect_payload(location_id, subset)
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
count = seeded_db.execute(
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
(event.id,),
).fetchone()[0]
assert count == 3
def test_event_animals_has_correct_event_id(
self, seeded_db, product_service, location_id, animal_ids
):
"""Each event_animals row has the correct event_id."""
payload = make_collect_payload(location_id, animal_ids[:1])
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
# Query by event_id to get rows created by this specific event
row = seeded_db.execute(
"SELECT event_id, animal_id FROM event_animals WHERE event_id = ?",
(event.id,),
).fetchone()
assert row[0] == event.id
assert row[1] == animal_ids[0]
def test_event_animals_has_correct_ts_utc(
self, seeded_db, product_service, location_id, animal_ids
):
"""Each event_animals row has the correct ts_utc."""
payload = make_collect_payload(location_id, animal_ids[:1])
ts_utc = int(time.time() * 1000)
event = product_service.collect_product(payload, ts_utc, "test_user")
row = seeded_db.execute(
"SELECT ts_utc FROM event_animals WHERE event_id = ?",
(event.id,),
).fetchone()
assert row[0] == ts_utc
class TestProductServiceCollectValidation:
"""Tests for collect_product() validation."""
def test_rejects_nonexistent_product(self, seeded_db, product_service, location_id, animal_ids):
"""Raises ValidationError for non-existent product_code."""
payload = make_collect_payload(location_id, animal_ids, product_code="nonexistent.product")
with pytest.raises(ValidationError, match="not found"):
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
def test_rejects_inactive_product(self, seeded_db, product_service, location_id, animal_ids):
"""Raises ValidationError for inactive product."""
seeded_db.execute("UPDATE products SET active = 0 WHERE code = 'egg.duck'")
payload = make_collect_payload(location_id, animal_ids, product_code="egg.duck")
with pytest.raises(ValidationError, match="inactive"):
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
def test_rejects_non_collectable_product(
self, seeded_db, product_service, location_id, animal_ids
):
"""Raises ValidationError for non-collectable product (e.g., meat)."""
# Set meat as non-collectable for this test
seeded_db.execute("UPDATE products SET collectable = 0 WHERE code = 'meat'")
payload = make_collect_payload(location_id, animal_ids, product_code="meat")
with pytest.raises(ValidationError, match="not collectable"):
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
def test_rejects_nonexistent_location(self, seeded_db, product_service, animal_ids):
"""Raises ValidationError for non-existent location_id."""
fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
payload = make_collect_payload(fake_location_id, animal_ids)
with pytest.raises(ValidationError, match="not found"):
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
def test_rejects_archived_location(self, seeded_db, product_service, location_id, animal_ids):
"""Raises ValidationError for archived location."""
seeded_db.execute(
"UPDATE locations SET active = 0 WHERE id = ?",
(location_id,),
)
payload = make_collect_payload(location_id, animal_ids)
with pytest.raises(ValidationError, match="archived"):
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
def test_rejects_nonexistent_animal(self, seeded_db, product_service, location_id, animal_ids):
"""Raises ValidationError for non-existent animal in resolved_ids."""
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
payload = make_collect_payload(location_id, [fake_animal_id])
with pytest.raises(ValidationError, match="not found"):
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
def test_rejects_dead_animal(self, seeded_db, product_service, location_id, animal_ids):
"""Raises ValidationError for dead animal in resolved_ids."""
# Mark the first animal as dead
dead_id = animal_ids[0]
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(dead_id,),
)
payload = make_collect_payload(location_id, [dead_id])
with pytest.raises(ValidationError, match="not alive"):
product_service.collect_product(payload, int(time.time() * 1000), "test_user")