feat: add event store with nonce and clock skew validation

Implements Step 2.2 of the plan:
- EventStore class with append_event, get_event, list_events, is_tombstoned
- Event type constants for all 17 event types from spec
- ClockSkewError for rejecting timestamps >5 min in future
- DuplicateNonceError for idempotency nonce validation
- 25 tests covering all event store functionality

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-28 07:35:05 +00:00
parent cc32370d49
commit 29d5d3f8ff
6 changed files with 820 additions and 0 deletions

View File

@@ -0,0 +1,52 @@
# ABOUTME: Events package for event sourcing infrastructure.
# ABOUTME: Provides event types, store, and related exceptions.
from animaltrack.events.exceptions import ClockSkewError, DuplicateNonceError
from animaltrack.events.store import EventStore
from animaltrack.events.types import (
ALL_EVENT_TYPES,
ANIMAL_ATTRIBUTES_UPDATED,
ANIMAL_COHORT_CREATED,
ANIMAL_MERGED,
ANIMAL_MOVED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_STATUS_CORRECTED,
ANIMAL_TAG_ENDED,
ANIMAL_TAGGED,
FEED_GIVEN,
FEED_PURCHASED,
HATCH_RECORDED,
LOCATION_ARCHIVED,
LOCATION_CREATED,
LOCATION_RENAMED,
PRODUCT_COLLECTED,
PRODUCT_SOLD,
)
__all__ = [
# Event types
"LOCATION_CREATED",
"LOCATION_RENAMED",
"LOCATION_ARCHIVED",
"ANIMAL_COHORT_CREATED",
"ANIMAL_PROMOTED",
"ANIMAL_MOVED",
"ANIMAL_ATTRIBUTES_UPDATED",
"ANIMAL_TAGGED",
"ANIMAL_TAG_ENDED",
"HATCH_RECORDED",
"ANIMAL_OUTCOME",
"ANIMAL_MERGED",
"ANIMAL_STATUS_CORRECTED",
"PRODUCT_COLLECTED",
"PRODUCT_SOLD",
"FEED_PURCHASED",
"FEED_GIVEN",
"ALL_EVENT_TYPES",
# Exceptions
"ClockSkewError",
"DuplicateNonceError",
# Store
"EventStore",
]

View File

@@ -0,0 +1,18 @@
# ABOUTME: Custom exceptions for the event store.
# ABOUTME: Includes ClockSkewError and DuplicateNonceError.
class ClockSkewError(Exception):
"""Raised when ts_utc is more than 5 minutes in the future.
This guards against events with timestamps too far in the future,
which could cause issues with time-based queries and projections.
"""
class DuplicateNonceError(Exception):
"""Raised when an idempotency nonce has already been used.
Each POST form submission should have a unique nonce to prevent
duplicate event creation from double-submissions.
"""

View File

@@ -0,0 +1,247 @@
# ABOUTME: Core event store for appending, retrieving, and listing events.
# ABOUTME: Implements nonce validation and clock skew guards per spec.
import json
import time
from typing import Any
from animaltrack.events.exceptions import ClockSkewError, DuplicateNonceError
from animaltrack.id_gen import generate_id
from animaltrack.models.events import Event
# Maximum allowed clock skew: 5 minutes in milliseconds
MAX_CLOCK_SKEW_MS = 5 * 60 * 1000
class EventStore:
"""Repository for event sourcing operations.
Provides methods to append new events, retrieve existing events,
and list events with various filters. Includes validation for
clock skew and idempotency nonces.
"""
def __init__(self, db: Any) -> None:
"""Initialize the event store with a database connection.
Args:
db: A fastlite database connection.
"""
self.db = db
def append_event(
self,
event_type: str,
ts_utc: int,
actor: str,
entity_refs: dict,
payload: dict,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Append a new event to the store.
Creates a new event with all validations:
1. Clock skew check (ts_utc <= now + 5min)
2. Nonce validation (if provided, reject duplicates)
3. Generate ULID for event
4. Insert into events table
5. Record nonce (if provided) in idempotency_nonces
Args:
event_type: The type of event (e.g., ProductCollected).
ts_utc: Timestamp in milliseconds since epoch.
actor: The user or system creating the event.
entity_refs: JSON-serializable dict of entity references.
payload: JSON-serializable dict of event payload.
nonce: Optional idempotency nonce (ULID).
route: Required if nonce is provided; the endpoint route.
Returns:
The created Event model.
Raises:
ClockSkewError: If ts_utc is more than 5 minutes in the future.
DuplicateNonceError: If nonce has already been used.
ValueError: If nonce is provided without route.
"""
# Validate clock skew
self._check_clock_skew(ts_utc)
# Validate nonce requirements
if nonce is not None and route is None:
msg = "route is required when nonce is provided"
raise ValueError(msg)
# Check for duplicate nonce
if nonce is not None:
self._check_nonce(nonce)
# Generate event ID
event_id = generate_id()
# Serialize JSON fields
entity_refs_json = json.dumps(entity_refs)
payload_json = json.dumps(payload)
# Insert event
self.db.execute(
"""INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(event_id, event_type, ts_utc, actor, entity_refs_json, payload_json, 1),
)
# Record nonce if provided
if nonce is not None:
self.db.execute(
"""INSERT INTO idempotency_nonces (nonce, actor, route, created_at_utc)
VALUES (?, ?, ?, ?)""",
(nonce, actor, route, ts_utc),
)
return Event(
id=event_id,
type=event_type,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload,
version=1,
)
def get_event(self, event_id: str) -> Event | None:
"""Retrieve an event by ID.
Args:
event_id: The ULID of the event.
Returns:
The Event if found, None otherwise.
"""
row = self.db.execute(
"""SELECT id, type, ts_utc, actor, entity_refs, payload, version
FROM events WHERE id = ?""",
(event_id,),
).fetchone()
if row is None:
return None
return Event(
id=row[0],
type=row[1],
ts_utc=row[2],
actor=row[3],
entity_refs=json.loads(row[4]),
payload=json.loads(row[5]),
version=row[6],
)
def list_events(
self,
event_type: str | None = None,
since_utc: int | None = None,
until_utc: int | None = None,
actor: str | None = None,
limit: int = 100,
) -> list[Event]:
"""List events with optional filters.
Args:
event_type: Filter by event type.
since_utc: Include events with ts_utc >= since_utc.
until_utc: Include events with ts_utc <= until_utc.
actor: Filter by actor.
limit: Maximum number of events to return.
Returns:
List of events ordered by ts_utc ASC.
"""
query = "SELECT id, type, ts_utc, actor, entity_refs, payload, version FROM events"
conditions = []
params: list = []
if event_type is not None:
conditions.append("type = ?")
params.append(event_type)
if since_utc is not None:
conditions.append("ts_utc >= ?")
params.append(since_utc)
if until_utc is not None:
conditions.append("ts_utc <= ?")
params.append(until_utc)
if actor is not None:
conditions.append("actor = ?")
params.append(actor)
if conditions:
query += " WHERE " + " AND ".join(conditions)
query += " ORDER BY ts_utc ASC"
query += f" LIMIT {limit}"
rows = self.db.execute(query, tuple(params)).fetchall()
return [
Event(
id=row[0],
type=row[1],
ts_utc=row[2],
actor=row[3],
entity_refs=json.loads(row[4]),
payload=json.loads(row[5]),
version=row[6],
)
for row in rows
]
def is_tombstoned(self, event_id: str) -> bool:
"""Check if an event has been tombstoned (soft deleted).
Args:
event_id: The ULID of the event to check.
Returns:
True if a tombstone exists for the event, False otherwise.
"""
row = self.db.execute(
"SELECT 1 FROM event_tombstones WHERE target_event_id = ?",
(event_id,),
).fetchone()
return row is not None
def _check_clock_skew(self, ts_utc: int) -> None:
"""Validate that ts_utc is not too far in the future.
Args:
ts_utc: Timestamp to validate.
Raises:
ClockSkewError: If ts_utc is more than 5 minutes in the future.
"""
now_ms = int(time.time() * 1000)
if ts_utc > now_ms + MAX_CLOCK_SKEW_MS:
msg = f"ts_utc {ts_utc} is more than 5 minutes in the future"
raise ClockSkewError(msg)
def _check_nonce(self, nonce: str) -> None:
"""Check if a nonce has already been used.
Args:
nonce: The idempotency nonce to check.
Raises:
DuplicateNonceError: If the nonce already exists.
"""
existing = self.db.execute(
"SELECT 1 FROM idempotency_nonces WHERE nonce = ?",
(nonce,),
).fetchone()
if existing is not None:
msg = f"Nonce {nonce} has already been used"
raise DuplicateNonceError(msg)

View File

@@ -0,0 +1,48 @@
# ABOUTME: Event type constants for the event sourcing system.
# ABOUTME: Defines all event types as specified in spec section 6.
# Location events
LOCATION_CREATED = "LocationCreated"
LOCATION_RENAMED = "LocationRenamed"
LOCATION_ARCHIVED = "LocationArchived"
# Animal events
ANIMAL_COHORT_CREATED = "AnimalCohortCreated"
ANIMAL_PROMOTED = "AnimalPromoted"
ANIMAL_MOVED = "AnimalMoved"
ANIMAL_ATTRIBUTES_UPDATED = "AnimalAttributesUpdated"
ANIMAL_TAGGED = "AnimalTagged"
ANIMAL_TAG_ENDED = "AnimalTagEnded"
HATCH_RECORDED = "HatchRecorded"
ANIMAL_OUTCOME = "AnimalOutcome"
ANIMAL_MERGED = "AnimalMerged"
ANIMAL_STATUS_CORRECTED = "AnimalStatusCorrected"
# Product events
PRODUCT_COLLECTED = "ProductCollected"
PRODUCT_SOLD = "ProductSold"
# Feed events
FEED_PURCHASED = "FeedPurchased"
FEED_GIVEN = "FeedGiven"
# List of all event types for validation
ALL_EVENT_TYPES = [
LOCATION_CREATED,
LOCATION_RENAMED,
LOCATION_ARCHIVED,
ANIMAL_COHORT_CREATED,
ANIMAL_PROMOTED,
ANIMAL_MOVED,
ANIMAL_ATTRIBUTES_UPDATED,
ANIMAL_TAGGED,
ANIMAL_TAG_ENDED,
HATCH_RECORDED,
ANIMAL_OUTCOME,
ANIMAL_MERGED,
ANIMAL_STATUS_CORRECTED,
PRODUCT_COLLECTED,
PRODUCT_SOLD,
FEED_PURCHASED,
FEED_GIVEN,
]