Implements Step 4.3 from the plan: - Add selection/resolver.py with basic resolve_selection for validating animal IDs exist and are alive - Add ProductsProjection placeholder (stats tables added in Step 4.4) - Add ProductService with collect_product() function - Add PRODUCT_COLLECTED to EventAnimalsProjection for linking events to affected animals - Full test coverage for all new components 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
317 lines
12 KiB
Python
317 lines
12 KiB
Python
# ABOUTME: Tests for ProductService operations.
|
|
# ABOUTME: Tests collect_product with event creation and projection updates.
|
|
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from animaltrack.events.payloads import (
|
|
AnimalCohortCreatedPayload,
|
|
ProductCollectedPayload,
|
|
)
|
|
from animaltrack.events.store import EventStore
|
|
from animaltrack.events.types import PRODUCT_COLLECTED
|
|
from animaltrack.projections import ProjectionRegistry
|
|
from animaltrack.projections.animal_registry import AnimalRegistryProjection
|
|
from animaltrack.projections.event_animals import EventAnimalsProjection
|
|
from animaltrack.projections.intervals import IntervalProjection
|
|
from animaltrack.projections.products import ProductsProjection
|
|
from animaltrack.services.animal import AnimalService
|
|
from animaltrack.services.products import ProductService, ValidationError
|
|
|
|
|
|
@pytest.fixture
|
|
def event_store(seeded_db):
|
|
"""Create an EventStore for testing."""
|
|
return EventStore(seeded_db)
|
|
|
|
|
|
@pytest.fixture
|
|
def projection_registry(seeded_db):
|
|
"""Create a ProjectionRegistry with all needed projections."""
|
|
registry = ProjectionRegistry()
|
|
registry.register(AnimalRegistryProjection(seeded_db))
|
|
registry.register(EventAnimalsProjection(seeded_db))
|
|
registry.register(IntervalProjection(seeded_db))
|
|
registry.register(ProductsProjection(seeded_db))
|
|
return registry
|
|
|
|
|
|
@pytest.fixture
|
|
def animal_service(seeded_db, event_store, projection_registry):
|
|
"""Create an AnimalService for testing."""
|
|
return AnimalService(seeded_db, event_store, projection_registry)
|
|
|
|
|
|
@pytest.fixture
|
|
def product_service(seeded_db, event_store, projection_registry):
|
|
"""Create a ProductService for testing."""
|
|
return ProductService(seeded_db, event_store, projection_registry)
|
|
|
|
|
|
@pytest.fixture
|
|
def location_id(seeded_db):
|
|
"""Get a valid location_id from seeded data."""
|
|
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
|
|
return row[0]
|
|
|
|
|
|
@pytest.fixture
|
|
def animal_ids(seeded_db, animal_service, location_id):
|
|
"""Create a cohort and return the animal IDs."""
|
|
payload = AnimalCohortCreatedPayload(
|
|
species="duck",
|
|
count=5,
|
|
life_stage="adult",
|
|
sex="unknown",
|
|
location_id=location_id,
|
|
origin="purchased",
|
|
)
|
|
ts_utc = int(time.time() * 1000)
|
|
event = animal_service.create_cohort(payload, ts_utc, "test_user")
|
|
return event.entity_refs["animal_ids"]
|
|
|
|
|
|
def make_collect_payload(
|
|
location_id: str,
|
|
resolved_ids: list[str],
|
|
product_code: str = "egg.duck",
|
|
quantity: int = 12,
|
|
notes: str | None = None,
|
|
) -> ProductCollectedPayload:
|
|
"""Create a product collection payload for testing."""
|
|
return ProductCollectedPayload(
|
|
location_id=location_id,
|
|
product_code=product_code,
|
|
quantity=quantity,
|
|
resolved_ids=resolved_ids,
|
|
notes=notes,
|
|
)
|
|
|
|
|
|
# =============================================================================
|
|
# collect_product Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestProductServiceCollect:
|
|
"""Tests for collect_product()."""
|
|
|
|
def test_creates_product_collected_event(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""collect_product creates a ProductCollected event."""
|
|
payload = make_collect_payload(location_id, animal_ids)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
assert event.type == PRODUCT_COLLECTED
|
|
assert event.actor == "test_user"
|
|
assert event.ts_utc == ts_utc
|
|
|
|
def test_event_has_product_code_in_entity_refs(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Event entity_refs contains product_code."""
|
|
payload = make_collect_payload(location_id, animal_ids, product_code="egg.duck")
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
assert event.entity_refs["product_code"] == "egg.duck"
|
|
|
|
def test_event_has_location_id_in_entity_refs(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Event entity_refs contains location_id."""
|
|
payload = make_collect_payload(location_id, animal_ids)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
assert event.entity_refs["location_id"] == location_id
|
|
|
|
def test_event_has_quantity_in_entity_refs(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Event entity_refs contains quantity."""
|
|
payload = make_collect_payload(location_id, animal_ids, quantity=24)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
assert event.entity_refs["quantity"] == 24
|
|
|
|
def test_event_has_animal_ids_in_entity_refs(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Event entity_refs contains animal_ids."""
|
|
payload = make_collect_payload(location_id, animal_ids[:3])
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
assert event.entity_refs["animal_ids"] == animal_ids[:3]
|
|
|
|
def test_event_stored_in_events_table(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Event is stored in the events table."""
|
|
payload = make_collect_payload(location_id, animal_ids)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT id, type FROM events WHERE id = ?",
|
|
(event.id,),
|
|
).fetchone()
|
|
|
|
assert row is not None
|
|
assert row[0] == event.id
|
|
assert row[1] == PRODUCT_COLLECTED
|
|
|
|
|
|
class TestProductServiceCollectEventAnimals:
|
|
"""Tests for event_animals integration."""
|
|
|
|
def test_event_animals_populated(self, seeded_db, product_service, location_id, animal_ids):
|
|
"""event_animals links are created for each resolved animal."""
|
|
payload = make_collect_payload(location_id, animal_ids)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
rows = seeded_db.execute(
|
|
"SELECT animal_id FROM event_animals WHERE event_id = ? ORDER BY animal_id",
|
|
(event.id,),
|
|
).fetchall()
|
|
|
|
assert len(rows) == len(animal_ids)
|
|
|
|
def test_event_animals_count_matches_resolved_ids(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""event_animals count matches the number of resolved_ids."""
|
|
subset = animal_ids[:3]
|
|
payload = make_collect_payload(location_id, subset)
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
count = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM event_animals WHERE event_id = ?",
|
|
(event.id,),
|
|
).fetchone()[0]
|
|
|
|
assert count == 3
|
|
|
|
def test_event_animals_has_correct_event_id(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Each event_animals row has the correct event_id."""
|
|
payload = make_collect_payload(location_id, animal_ids[:1])
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
# Query by event_id to get rows created by this specific event
|
|
row = seeded_db.execute(
|
|
"SELECT event_id, animal_id FROM event_animals WHERE event_id = ?",
|
|
(event.id,),
|
|
).fetchone()
|
|
|
|
assert row[0] == event.id
|
|
assert row[1] == animal_ids[0]
|
|
|
|
def test_event_animals_has_correct_ts_utc(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Each event_animals row has the correct ts_utc."""
|
|
payload = make_collect_payload(location_id, animal_ids[:1])
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = product_service.collect_product(payload, ts_utc, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT ts_utc FROM event_animals WHERE event_id = ?",
|
|
(event.id,),
|
|
).fetchone()
|
|
|
|
assert row[0] == ts_utc
|
|
|
|
|
|
class TestProductServiceCollectValidation:
|
|
"""Tests for collect_product() validation."""
|
|
|
|
def test_rejects_nonexistent_product(self, seeded_db, product_service, location_id, animal_ids):
|
|
"""Raises ValidationError for non-existent product_code."""
|
|
payload = make_collect_payload(location_id, animal_ids, product_code="nonexistent.product")
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_inactive_product(self, seeded_db, product_service, location_id, animal_ids):
|
|
"""Raises ValidationError for inactive product."""
|
|
seeded_db.execute("UPDATE products SET active = 0 WHERE code = 'egg.duck'")
|
|
|
|
payload = make_collect_payload(location_id, animal_ids, product_code="egg.duck")
|
|
|
|
with pytest.raises(ValidationError, match="inactive"):
|
|
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_non_collectable_product(
|
|
self, seeded_db, product_service, location_id, animal_ids
|
|
):
|
|
"""Raises ValidationError for non-collectable product (e.g., meat)."""
|
|
# Set meat as non-collectable for this test
|
|
seeded_db.execute("UPDATE products SET collectable = 0 WHERE code = 'meat'")
|
|
|
|
payload = make_collect_payload(location_id, animal_ids, product_code="meat")
|
|
|
|
with pytest.raises(ValidationError, match="not collectable"):
|
|
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_nonexistent_location(self, seeded_db, product_service, animal_ids):
|
|
"""Raises ValidationError for non-existent location_id."""
|
|
fake_location_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
|
|
payload = make_collect_payload(fake_location_id, animal_ids)
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_archived_location(self, seeded_db, product_service, location_id, animal_ids):
|
|
"""Raises ValidationError for archived location."""
|
|
seeded_db.execute(
|
|
"UPDATE locations SET active = 0 WHERE id = ?",
|
|
(location_id,),
|
|
)
|
|
|
|
payload = make_collect_payload(location_id, animal_ids)
|
|
|
|
with pytest.raises(ValidationError, match="archived"):
|
|
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_nonexistent_animal(self, seeded_db, product_service, location_id, animal_ids):
|
|
"""Raises ValidationError for non-existent animal in resolved_ids."""
|
|
fake_animal_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
|
|
payload = make_collect_payload(location_id, [fake_animal_id])
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_dead_animal(self, seeded_db, product_service, location_id, animal_ids):
|
|
"""Raises ValidationError for dead animal in resolved_ids."""
|
|
# Mark the first animal as dead
|
|
dead_id = animal_ids[0]
|
|
seeded_db.execute(
|
|
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
|
|
(dead_id,),
|
|
)
|
|
|
|
payload = make_collect_payload(location_id, [dead_id])
|
|
|
|
with pytest.raises(ValidationError, match="not alive"):
|
|
product_service.collect_product(payload, int(time.time() * 1000), "test_user")
|