# ABOUTME: Tests for rebuild-projections CLI command. # ABOUTME: Verifies projection tables are truncated and events are replayed correctly. import os import subprocess import sys from pathlib import Path from animaltrack.db import get_db from animaltrack.events.enums import LifeStage, Origin from animaltrack.events.payloads import AnimalCohortCreatedPayload from animaltrack.events.store import EventStore from animaltrack.migrations import run_migrations from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.event_log import EventLogProjection from animaltrack.projections.feed import FeedInventoryProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.projections.products import ProductsProjection from animaltrack.seeds import run_seeds from animaltrack.services.animal import AnimalService PROJECT_ROOT = Path(__file__).parent.parent class TestRebuildProjectionsCLI: """Tests for rebuild-projections command.""" def test_rebuild_command_success(self, tmp_path): """Should rebuild projections via CLI and exit 0.""" db_path = tmp_path / "test.db" env = os.environ.copy() env["DB_PATH"] = str(db_path) env["CSRF_SECRET"] = "test-secret-for-csrf" env["PYTHONPATH"] = str(PROJECT_ROOT / "src") # First seed the database result = subprocess.run( [sys.executable, "-m", "animaltrack.cli", "seed"], capture_output=True, text=True, env=env, cwd=str(PROJECT_ROOT), ) assert result.returncode == 0, f"Seed failed: {result.stderr}" # Then rebuild projections result = subprocess.run( [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], capture_output=True, text=True, env=env, cwd=str(PROJECT_ROOT), ) assert result.returncode == 0, f"Rebuild failed: {result.stderr}" assert "Truncating projection tables" in result.stdout assert "Rebuild complete" in result.stdout def test_rebuild_with_events(self, tmp_path): """Should correctly replay events and update projections.""" db_path = tmp_path / "test.db" # Set up database with migrations and seeds run_migrations(str(db_path), "migrations", verbose=False) db = get_db(str(db_path)) run_seeds(db) # Create some events via AnimalService event_store = EventStore(db) registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(db)) registry.register(IntervalProjection(db)) registry.register(EventAnimalsProjection(db)) registry.register(ProductsProjection(db)) registry.register(FeedInventoryProjection(db)) registry.register(EventLogProjection(db)) animal_service = AnimalService(db, event_store, registry) # Create a cohort import time ts_utc = int(time.time() * 1000) location = db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] payload = AnimalCohortCreatedPayload( species="duck", count=5, origin=Origin.PURCHASED, life_stage=LifeStage.ADULT, location_id=location, ) animal_service.create_cohort( payload=payload, ts_utc=ts_utc, actor="test", ) # Verify animal_registry has entries count_before = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count_before == 5 # Clear projections manually (simulating corruption) db.execute("DELETE FROM animal_registry") db.execute("DELETE FROM live_animals_by_location") count_cleared = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count_cleared == 0 # Now run rebuild via CLI env = os.environ.copy() env["DB_PATH"] = str(db_path) env["CSRF_SECRET"] = "test-secret-for-csrf" env["PYTHONPATH"] = str(PROJECT_ROOT / "src") result = subprocess.run( [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], capture_output=True, text=True, env=env, cwd=str(PROJECT_ROOT), ) assert result.returncode == 0, f"Rebuild failed: {result.stderr}" # Verify events were processed (seed data may add additional events) assert "events to replay" in result.stdout assert "Rebuild complete" in result.stdout # Verify projections are restored db2 = get_db(str(db_path)) count_after = db2.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] # Should have at least our 5 animals restored assert count_after >= 5 def test_rebuild_skips_tombstoned_events(self, tmp_path): """Should not replay events that have been tombstoned.""" db_path = tmp_path / "test.db" # Set up database run_migrations(str(db_path), "migrations", verbose=False) db = get_db(str(db_path)) run_seeds(db) # Count animals from seed data seed_animal_count = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] # Create events via AnimalService event_store = EventStore(db) registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(db)) registry.register(IntervalProjection(db)) registry.register(EventAnimalsProjection(db)) registry.register(ProductsProjection(db)) registry.register(FeedInventoryProjection(db)) registry.register(EventLogProjection(db)) animal_service = AnimalService(db, event_store, registry) import time ts_utc = int(time.time() * 1000) location = db.execute("SELECT id FROM locations LIMIT 1").fetchone()[0] # Create two cohorts payload1 = AnimalCohortCreatedPayload( species="duck", count=3, origin=Origin.PURCHASED, life_stage=LifeStage.ADULT, location_id=location, ) animal_service.create_cohort( payload=payload1, ts_utc=ts_utc, actor="test", ) payload2 = AnimalCohortCreatedPayload( species="duck", count=2, origin=Origin.PURCHASED, life_stage=LifeStage.ADULT, location_id=location, ) event2 = animal_service.create_cohort( payload=payload2, ts_utc=ts_utc + 1000, actor="test", ) # Verify we have seed animals + 5 new animals count_before = db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert count_before == seed_animal_count + 5 # Tombstone the second event (manually, to simulate what delete_event does) from ulid import ULID tombstone_id = str(ULID()) db.execute( """INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason) VALUES (?, ?, ?, ?, ?)""", (tombstone_id, ts_utc + 2000, "test", event2.id, "test deletion"), ) # Run rebuild via CLI env = os.environ.copy() env["DB_PATH"] = str(db_path) env["CSRF_SECRET"] = "test-secret-for-csrf" env["PYTHONPATH"] = str(PROJECT_ROOT / "src") result = subprocess.run( [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], capture_output=True, text=True, env=env, cwd=str(PROJECT_ROOT), ) assert result.returncode == 0, f"Rebuild failed: {result.stderr}" # Verify rebuild completed assert "events to replay" in result.stdout assert "Rebuild complete" in result.stdout # Verify only seed animals + 3 from first cohort (second cohort tombstoned) db2 = get_db(str(db_path)) count_after = db2.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] # Should have 2 fewer animals (the tombstoned cohort had count=2) assert count_after == seed_animal_count + 3 def test_rebuild_empty_event_log(self, tmp_path): """Should handle empty event log gracefully.""" db_path = tmp_path / "test.db" env = os.environ.copy() env["DB_PATH"] = str(db_path) env["CSRF_SECRET"] = "test-secret-for-csrf" env["PYTHONPATH"] = str(PROJECT_ROOT / "src") # Just run migrations (no seeds, no events) result = subprocess.run( [sys.executable, "-m", "animaltrack.cli", "rebuild-projections"], capture_output=True, text=True, env=env, cwd=str(PROJECT_ROOT), ) assert result.returncode == 0, f"Rebuild failed: {result.stderr}" assert "Found 0 events to replay" in result.stdout assert "Rebuild complete: processed 0 events" in result.stdout