feat: add rebuild-projections CLI and fix event delete projection revert
- Add 'rebuild-projections' CLI command that truncates all projection tables and replays non-tombstoned events to rebuild state - Fix event delete route to register all projections before calling delete_event, ensuring projections are properly reverted - Add comprehensive tests for both rebuild CLI and delete with projections The rebuild-projections command is useful for recovering from corrupted projection state, while the delete fix ensures future deletes properly revert animal status (e.g., sold -> alive). 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -1,5 +1,5 @@
|
||||
# ABOUTME: Command-line interface for AnimalTrack.
|
||||
# ABOUTME: Provides migrate, seed, and serve commands.
|
||||
# ABOUTME: Provides migrate, seed, serve, and rebuild-projections commands.
|
||||
|
||||
import argparse
|
||||
import sys
|
||||
@@ -29,6 +29,12 @@ def main():
|
||||
serve_parser.add_argument("--port", type=int, default=3366, help="Port to listen on")
|
||||
serve_parser.add_argument("--host", type=str, default="0.0.0.0", help="Host to bind to")
|
||||
|
||||
# rebuild-projections command
|
||||
subparsers.add_parser(
|
||||
"rebuild-projections",
|
||||
help="Rebuild all projections by replaying the event log",
|
||||
)
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.command is None:
|
||||
@@ -113,6 +119,91 @@ def main():
|
||||
# Start server
|
||||
print(f"Starting server on {args.host}:{args.port}...")
|
||||
uvicorn.run(app, host=args.host, port=args.port)
|
||||
elif args.command == "rebuild-projections":
|
||||
import json
|
||||
|
||||
from animaltrack.config import Settings
|
||||
from animaltrack.db import get_db
|
||||
from animaltrack.events.processor import process_event
|
||||
from animaltrack.migrations import run_migrations
|
||||
from animaltrack.models.events import Event
|
||||
from animaltrack.projections import ProjectionRegistry
|
||||
from animaltrack.projections.animal_registry import AnimalRegistryProjection
|
||||
from animaltrack.projections.event_animals import EventAnimalsProjection
|
||||
from animaltrack.projections.event_log import EventLogProjection
|
||||
from animaltrack.projections.feed import FeedInventoryProjection
|
||||
from animaltrack.projections.intervals import IntervalProjection
|
||||
from animaltrack.projections.products import ProductsProjection
|
||||
|
||||
settings = Settings()
|
||||
|
||||
# Run migrations first
|
||||
print("Running migrations...")
|
||||
success = run_migrations(
|
||||
db_path=settings.db_path,
|
||||
migrations_dir="migrations",
|
||||
verbose=False,
|
||||
)
|
||||
if not success:
|
||||
print("Migration failed", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
db = get_db(settings.db_path)
|
||||
|
||||
# Projection tables to truncate
|
||||
projection_tables = [
|
||||
"animal_registry",
|
||||
"live_animals_by_location",
|
||||
"animal_location_intervals",
|
||||
"animal_attr_intervals",
|
||||
"event_animals",
|
||||
"event_log_by_location",
|
||||
"feed_inventory",
|
||||
]
|
||||
|
||||
print("Truncating projection tables...")
|
||||
for table in projection_tables:
|
||||
db.execute(f"DELETE FROM {table}")
|
||||
print(f" Truncated {len(projection_tables)} tables")
|
||||
|
||||
# Register all projections
|
||||
registry = ProjectionRegistry()
|
||||
registry.register(AnimalRegistryProjection(db))
|
||||
registry.register(IntervalProjection(db))
|
||||
registry.register(EventAnimalsProjection(db))
|
||||
registry.register(ProductsProjection(db))
|
||||
registry.register(FeedInventoryProjection(db))
|
||||
registry.register(EventLogProjection(db))
|
||||
|
||||
# Get all non-tombstoned events in order
|
||||
print("Fetching events...")
|
||||
rows = list(
|
||||
db.execute(
|
||||
"""SELECT id, type, ts_utc, actor, version, payload, entity_refs
|
||||
FROM events
|
||||
WHERE id NOT IN (SELECT target_event_id FROM event_tombstones)
|
||||
ORDER BY ts_utc, id"""
|
||||
).fetchall()
|
||||
)
|
||||
print(f" Found {len(rows)} events to replay")
|
||||
|
||||
# Replay events through projections
|
||||
print("Replaying events...")
|
||||
for i, row in enumerate(rows):
|
||||
event = Event(
|
||||
id=row[0],
|
||||
type=row[1],
|
||||
ts_utc=row[2],
|
||||
actor=row[3],
|
||||
version=row[4],
|
||||
payload=json.loads(row[5]),
|
||||
entity_refs=json.loads(row[6]),
|
||||
)
|
||||
process_event(event, registry)
|
||||
if (i + 1) % 100 == 0:
|
||||
print(f" Processed {i + 1}/{len(rows)} events...")
|
||||
|
||||
print(f"Rebuild complete: processed {len(rows)} events")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
|
||||
@@ -20,6 +20,12 @@ from animaltrack.events.exceptions import (
|
||||
from animaltrack.events.store import EventStore
|
||||
from animaltrack.models.reference import UserRole
|
||||
from animaltrack.projections import ProjectionRegistry
|
||||
from animaltrack.projections.animal_registry import AnimalRegistryProjection
|
||||
from animaltrack.projections.event_animals import EventAnimalsProjection
|
||||
from animaltrack.projections.event_log import EventLogProjection
|
||||
from animaltrack.projections.feed import FeedInventoryProjection
|
||||
from animaltrack.projections.intervals import IntervalProjection
|
||||
from animaltrack.projections.products import ProductsProjection
|
||||
from animaltrack.repositories.locations import LocationRepository
|
||||
from animaltrack.repositories.user_defaults import UserDefaultsRepository
|
||||
from animaltrack.web.templates import render_page
|
||||
@@ -344,9 +350,15 @@ async def event_delete(request: Request, event_id: str):
|
||||
reason = form.get("reason", "")
|
||||
cascade = form.get("cascade", "false") == "true"
|
||||
|
||||
# Get event store and registry
|
||||
# Get event store and registry with all projections
|
||||
event_store = EventStore(db)
|
||||
registry = ProjectionRegistry()
|
||||
registry.register(AnimalRegistryProjection(db))
|
||||
registry.register(IntervalProjection(db))
|
||||
registry.register(EventAnimalsProjection(db))
|
||||
registry.register(ProductsProjection(db))
|
||||
registry.register(FeedInventoryProjection(db))
|
||||
registry.register(EventLogProjection(db))
|
||||
|
||||
try:
|
||||
# Check for dependent events first
|
||||
|
||||
Reference in New Issue
Block a user