diff --git a/PLAN.md b/PLAN.md index f888391..eaf3fdd 100644 --- a/PLAN.md +++ b/PLAN.md @@ -379,19 +379,19 @@ Check off items as completed. Each phase builds on the previous. - [x] E2E test #6: Deletes: recorder vs admin cascade (test_e2e_deletion.py) - [x] E2E test #7: Harvest with yields (test_e2e_harvest.py) - [x] E2E test #8: Optimistic lock with confirm (test_e2e_optimistic_lock.py) -- [ ] **Commit checkpoint** +- [x] **Commit checkpoint** (5ba068b) NOTE: Tests #1-5 in test_e2e_stats_progression.py have documented discrepancies with spec values due to integer truncation of bird-days and timeline differences. The tests verify implementation consistency, not exact spec values. ### Step 10.2: Location Events & Error Handling -- [ ] Implement LocationCreated (idempotent for seeding) -- [ ] Implement LocationRenamed -- [ ] Implement LocationArchived -- [ ] Standardize error responses (422, 409, 401/403) -- [ ] Toast via HX-Trigger -- [ ] Write tests for location events and error rendering +- [x] Implement LocationCreated (idempotent for seeding) +- [x] Implement LocationRenamed +- [x] Implement LocationArchived +- [x] Standardize error responses (422, 409, 401/403) +- [x] Toast via HX-Trigger +- [x] Write tests for location events and error rendering - [ ] **Commit checkpoint** ### Step 10.3: CLI, Docker & Deployment diff --git a/src/animaltrack/projections/location.py b/src/animaltrack/projections/location.py new file mode 100644 index 0000000..023756f --- /dev/null +++ b/src/animaltrack/projections/location.py @@ -0,0 +1,162 @@ +# ABOUTME: Projection for location lifecycle events. +# ABOUTME: Handles LocationCreated, LocationRenamed, and LocationArchived events. + +import time +from typing import Any + +from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED +from animaltrack.models.events import Event +from animaltrack.models.reference import Location +from animaltrack.projections.base import Projection +from animaltrack.repositories.locations import LocationRepository + + +class LocationProjection(Projection): + """Maintains location reference data from events. + + This projection handles location lifecycle events, maintaining: + - locations: Reference table of all locations + """ + + def __init__(self, db: Any) -> None: + """Initialize the projection with a database connection. + + Args: + db: A fastlite database connection. + """ + super().__init__(db) + self.location_repo = LocationRepository(db) + + def get_event_types(self) -> list[str]: + """Return the event types this projection handles.""" + return [LOCATION_CREATED, LOCATION_RENAMED, LOCATION_ARCHIVED] + + def apply(self, event: Event) -> None: + """Apply location event to update reference data.""" + if event.type == LOCATION_CREATED: + self._apply_location_created(event) + elif event.type == LOCATION_RENAMED: + self._apply_location_renamed(event) + elif event.type == LOCATION_ARCHIVED: + self._apply_location_archived(event) + + def revert(self, event: Event) -> None: + """Revert location event from reference data.""" + if event.type == LOCATION_CREATED: + self._revert_location_created(event) + elif event.type == LOCATION_RENAMED: + self._revert_location_renamed(event) + elif event.type == LOCATION_ARCHIVED: + self._revert_location_archived(event) + + def _apply_location_created(self, event: Event) -> None: + """Apply location created event. + + Inserts a new location into the locations table. + """ + location_id = event.entity_refs.get("location_id") + name = event.payload.get("name") + ts_utc = event.ts_utc + + location = Location( + id=location_id, + name=name, + active=True, + created_at_utc=ts_utc, + updated_at_utc=ts_utc, + ) + self.location_repo.upsert(location) + + def _revert_location_created(self, event: Event) -> None: + """Revert location created event. + + Removes the location from the locations table. + """ + location_id = event.entity_refs.get("location_id") + + self.db.execute( + "DELETE FROM locations WHERE id = ?", + (location_id,), + ) + + def _apply_location_renamed(self, event: Event) -> None: + """Apply location renamed event. + + Updates the location name in the locations table. + """ + location_id = event.entity_refs.get("location_id") + new_name = event.payload.get("new_name") + ts_utc = event.ts_utc + + self.db.execute( + "UPDATE locations SET name = ?, updated_at_utc = ? WHERE id = ?", + (new_name, ts_utc, location_id), + ) + + def _revert_location_renamed(self, event: Event) -> None: + """Revert location renamed event. + + This requires looking up the previous name from event history. + For simplicity, we search for the most recent prior name. + """ + location_id = event.entity_refs.get("location_id") + ts_utc = int(time.time() * 1000) + + # Find the previous name from event history: + # 1. Check if there's an earlier LocationRenamed event + # 2. Or fall back to the LocationCreated event + row = self.db.execute( + """ + SELECT CASE + WHEN type = ? THEN JSON_EXTRACT(payload, '$.new_name') + WHEN type = ? THEN JSON_EXTRACT(payload, '$.name') + END as name + FROM events + WHERE entity_refs LIKE ? + AND type IN (?, ?) + AND ts_utc < ? + ORDER BY ts_utc DESC + LIMIT 1 + """, + ( + LOCATION_RENAMED, + LOCATION_CREATED, + f'%"{location_id}"%', + LOCATION_RENAMED, + LOCATION_CREATED, + event.ts_utc, + ), + ).fetchone() + + if row and row[0]: + previous_name = row[0] + self.db.execute( + "UPDATE locations SET name = ?, updated_at_utc = ? WHERE id = ?", + (previous_name, ts_utc, location_id), + ) + + def _apply_location_archived(self, event: Event) -> None: + """Apply location archived event. + + Sets the location to inactive. + """ + location_id = event.entity_refs.get("location_id") + ts_utc = event.ts_utc + + self.db.execute( + "UPDATE locations SET active = 0, updated_at_utc = ? WHERE id = ?", + (ts_utc, location_id), + ) + + def _revert_location_archived(self, event: Event) -> None: + """Revert location archived event. + + Sets the location back to active. + """ + location_id = event.entity_refs.get("location_id") + ts_utc = int(time.time() * 1000) + + self.db.execute( + "UPDATE locations SET active = 1, updated_at_utc = ? WHERE id = ?", + (ts_utc, location_id), + ) diff --git a/src/animaltrack/seeds.py b/src/animaltrack/seeds.py index b769774..20369cc 100644 --- a/src/animaltrack/seeds.py +++ b/src/animaltrack/seeds.py @@ -4,23 +4,24 @@ import time from typing import Any -from animaltrack.id_gen import generate_id +from animaltrack.events.store import EventStore from animaltrack.models.reference import ( FeedType, - Location, Product, ProductUnit, Species, User, UserRole, ) +from animaltrack.projections import ProjectionRegistry +from animaltrack.projections.location import LocationProjection from animaltrack.repositories import ( FeedTypeRepository, - LocationRepository, ProductRepository, SpeciesRepository, UserRepository, ) +from animaltrack.services.location import LocationService def run_seeds(db: Any) -> None: @@ -73,12 +74,17 @@ def _seed_users(db: Any, now_utc: int) -> None: def _seed_locations(db: Any, now_utc: int) -> None: - """Seed location data. + """Seed location data via LocationCreated events. - Locations are created only if they don't exist (by name). - This preserves existing ULIDs on re-seeding. + Uses LocationService.create_location() which is idempotent - + locations are created only if they don't exist (by name). + This emits LocationCreated events per spec §23. """ - repo = LocationRepository(db) + # Initialize LocationService with projection + event_store = EventStore(db) + registry = ProjectionRegistry() + registry.register(LocationProjection(db)) + location_service = LocationService(db, event_store, registry) location_names = [ "Strip 1", @@ -92,16 +98,12 @@ def _seed_locations(db: Any, now_utc: int) -> None: ] for name in location_names: - existing = repo.get_by_name(name) - if existing is None: - location = Location( - id=generate_id(), - name=name, - active=True, - created_at_utc=now_utc, - updated_at_utc=now_utc, - ) - repo.upsert(location) + # create_location is idempotent - returns None if already exists + location_service.create_location( + name=name, + ts_utc=now_utc, + actor="system", + ) def _seed_species(db: Any, now_utc: int) -> None: diff --git a/src/animaltrack/services/location.py b/src/animaltrack/services/location.py new file mode 100644 index 0000000..fdfcf85 --- /dev/null +++ b/src/animaltrack/services/location.py @@ -0,0 +1,253 @@ +# ABOUTME: Service layer for location operations. +# ABOUTME: Coordinates event creation with projection updates for location lifecycle. + +from typing import Any + +from animaltrack.db import transaction +from animaltrack.events.payloads import ( + LocationArchivedPayload, + LocationCreatedPayload, + LocationRenamedPayload, +) +from animaltrack.events.processor import process_event +from animaltrack.events.store import EventStore +from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED +from animaltrack.id_gen import generate_id +from animaltrack.models.events import Event +from animaltrack.projections import ProjectionRegistry +from animaltrack.repositories.locations import LocationRepository + + +class LocationServiceError(Exception): + """Base exception for location service errors.""" + + +class ValidationError(LocationServiceError): + """Raised when validation fails.""" + + +class LocationService: + """Service for location lifecycle operations. + + Provides methods to create, rename, and archive locations. + All operations are atomic and emit events. + """ + + def __init__( + self, + db: Any, + event_store: EventStore, + registry: ProjectionRegistry, + ) -> None: + """Initialize the service. + + Args: + db: A fastlite database connection. + event_store: The event store for appending events. + registry: Registry of projections to update. + """ + self.db = db + self.event_store = event_store + self.registry = registry + self.location_repo = LocationRepository(db) + + def create_location( + self, + name: str, + ts_utc: int, + actor: str, + nonce: str | None = None, + route: str | None = None, + ) -> Event | None: + """Create a new location. + + Creates a LocationCreated event if no location with this name exists. + This is idempotent per spec §23 - returns None if location already exists. + + Args: + name: The location name. + ts_utc: Timestamp in milliseconds since epoch. + actor: The user performing the action. + nonce: Optional idempotency nonce. + route: Required if nonce provided. + + Returns: + The created event, or None if location already exists (idempotent). + + Raises: + ValidationError: If name is empty or whitespace-only. + """ + # Validate name + if not name or not name.strip(): + msg = "Location name cannot be empty" + raise ValidationError(msg) + + name = name.strip() + + # Check if location with this name already exists (idempotent) + existing = self.location_repo.get_by_name(name) + if existing is not None: + return None + + # Generate a new location ID + location_id = generate_id() + + # Create payload + payload = LocationCreatedPayload( + location_id=location_id, + name=name, + ) + + # Build entity_refs + entity_refs = { + "location_id": location_id, + } + + with transaction(self.db): + event = self.event_store.append_event( + event_type=LOCATION_CREATED, + ts_utc=ts_utc, + actor=actor, + entity_refs=entity_refs, + payload=payload.model_dump(), + nonce=nonce, + route=route, + ) + + process_event(event, self.registry) + + return event + + def rename_location( + self, + location_id: str, + new_name: str, + ts_utc: int, + actor: str, + nonce: str | None = None, + route: str | None = None, + ) -> Event: + """Rename an existing location. + + Creates a LocationRenamed event. + + Args: + location_id: The location ID (ULID). + new_name: The new name for the location. + ts_utc: Timestamp in milliseconds since epoch. + actor: The user performing the action. + nonce: Optional idempotency nonce. + route: Required if nonce provided. + + Returns: + The created event. + + Raises: + ValidationError: If location not found, name is empty, or name already exists. + """ + # Validate new_name + if not new_name or not new_name.strip(): + msg = "Location name cannot be empty" + raise ValidationError(msg) + + new_name = new_name.strip() + + # Check location exists + location = self.location_repo.get(location_id) + if location is None: + msg = f"Location '{location_id}' not found" + raise ValidationError(msg) + + # Check new name doesn't conflict with another location (but allow same name) + existing = self.location_repo.get_by_name(new_name) + if existing is not None and existing.id != location_id: + msg = f"Location with name '{new_name}' already exists" + raise ValidationError(msg) + + # Create payload + payload = LocationRenamedPayload( + location_id=location_id, + new_name=new_name, + ) + + # Build entity_refs + entity_refs = { + "location_id": location_id, + } + + with transaction(self.db): + event = self.event_store.append_event( + event_type=LOCATION_RENAMED, + ts_utc=ts_utc, + actor=actor, + entity_refs=entity_refs, + payload=payload.model_dump(), + nonce=nonce, + route=route, + ) + + process_event(event, self.registry) + + return event + + def archive_location( + self, + location_id: str, + ts_utc: int, + actor: str, + nonce: str | None = None, + route: str | None = None, + ) -> Event: + """Archive a location. + + Creates a LocationArchived event. The location will be marked inactive + and hidden from active location lists. + + Args: + location_id: The location ID (ULID). + ts_utc: Timestamp in milliseconds since epoch. + actor: The user performing the action. + nonce: Optional idempotency nonce. + route: Required if nonce provided. + + Returns: + The created event. + + Raises: + ValidationError: If location not found or already archived. + """ + # Check location exists + location = self.location_repo.get(location_id) + if location is None: + msg = f"Location '{location_id}' not found" + raise ValidationError(msg) + + # Check not already archived + if not location.active: + msg = f"Location '{location_id}' is already archived" + raise ValidationError(msg) + + # Create payload + payload = LocationArchivedPayload( + location_id=location_id, + ) + + # Build entity_refs + entity_refs = { + "location_id": location_id, + } + + with transaction(self.db): + event = self.event_store.append_event( + event_type=LOCATION_ARCHIVED, + ts_utc=ts_utc, + actor=actor, + entity_refs=entity_refs, + payload=payload.model_dump(), + nonce=nonce, + route=route, + ) + + process_event(event, self.registry) + + return event diff --git a/src/animaltrack/web/app.py b/src/animaltrack/web/app.py index b4c26b2..9726c56 100644 --- a/src/animaltrack/web/app.py +++ b/src/animaltrack/web/app.py @@ -26,6 +26,7 @@ from animaltrack.web.routes import ( register_events_routes, register_feed_routes, register_health_routes, + register_location_routes, register_move_routes, register_product_routes, register_registry_routes, @@ -152,6 +153,7 @@ def create_app( register_egg_routes(rt, app) register_events_routes(rt, app) register_feed_routes(rt, app) + register_location_routes(rt, app) register_move_routes(rt, app) register_product_routes(rt, app) register_registry_routes(rt, app) diff --git a/src/animaltrack/web/responses.py b/src/animaltrack/web/responses.py new file mode 100644 index 0000000..3e32aed --- /dev/null +++ b/src/animaltrack/web/responses.py @@ -0,0 +1,64 @@ +# ABOUTME: Helper functions for creating standardized HTTP responses. +# ABOUTME: Provides error_response, error_toast, and success_toast utilities. + +import json + +from starlette.responses import HTMLResponse + + +def error_response(content: str, status_code: int = 422) -> HTMLResponse: + """Create an HTML error response. + + Args: + content: The HTML content to include in the response body. + status_code: The HTTP status code (default 422 for validation errors). + + Returns: + An HTMLResponse with the specified content and status code. + + Standard status codes: + - 422: Validation errors (missing fields, invalid values) + - 409: Conflicts (roster mismatch, dependents blocking delete) + - 403: Authorization failed (wrong role, CSRF) + - 401: Authentication failed (no user) + """ + return HTMLResponse(content=content, status_code=status_code) + + +def error_toast(message: str, status_code: int = 422) -> HTMLResponse: + """Create an error response with a toast notification. + + Returns a minimal HTML response with an HX-Trigger header that + triggers a toast notification on the client side. + + Args: + message: The error message to display in the toast. + status_code: The HTTP status code (default 422 for validation errors). + + Returns: + An HTMLResponse with HX-Trigger header for toast display. + """ + response = HTMLResponse(content="", status_code=status_code) + response.headers["HX-Trigger"] = json.dumps( + {"showToast": {"message": message, "type": "error"}} + ) + return response + + +def success_toast(message: str) -> dict: + """Create a toast trigger dict for a success message. + + This returns a dict that can be JSON-serialized and used as + an HX-Trigger header value. + + Args: + message: The success message to display in the toast. + + Returns: + A dict suitable for use as HX-Trigger header value. + + Usage: + response = HTMLResponse(content=...) + response.headers["HX-Trigger"] = json.dumps(success_toast("Done!")) + """ + return {"showToast": {"message": message, "type": "success"}} diff --git a/src/animaltrack/web/routes/__init__.py b/src/animaltrack/web/routes/__init__.py index e4d8190..66d8415 100644 --- a/src/animaltrack/web/routes/__init__.py +++ b/src/animaltrack/web/routes/__init__.py @@ -7,6 +7,7 @@ from animaltrack.web.routes.eggs import register_egg_routes from animaltrack.web.routes.events import register_events_routes from animaltrack.web.routes.feed import register_feed_routes from animaltrack.web.routes.health import register_health_routes +from animaltrack.web.routes.locations import register_location_routes from animaltrack.web.routes.move import register_move_routes from animaltrack.web.routes.products import register_product_routes from animaltrack.web.routes.registry import register_registry_routes @@ -18,6 +19,7 @@ __all__ = [ "register_events_routes", "register_feed_routes", "register_health_routes", + "register_location_routes", "register_move_routes", "register_product_routes", "register_registry_routes", diff --git a/src/animaltrack/web/routes/locations.py b/src/animaltrack/web/routes/locations.py new file mode 100644 index 0000000..8889d47 --- /dev/null +++ b/src/animaltrack/web/routes/locations.py @@ -0,0 +1,253 @@ +# ABOUTME: Routes for Location management functionality (admin-only). +# ABOUTME: Handles GET /locations and POST /actions/location-* routes. + +from __future__ import annotations + +import json +import time + +from fasthtml.common import to_xml +from starlette.requests import Request +from starlette.responses import HTMLResponse + +from animaltrack.events.store import EventStore +from animaltrack.models.reference import UserRole +from animaltrack.projections import ProjectionRegistry +from animaltrack.projections.location import LocationProjection +from animaltrack.repositories.locations import LocationRepository +from animaltrack.services.location import LocationService, ValidationError +from animaltrack.web.auth import require_role +from animaltrack.web.responses import success_toast +from animaltrack.web.templates import page +from animaltrack.web.templates.locations import location_list, rename_form + + +def _get_location_service(db) -> LocationService: + """Create a LocationService with projections.""" + event_store = EventStore(db) + registry = ProjectionRegistry() + registry.register(LocationProjection(db)) + return LocationService(db, event_store, registry) + + +# ============================================================================= +# GET /locations - Location List +# ============================================================================= + + +@require_role(UserRole.ADMIN) +async def locations_index(req: Request): + """GET /locations - Location management page (admin-only).""" + db = req.app.state.db + locations = LocationRepository(db).list_all() + + return page( + location_list(locations), + title="Locations - AnimalTrack", + active_nav=None, + ) + + +# ============================================================================= +# GET /locations/{id}/rename - Rename Form +# ============================================================================= + + +@require_role(UserRole.ADMIN) +async def location_rename_form(req: Request, location_id: str): + """GET /locations/{id}/rename - Rename location form (admin-only).""" + db = req.app.state.db + location = LocationRepository(db).get(location_id) + + if location is None: + return HTMLResponse(content="Location not found", status_code=404) + + return HTMLResponse(content=to_xml(rename_form(location))) + + +# ============================================================================= +# POST /actions/location-created +# ============================================================================= + + +@require_role(UserRole.ADMIN) +async def location_created(req: Request): + """POST /actions/location-created - Create a new location (admin-only).""" + db = req.app.state.db + form = await req.form() + + name = form.get("name", "").strip() + nonce = form.get("nonce") + + # Validation + if not name: + return _render_error_list(db, "Location name cannot be empty") + + ts_utc = int(time.time() * 1000) + + # Get actor from auth + auth = req.scope.get("auth") + actor = auth.username if auth else "unknown" + + # Create location + location_service = _get_location_service(db) + try: + event = location_service.create_location( + name=name, + ts_utc=ts_utc, + actor=actor, + nonce=nonce, + route="/actions/location-created", + ) + except ValidationError as e: + return _render_error_list(db, str(e)) + + # Get updated list + locations = LocationRepository(db).list_all() + + # Success response with toast + response = HTMLResponse( + content=to_xml(location_list(locations)), + ) + + if event is not None: + response.headers["HX-Trigger"] = json.dumps(success_toast(f"Created location: {name}")) + else: + # Idempotent - location already exists + response.headers["HX-Trigger"] = json.dumps( + success_toast(f"Location already exists: {name}") + ) + + return response + + +# ============================================================================= +# POST /actions/location-renamed +# ============================================================================= + + +@require_role(UserRole.ADMIN) +async def location_renamed(req: Request): + """POST /actions/location-renamed - Rename a location (admin-only).""" + db = req.app.state.db + form = await req.form() + + location_id = form.get("location_id", "").strip() + new_name = form.get("new_name", "").strip() + nonce = form.get("nonce") + + # Validation + if not location_id: + return _render_error_list(db, "Location ID is required") + + if not new_name: + return _render_error_list(db, "New name cannot be empty") + + ts_utc = int(time.time() * 1000) + + # Get actor from auth + auth = req.scope.get("auth") + actor = auth.username if auth else "unknown" + + # Rename location + location_service = _get_location_service(db) + try: + location_service.rename_location( + location_id=location_id, + new_name=new_name, + ts_utc=ts_utc, + actor=actor, + nonce=nonce, + route="/actions/location-renamed", + ) + except ValidationError as e: + return _render_error_list(db, str(e)) + + # Get updated list + locations = LocationRepository(db).list_all() + + # Success response with toast + response = HTMLResponse( + content=to_xml(location_list(locations)), + ) + response.headers["HX-Trigger"] = json.dumps(success_toast(f"Renamed location to: {new_name}")) + + return response + + +# ============================================================================= +# POST /actions/location-archived +# ============================================================================= + + +@require_role(UserRole.ADMIN) +async def location_archived(req: Request): + """POST /actions/location-archived - Archive a location (admin-only).""" + db = req.app.state.db + form = await req.form() + + location_id = form.get("location_id", "").strip() + nonce = form.get("nonce") + + # Validation + if not location_id: + return _render_error_list(db, "Location ID is required") + + ts_utc = int(time.time() * 1000) + + # Get actor from auth + auth = req.scope.get("auth") + actor = auth.username if auth else "unknown" + + # Archive location + location_service = _get_location_service(db) + try: + location_service.archive_location( + location_id=location_id, + ts_utc=ts_utc, + actor=actor, + nonce=nonce, + route="/actions/location-archived", + ) + except ValidationError as e: + return _render_error_list(db, str(e)) + + # Get updated list + locations = LocationRepository(db).list_all() + + # Success response with toast + response = HTMLResponse( + content=to_xml(location_list(locations)), + ) + response.headers["HX-Trigger"] = json.dumps(success_toast("Location archived")) + + return response + + +# ============================================================================= +# Helper Functions +# ============================================================================= + + +def _render_error_list(db, error_message: str) -> HTMLResponse: + """Render location list with error message.""" + locations = LocationRepository(db).list_all() + + return HTMLResponse( + content=to_xml(location_list(locations, error=error_message)), + status_code=422, + ) + + +def register_location_routes(rt, app): + """Register location management routes. + + Args: + rt: FastHTML route decorator. + app: FastHTML application instance. + """ + rt("/locations")(locations_index) + rt("/locations/{location_id}/rename")(location_rename_form) + rt("/actions/location-created", methods=["POST"])(location_created) + rt("/actions/location-renamed", methods=["POST"])(location_renamed) + rt("/actions/location-archived", methods=["POST"])(location_archived) diff --git a/src/animaltrack/web/templates/base.py b/src/animaltrack/web/templates/base.py index 830a31f..ab461f0 100644 --- a/src/animaltrack/web/templates/base.py +++ b/src/animaltrack/web/templates/base.py @@ -1,11 +1,70 @@ # ABOUTME: Base HTML template for AnimalTrack pages. # ABOUTME: Provides consistent layout with MonsterUI theme and bottom nav. -from fasthtml.common import Container, Div, Title +from fasthtml.common import Container, Div, Script, Title from animaltrack.web.templates.nav import BottomNav, BottomNavStyles +def toast_container(): + """Create a toast container for displaying notifications. + + This container holds toast notifications that appear in the top-right corner. + Toasts are triggered via HTMX events (HX-Trigger header with showToast). + """ + return Div( + id="toast-container", + cls="toast toast-end toast-top z-50", + ) + + +def toast_script(): + """JavaScript to handle showToast events from HTMX. + + Listens for the showToast event and creates toast notifications + that auto-dismiss after 5 seconds. + """ + script = """ + document.body.addEventListener('showToast', function(evt) { + var message = evt.detail.message || 'Action completed'; + var type = evt.detail.type || 'success'; + + // Create alert element with appropriate styling + var alertClass = 'alert shadow-lg mb-2 '; + if (type === 'success') { + alertClass += 'alert-success'; + } else if (type === 'error') { + alertClass += 'alert-error'; + } else if (type === 'warning') { + alertClass += 'alert-warning'; + } else { + alertClass += 'alert-info'; + } + + var toast = document.createElement('div'); + toast.className = alertClass; + toast.innerHTML = '' + message + ''; + + var container = document.getElementById('toast-container'); + if (container) { + container.appendChild(toast); + + // Auto-remove after 5 seconds + setTimeout(function() { + toast.style.opacity = '0'; + toast.style.transition = 'opacity 0.5s'; + setTimeout(function() { + if (toast.parentNode) { + toast.parentNode.removeChild(toast); + } + }, 500); + }, 5000); + } + }); + """ + return Script(script) + + def page(content, title: str = "AnimalTrack", active_nav: str = "egg"): """ Base page template wrapper with navigation. @@ -15,6 +74,7 @@ def page(content, title: str = "AnimalTrack", active_nav: str = "egg"): - Bottom navigation styling - Content container with nav padding - Fixed bottom navigation bar + - Toast container for notifications Args: content: Page content (FT components) @@ -27,6 +87,7 @@ def page(content, title: str = "AnimalTrack", active_nav: str = "egg"): return ( Title(title), BottomNavStyles(), + toast_script(), # Main content with bottom padding for fixed nav # hx-boost enables AJAX for all descendant forms/links Div( @@ -35,5 +96,6 @@ def page(content, title: str = "AnimalTrack", active_nav: str = "egg"): hx_target="body", cls="pb-20 min-h-screen bg-[#0f0f0e] text-stone-100", ), + toast_container(), BottomNav(active_id=active_nav), ) diff --git a/src/animaltrack/web/templates/locations.py b/src/animaltrack/web/templates/locations.py new file mode 100644 index 0000000..c78dbd0 --- /dev/null +++ b/src/animaltrack/web/templates/locations.py @@ -0,0 +1,170 @@ +# ABOUTME: Templates for Location management pages. +# ABOUTME: Admin-only pages for creating, renaming, and archiving locations. + +from uuid import uuid4 + +from fasthtml.common import H1, Div, Form, Hidden, Table, Tbody, Td, Th, Thead, Tr +from monsterui.all import ( + Alert, + AlertT, + Button, + ButtonT, + Card, + DivFullySpaced, + Grid, + LabelInput, +) + +from animaltrack.models.reference import Location + + +def location_list( + locations: list[Location], + error: str | None = None, +) -> Div: + """Create the location list page. + + Args: + locations: List of all locations. + error: Optional error message to display. + + Returns: + Div containing the location management page. + """ + # Error alert if present + error_alert = None + if error: + error_alert = Alert(error, cls=AlertT.error) + + # Create new location form + create_form = Card( + Form( + LabelInput( + "Location Name", + name="name", + type="text", + required=True, + placeholder="Enter location name", + ), + Hidden(name="nonce", value=str(uuid4())), + Button("Create Location", type="submit", cls=ButtonT.primary), + hx_post="/actions/location-created", + hx_target="#location-list", + hx_swap="outerHTML", + ), + header=Div("Create New Location", cls="text-lg font-semibold"), + ) + + # Location table + rows = [] + for loc in locations: + status = "Active" if loc.active else "Archived" + status_badge = Div(status, cls="badge badge-success" if loc.active else "badge badge-ghost") + + # Action buttons + actions = DivFullySpaced( + Button( + "Rename", + cls=ButtonT.ghost + " btn-sm", + hx_get=f"/locations/{loc.id}/rename", + hx_target="#modal-container", + hx_swap="innerHTML", + ) + if loc.active + else None, + Button( + "Archive", + cls=ButtonT.ghost + " btn-sm text-warning", + hx_post="/actions/location-archived", + hx_vals=f'{{"location_id": "{loc.id}", "nonce": "{uuid4()}"}}', + hx_target="#location-list", + hx_swap="outerHTML", + hx_confirm="Are you sure you want to archive this location?", + ) + if loc.active + else None, + ) + + rows.append( + Tr( + Td(loc.name), + Td(status_badge), + Td(actions), + ) + ) + + table = Table( + Thead( + Tr( + Th("Name"), + Th("Status"), + Th("Actions"), + ) + ), + Tbody(*rows), + cls="table table-zebra w-full", + ) + + location_card = Card( + table, + header=Div("All Locations", cls="text-lg font-semibold"), + ) + + return Div( + H1("Location Management", cls="text-2xl font-bold mb-4"), + error_alert, + Grid( + create_form, + cols_sm=1, + cols_md=1, + cols_lg=1, + gap=4, + ), + Div(id="modal-container"), + Div( + location_card, + id="location-list", + cls="mt-4", + ), + ) + + +def rename_form( + location: Location, + error: str | None = None, +) -> Div: + """Create the rename location form. + + Args: + location: The location to rename. + error: Optional error message to display. + + Returns: + Div containing the rename form. + """ + error_alert = None + if error: + error_alert = Alert(error, cls=AlertT.error) + + return Card( + error_alert, + Form( + LabelInput( + "New Name", + name="new_name", + type="text", + required=True, + value=location.name, + ), + Hidden(name="location_id", value=location.id), + Hidden(name="nonce", value=str(uuid4())), + DivFullySpaced( + Button("Cancel", type="button", cls=ButtonT.ghost, hx_get="/locations"), + Button("Rename", type="submit", cls=ButtonT.primary), + ), + hx_post="/actions/location-renamed", + hx_target="#location-list", + hx_swap="outerHTML", + ), + header=Div(f"Rename: {location.name}", cls="text-lg font-semibold"), + ) diff --git a/tests/test_service_animal.py b/tests/test_service_animal.py index 46a9a1d..c88ea43 100644 --- a/tests/test_service_animal.py +++ b/tests/test_service_animal.py @@ -240,6 +240,9 @@ class TestAnimalServiceTransactionIntegrity: def test_no_partial_data_on_projection_error(self, seeded_db, event_store, valid_location_id): """If projection fails, event is not persisted.""" + # Count events before the test (seeds create LocationCreated events) + event_count_before = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0] + # Create a registry with a failing projection from animaltrack.projections import Projection, ProjectionError @@ -264,9 +267,9 @@ class TestAnimalServiceTransactionIntegrity: with pytest.raises(ProjectionError): service.create_cohort(payload, ts_utc, "test_user") - # Verify nothing was persisted - event_count = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0] - assert event_count == 0 + # Verify no new events were persisted (count unchanged from before) + event_count_after = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0] + assert event_count_after == event_count_before animal_count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0] assert animal_count == 0 diff --git a/tests/test_service_location.py b/tests/test_service_location.py new file mode 100644 index 0000000..e894ae0 --- /dev/null +++ b/tests/test_service_location.py @@ -0,0 +1,343 @@ +# ABOUTME: Tests for LocationService operations. +# ABOUTME: Tests create_location, rename_location, and archive_location with event emission. + +import time + +import pytest + +from animaltrack.events.store import EventStore +from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED +from animaltrack.projections import ProjectionRegistry +from animaltrack.services.location import LocationService, ValidationError + + +@pytest.fixture +def event_store(seeded_db): + """Create an EventStore for testing.""" + return EventStore(seeded_db) + + +@pytest.fixture +def projection_registry(seeded_db): + """Create a ProjectionRegistry with location projections registered.""" + from animaltrack.projections.location import LocationProjection + + registry = ProjectionRegistry() + registry.register(LocationProjection(seeded_db)) + return registry + + +@pytest.fixture +def location_service(seeded_db, event_store, projection_registry): + """Create a LocationService for testing.""" + return LocationService(seeded_db, event_store, projection_registry) + + +# ============================================================================= +# create_location Tests +# ============================================================================= + + +class TestLocationServiceCreate: + """Tests for create_location().""" + + def test_creates_location_created_event(self, seeded_db, location_service): + """create_location creates a LocationCreated event.""" + ts_utc = int(time.time() * 1000) + + event = location_service.create_location("New Location", ts_utc, "test_user") + + assert event.type == LOCATION_CREATED + assert event.actor == "test_user" + assert event.ts_utc == ts_utc + + def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service): + """Event entity_refs contains location_id.""" + ts_utc = int(time.time() * 1000) + + event = location_service.create_location("Another Location", ts_utc, "test_user") + + assert "location_id" in event.entity_refs + assert len(event.entity_refs["location_id"]) == 26 # ULID length + + def test_event_payload_contains_name(self, seeded_db, location_service): + """Event payload contains the location name.""" + ts_utc = int(time.time() * 1000) + + event = location_service.create_location("My Location", ts_utc, "test_user") + + assert event.payload["name"] == "My Location" + + def test_inserts_into_locations_table(self, seeded_db, location_service): + """create_location inserts a new row in locations table.""" + ts_utc = int(time.time() * 1000) + + event = location_service.create_location("Test Location", ts_utc, "test_user") + + row = seeded_db.execute( + "SELECT id, name, active FROM locations WHERE id = ?", + (event.entity_refs["location_id"],), + ).fetchone() + + assert row is not None + assert row[1] == "Test Location" + assert row[2] == 1 # active=True + + def test_returns_existing_location_if_name_exists(self, seeded_db, location_service): + """create_location returns None if location with same name already exists.""" + ts_utc = int(time.time() * 1000) + + # Create first location + event1 = location_service.create_location("Unique Name", ts_utc, "test_user") + assert event1 is not None + + # Try to create duplicate - should return None (idempotent) + event2 = location_service.create_location("Unique Name", ts_utc + 1000, "test_user") + + assert event2 is None + + def test_idempotent_skips_if_event_exists_for_location(self, seeded_db, location_service): + """create_location is idempotent - skips if LocationCreated event exists.""" + ts_utc = int(time.time() * 1000) + + # Create the location once + event1 = location_service.create_location("Idempotent Test", ts_utc, "test_user") + location_id = event1.entity_refs["location_id"] + + # Count events for this location + event_count_before = seeded_db.execute( + "SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?", + (LOCATION_CREATED, f"%{location_id}%"), + ).fetchone()[0] + + # Calling create again with same name should not create a new event + event2 = location_service.create_location("Idempotent Test", ts_utc + 1000, "test_user") + + event_count_after = seeded_db.execute( + "SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?", + (LOCATION_CREATED, f"%{location_id}%"), + ).fetchone()[0] + + assert event2 is None + assert event_count_after == event_count_before + + +class TestLocationServiceCreateValidation: + """Tests for create_location() validation.""" + + def test_rejects_empty_name(self, seeded_db, location_service): + """Raises ValidationError for empty name.""" + with pytest.raises(ValidationError, match="name"): + location_service.create_location("", int(time.time() * 1000), "test_user") + + def test_rejects_whitespace_only_name(self, seeded_db, location_service): + """Raises ValidationError for whitespace-only name.""" + with pytest.raises(ValidationError, match="name"): + location_service.create_location(" ", int(time.time() * 1000), "test_user") + + +# ============================================================================= +# rename_location Tests +# ============================================================================= + + +class TestLocationServiceRename: + """Tests for rename_location().""" + + def test_creates_location_renamed_event(self, seeded_db, location_service): + """rename_location creates a LocationRenamed event.""" + ts_utc = int(time.time() * 1000) + + # First create a location + create_event = location_service.create_location("Original Name", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + # Rename it + event = location_service.rename_location( + location_id, "New Name", ts_utc + 1000, "test_user" + ) + + assert event.type == LOCATION_RENAMED + assert event.actor == "test_user" + assert event.ts_utc == ts_utc + 1000 + + def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service): + """Event entity_refs contains location_id.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Rename Test", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + event = location_service.rename_location(location_id, "Renamed", ts_utc + 1000, "test_user") + + assert event.entity_refs["location_id"] == location_id + + def test_event_payload_contains_new_name(self, seeded_db, location_service): + """Event payload contains the new name.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Before", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + event = location_service.rename_location(location_id, "After", ts_utc + 1000, "test_user") + + assert event.payload["new_name"] == "After" + + def test_updates_locations_table_name(self, seeded_db, location_service): + """rename_location updates the name in locations table.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Old Name", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + location_service.rename_location(location_id, "Updated Name", ts_utc + 1000, "test_user") + + row = seeded_db.execute( + "SELECT name FROM locations WHERE id = ?", (location_id,) + ).fetchone() + + assert row[0] == "Updated Name" + + +class TestLocationServiceRenameValidation: + """Tests for rename_location() validation.""" + + def test_rejects_nonexistent_location(self, seeded_db, location_service): + """Raises ValidationError for non-existent location_id.""" + fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV" + + with pytest.raises(ValidationError, match="not found"): + location_service.rename_location( + fake_id, "New Name", int(time.time() * 1000), "test_user" + ) + + def test_rejects_duplicate_name(self, seeded_db, location_service): + """Raises ValidationError when renaming to an existing name.""" + ts_utc = int(time.time() * 1000) + + # Create two locations + location_service.create_location("First Location", ts_utc, "test_user") + create_event2 = location_service.create_location( + "Second Location", ts_utc + 1000, "test_user" + ) + location_id = create_event2.entity_refs["location_id"] + + # Try to rename second to first's name + with pytest.raises(ValidationError, match="already exists"): + location_service.rename_location( + location_id, "First Location", ts_utc + 2000, "test_user" + ) + + def test_rejects_empty_new_name(self, seeded_db, location_service): + """Raises ValidationError for empty new_name.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Test", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + with pytest.raises(ValidationError, match="name"): + location_service.rename_location(location_id, "", ts_utc + 1000, "test_user") + + def test_allows_rename_to_same_name(self, seeded_db, location_service): + """Allows renaming to the same name (no-op).""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Same Name", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + # Should not raise - renaming to same name is allowed (no-op) + event = location_service.rename_location( + location_id, "Same Name", ts_utc + 1000, "test_user" + ) + + assert event.type == LOCATION_RENAMED + + +# ============================================================================= +# archive_location Tests +# ============================================================================= + + +class TestLocationServiceArchive: + """Tests for archive_location().""" + + def test_creates_location_archived_event(self, seeded_db, location_service): + """archive_location creates a LocationArchived event.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("To Archive", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + event = location_service.archive_location(location_id, ts_utc + 1000, "test_user") + + assert event.type == LOCATION_ARCHIVED + assert event.actor == "test_user" + assert event.ts_utc == ts_utc + 1000 + + def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service): + """Event entity_refs contains location_id.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Archive Test", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + event = location_service.archive_location(location_id, ts_utc + 1000, "test_user") + + assert event.entity_refs["location_id"] == location_id + + def test_sets_location_inactive(self, seeded_db, location_service): + """archive_location sets active=0 in locations table.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Active Location", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + # Verify initially active + row = seeded_db.execute( + "SELECT active FROM locations WHERE id = ?", (location_id,) + ).fetchone() + assert row[0] == 1 + + # Archive it + location_service.archive_location(location_id, ts_utc + 1000, "test_user") + + # Verify now inactive + row = seeded_db.execute( + "SELECT active FROM locations WHERE id = ?", (location_id,) + ).fetchone() + assert row[0] == 0 + + def test_location_not_in_list_active(self, seeded_db, location_service): + """Archived location does not appear in list_active().""" + from animaltrack.repositories.locations import LocationRepository + + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Will Be Archived", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + # Before archive - should be in list + active_before = LocationRepository(seeded_db).list_active() + assert any(loc.id == location_id for loc in active_before) + + # Archive it + location_service.archive_location(location_id, ts_utc + 1000, "test_user") + + # After archive - should not be in list + active_after = LocationRepository(seeded_db).list_active() + assert not any(loc.id == location_id for loc in active_after) + + +class TestLocationServiceArchiveValidation: + """Tests for archive_location() validation.""" + + def test_rejects_nonexistent_location(self, seeded_db, location_service): + """Raises ValidationError for non-existent location_id.""" + fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV" + + with pytest.raises(ValidationError, match="not found"): + location_service.archive_location(fake_id, int(time.time() * 1000), "test_user") + + def test_rejects_already_archived_location(self, seeded_db, location_service): + """Raises ValidationError when archiving an already archived location.""" + ts_utc = int(time.time() * 1000) + create_event = location_service.create_location("Already Archived", ts_utc, "test_user") + location_id = create_event.entity_refs["location_id"] + + # Archive once + location_service.archive_location(location_id, ts_utc + 1000, "test_user") + + # Try to archive again + with pytest.raises(ValidationError, match="already archived"): + location_service.archive_location(location_id, ts_utc + 2000, "test_user") diff --git a/tests/test_web_locations.py b/tests/test_web_locations.py new file mode 100644 index 0000000..b2d581d --- /dev/null +++ b/tests/test_web_locations.py @@ -0,0 +1,443 @@ +# ABOUTME: Tests for Location management web routes. +# ABOUTME: Covers GET /locations list and POST /actions/location-* (admin-only). + +import json +import os +import time + +import pytest +from starlette.testclient import TestClient + +from animaltrack.models.reference import User, UserRole +from animaltrack.repositories.users import UserRepository + + +def make_test_settings( + csrf_secret: str = "test-secret", + trusted_proxy_ips: str = "127.0.0.1", + dev_mode: bool = True, +): + """Create Settings for testing by setting env vars temporarily.""" + from animaltrack.config import Settings + + old_env = os.environ.copy() + try: + os.environ["CSRF_SECRET"] = csrf_secret + os.environ["TRUSTED_PROXY_IPS"] = trusted_proxy_ips + os.environ["DEV_MODE"] = str(dev_mode).lower() + return Settings() + finally: + os.environ.clear() + os.environ.update(old_env) + + +@pytest.fixture +def admin_client(seeded_db): + """Test client with admin role (dev mode - bypasses CSRF, auto-admin auth).""" + from animaltrack.web.app import create_app + + settings = make_test_settings(trusted_proxy_ips="testclient", dev_mode=True) + app, rt = create_app(settings=settings, db=seeded_db) + return TestClient(app, raise_server_exceptions=True) + + +@pytest.fixture +def user_client(seeded_db): + """Test client with regular (recorder) role.""" + from animaltrack.web.app import create_app + + # Create recorder user in database + user_repo = UserRepository(seeded_db) + now = int(time.time() * 1000) + user = User( + username="recorder", + display_name="Recorder User", + role=UserRole.RECORDER, + active=True, + created_at_utc=now, + updated_at_utc=now, + ) + user_repo.upsert(user) + + # Use dev_mode=False to enable real auth + settings = make_test_settings(trusted_proxy_ips="testclient", dev_mode=False) + app, rt = create_app(settings=settings, db=seeded_db) + client = TestClient(app, raise_server_exceptions=True) + # Set header to simulate trusted proxy auth + client.headers["X-Oidc-Username"] = "recorder" + return client + + +# ============================================================================= +# GET /locations Tests +# ============================================================================= + + +class TestLocationListRendering: + """Tests for GET /locations list rendering.""" + + def test_locations_page_renders_for_admin(self, admin_client): + """GET /locations returns 200 for admin.""" + resp = admin_client.get("/locations") + assert resp.status_code == 200 + + def test_locations_page_shows_seeded_locations(self, admin_client): + """Locations page shows seeded locations.""" + resp = admin_client.get("/locations") + assert resp.status_code == 200 + assert "Strip 1" in resp.text + assert "Strip 2" in resp.text + assert "Nursery 1" in resp.text + + def test_locations_page_returns_403_for_recorder(self, user_client): + """GET /locations returns 403 for non-admin.""" + resp = user_client.get("/locations") + assert resp.status_code == 403 + + +# ============================================================================= +# POST /actions/location-created Tests +# ============================================================================= + + +class TestLocationCreatedSuccess: + """Tests for successful POST /actions/location-created.""" + + def test_creates_location(self, admin_client, seeded_db): + """POST creates a new location.""" + resp = admin_client.post( + "/actions/location-created", + data={ + "name": "New Test Location", + "nonce": "test-nonce-loc-1", + }, + ) + + assert resp.status_code == 200 + + # Verify location was created + row = seeded_db.execute( + "SELECT name, active FROM locations WHERE name = 'New Test Location'" + ).fetchone() + assert row is not None + assert row[0] == "New Test Location" + assert row[1] == 1 + + def test_creates_event(self, admin_client, seeded_db): + """POST creates LocationCreated event.""" + resp = admin_client.post( + "/actions/location-created", + data={ + "name": "Event Test Location", + "nonce": "test-nonce-loc-2", + }, + ) + + assert resp.status_code == 200 + + event_row = seeded_db.execute( + "SELECT type FROM events WHERE type = 'LocationCreated' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert event_row is not None + assert event_row[0] == "LocationCreated" + + def test_returns_toast_on_success(self, admin_client): + """POST returns HX-Trigger with toast message.""" + resp = admin_client.post( + "/actions/location-created", + data={ + "name": "Toast Test Location", + "nonce": "test-nonce-loc-3", + }, + ) + + assert resp.status_code == 200 + assert "HX-Trigger" in resp.headers + trigger = json.loads(resp.headers["HX-Trigger"]) + assert "showToast" in trigger + assert trigger["showToast"]["type"] == "success" + + def test_idempotent_for_existing_name(self, admin_client, seeded_db): + """POST is idempotent - returns success for existing name.""" + # First create + resp1 = admin_client.post( + "/actions/location-created", + data={ + "name": "Idempotent Location", + "nonce": "test-nonce-loc-4", + }, + ) + assert resp1.status_code == 200 + + # Second create with same name - should still succeed (idempotent) + resp2 = admin_client.post( + "/actions/location-created", + data={ + "name": "Idempotent Location", + "nonce": "test-nonce-loc-5", + }, + ) + assert resp2.status_code == 200 + + +class TestLocationCreatedValidation: + """Tests for POST /actions/location-created validation.""" + + def test_returns_403_for_recorder(self, user_client): + """POST returns 403 for non-admin.""" + resp = user_client.post( + "/actions/location-created", + data={ + "name": "Unauthorized Location", + "nonce": "test-nonce-loc-6", + }, + ) + assert resp.status_code == 403 + + def test_returns_422_for_empty_name(self, admin_client): + """POST returns 422 for empty name.""" + resp = admin_client.post( + "/actions/location-created", + data={ + "name": "", + "nonce": "test-nonce-loc-7", + }, + ) + assert resp.status_code == 422 + + def test_returns_422_for_whitespace_name(self, admin_client): + """POST returns 422 for whitespace-only name.""" + resp = admin_client.post( + "/actions/location-created", + data={ + "name": " ", + "nonce": "test-nonce-loc-8", + }, + ) + assert resp.status_code == 422 + + +# ============================================================================= +# POST /actions/location-renamed Tests +# ============================================================================= + + +class TestLocationRenamedSuccess: + """Tests for successful POST /actions/location-renamed.""" + + def test_renames_location(self, admin_client, seeded_db): + """POST renames an existing location.""" + # Get an existing location ID + row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() + location_id = row[0] + + resp = admin_client.post( + "/actions/location-renamed", + data={ + "location_id": location_id, + "new_name": "Renamed Strip 1", + "nonce": "test-nonce-loc-9", + }, + ) + + assert resp.status_code == 200 + + # Verify rename + row = seeded_db.execute( + "SELECT name FROM locations WHERE id = ?", (location_id,) + ).fetchone() + assert row[0] == "Renamed Strip 1" + + def test_creates_event(self, admin_client, seeded_db): + """POST creates LocationRenamed event.""" + row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone() + location_id = row[0] + + resp = admin_client.post( + "/actions/location-renamed", + data={ + "location_id": location_id, + "new_name": "Renamed Strip 2", + "nonce": "test-nonce-loc-10", + }, + ) + + assert resp.status_code == 200 + + event_row = seeded_db.execute( + "SELECT type FROM events WHERE type = 'LocationRenamed' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert event_row is not None + + +class TestLocationRenamedValidation: + """Tests for POST /actions/location-renamed validation.""" + + def test_returns_403_for_recorder(self, user_client, seeded_db): + """POST returns 403 for non-admin.""" + row = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone() + location_id = row[0] + + resp = user_client.post( + "/actions/location-renamed", + data={ + "location_id": location_id, + "new_name": "New Name", + "nonce": "test-nonce-loc-11", + }, + ) + assert resp.status_code == 403 + + def test_returns_422_for_nonexistent_location(self, admin_client): + """POST returns 422 for non-existent location.""" + fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV" + + resp = admin_client.post( + "/actions/location-renamed", + data={ + "location_id": fake_id, + "new_name": "New Name", + "nonce": "test-nonce-loc-12", + }, + ) + assert resp.status_code == 422 + + def test_returns_422_for_duplicate_name(self, admin_client, seeded_db): + """POST returns 422 when renaming to existing name.""" + row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 3'").fetchone() + location_id = row[0] + + # Try to rename to an existing name + resp = admin_client.post( + "/actions/location-renamed", + data={ + "location_id": location_id, + "new_name": "Strip 4", # Already exists + "nonce": "test-nonce-loc-13", + }, + ) + assert resp.status_code == 422 + + +# ============================================================================= +# POST /actions/location-archived Tests +# ============================================================================= + + +class TestLocationArchivedSuccess: + """Tests for successful POST /actions/location-archived.""" + + def test_archives_location(self, admin_client, seeded_db): + """POST archives an existing location.""" + # First create a location to archive + admin_client.post( + "/actions/location-created", + data={"name": "To Be Archived", "nonce": "test-nonce-loc-14"}, + ) + + row = seeded_db.execute("SELECT id FROM locations WHERE name = 'To Be Archived'").fetchone() + location_id = row[0] + + resp = admin_client.post( + "/actions/location-archived", + data={ + "location_id": location_id, + "nonce": "test-nonce-loc-15", + }, + ) + + assert resp.status_code == 200 + + # Verify archived + row = seeded_db.execute( + "SELECT active FROM locations WHERE id = ?", (location_id,) + ).fetchone() + assert row[0] == 0 + + def test_creates_event(self, admin_client, seeded_db): + """POST creates LocationArchived event.""" + # Create a location to archive + admin_client.post( + "/actions/location-created", + data={"name": "Archive Event Test", "nonce": "test-nonce-loc-16"}, + ) + + row = seeded_db.execute( + "SELECT id FROM locations WHERE name = 'Archive Event Test'" + ).fetchone() + location_id = row[0] + + resp = admin_client.post( + "/actions/location-archived", + data={ + "location_id": location_id, + "nonce": "test-nonce-loc-17", + }, + ) + + assert resp.status_code == 200 + + event_row = seeded_db.execute( + "SELECT type FROM events WHERE type = 'LocationArchived' ORDER BY id DESC LIMIT 1" + ).fetchone() + assert event_row is not None + + +class TestLocationArchivedValidation: + """Tests for POST /actions/location-archived validation.""" + + def test_returns_403_for_recorder(self, user_client, seeded_db): + """POST returns 403 for non-admin.""" + row = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone() + location_id = row[0] + + resp = user_client.post( + "/actions/location-archived", + data={ + "location_id": location_id, + "nonce": "test-nonce-loc-18", + }, + ) + assert resp.status_code == 403 + + def test_returns_422_for_nonexistent_location(self, admin_client): + """POST returns 422 for non-existent location.""" + fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV" + + resp = admin_client.post( + "/actions/location-archived", + data={ + "location_id": fake_id, + "nonce": "test-nonce-loc-19", + }, + ) + assert resp.status_code == 422 + + def test_returns_422_for_already_archived(self, admin_client, seeded_db): + """POST returns 422 when archiving already archived location.""" + # Create and archive a location + admin_client.post( + "/actions/location-created", + data={"name": "Double Archive Test", "nonce": "test-nonce-loc-20"}, + ) + + row = seeded_db.execute( + "SELECT id FROM locations WHERE name = 'Double Archive Test'" + ).fetchone() + location_id = row[0] + + # Archive once + admin_client.post( + "/actions/location-archived", + data={"location_id": location_id, "nonce": "test-nonce-loc-21"}, + ) + + # Try to archive again + resp = admin_client.post( + "/actions/location-archived", + data={ + "location_id": location_id, + "nonce": "test-nonce-loc-22", + }, + ) + assert resp.status_code == 422 diff --git a/tests/test_web_responses.py b/tests/test_web_responses.py new file mode 100644 index 0000000..2f72531 --- /dev/null +++ b/tests/test_web_responses.py @@ -0,0 +1,109 @@ +# ABOUTME: Tests for web response helper functions. +# ABOUTME: Tests error_response, error_toast, and success_toast helpers. + +import json + +from starlette.responses import HTMLResponse + +from animaltrack.web.responses import error_response, error_toast, success_toast + + +class TestErrorResponse: + """Tests for error_response().""" + + def test_returns_html_response(self): + """error_response returns an HTMLResponse.""" + response = error_response("
Error
") + assert isinstance(response, HTMLResponse) + + def test_default_status_422(self): + """error_response defaults to 422 status code.""" + response = error_response("
Validation error
") + assert response.status_code == 422 + + def test_custom_status_code(self): + """error_response accepts custom status code.""" + response = error_response("
Conflict
", status_code=409) + assert response.status_code == 409 + + def test_403_status_code(self): + """error_response works with 403 status code.""" + response = error_response("
Forbidden
", status_code=403) + assert response.status_code == 403 + + def test_401_status_code(self): + """error_response works with 401 status code.""" + response = error_response("
Unauthorized
", status_code=401) + assert response.status_code == 401 + + def test_content_in_body(self): + """error_response includes content in body.""" + response = error_response("

Error message

") + assert b"

Error message

" in response.body + + +class TestErrorToast: + """Tests for error_toast().""" + + def test_returns_html_response(self): + """error_toast returns an HTMLResponse.""" + response = error_toast("Something went wrong") + assert isinstance(response, HTMLResponse) + + def test_default_status_422(self): + """error_toast defaults to 422 status code.""" + response = error_toast("Validation failed") + assert response.status_code == 422 + + def test_custom_status_code(self): + """error_toast accepts custom status code.""" + response = error_toast("Not allowed", status_code=403) + assert response.status_code == 403 + + def test_has_hx_trigger_header(self): + """error_toast sets HX-Trigger header.""" + response = error_toast("Error message") + assert "HX-Trigger" in response.headers + + def test_toast_message_in_header(self): + """error_toast includes message in HX-Trigger header.""" + response = error_toast("Error message") + trigger = json.loads(response.headers["HX-Trigger"]) + assert trigger["showToast"]["message"] == "Error message" + + def test_toast_type_error(self): + """error_toast sets toast type to error.""" + response = error_toast("Error message") + trigger = json.loads(response.headers["HX-Trigger"]) + assert trigger["showToast"]["type"] == "error" + + +class TestSuccessToast: + """Tests for success_toast().""" + + def test_returns_dict(self): + """success_toast returns a dict (for HX-Trigger header).""" + result = success_toast("Action completed") + assert isinstance(result, dict) + + def test_has_show_toast_key(self): + """success_toast dict has showToast key.""" + result = success_toast("Action completed") + assert "showToast" in result + + def test_message_in_toast(self): + """success_toast includes message.""" + result = success_toast("Created successfully") + assert result["showToast"]["message"] == "Created successfully" + + def test_toast_type_success(self): + """success_toast sets toast type to success.""" + result = success_toast("Done") + assert result["showToast"]["type"] == "success" + + def test_can_json_dumps(self): + """success_toast result can be JSON serialized.""" + result = success_toast("Test message") + serialized = json.dumps(result) + assert '"showToast"' in serialized + assert '"Test message"' in serialized