From 85a4c6bc7b28d0b6238f90be5c0b84f621c97911 Mon Sep 17 00:00:00 2001 From: Petru Paler Date: Fri, 2 Jan 2026 10:35:39 +0000 Subject: [PATCH] feat: add rebuild-projections CLI and fix event delete projection revert MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Add 'rebuild-projections' CLI command that truncates all projection tables and replays non-tombstoned events to rebuild state - Fix event delete route to register all projections before calling delete_event, ensuring projections are properly reverted - Add comprehensive tests for both rebuild CLI and delete with projections The rebuild-projections command is useful for recovering from corrupted projection state, while the delete fix ensures future deletes properly revert animal status (e.g., sold -> alive). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- src/animaltrack/cli.py | 93 ++++++++- src/animaltrack/web/routes/events.py | 14 +- tests/test_cli_rebuild.py | 251 ++++++++++++++++++++++++ tests/test_web_events_delete.py | 279 +++++++++++++++++++++++++++ 4 files changed, 635 insertions(+), 2 deletions(-) create mode 100644 tests/test_cli_rebuild.py create mode 100644 tests/test_web_events_delete.py diff --git a/src/animaltrack/cli.py b/src/animaltrack/cli.py index 4757acf..6c9cbc7 100644 --- a/src/animaltrack/cli.py +++ b/src/animaltrack/cli.py @@ -1,5 +1,5 @@ # ABOUTME: Command-line interface for AnimalTrack. -# ABOUTME: Provides migrate, seed, and serve commands. +# ABOUTME: Provides migrate, seed, serve, and rebuild-projections commands. import argparse import sys @@ -29,6 +29,12 @@ def main(): serve_parser.add_argument("--port", type=int, default=3366, help="Port to listen on") serve_parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to") + # rebuild-projections command + subparsers.add_parser( + "rebuild-projections", + help="Rebuild all projections by replaying the event log", + ) + args = parser.parse_args() if args.command is None: @@ -113,6 +119,91 @@ def main(): # Start server print(f"Starting server on {args.host}:{args.port}...") uvicorn.run(app, host=args.host, port=args.port) + elif args.command == "rebuild-projections": + import json + + from animaltrack.config import Settings + from animaltrack.db import get_db + from animaltrack.events.processor import process_event + from animaltrack.migrations import run_migrations + from animaltrack.models.events import Event + from animaltrack.projections import ProjectionRegistry + from animaltrack.projections.animal_registry import AnimalRegistryProjection + from animaltrack.projections.event_animals import EventAnimalsProjection + from animaltrack.projections.event_log import EventLogProjection + from animaltrack.projections.feed import FeedInventoryProjection + from animaltrack.projections.intervals import IntervalProjection + from animaltrack.projections.products import ProductsProjection + + settings = Settings() + + # Run migrations first + print("Running migrations...") + success = run_migrations( + db_path=settings.db_path, + migrations_dir="migrations", + verbose=False, + ) + if not success: + print("Migration failed", file=sys.stderr) + sys.exit(1) + + db = get_db(settings.db_path) + + # Projection tables to truncate + projection_tables = [ + "animal_registry", + "live_animals_by_location", + "animal_location_intervals", + "animal_attr_intervals", + "event_animals", + "event_log_by_location", + "feed_inventory", + ] + + print("Truncating projection tables...") + for table in projection_tables: + db.execute(f"DELETE FROM {table}") + print(f" Truncated {len(projection_tables)} tables") + + # Register all projections + registry = ProjectionRegistry() + registry.register(AnimalRegistryProjection(db)) + registry.register(IntervalProjection(db)) + registry.register(EventAnimalsProjection(db)) + registry.register(ProductsProjection(db)) + registry.register(FeedInventoryProjection(db)) + registry.register(EventLogProjection(db)) + + # Get all non-tombstoned events in order + print("Fetching events...") + rows = list( + db.execute( + """SELECT id, type, ts_utc, actor, version, payload, entity_refs + FROM events + WHERE id NOT IN (SELECT target_event_id FROM event_tombstones) + ORDER BY ts_utc, id""" + ).fetchall() + ) + print(f" Found {len(rows)} events to replay") + + # Replay events through projections + print("Replaying events...") + for i, row in enumerate(rows): + event = Event( + id=row[0], + type=row[1], + ts_utc=row[2], + actor=row[3], + version=row[4], + payload=json.loads(row[5]), + entity_refs=json.loads(row[6]), + ) + process_event(event, registry) + if (i + 1) % 100 == 0: + print(f" Processed {i + 1}/{len(rows)} events...") + + print(f"Rebuild complete: processed {len(rows)} events") if __name__ == "__main__": diff --git a/src/animaltrack/web/routes/events.py b/src/animaltrack/web/routes/events.py index 5c0b4d8..aa2352b 100644 --- a/src/animaltrack/web/routes/events.py +++ b/src/animaltrack/web/routes/events.py @@ -20,6 +20,12 @@ from animaltrack.events.exceptions import ( from animaltrack.events.store import EventStore from animaltrack.models.reference import UserRole from animaltrack.projections import ProjectionRegistry +from animaltrack.projections.animal_registry import AnimalRegistryProjection +from animaltrack.projections.event_animals import EventAnimalsProjection +from animaltrack.projections.event_log import EventLogProjection +from animaltrack.projections.feed import FeedInventoryProjection +from animaltrack.projections.intervals import IntervalProjection +from animaltrack.projections.products import ProductsProjection from animaltrack.repositories.locations import LocationRepository from animaltrack.repositories.user_defaults import UserDefaultsRepository from animaltrack.web.templates import render_page @@ -344,9 +350,15 @@ async def event_delete(request: Request, event_id: str): reason = form.get("reason", "") cascade = form.get("cascade", "false") == "true" - # Get event store and registry + # Get event store and registry with all projections event_store = EventStore(db) registry = ProjectionRegistry() + registry.register(AnimalRegistryProjection(db)) + registry.register(IntervalProjection(db)) + registry.register(EventAnimalsProjection(db)) + registry.register(ProductsProjection(db)) + registry.register(FeedInventoryProjection(db)) + registry.register(EventLogProjection(db)) try: # Check for dependent events first diff --git a/tests/test_cli_rebuild.py b/tests/test_cli_rebuild.py new file mode 100644 index 0000000..5562c98 --- /dev/null +++ b/tests/test_cli_rebuild.py @@ -0,0 +1,251 @@ +# ABOUTME: Tests for rebuild-projections CLI command. +# ABOUTME: Verifies projection tables are truncated and events are replayed correctly. + +import os +import subprocess +import sys +from pathlib import Path + +from animaltrack.db import get_db +from animaltrack.events.enums import LifeStage, Origin +from animaltrack.events.payloads import AnimalCohortCreatedPayload +from animaltrack.events.store import EventStore +from animaltrack.migrations import run_migrations +from animaltrack.projections import ProjectionRegistry +from animaltrack.projections.animal_registry import AnimalRegistryProjection +from animaltrack.projections.event_animals import EventAnimalsProjection +from animaltrack.projections.event_log import EventLogProjection +from animaltrack.projections.feed import FeedInventoryProjection +from animaltrack.projections.intervals import IntervalProjection +from animaltrack.projections.products import ProductsProjection +from animaltrack.seeds import run_seeds +from animaltrack.services.animal import AnimalService + +PROJECT_ROOT = Path(__file__).parent.parent + + +class TestRebuildProjectionsCLI: + """Tests for rebuild-projections command.""" + + def test_rebuild_command_success(self, tmp_path): + """Should rebuild projections via CLI and exit 0.""" + db_path = tmp_path / "test.db" + + env = os.environ.copy() + env["DB_PATH"] = str(db_path) + env["CSRF_SECRET"] = "test-secret-for-csrf" + env["PYTHONPATH"] = str(PROJECT_ROOT / "src") + + # First seed the database + result = subprocess.run( + [sys.executable, "-m", "animaltrack.cli", "seed"], + capture_output=True, + text=True, + env=env, + cwd=str(PROJECT_ROOT), + ) + assert result.returncode == 0, f"Seed failed: {result.stderr}" + + # Then rebuild projections + result = subprocess.run( + [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], + capture_output=True, + text=True, + env=env, + cwd=str(PROJECT_ROOT), + ) + + assert result.returncode == 0, f"Rebuild failed: {result.stderr}" + assert "Truncating projection tables" in result.stdout + assert "Rebuild complete" in result.stdout + + def test_rebuild_with_events(self, tmp_path): + """Should correctly replay events and update projections.""" + db_path = tmp_path / "test.db" + + # Set up database with migrations and seeds + run_migrations(str(db_path), "migrations", verbose=False) + db = get_db(str(db_path)) + run_seeds(db) + + # Create some events via AnimalService + event_store = EventStore(db) + registry = ProjectionRegistry() + registry.register(AnimalRegistryProjection(db)) + registry.register(IntervalProjection(db)) + registry.register(EventAnimalsProjection(db)) + registry.register(ProductsProjection(db)) + registry.register(FeedInventoryProjection(db)) + registry.register(EventLogProjection(db)) + + animal_service = AnimalService(db, event_store, registry) + + # Create a cohort + import time + + ts_utc = int(time.time() * 1000) + location = db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] + + payload = AnimalCohortCreatedPayload( + species="duck", + count=5, + origin=Origin.PURCHASED, + life_stage=LifeStage.ADULT, + location_id=location, + ) + animal_service.create_cohort( + payload=payload, + ts_utc=ts_utc, + actor="test", + ) + + # Verify animal_registry has entries + count_before = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] + assert count_before == 5 + + # Clear projections manually (simulating corruption) + db.execute("DELETE FROM animal_registry") + db.execute("DELETE FROM live_animals_by_location") + count_cleared = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] + assert count_cleared == 0 + + # Now run rebuild via CLI + env = os.environ.copy() + env["DB_PATH"] = str(db_path) + env["CSRF_SECRET"] = "test-secret-for-csrf" + env["PYTHONPATH"] = str(PROJECT_ROOT / "src") + + result = subprocess.run( + [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], + capture_output=True, + text=True, + env=env, + cwd=str(PROJECT_ROOT), + ) + + assert result.returncode == 0, f"Rebuild failed: {result.stderr}" + # Verify events were processed (seed data may add additional events) + assert "events to replay" in result.stdout + assert "Rebuild complete" in result.stdout + + # Verify projections are restored + db2 = get_db(str(db_path)) + count_after = db2.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] + # Should have at least our 5 animals restored + assert count_after >= 5 + + def test_rebuild_skips_tombstoned_events(self, tmp_path): + """Should not replay events that have been tombstoned.""" + db_path = tmp_path / "test.db" + + # Set up database + run_migrations(str(db_path), "migrations", verbose=False) + db = get_db(str(db_path)) + run_seeds(db) + + # Count animals from seed data + seed_animal_count = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] + + # Create events via AnimalService + event_store = EventStore(db) + registry = ProjectionRegistry() + registry.register(AnimalRegistryProjection(db)) + registry.register(IntervalProjection(db)) + registry.register(EventAnimalsProjection(db)) + registry.register(ProductsProjection(db)) + registry.register(FeedInventoryProjection(db)) + registry.register(EventLogProjection(db)) + + animal_service = AnimalService(db, event_store, registry) + + import time + + ts_utc = int(time.time() * 1000) + location = db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] + + # Create two cohorts + payload1 = AnimalCohortCreatedPayload( + species="duck", + count=3, + origin=Origin.PURCHASED, + life_stage=LifeStage.ADULT, + location_id=location, + ) + animal_service.create_cohort( + payload=payload1, + ts_utc=ts_utc, + actor="test", + ) + payload2 = AnimalCohortCreatedPayload( + species="duck", + count=2, + origin=Origin.PURCHASED, + life_stage=LifeStage.ADULT, + location_id=location, + ) + event2 = animal_service.create_cohort( + payload=payload2, + ts_utc=ts_utc + 1000, + actor="test", + ) + + # Verify we have seed animals + 5 new animals + count_before = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] + assert count_before == seed_animal_count + 5 + + # Tombstone the second event (manually, to simulate what delete_event does) + from ulid import ULID + + tombstone_id = str(ULID()) + db.execute( + """INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason) + VALUES (?, ?, ?, ?, ?)""", + (tombstone_id, ts_utc + 2000, "test", event2.id, "test deletion"), + ) + + # Run rebuild via CLI + env = os.environ.copy() + env["DB_PATH"] = str(db_path) + env["CSRF_SECRET"] = "test-secret-for-csrf" + env["PYTHONPATH"] = str(PROJECT_ROOT / "src") + + result = subprocess.run( + [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], + capture_output=True, + text=True, + env=env, + cwd=str(PROJECT_ROOT), + ) + + assert result.returncode == 0, f"Rebuild failed: {result.stderr}" + # Verify rebuild completed + assert "events to replay" in result.stdout + assert "Rebuild complete" in result.stdout + + # Verify only seed animals + 3 from first cohort (second cohort tombstoned) + db2 = get_db(str(db_path)) + count_after = db2.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] + # Should have 2 fewer animals (the tombstoned cohort had count=2) + assert count_after == seed_animal_count + 3 + + def test_rebuild_empty_event_log(self, tmp_path): + """Should handle empty event log gracefully.""" + db_path = tmp_path / "test.db" + + env = os.environ.copy() + env["DB_PATH"] = str(db_path) + env["CSRF_SECRET"] = "test-secret-for-csrf" + env["PYTHONPATH"] = str(PROJECT_ROOT / "src") + + # Just run migrations (no seeds, no events) + result = subprocess.run( + [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], + capture_output=True, + text=True, + env=env, + cwd=str(PROJECT_ROOT), + ) + + assert result.returncode == 0, f"Rebuild failed: {result.stderr}" + assert "Found 0 events to replay" in result.stdout + assert "Rebuild complete: processed 0 events" in result.stdout diff --git a/tests/test_web_events_delete.py b/tests/test_web_events_delete.py new file mode 100644 index 0000000..79c59cf --- /dev/null +++ b/tests/test_web_events_delete.py @@ -0,0 +1,279 @@ +# ABOUTME: Tests for event delete with projection verification. +# ABOUTME: Verifies that deleting events properly reverts projections. + +import time + +from animaltrack.db import get_db +from animaltrack.events.delete import delete_event +from animaltrack.events.enums import LifeStage, Origin, Outcome +from animaltrack.events.payloads import AnimalCohortCreatedPayload, AnimalOutcomePayload +from animaltrack.events.store import EventStore +from animaltrack.migrations import run_migrations +from animaltrack.projections import ProjectionRegistry +from animaltrack.projections.animal_registry import AnimalRegistryProjection +from animaltrack.projections.event_animals import EventAnimalsProjection +from animaltrack.projections.event_log import EventLogProjection +from animaltrack.projections.feed import FeedInventoryProjection +from animaltrack.projections.intervals import IntervalProjection +from animaltrack.projections.products import ProductsProjection +from animaltrack.seeds import run_seeds +from animaltrack.services.animal import AnimalService + + +class TestEventDeleteProjections: + """Tests for delete_event with projection updates.""" + + def test_delete_animal_outcome_reverts_status(self, tmp_path): + """Deleting AnimalOutcome should revert animals to alive status.""" + db_path = tmp_path / "test.db" + + # Set up database + run_migrations(str(db_path), "migrations", verbose=False) + db = get_db(str(db_path)) + run_seeds(db) + + # Create projections and services + event_store = EventStore(db) + registry = ProjectionRegistry() + registry.register(AnimalRegistryProjection(db)) + registry.register(IntervalProjection(db)) + registry.register(EventAnimalsProjection(db)) + registry.register(ProductsProjection(db)) + registry.register(FeedInventoryProjection(db)) + registry.register(EventLogProjection(db)) + + animal_service = AnimalService(db, event_store, registry) + + ts_utc = int(time.time() * 1000) + location = db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] + + # Create a cohort + cohort_payload = AnimalCohortCreatedPayload( + species="duck", + count=3, + origin=Origin.PURCHASED, + life_stage=LifeStage.ADULT, + location_id=location, + ) + cohort_event = animal_service.create_cohort( + payload=cohort_payload, + ts_utc=ts_utc, + actor="test", + ) + + # Get animal IDs + animal_ids = cohort_event.entity_refs["animal_ids"] + + # Verify all animals are alive + for aid in animal_ids: + row = db.execute( + "SELECT status FROM animal_registry WHERE animal_id = ?", + (aid,), + ).fetchone() + assert row[0] == "alive" + + # Record outcome (sold) + outcome_payload = AnimalOutcomePayload( + outcome=Outcome.SOLD, + resolved_ids=animal_ids, + ) + outcome_event = animal_service.record_outcome( + payload=outcome_payload, + ts_utc=ts_utc + 1000, + actor="test", + ) + + # Verify animals are now "sold" + for aid in animal_ids: + row = db.execute( + "SELECT status FROM animal_registry WHERE animal_id = ?", + (aid,), + ).fetchone() + assert row[0] == "sold", f"Animal {aid} should be sold, got {row[0]}" + + # Delete the outcome event + deleted_ids = delete_event( + db=db, + event_store=event_store, + event_id=outcome_event.id, + actor="test", + role="admin", + cascade=False, + reason="test deletion", + registry=registry, + ) + + assert len(deleted_ids) == 1 + assert outcome_event.id in deleted_ids + + # Verify animals are back to "alive" + for aid in animal_ids: + row = db.execute( + "SELECT status FROM animal_registry WHERE animal_id = ?", + (aid,), + ).fetchone() + assert row[0] == "alive", f"Animal {aid} should be alive after delete, got {row[0]}" + + def test_delete_without_registry_does_not_revert(self, tmp_path): + """Without registry projections, delete won't revert status (bug demo).""" + db_path = tmp_path / "test.db" + + # Set up database + run_migrations(str(db_path), "migrations", verbose=False) + db = get_db(str(db_path)) + run_seeds(db) + + # Create projections and services + event_store = EventStore(db) + registry = ProjectionRegistry() + registry.register(AnimalRegistryProjection(db)) + registry.register(IntervalProjection(db)) + registry.register(EventAnimalsProjection(db)) + registry.register(ProductsProjection(db)) + registry.register(FeedInventoryProjection(db)) + registry.register(EventLogProjection(db)) + + animal_service = AnimalService(db, event_store, registry) + + ts_utc = int(time.time() * 1000) + location = db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] + + # Create a cohort + cohort_payload = AnimalCohortCreatedPayload( + species="duck", + count=2, + origin=Origin.PURCHASED, + life_stage=LifeStage.ADULT, + location_id=location, + ) + cohort_event = animal_service.create_cohort( + payload=cohort_payload, + ts_utc=ts_utc, + actor="test", + ) + + animal_ids = cohort_event.entity_refs["animal_ids"] + + # Record outcome (sold) + outcome_payload = AnimalOutcomePayload( + outcome=Outcome.SOLD, + resolved_ids=animal_ids, + ) + outcome_event = animal_service.record_outcome( + payload=outcome_payload, + ts_utc=ts_utc + 1000, + actor="test", + ) + + # Verify animals are "sold" + for aid in animal_ids: + row = db.execute( + "SELECT status FROM animal_registry WHERE animal_id = ?", + (aid,), + ).fetchone() + assert row[0] == "sold" + + # Delete with EMPTY registry (simulating the bug) + empty_registry = ProjectionRegistry() # No projections registered! + deleted_ids = delete_event( + db=db, + event_store=event_store, + event_id=outcome_event.id, + actor="test", + role="admin", + cascade=False, + reason="test deletion", + registry=empty_registry, + ) + + assert len(deleted_ids) == 1 + + # Bug: Animals are still "sold" because projections weren't reverted + for aid in animal_ids: + row = db.execute( + "SELECT status FROM animal_registry WHERE animal_id = ?", + (aid,), + ).fetchone() + # This demonstrates the bug - with empty registry, status is not reverted + assert row[0] == "sold", "Without projections, animal should stay sold" + + def test_delete_death_outcome_reverts_to_alive(self, tmp_path): + """Deleting death outcome should revert animals to alive status.""" + db_path = tmp_path / "test.db" + + # Set up database + run_migrations(str(db_path), "migrations", verbose=False) + db = get_db(str(db_path)) + run_seeds(db) + + # Create projections and services + event_store = EventStore(db) + registry = ProjectionRegistry() + registry.register(AnimalRegistryProjection(db)) + registry.register(IntervalProjection(db)) + registry.register(EventAnimalsProjection(db)) + registry.register(ProductsProjection(db)) + registry.register(FeedInventoryProjection(db)) + registry.register(EventLogProjection(db)) + + animal_service = AnimalService(db, event_store, registry) + + ts_utc = int(time.time() * 1000) + location = db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] + + # Create a cohort + cohort_payload = AnimalCohortCreatedPayload( + species="duck", + count=2, + origin=Origin.PURCHASED, + life_stage=LifeStage.ADULT, + location_id=location, + ) + cohort_event = animal_service.create_cohort( + payload=cohort_payload, + ts_utc=ts_utc, + actor="test", + ) + + animal_ids = cohort_event.entity_refs["animal_ids"] + + # Record death + outcome_payload = AnimalOutcomePayload( + outcome=Outcome.DEATH, + resolved_ids=animal_ids, + ) + outcome_event = animal_service.record_outcome( + payload=outcome_payload, + ts_utc=ts_utc + 1000, + actor="test", + ) + + # Verify animals are "dead" + for aid in animal_ids: + row = db.execute( + "SELECT status FROM animal_registry WHERE animal_id = ?", + (aid,), + ).fetchone() + assert row[0] == "dead" + + # Delete the outcome event with proper registry + deleted_ids = delete_event( + db=db, + event_store=event_store, + event_id=outcome_event.id, + actor="test", + role="admin", + cascade=False, + reason="test deletion", + registry=registry, + ) + + assert len(deleted_ids) == 1 + + # Verify animals are back to alive + for aid in animal_ids: + row = db.execute( + "SELECT status FROM animal_registry WHERE animal_id = ?", + (aid,), + ).fetchone() + assert row[0] == "alive", f"Animal {aid} should be alive, got {row[0]}"