feat: add animal cohort creation projection and service

Implements Step 3.3: Animal Cohort Creation

- Add AnimalRegistryProjection for animal_registry and live_animals_by_location
- Add EventAnimalsProjection for event_animals link table
- Add IntervalProjection for location and attribute intervals
- Add AnimalService with create_cohort() for coordinating event + projections
- Add seeded_db fixture to conftest.py
- Update projections/__init__.py with new exports

All operations atomic within single transaction. Includes validation for
location (exists, active) and species (exists, active).

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 06:52:23 +00:00
parent bd09c99366
commit 876e8174ee
11 changed files with 1637 additions and 1 deletions

View File

@@ -1,7 +1,17 @@
# ABOUTME: Projection system for maintaining read models from events.
# ABOUTME: Exports Projection base class, ProjectionRegistry, and ProjectionError.
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.base import Projection, ProjectionRegistry
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.exceptions import ProjectionError
from animaltrack.projections.intervals import IntervalProjection
__all__ = ["Projection", "ProjectionError", "ProjectionRegistry"]
__all__ = [
"AnimalRegistryProjection",
"EventAnimalsProjection",
"IntervalProjection",
"Projection",
"ProjectionError",
"ProjectionRegistry",
]

View File

@@ -0,0 +1,123 @@
# ABOUTME: Projection for animal_registry and live_animals_by_location tables.
# ABOUTME: Handles AnimalCohortCreated and other animal lifecycle events.
from typing import Any
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
class AnimalRegistryProjection(Projection):
"""Maintains animal_registry and live_animals_by_location tables.
This projection handles events that create, update, or terminate animals.
It maintains both the full animal_registry (all animals) and the
live_animals_by_location denormalized view (only alive animals).
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection.
"""
super().__init__(db)
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED]
def apply(self, event: Event) -> None:
"""Apply an event to update registry tables."""
if event.type == ANIMAL_COHORT_CREATED:
self._apply_cohort_created(event)
def revert(self, event: Event) -> None:
"""Revert an event from registry tables."""
if event.type == ANIMAL_COHORT_CREATED:
self._revert_cohort_created(event)
def _apply_cohort_created(self, event: Event) -> None:
"""Create animals in registry from cohort event.
For each animal_id in entity_refs:
- Insert into animal_registry with attributes from payload
- Insert into live_animals_by_location for roster queries
"""
animal_ids = event.entity_refs.get("animal_ids", [])
payload = event.payload
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Insert into animal_registry
self.db.execute(
"""
INSERT INTO animal_registry (
animal_id, species_code, identified, nickname,
sex, repro_status, life_stage, status,
location_id, origin, born_or_hatched_at, acquired_at,
first_seen_utc, last_event_utc
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
animal_id,
payload["species"],
0, # identified = false
None, # nickname = NULL
payload["sex"],
"unknown", # repro_status
payload["life_stage"],
"alive", # status
payload["location_id"],
payload["origin"],
None, # born_or_hatched_at (could set if origin == 'hatched')
None, # acquired_at (could set if origin == 'purchased')
ts_utc, # first_seen_utc
ts_utc, # last_event_utc
),
)
# Insert into live_animals_by_location
self.db.execute(
"""
INSERT INTO live_animals_by_location (
animal_id, location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc, last_move_utc, tags
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
animal_id,
payload["location_id"],
payload["species"],
0, # identified = false
None, # nickname = NULL
payload["sex"],
"unknown", # repro_status
payload["life_stage"],
ts_utc, # first_seen_utc
None, # last_move_utc = NULL for new cohort
"[]", # tags = empty JSON array
),
)
def _revert_cohort_created(self, event: Event) -> None:
"""Remove animals created by cohort event.
Deletes rows from both tables for all animal_ids in the event.
Order matters: delete from live_animals first to avoid FK issues.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
for animal_id in animal_ids:
# Delete from live_animals_by_location first
self.db.execute(
"DELETE FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
)
# Then delete from animal_registry
self.db.execute(
"DELETE FROM animal_registry WHERE animal_id = ?",
(animal_id,),
)

View File

@@ -0,0 +1,49 @@
# ABOUTME: Projection for event_animals link table.
# ABOUTME: Links events to the animals they affect for efficient querying.
from typing import Any
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
class EventAnimalsProjection(Projection):
"""Maintains event_animals link table.
This projection tracks which animals are affected by each event,
enabling efficient queries like "show all events for animal X"
or "show all animals affected by event Y".
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection.
"""
super().__init__(db)
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED]
def apply(self, event: Event) -> None:
"""Link event to affected animals."""
animal_ids = event.entity_refs.get("animal_ids", [])
for animal_id in animal_ids:
self.db.execute(
"""
INSERT INTO event_animals (event_id, animal_id, ts_utc)
VALUES (?, ?, ?)
""",
(event.id, animal_id, event.ts_utc),
)
def revert(self, event: Event) -> None:
"""Remove event-animal links."""
self.db.execute(
"DELETE FROM event_animals WHERE event_id = ?",
(event.id,),
)

View File

@@ -0,0 +1,103 @@
# ABOUTME: Projection for time-series interval tables.
# ABOUTME: Tracks animal location and attribute history over time.
from typing import Any
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
class IntervalProjection(Projection):
"""Maintains interval tables for historical queries.
This projection manages animal_location_intervals and animal_attr_intervals
tables, which track the history of where animals were and what their
attributes were at any point in time.
Intervals have a start_utc and optional end_utc. An open interval
(end_utc=NULL) means the value is current.
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection.
"""
super().__init__(db)
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED]
def apply(self, event: Event) -> None:
"""Create intervals for event."""
if event.type == ANIMAL_COHORT_CREATED:
self._apply_cohort_created(event)
def revert(self, event: Event) -> None:
"""Remove intervals created by event."""
if event.type == ANIMAL_COHORT_CREATED:
self._revert_cohort_created(event)
def _apply_cohort_created(self, event: Event) -> None:
"""Create initial intervals for new animals.
For each animal in the cohort:
- Create an open location interval at the initial location
- Create open attribute intervals for sex, life_stage, repro_status, status
"""
animal_ids = event.entity_refs.get("animal_ids", [])
payload = event.payload
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Create location interval (open-ended)
self.db.execute(
"""
INSERT INTO animal_location_intervals
(animal_id, location_id, start_utc, end_utc)
VALUES (?, ?, ?, NULL)
""",
(animal_id, payload["location_id"], ts_utc),
)
# Create attribute intervals (sex, life_stage, repro_status, status)
attrs = [
("sex", payload["sex"]),
("life_stage", payload["life_stage"]),
("repro_status", "unknown"), # Default for new cohort
("status", "alive"), # Default for new cohort
]
for attr, value in attrs:
self.db.execute(
"""
INSERT INTO animal_attr_intervals
(animal_id, attr, value, start_utc, end_utc)
VALUES (?, ?, ?, ?, NULL)
""",
(animal_id, attr, value, ts_utc),
)
def _revert_cohort_created(self, event: Event) -> None:
"""Remove intervals for animals from cohort event.
Deletes all location and attribute intervals for the animals
created by this event.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
for animal_id in animal_ids:
# Delete location intervals
self.db.execute(
"DELETE FROM animal_location_intervals WHERE animal_id = ?",
(animal_id,),
)
# Delete attribute intervals
self.db.execute(
"DELETE FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
)

View File

@@ -0,0 +1,2 @@
# ABOUTME: Service layer for AnimalTrack business logic.
# ABOUTME: Coordinates event creation with projection updates.

View File

@@ -0,0 +1,147 @@
# ABOUTME: Service layer for animal operations.
# ABOUTME: Coordinates event creation with projection updates.
from typing import Any
from animaltrack.db import transaction
from animaltrack.events.payloads import AnimalCohortCreatedPayload
from animaltrack.events.processor import process_event
from animaltrack.events.store import EventStore
from animaltrack.events.types import ANIMAL_COHORT_CREATED
from animaltrack.id_gen import generate_id
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
from animaltrack.repositories.locations import LocationRepository
from animaltrack.repositories.species import SpeciesRepository
class AnimalServiceError(Exception):
"""Base exception for animal service errors."""
pass
class ValidationError(AnimalServiceError):
"""Raised when validation fails."""
pass
class AnimalService:
"""Service for animal operations.
Coordinates event store operations with projection updates,
ensuring all operations happen atomically within a transaction.
"""
def __init__(
self,
db: Any,
event_store: EventStore,
registry: ProjectionRegistry,
) -> None:
"""Initialize the service with dependencies.
Args:
db: A fastlite database connection.
event_store: The event store for event creation.
registry: The projection registry for processing events.
"""
self.db = db
self.event_store = event_store
self.registry = registry
self.location_repo = LocationRepository(db)
self.species_repo = SpeciesRepository(db)
def create_cohort(
self,
payload: AnimalCohortCreatedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Create a cohort of animals.
Creates an AnimalCohortCreated event and processes it through
all registered projections. All operations happen atomically
within a transaction.
Args:
payload: Validated cohort creation payload.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user creating the cohort.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If validation fails.
"""
# Validate location exists and is active
self._validate_location(payload.location_id)
# Validate species exists and is active
self._validate_species(payload.species)
# Generate animal IDs for the cohort
animal_ids = [generate_id() for _ in range(payload.count)]
# Build entity_refs with location and animal IDs
entity_refs = {
"location_id": payload.location_id,
"animal_ids": animal_ids,
}
with transaction(self.db):
# Append event to store
event = self.event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
# Process event through projections
process_event(event, self.registry)
return event
def _validate_location(self, location_id: str) -> None:
"""Validate that location exists and is active.
Args:
location_id: The location ID to validate.
Raises:
ValidationError: If location doesn't exist or is archived.
"""
location = self.location_repo.get(location_id)
if location is None:
msg = f"Location {location_id} not found"
raise ValidationError(msg)
if not location.active:
msg = f"Location {location_id} is archived"
raise ValidationError(msg)
def _validate_species(self, species_code: str) -> None:
"""Validate that species exists and is active.
Args:
species_code: The species code to validate.
Raises:
ValidationError: If species doesn't exist or is not active.
"""
species = self.species_repo.get(species_code)
if species is None:
msg = f"Species {species_code} not found"
raise ValidationError(msg)
if not species.active:
msg = f"Species {species_code} is not active"
raise ValidationError(msg)