feat: add projection infrastructure for event processing

Add base class, registry, and processor for the projection system:
- Projection ABC with apply/revert methods
- ProjectionRegistry for mapping event types to projections
- process_event/revert_event functions for dispatching

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-28 18:47:11 +00:00
parent e3212a1e87
commit 42cb1ed7cb
6 changed files with 547 additions and 1 deletions

View File

@@ -1,5 +1,5 @@
# ABOUTME: Events package for event sourcing infrastructure. # ABOUTME: Events package for event sourcing infrastructure.
# ABOUTME: Provides event types, store, payloads, and related exceptions. # ABOUTME: Provides event types, store, payloads, processor, and related exceptions.
from animaltrack.events.enums import ( from animaltrack.events.enums import (
AnimalStatus, AnimalStatus,
@@ -31,6 +31,7 @@ from animaltrack.events.payloads import (
ProductSoldPayload, ProductSoldPayload,
YieldItem, YieldItem,
) )
from animaltrack.events.processor import process_event, revert_event
from animaltrack.events.store import EventStore from animaltrack.events.store import EventStore
from animaltrack.events.types import ( from animaltrack.events.types import (
ALL_EVENT_TYPES, ALL_EVENT_TYPES,
@@ -86,6 +87,9 @@ __all__ = [
"DuplicateNonceError", "DuplicateNonceError",
# Store # Store
"EventStore", "EventStore",
# Processor
"process_event",
"revert_event",
# Validation # Validation
"normalize_tag", "normalize_tag",
"validate_ulid", "validate_ulid",

View File

@@ -0,0 +1,45 @@
# ABOUTME: Event processor that dispatches events to registered projections.
# ABOUTME: Must be called within the same transaction as event creation.
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
def process_event(event: Event, registry: ProjectionRegistry) -> None:
"""Apply an event to all registered projections.
Dispatches the event to every projection that handles its type.
Must be called within the same transaction as event creation
to maintain consistency between events and projections.
Args:
event: The event to process.
registry: The projection registry to look up handlers.
Raises:
ProjectionError: If any projection fails to apply.
The error propagates so the transaction can be rolled back.
"""
projections = registry.get_projections(event.type)
for projection in projections:
projection.apply(event)
def revert_event(event: Event, registry: ProjectionRegistry) -> None:
"""Revert an event from all registered projections.
Dispatches the event to every projection that handles its type
so they can undo its effects. Must be called within the same
transaction as event edit/delete.
Args:
event: The event to revert.
registry: The projection registry to look up handlers.
Raises:
ProjectionError: If any projection fails to revert.
The error propagates so the transaction can be rolled back.
"""
projections = registry.get_projections(event.type)
for projection in projections:
projection.revert(event)

View File

@@ -0,0 +1,7 @@
# ABOUTME: Projection system for maintaining read models from events.
# ABOUTME: Exports Projection base class, ProjectionRegistry, and ProjectionError.
from animaltrack.projections.base import Projection, ProjectionRegistry
from animaltrack.projections.exceptions import ProjectionError
__all__ = ["Projection", "ProjectionError", "ProjectionRegistry"]

View File

@@ -0,0 +1,107 @@
# ABOUTME: Base class and registry for the projection system.
# ABOUTME: Projections transform events into queryable read models.
from abc import ABC, abstractmethod
from typing import Any
from animaltrack.models.events import Event
class Projection(ABC):
"""Abstract base class for all projections.
A projection listens to specific event types and maintains
a queryable read model (table or cache). Projections must
implement apply() to process new events and revert() to
undo events (for edits/deletes).
Projections are called synchronously in the same transaction
as event creation to maintain consistency.
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection for updating tables.
"""
self.db = db
@abstractmethod
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles.
Returns:
List of event type strings this projection subscribes to.
"""
...
@abstractmethod
def apply(self, event: Event) -> None:
"""Apply an event to update the projection state.
Called when a new event is created. Must be idempotent
within the same transaction.
Args:
event: The event to apply.
Raises:
ProjectionError: If the projection fails to apply.
"""
...
@abstractmethod
def revert(self, event: Event) -> None:
"""Revert an event from the projection state.
Called when an event is edited or deleted. For fast-revert
projections, this directly reverses the effect. For interval
projections, this may trigger a replay from affected timestamp.
Args:
event: The event to revert.
Raises:
ProjectionError: If the projection fails to revert.
"""
...
class ProjectionRegistry:
"""Registry for projection instances.
Maps event types to the projections that handle them.
Multiple projections can subscribe to the same event type.
"""
def __init__(self) -> None:
"""Initialize an empty registry."""
self._projections: dict[str, list[Projection]] = {}
def register(self, projection: Projection) -> None:
"""Register a projection for its event types.
The projection's get_event_types() is called to determine
which event types it handles, and it's added to the registry
for each type.
Args:
projection: The projection instance to register.
"""
for event_type in projection.get_event_types():
if event_type not in self._projections:
self._projections[event_type] = []
self._projections[event_type].append(projection)
def get_projections(self, event_type: str) -> list[Projection]:
"""Get all projections that handle an event type.
Args:
event_type: The event type to look up.
Returns:
List of projections registered for this event type.
Empty list if no projections handle this type.
"""
return self._projections.get(event_type, [])

View File

@@ -0,0 +1,13 @@
# ABOUTME: Custom exceptions for the projection system.
# ABOUTME: ProjectionError is the base exception for all projection failures.
class ProjectionError(Exception):
"""Base exception for projection errors.
Raised when a projection fails to apply or revert an event.
The error should propagate to the caller so the transaction
can be rolled back.
"""
pass

370
tests/test_projections.py Normal file
View File

@@ -0,0 +1,370 @@
# ABOUTME: Tests for projection infrastructure - base class, registry, and processor.
# ABOUTME: Follows TDD approach for Step 2.4 implementation.
import pytest
from animaltrack.events.processor import process_event, revert_event
from animaltrack.models.events import Event
from animaltrack.projections import Projection, ProjectionError, ProjectionRegistry
# Test fixtures
def make_test_event(event_type: str = "TestEvent") -> Event:
"""Create a test event with minimal required fields."""
return Event(
id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
type=event_type,
ts_utc=1704067200000,
actor="test_user",
entity_refs={},
payload={},
version=1,
)
class ConcreteProjection(Projection):
"""Concrete projection for testing."""
def __init__(self, db, event_types: list[str] | None = None):
super().__init__(db)
self._event_types = event_types or ["TestEvent"]
self.apply_calls: list[Event] = []
self.revert_calls: list[Event] = []
def get_event_types(self) -> list[str]:
return self._event_types
def apply(self, event: Event) -> None:
self.apply_calls.append(event)
def revert(self, event: Event) -> None:
self.revert_calls.append(event)
class IncompleteProjection(Projection):
"""Projection missing required abstract methods - should fail to instantiate."""
pass
# Tests for Projection base class
class TestProjectionBaseClass:
"""Tests for the Projection abstract base class."""
def test_cannot_instantiate_abstract_class(self):
"""Projection ABC cannot be instantiated directly."""
with pytest.raises(TypeError, match="Can't instantiate abstract class"):
Projection(db=None)
def test_concrete_must_implement_get_event_types(self):
"""Concrete projection must implement get_event_types."""
class MissingGetEventTypes(Projection):
def apply(self, event: Event) -> None:
pass
def revert(self, event: Event) -> None:
pass
with pytest.raises(TypeError, match="Can't instantiate abstract class"):
MissingGetEventTypes(db=None)
def test_concrete_must_implement_apply(self):
"""Concrete projection must implement apply."""
class MissingApply(Projection):
def get_event_types(self) -> list[str]:
return []
def revert(self, event: Event) -> None:
pass
with pytest.raises(TypeError, match="Can't instantiate abstract class"):
MissingApply(db=None)
def test_concrete_must_implement_revert(self):
"""Concrete projection must implement revert."""
class MissingRevert(Projection):
def get_event_types(self) -> list[str]:
return []
def apply(self, event: Event) -> None:
pass
with pytest.raises(TypeError, match="Can't instantiate abstract class"):
MissingRevert(db=None)
def test_concrete_projection_can_be_instantiated(self):
"""Concrete projection with all methods can be instantiated."""
projection = ConcreteProjection(db=None)
assert projection is not None
assert projection.db is None
def test_projection_stores_db_reference(self):
"""Projection stores database reference passed to constructor."""
mock_db = object()
projection = ConcreteProjection(db=mock_db)
assert projection.db is mock_db
# Tests for ProjectionRegistry
class TestProjectionRegistry:
"""Tests for the ProjectionRegistry."""
def test_create_empty_registry(self):
"""Registry can be created empty."""
registry = ProjectionRegistry()
assert registry is not None
def test_register_projection(self):
"""Projection can be registered."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["EventA"])
registry.register(projection)
# Should not raise
assert True
def test_get_projections_returns_registered(self):
"""get_projections returns registered projections for event type."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["EventA"])
registry.register(projection)
result = registry.get_projections("EventA")
assert len(result) == 1
assert result[0] is projection
def test_get_projections_empty_for_unregistered_type(self):
"""get_projections returns empty list for unregistered event type."""
registry = ProjectionRegistry()
result = registry.get_projections("UnknownEvent")
assert result == []
def test_register_multiple_projections_same_type(self):
"""Multiple projections can handle same event type."""
registry = ProjectionRegistry()
proj1 = ConcreteProjection(db=None, event_types=["EventA"])
proj2 = ConcreteProjection(db=None, event_types=["EventA"])
registry.register(proj1)
registry.register(proj2)
result = registry.get_projections("EventA")
assert len(result) == 2
assert proj1 in result
assert proj2 in result
def test_projection_registered_for_multiple_types(self):
"""Single projection can handle multiple event types."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["EventA", "EventB"])
registry.register(projection)
result_a = registry.get_projections("EventA")
result_b = registry.get_projections("EventB")
assert len(result_a) == 1
assert len(result_b) == 1
assert result_a[0] is projection
assert result_b[0] is projection
def test_different_projections_for_different_types(self):
"""Different projections can handle different event types."""
registry = ProjectionRegistry()
proj_a = ConcreteProjection(db=None, event_types=["EventA"])
proj_b = ConcreteProjection(db=None, event_types=["EventB"])
registry.register(proj_a)
registry.register(proj_b)
result_a = registry.get_projections("EventA")
result_b = registry.get_projections("EventB")
assert len(result_a) == 1
assert len(result_b) == 1
assert result_a[0] is proj_a
assert result_b[0] is proj_b
# Tests for process_event
class TestProcessEvent:
"""Tests for the process_event function."""
def test_calls_apply_on_registered_projection(self):
"""process_event calls apply on registered projection."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["TestEvent"])
registry.register(projection)
event = make_test_event("TestEvent")
process_event(event, registry)
assert len(projection.apply_calls) == 1
assert projection.apply_calls[0] is event
def test_calls_apply_on_multiple_projections(self):
"""process_event calls apply on all registered projections."""
registry = ProjectionRegistry()
proj1 = ConcreteProjection(db=None, event_types=["TestEvent"])
proj2 = ConcreteProjection(db=None, event_types=["TestEvent"])
registry.register(proj1)
registry.register(proj2)
event = make_test_event("TestEvent")
process_event(event, registry)
assert len(proj1.apply_calls) == 1
assert len(proj2.apply_calls) == 1
assert proj1.apply_calls[0] is event
assert proj2.apply_calls[0] is event
def test_does_nothing_for_unregistered_type(self):
"""process_event does nothing for unregistered event type."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["OtherEvent"])
registry.register(projection)
event = make_test_event("TestEvent")
process_event(event, registry)
assert len(projection.apply_calls) == 0
def test_does_not_call_revert(self):
"""process_event does not call revert."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["TestEvent"])
registry.register(projection)
event = make_test_event("TestEvent")
process_event(event, registry)
assert len(projection.revert_calls) == 0
# Tests for revert_event
class TestRevertEvent:
"""Tests for the revert_event function."""
def test_calls_revert_on_registered_projection(self):
"""revert_event calls revert on registered projection."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["TestEvent"])
registry.register(projection)
event = make_test_event("TestEvent")
revert_event(event, registry)
assert len(projection.revert_calls) == 1
assert projection.revert_calls[0] is event
def test_calls_revert_on_multiple_projections(self):
"""revert_event calls revert on all registered projections."""
registry = ProjectionRegistry()
proj1 = ConcreteProjection(db=None, event_types=["TestEvent"])
proj2 = ConcreteProjection(db=None, event_types=["TestEvent"])
registry.register(proj1)
registry.register(proj2)
event = make_test_event("TestEvent")
revert_event(event, registry)
assert len(proj1.revert_calls) == 1
assert len(proj2.revert_calls) == 1
assert proj1.revert_calls[0] is event
assert proj2.revert_calls[0] is event
def test_does_nothing_for_unregistered_type(self):
"""revert_event does nothing for unregistered event type."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["OtherEvent"])
registry.register(projection)
event = make_test_event("TestEvent")
revert_event(event, registry)
assert len(projection.revert_calls) == 0
def test_does_not_call_apply(self):
"""revert_event does not call apply."""
registry = ProjectionRegistry()
projection = ConcreteProjection(db=None, event_types=["TestEvent"])
registry.register(projection)
event = make_test_event("TestEvent")
revert_event(event, registry)
assert len(projection.apply_calls) == 0
# Tests for ProjectionError
class TestProjectionError:
"""Tests for the ProjectionError exception."""
def test_projection_error_is_exception(self):
"""ProjectionError is an Exception."""
error = ProjectionError("test error")
assert isinstance(error, Exception)
def test_projection_error_message(self):
"""ProjectionError stores message."""
error = ProjectionError("test message")
assert str(error) == "test message"
# Tests for error propagation
class TestErrorPropagation:
"""Tests for error propagation in projections."""
def test_apply_error_propagates(self):
"""Errors in apply() propagate to caller."""
class FailingProjection(Projection):
def get_event_types(self) -> list[str]:
return ["TestEvent"]
def apply(self, event: Event) -> None:
raise ProjectionError("apply failed")
def revert(self, event: Event) -> None:
pass
registry = ProjectionRegistry()
projection = FailingProjection(db=None)
registry.register(projection)
event = make_test_event("TestEvent")
with pytest.raises(ProjectionError, match="apply failed"):
process_event(event, registry)
def test_revert_error_propagates(self):
"""Errors in revert() propagate to caller."""
class FailingProjection(Projection):
def get_event_types(self) -> list[str]:
return ["TestEvent"]
def apply(self, event: Event) -> None:
pass
def revert(self, event: Event) -> None:
raise ProjectionError("revert failed")
registry = ProjectionRegistry()
projection = FailingProjection(db=None)
registry.register(projection)
event = make_test_event("TestEvent")
with pytest.raises(ProjectionError, match="revert failed"):
revert_event(event, registry)