feat: add event deletion with tombstone creation and cascade rules

Implement event deletion per spec §10 with role-based rules:
- Recorder can delete own events if no dependents exist
- Admin can cascade delete (tombstone target + all dependents)
- All deletions create immutable tombstone records

New modules:
- events/dependencies.py: find events depending on a target via shared animals
- events/delete.py: delete_event() with projection reversal

DependentEventsError exception added for 409 Conflict responses.
E2E test #6 implemented per spec §21.6.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 18:44:13 +00:00
parent f733a067e2
commit 282d3d0f5a
5 changed files with 1405 additions and 0 deletions

View File

@@ -0,0 +1,115 @@
# ABOUTME: Event deletion functionality with tombstone creation and cascade rules.
# ABOUTME: Implements recorder vs admin deletion rules per spec section 10.
import time
from typing import Any
from animaltrack.events.dependencies import find_dependent_events
from animaltrack.events.exceptions import (
DependentEventsError,
EventNotFoundError,
EventTombstonedError,
)
from animaltrack.events.processor import revert_event
from animaltrack.events.store import EventStore
from animaltrack.id_gen import generate_id
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
def delete_event(
db: Any,
event_store: EventStore,
event_id: str,
actor: str,
role: str,
cascade: bool = False,
reason: str | None = None,
registry: ProjectionRegistry | None = None,
) -> list[str]:
"""Delete an event by creating a tombstone.
Steps:
1. Retrieve the event (raise if not found)
2. Check if already tombstoned (raise if so)
3. Find dependent events
4. Apply role-based rules:
- Recorder: Cannot delete if dependents exist
- Admin: Can delete with cascade to delete dependents
5. Revert projections for all events to be deleted
6. Create tombstone(s)
Args:
db: Database connection.
event_store: EventStore instance.
event_id: The ULID of the event to delete.
actor: Username of who is making the deletion.
role: Either "recorder" or "admin".
cascade: If True and role is admin, delete dependents too.
reason: Optional reason for the deletion.
registry: Optional projection registry for updating projections.
Returns:
List of event IDs that were tombstoned.
Raises:
EventNotFoundError: If the event doesn't exist.
EventTombstonedError: If the event has already been deleted.
DependentEventsError: If the event has dependents and cascade is not allowed.
"""
# Get the target event
target_event = event_store.get_event(event_id)
if target_event is None:
raise EventNotFoundError(f"Event {event_id} not found")
# Check if already tombstoned
if event_store.is_tombstoned(event_id):
raise EventTombstonedError(f"Event {event_id} has already been deleted")
# Find dependent events
dependents = find_dependent_events(db, event_store, event_id)
# Apply role-based rules
if dependents:
if role == "recorder":
raise DependentEventsError(
f"Cannot delete event {event_id}: it has {len(dependents)} dependent event(s)",
dependents,
)
elif role == "admin" and not cascade:
raise DependentEventsError(
f"Cannot delete event {event_id}: it has {len(dependents)} dependent event(s). "
"Use cascade=True to delete all dependents.",
dependents,
)
# Build the list of events to delete
# Delete in reverse chronological order (newest first) to properly revert projections
events_to_delete: list[Event] = [target_event]
if cascade and dependents:
# Sort dependents by ts_utc DESC (newest first) for proper revert order
events_to_delete = sorted(dependents, key=lambda e: e.ts_utc, reverse=True) + [target_event]
# Revert projections for all events (newest first)
if registry is not None:
for event in events_to_delete:
revert_event(event, registry)
# Create tombstones
deleted_at_utc = int(time.time() * 1000)
tombstoned_ids = []
for event in events_to_delete:
tombstone_id = generate_id()
tombstone_reason = reason
if event.id != event_id:
tombstone_reason = f"cascade from {event_id}" + (f": {reason}" if reason else "")
db.execute(
"""INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason)
VALUES (?, ?, ?, ?, ?)""",
(tombstone_id, deleted_at_utc, actor, event.id, tombstone_reason),
)
tombstoned_ids.append(event.id)
return tombstoned_ids

View File

@@ -0,0 +1,78 @@
# ABOUTME: Dependency detection for event deletion.
# ABOUTME: Finds events that depend on a target event via shared animals.
import json
from typing import Any
from animaltrack.events.store import EventStore
from animaltrack.models.events import Event
def find_dependent_events(db: Any, event_store: EventStore, event_id: str) -> list[Event]:
"""Find events that depend on the given event.
An event B depends on event A if:
- B references at least one animal that A also references
- B's ts_utc > A's ts_utc (B happened after A)
- B is not tombstoned
This is used to determine if an event can be safely deleted.
If an event has dependents, a recorder cannot delete it (409 Conflict).
An admin can delete it with cascade=True to delete all dependents.
Args:
db: Database connection.
event_store: EventStore instance for fetching events.
event_id: The ULID of the target event.
Returns:
List of Event objects that depend on the target event,
ordered by ts_utc ASC (oldest first).
"""
# Get the target event
target_event = event_store.get_event(event_id)
if target_event is None:
return []
# Get animal_ids linked to the target event
target_animals = db.execute(
"SELECT animal_id FROM event_animals WHERE event_id = ?",
(event_id,),
).fetchall()
if not target_animals:
return []
target_animal_ids = [row[0] for row in target_animals]
# Find other events that reference any of these animals and are:
# - After the target event (ts_utc > target.ts_utc)
# - Not the target event itself
# - Not tombstoned
placeholders = ",".join("?" for _ in target_animal_ids)
query = f"""
SELECT DISTINCT e.id, e.type, e.ts_utc, e.actor, e.entity_refs, e.payload, e.version
FROM events e
JOIN event_animals ea ON ea.event_id = e.id
WHERE ea.animal_id IN ({placeholders})
AND e.ts_utc > ?
AND e.id != ?
AND e.id NOT IN (SELECT target_event_id FROM event_tombstones)
ORDER BY e.ts_utc ASC, e.id ASC
"""
params = (*target_animal_ids, target_event.ts_utc, event_id)
rows = db.execute(query, params).fetchall()
return [
Event(
id=row[0],
type=row[1],
ts_utc=row[2],
actor=row[3],
entity_refs=json.loads(row[4]),
payload=json.loads(row[5]),
version=row[6],
)
for row in rows
]

View File

@@ -24,3 +24,21 @@ class EventNotFoundError(Exception):
class EventTombstonedError(Exception): class EventTombstonedError(Exception):
"""Raised when attempting to edit or access a tombstoned (deleted) event.""" """Raised when attempting to edit or access a tombstoned (deleted) event."""
class DependentEventsError(Exception):
"""Raised when attempting to delete an event that has dependent events.
Contains the list of dependent events so the caller can inform the user
or request cascade deletion with admin privileges.
"""
def __init__(self, message: str, dependent_events: list) -> None:
"""Initialize with message and list of dependent events.
Args:
message: Error message.
dependent_events: List of Event objects that depend on the target.
"""
super().__init__(message)
self.dependent_events = dependent_events

369
tests/test_e2e_deletion.py Normal file
View File

@@ -0,0 +1,369 @@
# ABOUTME: E2E test #6 from spec section 21.6: Deletes - recorder vs admin cascade.
# ABOUTME: Tests FeedGiven deletion by recorder and cascade deletion by admin.
import time
import pytest
from animaltrack.events.store import EventStore
@pytest.fixture
def now_utc():
"""Current time in milliseconds since epoch."""
return int(time.time() * 1000)
@pytest.fixture
def full_projection_registry(seeded_db):
"""Create a ProjectionRegistry with all projections."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.feed import FeedInventoryProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.projections.products import ProductsProjection
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(ProductsProjection(seeded_db))
registry.register(FeedInventoryProjection(seeded_db))
return registry
@pytest.fixture
def services(seeded_db, full_projection_registry):
"""Create all services needed for E2E test."""
from animaltrack.services.animal import AnimalService
from animaltrack.services.feed import FeedService
from animaltrack.services.products import ProductService
event_store = EventStore(seeded_db)
return {
"db": seeded_db,
"event_store": event_store,
"registry": full_projection_registry,
"animal_service": AnimalService(seeded_db, event_store, full_projection_registry),
"feed_service": FeedService(seeded_db, event_store, full_projection_registry),
"product_service": ProductService(seeded_db, event_store, full_projection_registry),
}
@pytest.fixture
def baseline_setup(seeded_db, services, now_utc):
"""Set up baseline scenario matching spec E2E tests 1-5.
Creates:
- 10 adult female ducks at Strip 1
- Feed: 40kg purchased @ EUR 1.20/kg
- Feed given: 20kg to Strip 1, then 3kg more
- Eggs: 27 + 6 = 33 eggs collected (after edit from 8 to 6)
Returns dict with location IDs and event references.
"""
from animaltrack.events.payloads import (
AnimalCohortCreatedPayload,
FeedGivenPayload,
FeedPurchasedPayload,
ProductCollectedPayload,
)
# Get location IDs
strip1_id = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
nursery4_id = seeded_db.execute("SELECT id FROM locations WHERE name = 'Nursery 4'").fetchone()[
0
]
one_day_ms = 24 * 60 * 60 * 1000
animal_creation_ts = now_utc - one_day_ms
# Create 10 adult female ducks at Strip 1
cohort_payload = AnimalCohortCreatedPayload(
species="duck",
count=10,
life_stage="adult",
sex="female",
location_id=strip1_id,
origin="purchased",
)
cohort_event = services["animal_service"].create_cohort(
cohort_payload, animal_creation_ts, "test_user"
)
animal_ids = cohort_event.entity_refs["animal_ids"]
# Purchase feed: 40kg @ EUR 1.20/kg (2 bags of 20kg @ EUR 24 each)
purchase_payload = FeedPurchasedPayload(
feed_type_code="layer",
bag_size_kg=20,
bags_count=2,
bag_price_cents=2400, # EUR 24 per 20kg = EUR 1.20/kg
)
services["feed_service"].purchase_feed(purchase_payload, now_utc + 1000, "test_user")
# Give first batch: 20kg to Strip 1
give1_payload = FeedGivenPayload(
location_id=strip1_id,
feed_type_code="layer",
amount_kg=20,
)
services["feed_service"].give_feed(give1_payload, now_utc + 2000, "test_user")
# Give second batch: 3kg to Strip 1 (this is the event we'll test deleting)
give2_payload = FeedGivenPayload(
location_id=strip1_id,
feed_type_code="layer",
amount_kg=3,
)
services["feed_service"].give_feed(give2_payload, now_utc + 2500, "test_user")
# Give third batch: 4kg to Strip 1 (this is the event recorder will delete)
give3_payload = FeedGivenPayload(
location_id=strip1_id,
feed_type_code="layer",
amount_kg=4,
)
give4kg_event = services["feed_service"].give_feed(give3_payload, now_utc + 2600, "test_user")
# Collect eggs: 27 first, then 6 (simulating edit from 8 to 6)
collect1_payload = ProductCollectedPayload(
location_id=strip1_id,
product_code="egg.duck",
quantity=27,
resolved_ids=animal_ids,
)
services["product_service"].collect_product(collect1_payload, now_utc + 3000, "test_user")
collect2_payload = ProductCollectedPayload(
location_id=strip1_id,
product_code="egg.duck",
quantity=6,
resolved_ids=animal_ids,
)
services["product_service"].collect_product(collect2_payload, now_utc + 3500, "test_user")
return {
"strip1_id": strip1_id,
"nursery4_id": nursery4_id,
"animal_ids": animal_ids,
"give4kg_event_id": give4kg_event.id,
"ts_utc": now_utc + 4000, # After all baseline events
}
class TestE2EDeletion:
"""E2E Test #6: Deletes - recorder vs admin cascade.
From spec section 21.6:
1. Recorder deletes 4 kg FeedGiven from @Strip 1
Expect: FeedInventory: given_kg=19; balance_kg=21
2. Create cohort 1 juvenile @Nursery 4; move to Strip 1
Recorder tries delete cohort → 409 (dependents)
Admin deletes cohort with cascade → both tombstoned; animal removed
"""
def test_recorder_deletes_feed_given(self, seeded_db, services, baseline_setup, now_utc):
"""Part 1: Recorder successfully deletes FeedGiven event with no dependents.
After deleting the 4kg FeedGiven:
- given_kg decreases from 27 to 23
- balance_kg increases from 13 to 17
"""
from animaltrack.events.delete import delete_event
# Verify initial state: 27kg given (20 + 3 + 4), 13 balance
row = seeded_db.execute(
"SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory "
"WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg
assert row[1] == 27 # given_kg (20 + 3 + 4)
assert row[2] == 13 # balance_kg (40 - 27)
# Recorder deletes the 4kg FeedGiven event
deleted_ids = delete_event(
db=seeded_db,
event_store=services["event_store"],
event_id=baseline_setup["give4kg_event_id"],
actor="test_user",
role="recorder",
registry=services["registry"],
)
assert len(deleted_ids) == 1
assert deleted_ids[0] == baseline_setup["give4kg_event_id"]
# Verify updated state: 23kg given (20 + 3), 17 balance
row = seeded_db.execute(
"SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory "
"WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg unchanged
assert row[1] == 23 # given_kg reduced by 4
assert row[2] == 17 # balance_kg increased by 4
def test_recorder_blocked_from_deleting_cohort_with_dependents(
self, seeded_db, services, baseline_setup, now_utc
):
"""Part 2a: Recorder cannot delete cohort that has dependent move event."""
from animaltrack.events.delete import delete_event
from animaltrack.events.exceptions import DependentEventsError
from animaltrack.events.payloads import AnimalCohortCreatedPayload, AnimalMovedPayload
# Create 1 juvenile at Nursery 4
cohort_payload = AnimalCohortCreatedPayload(
species="duck",
count=1,
life_stage="juvenile",
sex="male",
location_id=baseline_setup["nursery4_id"],
origin="hatched",
)
cohort_event = services["animal_service"].create_cohort(
cohort_payload, baseline_setup["ts_utc"] + 1000, "test_user"
)
new_animal_ids = cohort_event.entity_refs["animal_ids"]
# Move the juvenile from Nursery 4 to Strip 1
move_payload = AnimalMovedPayload(
to_location_id=baseline_setup["strip1_id"],
resolved_ids=new_animal_ids,
)
move_event = services["animal_service"].move_animals(
move_payload, baseline_setup["ts_utc"] + 2000, "test_user"
)
# Verify animal exists and is at Strip 1
animal_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_registry WHERE animal_id = ? AND location_id = ?",
(new_animal_ids[0], baseline_setup["strip1_id"]),
).fetchone()[0]
assert animal_count == 1
# Recorder tries to delete the cohort - should be blocked
with pytest.raises(DependentEventsError) as exc_info:
delete_event(
db=seeded_db,
event_store=services["event_store"],
event_id=cohort_event.id,
actor="test_user",
role="recorder",
registry=services["registry"],
)
# Verify the error contains the move event as a dependent
assert len(exc_info.value.dependent_events) == 1
assert exc_info.value.dependent_events[0].id == move_event.id
# Verify cohort is NOT tombstoned
assert not services["event_store"].is_tombstoned(cohort_event.id)
def test_admin_cascade_deletes_cohort_and_move(
self, seeded_db, services, baseline_setup, now_utc
):
"""Part 2b: Admin can cascade delete cohort + dependent move."""
from animaltrack.events.delete import delete_event
from animaltrack.events.payloads import AnimalCohortCreatedPayload, AnimalMovedPayload
# Create 1 juvenile at Nursery 4
cohort_payload = AnimalCohortCreatedPayload(
species="duck",
count=1,
life_stage="juvenile",
sex="male",
location_id=baseline_setup["nursery4_id"],
origin="hatched",
)
cohort_event = services["animal_service"].create_cohort(
cohort_payload, baseline_setup["ts_utc"] + 1000, "test_user"
)
new_animal_ids = cohort_event.entity_refs["animal_ids"]
new_animal_id = new_animal_ids[0]
# Move the juvenile from Nursery 4 to Strip 1
move_payload = AnimalMovedPayload(
to_location_id=baseline_setup["strip1_id"],
resolved_ids=new_animal_ids,
)
move_event = services["animal_service"].move_animals(
move_payload, baseline_setup["ts_utc"] + 2000, "test_user"
)
# Verify animal exists before deletion
animal_in_registry = seeded_db.execute(
"SELECT COUNT(*) FROM animal_registry WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert animal_in_registry == 1
animal_in_roster = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert animal_in_roster == 1
location_intervals = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert location_intervals == 2 # Initial at Nursery 4, then moved to Strip 1
attr_intervals = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert attr_intervals == 4 # sex, life_stage, repro_status, status
# Admin cascade deletes the cohort
deleted_ids = delete_event(
db=seeded_db,
event_store=services["event_store"],
event_id=cohort_event.id,
actor="admin",
role="admin",
cascade=True,
registry=services["registry"],
)
# Both events should be tombstoned
assert len(deleted_ids) == 2
assert cohort_event.id in deleted_ids
assert move_event.id in deleted_ids
assert services["event_store"].is_tombstoned(cohort_event.id)
assert services["event_store"].is_tombstoned(move_event.id)
# Animal should be removed from registry
animal_in_registry = seeded_db.execute(
"SELECT COUNT(*) FROM animal_registry WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert animal_in_registry == 0
# Animal should be removed from roster
animal_in_roster = seeded_db.execute(
"SELECT COUNT(*) FROM live_animals_by_location WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert animal_in_roster == 0
# All intervals should be removed
location_intervals = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert location_intervals == 0
attr_intervals = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(new_animal_id,),
).fetchone()[0]
assert attr_intervals == 0
# Original baseline animals should still exist
original_animal_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_registry WHERE animal_id IN "
f"({','.join('?' for _ in baseline_setup['animal_ids'])})",
baseline_setup["animal_ids"],
).fetchone()[0]
assert original_animal_count == 10

View File

@@ -0,0 +1,825 @@
# ABOUTME: Tests for event deletion functionality.
# ABOUTME: Validates tombstone creation, projection reversal, and cascade rules.
import time
import pytest
from animaltrack.events.store import EventStore
from animaltrack.events.types import (
ANIMAL_COHORT_CREATED,
ANIMAL_MOVED,
FEED_PURCHASED,
)
from animaltrack.id_gen import generate_id
@pytest.fixture
def now_utc():
"""Current time in milliseconds since epoch."""
return int(time.time() * 1000)
@pytest.fixture
def event_store(migrated_db):
"""Create an EventStore instance with a migrated database."""
return EventStore(migrated_db)
@pytest.fixture
def seeded_event_store(seeded_db):
"""Create an EventStore with seeded database."""
return EventStore(seeded_db)
class TestFindDependentEvents:
"""Tests for finding events that depend on a target event."""
@pytest.fixture
def full_registry(self, seeded_db):
"""Create a ProjectionRegistry with all projections."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
return registry
def test_find_no_dependents_returns_empty(self, seeded_db, seeded_event_store, now_utc):
"""Event with no dependents returns empty list."""
from animaltrack.events.dependencies import find_dependent_events
# Create a standalone event (FeedPurchased has no animal links)
event = seeded_event_store.append_event(
event_type=FEED_PURCHASED,
ts_utc=now_utc,
actor="test_user",
entity_refs={
"feed_type_code": "layer",
"total_kg": 20,
"price_per_kg_cents": 120,
},
payload={
"feed_type_code": "layer",
"bag_size_kg": 20,
"bags_count": 1,
"bag_price_cents": 2400,
},
)
dependents = find_dependent_events(seeded_db, seeded_event_store, event.id)
assert dependents == []
def test_find_dependents_for_cohort_with_move(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""AnimalMoved event depends on the AnimalCohortCreated event."""
from animaltrack.events.dependencies import find_dependent_events
from animaltrack.events.processor import process_event
# Get two locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall()
loc1, loc2 = locations[0][0], locations[1][0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# Move the animal
move_event = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event, full_registry)
# Find dependents of the cohort event
dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id)
assert len(dependents) == 1
assert dependents[0].id == move_event.id
def test_find_dependents_excludes_tombstoned_events(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""Tombstoned events should not be returned as dependents."""
from animaltrack.events.dependencies import find_dependent_events
from animaltrack.events.processor import process_event
# Get two locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall()
loc1, loc2 = locations[0][0], locations[1][0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# Move the animal
move_event = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event, full_registry)
# Tombstone the move event
tombstone_id = generate_id()
seeded_db.execute(
"""INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason)
VALUES (?, ?, ?, ?, ?)""",
(tombstone_id, now_utc + 2000, "admin", move_event.id, "Test deletion"),
)
# Find dependents of the cohort event - should be empty now
dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id)
assert dependents == []
def test_find_dependents_does_not_include_self(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""The target event itself is not included in dependents."""
from animaltrack.events.dependencies import find_dependent_events
from animaltrack.events.processor import process_event
# Get a location
loc1 = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# Find dependents of the cohort event (no moves, so only the cohort itself)
dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id)
# The cohort event should NOT be in the list
assert cohort_event.id not in [d.id for d in dependents]
def test_find_dependents_multiple_dependents(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""Multiple dependent events are all returned."""
from animaltrack.events.dependencies import find_dependent_events
from animaltrack.events.processor import process_event
# Get three locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 3").fetchall()
loc1, loc2, loc3 = locations[0][0], locations[1][0], locations[2][0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# First move: loc1 -> loc2
move_event1 = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event1, full_registry)
# Second move: loc2 -> loc3
move_event2 = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 2000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc2,
"to_location_id": loc3,
},
payload={},
)
process_event(move_event2, full_registry)
# Find dependents of the cohort event
dependents = find_dependent_events(seeded_db, seeded_event_store, cohort_event.id)
assert len(dependents) == 2
dependent_ids = {d.id for d in dependents}
assert move_event1.id in dependent_ids
assert move_event2.id in dependent_ids
class TestDeleteEventTombstone:
"""Tests for tombstone creation when deleting events."""
@pytest.fixture
def feed_registry(self, seeded_db):
"""Create a ProjectionRegistry with feed projections."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.feed import FeedInventoryProjection
registry = ProjectionRegistry()
registry.register(FeedInventoryProjection(seeded_db))
return registry
def test_delete_event_creates_tombstone(
self, seeded_db, seeded_event_store, feed_registry, now_utc
):
"""Deleting an event creates a tombstone record."""
from animaltrack.events.delete import delete_event
# Create a feed purchase event
event = seeded_event_store.append_event(
event_type=FEED_PURCHASED,
ts_utc=now_utc,
actor="test_user",
entity_refs={
"feed_type_code": "layer",
"total_kg": 20,
"price_per_kg_cents": 120,
},
payload={
"feed_type_code": "layer",
"bag_size_kg": 20,
"bags_count": 1,
"bag_price_cents": 2400,
},
)
# Delete the event
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=event.id,
actor="admin",
role="admin",
registry=feed_registry,
)
# Verify tombstone exists
row = seeded_db.execute(
"SELECT actor, target_event_id FROM event_tombstones WHERE target_event_id = ?",
(event.id,),
).fetchone()
assert row is not None
assert row[0] == "admin"
assert row[1] == event.id
def test_delete_event_makes_event_tombstoned(
self, seeded_db, seeded_event_store, feed_registry, now_utc
):
"""After deletion, is_tombstoned() returns True."""
from animaltrack.events.delete import delete_event
event = seeded_event_store.append_event(
event_type=FEED_PURCHASED,
ts_utc=now_utc,
actor="test_user",
entity_refs={
"feed_type_code": "layer",
"total_kg": 20,
"price_per_kg_cents": 120,
},
payload={
"feed_type_code": "layer",
"bag_size_kg": 20,
"bags_count": 1,
"bag_price_cents": 2400,
},
)
assert not seeded_event_store.is_tombstoned(event.id)
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=event.id,
actor="admin",
role="admin",
registry=feed_registry,
)
assert seeded_event_store.is_tombstoned(event.id)
def test_delete_already_tombstoned_raises(
self, seeded_db, seeded_event_store, feed_registry, now_utc
):
"""Deleting an already tombstoned event raises EventTombstonedError."""
from animaltrack.events.delete import delete_event
from animaltrack.events.exceptions import EventTombstonedError
event = seeded_event_store.append_event(
event_type=FEED_PURCHASED,
ts_utc=now_utc,
actor="test_user",
entity_refs={
"feed_type_code": "layer",
"total_kg": 20,
"price_per_kg_cents": 120,
},
payload={
"feed_type_code": "layer",
"bag_size_kg": 20,
"bags_count": 1,
"bag_price_cents": 2400,
},
)
# First deletion succeeds
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=event.id,
actor="admin",
role="admin",
registry=feed_registry,
)
# Second deletion raises
with pytest.raises(EventTombstonedError):
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=event.id,
actor="admin",
role="admin",
registry=feed_registry,
)
def test_delete_nonexistent_event_raises(
self, seeded_db, seeded_event_store, feed_registry, now_utc
):
"""Deleting a non-existent event raises EventNotFoundError."""
from animaltrack.events.delete import delete_event
from animaltrack.events.exceptions import EventNotFoundError
with pytest.raises(EventNotFoundError):
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id="01ARZ3NDEKTSV4RRFFQ69G5FAV",
actor="admin",
role="admin",
registry=feed_registry,
)
class TestDeleteEventProjectionRevert:
"""Tests for projection reversal when deleting events."""
@pytest.fixture
def feed_registry(self, seeded_db):
"""Create a ProjectionRegistry with feed projections."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.feed import FeedInventoryProjection
registry = ProjectionRegistry()
registry.register(FeedInventoryProjection(seeded_db))
return registry
@pytest.fixture
def feed_service(self, seeded_db, feed_registry):
"""Create a FeedService for testing."""
from animaltrack.services.feed import FeedService
event_store = EventStore(seeded_db)
return FeedService(seeded_db, event_store, feed_registry)
def test_delete_feed_given_reverts_inventory(
self, seeded_db, seeded_event_store, feed_registry, feed_service, now_utc
):
"""Deleting a FeedGiven event reverts the inventory changes."""
from animaltrack.events.delete import delete_event
from animaltrack.events.payloads import FeedGivenPayload, FeedPurchasedPayload
# Get a valid location_id
location_id = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0]
# Purchase 40kg of feed
purchase_payload = FeedPurchasedPayload(
feed_type_code="layer",
bag_size_kg=20,
bags_count=2,
bag_price_cents=2400,
)
feed_service.purchase_feed(purchase_payload, now_utc, "test_user")
# Give 6kg of feed
give_payload = FeedGivenPayload(
location_id=location_id,
feed_type_code="layer",
amount_kg=6,
)
give_event = feed_service.give_feed(give_payload, now_utc + 1000, "test_user")
# Verify initial state: 40 purchased, 6 given, 34 balance
row = seeded_db.execute(
"SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory "
"WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg
assert row[1] == 6 # given_kg
assert row[2] == 34 # balance_kg
# Delete the give event
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=give_event.id,
actor="test_user",
role="recorder",
registry=feed_registry,
)
# Verify reverted state: 40 purchased, 0 given, 40 balance
row = seeded_db.execute(
"SELECT purchased_kg, given_kg, balance_kg FROM feed_inventory "
"WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg unchanged
assert row[1] == 0 # given_kg reverted
assert row[2] == 40 # balance_kg restored
def test_delete_feed_purchased_reverts_inventory(
self, seeded_db, seeded_event_store, feed_registry, feed_service, now_utc
):
"""Deleting a FeedPurchased event reverts the inventory changes."""
from animaltrack.events.delete import delete_event
from animaltrack.events.payloads import FeedPurchasedPayload
# Purchase 40kg of feed
purchase_payload = FeedPurchasedPayload(
feed_type_code="layer",
bag_size_kg=20,
bags_count=2,
bag_price_cents=2400,
)
purchase_event = feed_service.purchase_feed(purchase_payload, now_utc, "test_user")
# Verify initial state
row = seeded_db.execute(
"SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 40 # purchased_kg
assert row[1] == 40 # balance_kg
# Delete the purchase event
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=purchase_event.id,
actor="admin",
role="admin",
registry=feed_registry,
)
# Verify reverted state: 0 purchased, 0 balance
row = seeded_db.execute(
"SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = 'layer'"
).fetchone()
assert row[0] == 0 # purchased_kg reverted
assert row[1] == 0 # balance_kg reverted
class TestDeleteEventRoleRules:
"""Tests for recorder vs admin deletion rules."""
@pytest.fixture
def full_registry(self, seeded_db):
"""Create a ProjectionRegistry with all projections."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
return registry
def test_recorder_blocked_with_dependents(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""Recorder cannot delete event that has dependents."""
from animaltrack.events.delete import delete_event
from animaltrack.events.exceptions import DependentEventsError
from animaltrack.events.processor import process_event
# Get two locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall()
loc1, loc2 = locations[0][0], locations[1][0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# Move the animal
move_event = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event, full_registry)
# Recorder tries to delete cohort - should fail
with pytest.raises(DependentEventsError) as exc_info:
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=cohort_event.id,
actor="test_user",
role="recorder",
registry=full_registry,
)
# Verify the error contains the dependent event
assert len(exc_info.value.dependent_events) == 1
assert exc_info.value.dependent_events[0].id == move_event.id
def test_admin_blocked_without_cascade(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""Admin without cascade flag cannot delete event with dependents."""
from animaltrack.events.delete import delete_event
from animaltrack.events.exceptions import DependentEventsError
from animaltrack.events.processor import process_event
# Get two locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall()
loc1, loc2 = locations[0][0], locations[1][0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# Move the animal
move_event = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event, full_registry)
# Admin without cascade tries to delete cohort - should fail
with pytest.raises(DependentEventsError):
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=cohort_event.id,
actor="admin",
role="admin",
cascade=False,
registry=full_registry,
)
def test_admin_cascade_deletes_dependents(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""Admin with cascade flag deletes target and all dependents."""
from animaltrack.events.delete import delete_event
from animaltrack.events.processor import process_event
# Get two locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall()
loc1, loc2 = locations[0][0], locations[1][0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# Move the animal
move_event = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event, full_registry)
# Admin with cascade deletes cohort
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=cohort_event.id,
actor="admin",
role="admin",
cascade=True,
registry=full_registry,
)
# Both events should be tombstoned
assert seeded_event_store.is_tombstoned(cohort_event.id)
assert seeded_event_store.is_tombstoned(move_event.id)
def test_cascade_reverts_all_projections(
self, seeded_db, seeded_event_store, full_registry, now_utc
):
"""Cascade deletion reverts projections for all deleted events."""
from animaltrack.events.delete import delete_event
from animaltrack.events.processor import process_event
# Get two locations
locations = seeded_db.execute("SELECT id FROM locations LIMIT 2").fetchall()
loc1, loc2 = locations[0][0], locations[1][0]
# Create cohort
animal_id = generate_id()
cohort_event = seeded_event_store.append_event(
event_type=ANIMAL_COHORT_CREATED,
ts_utc=now_utc,
actor="test_user",
entity_refs={"animal_ids": [animal_id]},
payload={
"location_id": loc1,
"species": "duck",
"count": 1,
"sex": "female",
"life_stage": "adult",
"origin": "hatched",
},
)
process_event(cohort_event, full_registry)
# Move the animal
move_event = seeded_event_store.append_event(
event_type=ANIMAL_MOVED,
ts_utc=now_utc + 1000,
actor="test_user",
entity_refs={
"animal_ids": [animal_id],
"from_location_id": loc1,
"to_location_id": loc2,
},
payload={},
)
process_event(move_event, full_registry)
# Verify animal exists before deletion
animal_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert animal_count == 1
# Admin cascade deletes cohort
delete_event(
db=seeded_db,
event_store=seeded_event_store,
event_id=cohort_event.id,
actor="admin",
role="admin",
cascade=True,
registry=full_registry,
)
# Animal should be removed from registry
animal_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert animal_count == 0
# Location intervals should be removed
interval_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_location_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert interval_count == 0
# Attribute intervals should be removed
attr_count = seeded_db.execute(
"SELECT COUNT(*) FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
).fetchone()[0]
assert attr_count == 0