feat: add projection infrastructure for event processing

Add base class, registry, and processor for the projection system:
- Projection ABC with apply/revert methods
- ProjectionRegistry for mapping event types to projections
- process_event/revert_event functions for dispatching

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-28 18:47:11 +00:00
parent e3212a1e87
commit 42cb1ed7cb
6 changed files with 547 additions and 1 deletions

View File

@@ -1,5 +1,5 @@
# ABOUTME: Events package for event sourcing infrastructure.
# ABOUTME: Provides event types, store, payloads, and related exceptions.
# ABOUTME: Provides event types, store, payloads, processor, and related exceptions.
from animaltrack.events.enums import (
AnimalStatus,
@@ -31,6 +31,7 @@ from animaltrack.events.payloads import (
ProductSoldPayload,
YieldItem,
)
from animaltrack.events.processor import process_event, revert_event
from animaltrack.events.store import EventStore
from animaltrack.events.types import (
ALL_EVENT_TYPES,
@@ -86,6 +87,9 @@ __all__ = [
"DuplicateNonceError",
# Store
"EventStore",
# Processor
"process_event",
"revert_event",
# Validation
"normalize_tag",
"validate_ulid",

View File

@@ -0,0 +1,45 @@
# ABOUTME: Event processor that dispatches events to registered projections.
# ABOUTME: Must be called within the same transaction as event creation.
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
def process_event(event: Event, registry: ProjectionRegistry) -> None:
"""Apply an event to all registered projections.
Dispatches the event to every projection that handles its type.
Must be called within the same transaction as event creation
to maintain consistency between events and projections.
Args:
event: The event to process.
registry: The projection registry to look up handlers.
Raises:
ProjectionError: If any projection fails to apply.
The error propagates so the transaction can be rolled back.
"""
projections = registry.get_projections(event.type)
for projection in projections:
projection.apply(event)
def revert_event(event: Event, registry: ProjectionRegistry) -> None:
"""Revert an event from all registered projections.
Dispatches the event to every projection that handles its type
so they can undo its effects. Must be called within the same
transaction as event edit/delete.
Args:
event: The event to revert.
registry: The projection registry to look up handlers.
Raises:
ProjectionError: If any projection fails to revert.
The error propagates so the transaction can be rolled back.
"""
projections = registry.get_projections(event.type)
for projection in projections:
projection.revert(event)

View File

@@ -0,0 +1,7 @@
# ABOUTME: Projection system for maintaining read models from events.
# ABOUTME: Exports Projection base class, ProjectionRegistry, and ProjectionError.
from animaltrack.projections.base import Projection, ProjectionRegistry
from animaltrack.projections.exceptions import ProjectionError
__all__ = ["Projection", "ProjectionError", "ProjectionRegistry"]

View File

@@ -0,0 +1,107 @@
# ABOUTME: Base class and registry for the projection system.
# ABOUTME: Projections transform events into queryable read models.
from abc import ABC, abstractmethod
from typing import Any
from animaltrack.models.events import Event
class Projection(ABC):
"""Abstract base class for all projections.
A projection listens to specific event types and maintains
a queryable read model (table or cache). Projections must
implement apply() to process new events and revert() to
undo events (for edits/deletes).
Projections are called synchronously in the same transaction
as event creation to maintain consistency.
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection for updating tables.
"""
self.db = db
@abstractmethod
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles.
Returns:
List of event type strings this projection subscribes to.
"""
...
@abstractmethod
def apply(self, event: Event) -> None:
"""Apply an event to update the projection state.
Called when a new event is created. Must be idempotent
within the same transaction.
Args:
event: The event to apply.
Raises:
ProjectionError: If the projection fails to apply.
"""
...
@abstractmethod
def revert(self, event: Event) -> None:
"""Revert an event from the projection state.
Called when an event is edited or deleted. For fast-revert
projections, this directly reverses the effect. For interval
projections, this may trigger a replay from affected timestamp.
Args:
event: The event to revert.
Raises:
ProjectionError: If the projection fails to revert.
"""
...
class ProjectionRegistry:
"""Registry for projection instances.
Maps event types to the projections that handle them.
Multiple projections can subscribe to the same event type.
"""
def __init__(self) -> None:
"""Initialize an empty registry."""
self._projections: dict[str, list[Projection]] = {}
def register(self, projection: Projection) -> None:
"""Register a projection for its event types.
The projection's get_event_types() is called to determine
which event types it handles, and it's added to the registry
for each type.
Args:
projection: The projection instance to register.
"""
for event_type in projection.get_event_types():
if event_type not in self._projections:
self._projections[event_type] = []
self._projections[event_type].append(projection)
def get_projections(self, event_type: str) -> list[Projection]:
"""Get all projections that handle an event type.
Args:
event_type: The event type to look up.
Returns:
List of projections registered for this event type.
Empty list if no projections handle this type.
"""
return self._projections.get(event_type, [])

View File

@@ -0,0 +1,13 @@
# ABOUTME: Custom exceptions for the projection system.
# ABOUTME: ProjectionError is the base exception for all projection failures.
class ProjectionError(Exception):
"""Base exception for projection errors.
Raised when a projection fails to apply or revert an event.
The error should propagate to the caller so the transaction
can be rolled back.
"""
pass