From 42cb1ed7cb68bc5b2d7315ec2ae05da874de6939 Mon Sep 17 00:00:00 2001 From: Petru Paler Date: Sun, 28 Dec 2025 18:47:11 +0000 Subject: [PATCH] feat: add projection infrastructure for event processing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add base class, registry, and processor for the projection system: - Projection ABC with apply/revert methods - ProjectionRegistry for mapping event types to projections - process_event/revert_event functions for dispatching 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/animaltrack/events/__init__.py | 6 +- src/animaltrack/events/processor.py | 45 +++ src/animaltrack/projections/__init__.py | 7 + src/animaltrack/projections/base.py | 107 +++++++ src/animaltrack/projections/exceptions.py | 13 + tests/test_projections.py | 370 ++++++++++++++++++++++ 6 files changed, 547 insertions(+), 1 deletion(-) create mode 100644 src/animaltrack/events/processor.py create mode 100644 src/animaltrack/projections/__init__.py create mode 100644 src/animaltrack/projections/base.py create mode 100644 src/animaltrack/projections/exceptions.py create mode 100644 tests/test_projections.py diff --git a/src/animaltrack/events/__init__.py b/src/animaltrack/events/__init__.py index b551df5..54bc184 100644 --- a/src/animaltrack/events/__init__.py +++ b/src/animaltrack/events/__init__.py @@ -1,5 +1,5 @@ # ABOUTME: Events package for event sourcing infrastructure. -# ABOUTME: Provides event types, store, payloads, and related exceptions. +# ABOUTME: Provides event types, store, payloads, processor, and related exceptions. from animaltrack.events.enums import ( AnimalStatus, @@ -31,6 +31,7 @@ from animaltrack.events.payloads import ( ProductSoldPayload, YieldItem, ) +from animaltrack.events.processor import process_event, revert_event from animaltrack.events.store import EventStore from animaltrack.events.types import ( ALL_EVENT_TYPES, @@ -86,6 +87,9 @@ __all__ = [ "DuplicateNonceError", # Store "EventStore", + # Processor + "process_event", + "revert_event", # Validation "normalize_tag", "validate_ulid", diff --git a/src/animaltrack/events/processor.py b/src/animaltrack/events/processor.py new file mode 100644 index 0000000..fbf9e13 --- /dev/null +++ b/src/animaltrack/events/processor.py @@ -0,0 +1,45 @@ +# ABOUTME: Event processor that dispatches events to registered projections. +# ABOUTME: Must be called within the same transaction as event creation. + +from animaltrack.models.events import Event +from animaltrack.projections import ProjectionRegistry + + +def process_event(event: Event, registry: ProjectionRegistry) -> None: + """Apply an event to all registered projections. + + Dispatches the event to every projection that handles its type. + Must be called within the same transaction as event creation + to maintain consistency between events and projections. + + Args: + event: The event to process. + registry: The projection registry to look up handlers. + + Raises: + ProjectionError: If any projection fails to apply. + The error propagates so the transaction can be rolled back. + """ + projections = registry.get_projections(event.type) + for projection in projections: + projection.apply(event) + + +def revert_event(event: Event, registry: ProjectionRegistry) -> None: + """Revert an event from all registered projections. + + Dispatches the event to every projection that handles its type + so they can undo its effects. Must be called within the same + transaction as event edit/delete. + + Args: + event: The event to revert. + registry: The projection registry to look up handlers. + + Raises: + ProjectionError: If any projection fails to revert. + The error propagates so the transaction can be rolled back. + """ + projections = registry.get_projections(event.type) + for projection in projections: + projection.revert(event) diff --git a/src/animaltrack/projections/__init__.py b/src/animaltrack/projections/__init__.py new file mode 100644 index 0000000..fe26fe3 --- /dev/null +++ b/src/animaltrack/projections/__init__.py @@ -0,0 +1,7 @@ +# ABOUTME: Projection system for maintaining read models from events. +# ABOUTME: Exports Projection base class, ProjectionRegistry, and ProjectionError. + +from animaltrack.projections.base import Projection, ProjectionRegistry +from animaltrack.projections.exceptions import ProjectionError + +__all__ = ["Projection", "ProjectionError", "ProjectionRegistry"] diff --git a/src/animaltrack/projections/base.py b/src/animaltrack/projections/base.py new file mode 100644 index 0000000..9771624 --- /dev/null +++ b/src/animaltrack/projections/base.py @@ -0,0 +1,107 @@ +# ABOUTME: Base class and registry for the projection system. +# ABOUTME: Projections transform events into queryable read models. + +from abc import ABC, abstractmethod +from typing import Any + +from animaltrack.models.events import Event + + +class Projection(ABC): + """Abstract base class for all projections. + + A projection listens to specific event types and maintains + a queryable read model (table or cache). Projections must + implement apply() to process new events and revert() to + undo events (for edits/deletes). + + Projections are called synchronously in the same transaction + as event creation to maintain consistency. + """ + + def __init__(self, db: Any) -> None: + """Initialize the projection with a database connection. + + Args: + db: A fastlite database connection for updating tables. + """ + self.db = db + + @abstractmethod + def get_event_types(self) -> list[str]: + """Return the event types this projection handles. + + Returns: + List of event type strings this projection subscribes to. + """ + ... + + @abstractmethod + def apply(self, event: Event) -> None: + """Apply an event to update the projection state. + + Called when a new event is created. Must be idempotent + within the same transaction. + + Args: + event: The event to apply. + + Raises: + ProjectionError: If the projection fails to apply. + """ + ... + + @abstractmethod + def revert(self, event: Event) -> None: + """Revert an event from the projection state. + + Called when an event is edited or deleted. For fast-revert + projections, this directly reverses the effect. For interval + projections, this may trigger a replay from affected timestamp. + + Args: + event: The event to revert. + + Raises: + ProjectionError: If the projection fails to revert. + """ + ... + + +class ProjectionRegistry: + """Registry for projection instances. + + Maps event types to the projections that handle them. + Multiple projections can subscribe to the same event type. + """ + + def __init__(self) -> None: + """Initialize an empty registry.""" + self._projections: dict[str, list[Projection]] = {} + + def register(self, projection: Projection) -> None: + """Register a projection for its event types. + + The projection's get_event_types() is called to determine + which event types it handles, and it's added to the registry + for each type. + + Args: + projection: The projection instance to register. + """ + for event_type in projection.get_event_types(): + if event_type not in self._projections: + self._projections[event_type] = [] + self._projections[event_type].append(projection) + + def get_projections(self, event_type: str) -> list[Projection]: + """Get all projections that handle an event type. + + Args: + event_type: The event type to look up. + + Returns: + List of projections registered for this event type. + Empty list if no projections handle this type. + """ + return self._projections.get(event_type, []) diff --git a/src/animaltrack/projections/exceptions.py b/src/animaltrack/projections/exceptions.py new file mode 100644 index 0000000..e39daae --- /dev/null +++ b/src/animaltrack/projections/exceptions.py @@ -0,0 +1,13 @@ +# ABOUTME: Custom exceptions for the projection system. +# ABOUTME: ProjectionError is the base exception for all projection failures. + + +class ProjectionError(Exception): + """Base exception for projection errors. + + Raised when a projection fails to apply or revert an event. + The error should propagate to the caller so the transaction + can be rolled back. + """ + + pass diff --git a/tests/test_projections.py b/tests/test_projections.py new file mode 100644 index 0000000..b4891ac --- /dev/null +++ b/tests/test_projections.py @@ -0,0 +1,370 @@ +# ABOUTME: Tests for projection infrastructure - base class, registry, and processor. +# ABOUTME: Follows TDD approach for Step 2.4 implementation. + +import pytest + +from animaltrack.events.processor import process_event, revert_event +from animaltrack.models.events import Event +from animaltrack.projections import Projection, ProjectionError, ProjectionRegistry + +# Test fixtures + + +def make_test_event(event_type: str = "TestEvent") -> Event: + """Create a test event with minimal required fields.""" + return Event( + id="01ARZ3NDEKTSV4RRFFQ69G5FAV", + type=event_type, + ts_utc=1704067200000, + actor="test_user", + entity_refs={}, + payload={}, + version=1, + ) + + +class ConcreteProjection(Projection): + """Concrete projection for testing.""" + + def __init__(self, db, event_types: list[str] | None = None): + super().__init__(db) + self._event_types = event_types or ["TestEvent"] + self.apply_calls: list[Event] = [] + self.revert_calls: list[Event] = [] + + def get_event_types(self) -> list[str]: + return self._event_types + + def apply(self, event: Event) -> None: + self.apply_calls.append(event) + + def revert(self, event: Event) -> None: + self.revert_calls.append(event) + + +class IncompleteProjection(Projection): + """Projection missing required abstract methods - should fail to instantiate.""" + + pass + + +# Tests for Projection base class + + +class TestProjectionBaseClass: + """Tests for the Projection abstract base class.""" + + def test_cannot_instantiate_abstract_class(self): + """Projection ABC cannot be instantiated directly.""" + with pytest.raises(TypeError, match="Can't instantiate abstract class"): + Projection(db=None) + + def test_concrete_must_implement_get_event_types(self): + """Concrete projection must implement get_event_types.""" + + class MissingGetEventTypes(Projection): + def apply(self, event: Event) -> None: + pass + + def revert(self, event: Event) -> None: + pass + + with pytest.raises(TypeError, match="Can't instantiate abstract class"): + MissingGetEventTypes(db=None) + + def test_concrete_must_implement_apply(self): + """Concrete projection must implement apply.""" + + class MissingApply(Projection): + def get_event_types(self) -> list[str]: + return [] + + def revert(self, event: Event) -> None: + pass + + with pytest.raises(TypeError, match="Can't instantiate abstract class"): + MissingApply(db=None) + + def test_concrete_must_implement_revert(self): + """Concrete projection must implement revert.""" + + class MissingRevert(Projection): + def get_event_types(self) -> list[str]: + return [] + + def apply(self, event: Event) -> None: + pass + + with pytest.raises(TypeError, match="Can't instantiate abstract class"): + MissingRevert(db=None) + + def test_concrete_projection_can_be_instantiated(self): + """Concrete projection with all methods can be instantiated.""" + projection = ConcreteProjection(db=None) + assert projection is not None + assert projection.db is None + + def test_projection_stores_db_reference(self): + """Projection stores database reference passed to constructor.""" + mock_db = object() + projection = ConcreteProjection(db=mock_db) + assert projection.db is mock_db + + +# Tests for ProjectionRegistry + + +class TestProjectionRegistry: + """Tests for the ProjectionRegistry.""" + + def test_create_empty_registry(self): + """Registry can be created empty.""" + registry = ProjectionRegistry() + assert registry is not None + + def test_register_projection(self): + """Projection can be registered.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["EventA"]) + registry.register(projection) + # Should not raise + assert True + + def test_get_projections_returns_registered(self): + """get_projections returns registered projections for event type.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["EventA"]) + registry.register(projection) + + result = registry.get_projections("EventA") + assert len(result) == 1 + assert result[0] is projection + + def test_get_projections_empty_for_unregistered_type(self): + """get_projections returns empty list for unregistered event type.""" + registry = ProjectionRegistry() + result = registry.get_projections("UnknownEvent") + assert result == [] + + def test_register_multiple_projections_same_type(self): + """Multiple projections can handle same event type.""" + registry = ProjectionRegistry() + proj1 = ConcreteProjection(db=None, event_types=["EventA"]) + proj2 = ConcreteProjection(db=None, event_types=["EventA"]) + registry.register(proj1) + registry.register(proj2) + + result = registry.get_projections("EventA") + assert len(result) == 2 + assert proj1 in result + assert proj2 in result + + def test_projection_registered_for_multiple_types(self): + """Single projection can handle multiple event types.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["EventA", "EventB"]) + registry.register(projection) + + result_a = registry.get_projections("EventA") + result_b = registry.get_projections("EventB") + + assert len(result_a) == 1 + assert len(result_b) == 1 + assert result_a[0] is projection + assert result_b[0] is projection + + def test_different_projections_for_different_types(self): + """Different projections can handle different event types.""" + registry = ProjectionRegistry() + proj_a = ConcreteProjection(db=None, event_types=["EventA"]) + proj_b = ConcreteProjection(db=None, event_types=["EventB"]) + registry.register(proj_a) + registry.register(proj_b) + + result_a = registry.get_projections("EventA") + result_b = registry.get_projections("EventB") + + assert len(result_a) == 1 + assert len(result_b) == 1 + assert result_a[0] is proj_a + assert result_b[0] is proj_b + + +# Tests for process_event + + +class TestProcessEvent: + """Tests for the process_event function.""" + + def test_calls_apply_on_registered_projection(self): + """process_event calls apply on registered projection.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["TestEvent"]) + registry.register(projection) + + event = make_test_event("TestEvent") + process_event(event, registry) + + assert len(projection.apply_calls) == 1 + assert projection.apply_calls[0] is event + + def test_calls_apply_on_multiple_projections(self): + """process_event calls apply on all registered projections.""" + registry = ProjectionRegistry() + proj1 = ConcreteProjection(db=None, event_types=["TestEvent"]) + proj2 = ConcreteProjection(db=None, event_types=["TestEvent"]) + registry.register(proj1) + registry.register(proj2) + + event = make_test_event("TestEvent") + process_event(event, registry) + + assert len(proj1.apply_calls) == 1 + assert len(proj2.apply_calls) == 1 + assert proj1.apply_calls[0] is event + assert proj2.apply_calls[0] is event + + def test_does_nothing_for_unregistered_type(self): + """process_event does nothing for unregistered event type.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["OtherEvent"]) + registry.register(projection) + + event = make_test_event("TestEvent") + process_event(event, registry) + + assert len(projection.apply_calls) == 0 + + def test_does_not_call_revert(self): + """process_event does not call revert.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["TestEvent"]) + registry.register(projection) + + event = make_test_event("TestEvent") + process_event(event, registry) + + assert len(projection.revert_calls) == 0 + + +# Tests for revert_event + + +class TestRevertEvent: + """Tests for the revert_event function.""" + + def test_calls_revert_on_registered_projection(self): + """revert_event calls revert on registered projection.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["TestEvent"]) + registry.register(projection) + + event = make_test_event("TestEvent") + revert_event(event, registry) + + assert len(projection.revert_calls) == 1 + assert projection.revert_calls[0] is event + + def test_calls_revert_on_multiple_projections(self): + """revert_event calls revert on all registered projections.""" + registry = ProjectionRegistry() + proj1 = ConcreteProjection(db=None, event_types=["TestEvent"]) + proj2 = ConcreteProjection(db=None, event_types=["TestEvent"]) + registry.register(proj1) + registry.register(proj2) + + event = make_test_event("TestEvent") + revert_event(event, registry) + + assert len(proj1.revert_calls) == 1 + assert len(proj2.revert_calls) == 1 + assert proj1.revert_calls[0] is event + assert proj2.revert_calls[0] is event + + def test_does_nothing_for_unregistered_type(self): + """revert_event does nothing for unregistered event type.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["OtherEvent"]) + registry.register(projection) + + event = make_test_event("TestEvent") + revert_event(event, registry) + + assert len(projection.revert_calls) == 0 + + def test_does_not_call_apply(self): + """revert_event does not call apply.""" + registry = ProjectionRegistry() + projection = ConcreteProjection(db=None, event_types=["TestEvent"]) + registry.register(projection) + + event = make_test_event("TestEvent") + revert_event(event, registry) + + assert len(projection.apply_calls) == 0 + + +# Tests for ProjectionError + + +class TestProjectionError: + """Tests for the ProjectionError exception.""" + + def test_projection_error_is_exception(self): + """ProjectionError is an Exception.""" + error = ProjectionError("test error") + assert isinstance(error, Exception) + + def test_projection_error_message(self): + """ProjectionError stores message.""" + error = ProjectionError("test message") + assert str(error) == "test message" + + +# Tests for error propagation + + +class TestErrorPropagation: + """Tests for error propagation in projections.""" + + def test_apply_error_propagates(self): + """Errors in apply() propagate to caller.""" + + class FailingProjection(Projection): + def get_event_types(self) -> list[str]: + return ["TestEvent"] + + def apply(self, event: Event) -> None: + raise ProjectionError("apply failed") + + def revert(self, event: Event) -> None: + pass + + registry = ProjectionRegistry() + projection = FailingProjection(db=None) + registry.register(projection) + + event = make_test_event("TestEvent") + with pytest.raises(ProjectionError, match="apply failed"): + process_event(event, registry) + + def test_revert_error_propagates(self): + """Errors in revert() propagate to caller.""" + + class FailingProjection(Projection): + def get_event_types(self) -> list[str]: + return ["TestEvent"] + + def apply(self, event: Event) -> None: + pass + + def revert(self, event: Event) -> None: + raise ProjectionError("revert failed") + + registry = ProjectionRegistry() + projection = FailingProjection(db=None) + registry.register(projection) + + event = make_test_event("TestEvent") + with pytest.raises(ProjectionError, match="revert failed"): + revert_event(event, registry)