feat: implement location events & error handling (Step 10.2)

- Add LocationService with create/rename/archive methods
- Add LocationProjection for event handling
- Add admin-only location management routes at /locations
- Add error response helpers (error_response, error_toast, success_toast)
- Add toast handler JS to base template for HX-Trigger notifications
- Update seeds.py to emit LocationCreated events per spec §23
- Archived locations block animal moves with 422 error

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-31 17:48:16 +00:00
parent 5ba068b36a
commit 229842fb45
14 changed files with 1896 additions and 28 deletions

14
PLAN.md
View File

@@ -379,19 +379,19 @@ Check off items as completed. Each phase builds on the previous.
- [x] E2E test #6: Deletes: recorder vs admin cascade (test_e2e_deletion.py)
- [x] E2E test #7: Harvest with yields (test_e2e_harvest.py)
- [x] E2E test #8: Optimistic lock with confirm (test_e2e_optimistic_lock.py)
- [ ] **Commit checkpoint**
- [x] **Commit checkpoint** (5ba068b)
NOTE: Tests #1-5 in test_e2e_stats_progression.py have documented discrepancies with
spec values due to integer truncation of bird-days and timeline differences. The
tests verify implementation consistency, not exact spec values.
### Step 10.2: Location Events & Error Handling
- [ ] Implement LocationCreated (idempotent for seeding)
- [ ] Implement LocationRenamed
- [ ] Implement LocationArchived
- [ ] Standardize error responses (422, 409, 401/403)
- [ ] Toast via HX-Trigger
- [ ] Write tests for location events and error rendering
- [x] Implement LocationCreated (idempotent for seeding)
- [x] Implement LocationRenamed
- [x] Implement LocationArchived
- [x] Standardize error responses (422, 409, 401/403)
- [x] Toast via HX-Trigger
- [x] Write tests for location events and error rendering
- [ ] **Commit checkpoint**
### Step 10.3: CLI, Docker & Deployment

View File

@@ -0,0 +1,162 @@
# ABOUTME: Projection for location lifecycle events.
# ABOUTME: Handles LocationCreated, LocationRenamed, and LocationArchived events.
import time
from typing import Any
from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED
from animaltrack.models.events import Event
from animaltrack.models.reference import Location
from animaltrack.projections.base import Projection
from animaltrack.repositories.locations import LocationRepository
class LocationProjection(Projection):
"""Maintains location reference data from events.
This projection handles location lifecycle events, maintaining:
- locations: Reference table of all locations
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection.
"""
super().__init__(db)
self.location_repo = LocationRepository(db)
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [LOCATION_CREATED, LOCATION_RENAMED, LOCATION_ARCHIVED]
def apply(self, event: Event) -> None:
"""Apply location event to update reference data."""
if event.type == LOCATION_CREATED:
self._apply_location_created(event)
elif event.type == LOCATION_RENAMED:
self._apply_location_renamed(event)
elif event.type == LOCATION_ARCHIVED:
self._apply_location_archived(event)
def revert(self, event: Event) -> None:
"""Revert location event from reference data."""
if event.type == LOCATION_CREATED:
self._revert_location_created(event)
elif event.type == LOCATION_RENAMED:
self._revert_location_renamed(event)
elif event.type == LOCATION_ARCHIVED:
self._revert_location_archived(event)
def _apply_location_created(self, event: Event) -> None:
"""Apply location created event.
Inserts a new location into the locations table.
"""
location_id = event.entity_refs.get("location_id")
name = event.payload.get("name")
ts_utc = event.ts_utc
location = Location(
id=location_id,
name=name,
active=True,
created_at_utc=ts_utc,
updated_at_utc=ts_utc,
)
self.location_repo.upsert(location)
def _revert_location_created(self, event: Event) -> None:
"""Revert location created event.
Removes the location from the locations table.
"""
location_id = event.entity_refs.get("location_id")
self.db.execute(
"DELETE FROM locations WHERE id = ?",
(location_id,),
)
def _apply_location_renamed(self, event: Event) -> None:
"""Apply location renamed event.
Updates the location name in the locations table.
"""
location_id = event.entity_refs.get("location_id")
new_name = event.payload.get("new_name")
ts_utc = event.ts_utc
self.db.execute(
"UPDATE locations SET name = ?, updated_at_utc = ? WHERE id = ?",
(new_name, ts_utc, location_id),
)
def _revert_location_renamed(self, event: Event) -> None:
"""Revert location renamed event.
This requires looking up the previous name from event history.
For simplicity, we search for the most recent prior name.
"""
location_id = event.entity_refs.get("location_id")
ts_utc = int(time.time() * 1000)
# Find the previous name from event history:
# 1. Check if there's an earlier LocationRenamed event
# 2. Or fall back to the LocationCreated event
row = self.db.execute(
"""
SELECT CASE
WHEN type = ? THEN JSON_EXTRACT(payload, '$.new_name')
WHEN type = ? THEN JSON_EXTRACT(payload, '$.name')
END as name
FROM events
WHERE entity_refs LIKE ?
AND type IN (?, ?)
AND ts_utc < ?
ORDER BY ts_utc DESC
LIMIT 1
""",
(
LOCATION_RENAMED,
LOCATION_CREATED,
f'%"{location_id}"%',
LOCATION_RENAMED,
LOCATION_CREATED,
event.ts_utc,
),
).fetchone()
if row and row[0]:
previous_name = row[0]
self.db.execute(
"UPDATE locations SET name = ?, updated_at_utc = ? WHERE id = ?",
(previous_name, ts_utc, location_id),
)
def _apply_location_archived(self, event: Event) -> None:
"""Apply location archived event.
Sets the location to inactive.
"""
location_id = event.entity_refs.get("location_id")
ts_utc = event.ts_utc
self.db.execute(
"UPDATE locations SET active = 0, updated_at_utc = ? WHERE id = ?",
(ts_utc, location_id),
)
def _revert_location_archived(self, event: Event) -> None:
"""Revert location archived event.
Sets the location back to active.
"""
location_id = event.entity_refs.get("location_id")
ts_utc = int(time.time() * 1000)
self.db.execute(
"UPDATE locations SET active = 1, updated_at_utc = ? WHERE id = ?",
(ts_utc, location_id),
)

View File

@@ -4,23 +4,24 @@
import time
from typing import Any
from animaltrack.id_gen import generate_id
from animaltrack.events.store import EventStore
from animaltrack.models.reference import (
FeedType,
Location,
Product,
ProductUnit,
Species,
User,
UserRole,
)
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.location import LocationProjection
from animaltrack.repositories import (
FeedTypeRepository,
LocationRepository,
ProductRepository,
SpeciesRepository,
UserRepository,
)
from animaltrack.services.location import LocationService
def run_seeds(db: Any) -> None:
@@ -73,12 +74,17 @@ def _seed_users(db: Any, now_utc: int) -> None:
def _seed_locations(db: Any, now_utc: int) -> None:
"""Seed location data.
"""Seed location data via LocationCreated events.
Locations are created only if they don't exist (by name).
This preserves existing ULIDs on re-seeding.
Uses LocationService.create_location() which is idempotent -
locations are created only if they don't exist (by name).
This emits LocationCreated events per spec §23.
"""
repo = LocationRepository(db)
# Initialize LocationService with projection
event_store = EventStore(db)
registry = ProjectionRegistry()
registry.register(LocationProjection(db))
location_service = LocationService(db, event_store, registry)
location_names = [
"Strip 1",
@@ -92,16 +98,12 @@ def _seed_locations(db: Any, now_utc: int) -> None:
]
for name in location_names:
existing = repo.get_by_name(name)
if existing is None:
location = Location(
id=generate_id(),
name=name,
active=True,
created_at_utc=now_utc,
updated_at_utc=now_utc,
)
repo.upsert(location)
# create_location is idempotent - returns None if already exists
location_service.create_location(
name=name,
ts_utc=now_utc,
actor="system",
)
def _seed_species(db: Any, now_utc: int) -> None:

View File

@@ -0,0 +1,253 @@
# ABOUTME: Service layer for location operations.
# ABOUTME: Coordinates event creation with projection updates for location lifecycle.
from typing import Any
from animaltrack.db import transaction
from animaltrack.events.payloads import (
LocationArchivedPayload,
LocationCreatedPayload,
LocationRenamedPayload,
)
from animaltrack.events.processor import process_event
from animaltrack.events.store import EventStore
from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED
from animaltrack.id_gen import generate_id
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
from animaltrack.repositories.locations import LocationRepository
class LocationServiceError(Exception):
"""Base exception for location service errors."""
class ValidationError(LocationServiceError):
"""Raised when validation fails."""
class LocationService:
"""Service for location lifecycle operations.
Provides methods to create, rename, and archive locations.
All operations are atomic and emit events.
"""
def __init__(
self,
db: Any,
event_store: EventStore,
registry: ProjectionRegistry,
) -> None:
"""Initialize the service.
Args:
db: A fastlite database connection.
event_store: The event store for appending events.
registry: Registry of projections to update.
"""
self.db = db
self.event_store = event_store
self.registry = registry
self.location_repo = LocationRepository(db)
def create_location(
self,
name: str,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event | None:
"""Create a new location.
Creates a LocationCreated event if no location with this name exists.
This is idempotent per spec §23 - returns None if location already exists.
Args:
name: The location name.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the action.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event, or None if location already exists (idempotent).
Raises:
ValidationError: If name is empty or whitespace-only.
"""
# Validate name
if not name or not name.strip():
msg = "Location name cannot be empty"
raise ValidationError(msg)
name = name.strip()
# Check if location with this name already exists (idempotent)
existing = self.location_repo.get_by_name(name)
if existing is not None:
return None
# Generate a new location ID
location_id = generate_id()
# Create payload
payload = LocationCreatedPayload(
location_id=location_id,
name=name,
)
# Build entity_refs
entity_refs = {
"location_id": location_id,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=LOCATION_CREATED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event
def rename_location(
self,
location_id: str,
new_name: str,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Rename an existing location.
Creates a LocationRenamed event.
Args:
location_id: The location ID (ULID).
new_name: The new name for the location.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the action.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If location not found, name is empty, or name already exists.
"""
# Validate new_name
if not new_name or not new_name.strip():
msg = "Location name cannot be empty"
raise ValidationError(msg)
new_name = new_name.strip()
# Check location exists
location = self.location_repo.get(location_id)
if location is None:
msg = f"Location '{location_id}' not found"
raise ValidationError(msg)
# Check new name doesn't conflict with another location (but allow same name)
existing = self.location_repo.get_by_name(new_name)
if existing is not None and existing.id != location_id:
msg = f"Location with name '{new_name}' already exists"
raise ValidationError(msg)
# Create payload
payload = LocationRenamedPayload(
location_id=location_id,
new_name=new_name,
)
# Build entity_refs
entity_refs = {
"location_id": location_id,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=LOCATION_RENAMED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event
def archive_location(
self,
location_id: str,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Archive a location.
Creates a LocationArchived event. The location will be marked inactive
and hidden from active location lists.
Args:
location_id: The location ID (ULID).
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the action.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If location not found or already archived.
"""
# Check location exists
location = self.location_repo.get(location_id)
if location is None:
msg = f"Location '{location_id}' not found"
raise ValidationError(msg)
# Check not already archived
if not location.active:
msg = f"Location '{location_id}' is already archived"
raise ValidationError(msg)
# Create payload
payload = LocationArchivedPayload(
location_id=location_id,
)
# Build entity_refs
entity_refs = {
"location_id": location_id,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=LOCATION_ARCHIVED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event

View File

@@ -26,6 +26,7 @@ from animaltrack.web.routes import (
register_events_routes,
register_feed_routes,
register_health_routes,
register_location_routes,
register_move_routes,
register_product_routes,
register_registry_routes,
@@ -152,6 +153,7 @@ def create_app(
register_egg_routes(rt, app)
register_events_routes(rt, app)
register_feed_routes(rt, app)
register_location_routes(rt, app)
register_move_routes(rt, app)
register_product_routes(rt, app)
register_registry_routes(rt, app)

View File

@@ -0,0 +1,64 @@
# ABOUTME: Helper functions for creating standardized HTTP responses.
# ABOUTME: Provides error_response, error_toast, and success_toast utilities.
import json
from starlette.responses import HTMLResponse
def error_response(content: str, status_code: int = 422) -> HTMLResponse:
"""Create an HTML error response.
Args:
content: The HTML content to include in the response body.
status_code: The HTTP status code (default 422 for validation errors).
Returns:
An HTMLResponse with the specified content and status code.
Standard status codes:
- 422: Validation errors (missing fields, invalid values)
- 409: Conflicts (roster mismatch, dependents blocking delete)
- 403: Authorization failed (wrong role, CSRF)
- 401: Authentication failed (no user)
"""
return HTMLResponse(content=content, status_code=status_code)
def error_toast(message: str, status_code: int = 422) -> HTMLResponse:
"""Create an error response with a toast notification.
Returns a minimal HTML response with an HX-Trigger header that
triggers a toast notification on the client side.
Args:
message: The error message to display in the toast.
status_code: The HTTP status code (default 422 for validation errors).
Returns:
An HTMLResponse with HX-Trigger header for toast display.
"""
response = HTMLResponse(content="", status_code=status_code)
response.headers["HX-Trigger"] = json.dumps(
{"showToast": {"message": message, "type": "error"}}
)
return response
def success_toast(message: str) -> dict:
"""Create a toast trigger dict for a success message.
This returns a dict that can be JSON-serialized and used as
an HX-Trigger header value.
Args:
message: The success message to display in the toast.
Returns:
A dict suitable for use as HX-Trigger header value.
Usage:
response = HTMLResponse(content=...)
response.headers["HX-Trigger"] = json.dumps(success_toast("Done!"))
"""
return {"showToast": {"message": message, "type": "success"}}

View File

@@ -7,6 +7,7 @@ from animaltrack.web.routes.eggs import register_egg_routes
from animaltrack.web.routes.events import register_events_routes
from animaltrack.web.routes.feed import register_feed_routes
from animaltrack.web.routes.health import register_health_routes
from animaltrack.web.routes.locations import register_location_routes
from animaltrack.web.routes.move import register_move_routes
from animaltrack.web.routes.products import register_product_routes
from animaltrack.web.routes.registry import register_registry_routes
@@ -18,6 +19,7 @@ __all__ = [
"register_events_routes",
"register_feed_routes",
"register_health_routes",
"register_location_routes",
"register_move_routes",
"register_product_routes",
"register_registry_routes",

View File

@@ -0,0 +1,253 @@
# ABOUTME: Routes for Location management functionality (admin-only).
# ABOUTME: Handles GET /locations and POST /actions/location-* routes.
from __future__ import annotations
import json
import time
from fasthtml.common import to_xml
from starlette.requests import Request
from starlette.responses import HTMLResponse
from animaltrack.events.store import EventStore
from animaltrack.models.reference import UserRole
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.location import LocationProjection
from animaltrack.repositories.locations import LocationRepository
from animaltrack.services.location import LocationService, ValidationError
from animaltrack.web.auth import require_role
from animaltrack.web.responses import success_toast
from animaltrack.web.templates import page
from animaltrack.web.templates.locations import location_list, rename_form
def _get_location_service(db) -> LocationService:
"""Create a LocationService with projections."""
event_store = EventStore(db)
registry = ProjectionRegistry()
registry.register(LocationProjection(db))
return LocationService(db, event_store, registry)
# =============================================================================
# GET /locations - Location List
# =============================================================================
@require_role(UserRole.ADMIN)
async def locations_index(req: Request):
"""GET /locations - Location management page (admin-only)."""
db = req.app.state.db
locations = LocationRepository(db).list_all()
return page(
location_list(locations),
title="Locations - AnimalTrack",
active_nav=None,
)
# =============================================================================
# GET /locations/{id}/rename - Rename Form
# =============================================================================
@require_role(UserRole.ADMIN)
async def location_rename_form(req: Request, location_id: str):
"""GET /locations/{id}/rename - Rename location form (admin-only)."""
db = req.app.state.db
location = LocationRepository(db).get(location_id)
if location is None:
return HTMLResponse(content="Location not found", status_code=404)
return HTMLResponse(content=to_xml(rename_form(location)))
# =============================================================================
# POST /actions/location-created
# =============================================================================
@require_role(UserRole.ADMIN)
async def location_created(req: Request):
"""POST /actions/location-created - Create a new location (admin-only)."""
db = req.app.state.db
form = await req.form()
name = form.get("name", "").strip()
nonce = form.get("nonce")
# Validation
if not name:
return _render_error_list(db, "Location name cannot be empty")
ts_utc = int(time.time() * 1000)
# Get actor from auth
auth = req.scope.get("auth")
actor = auth.username if auth else "unknown"
# Create location
location_service = _get_location_service(db)
try:
event = location_service.create_location(
name=name,
ts_utc=ts_utc,
actor=actor,
nonce=nonce,
route="/actions/location-created",
)
except ValidationError as e:
return _render_error_list(db, str(e))
# Get updated list
locations = LocationRepository(db).list_all()
# Success response with toast
response = HTMLResponse(
content=to_xml(location_list(locations)),
)
if event is not None:
response.headers["HX-Trigger"] = json.dumps(success_toast(f"Created location: {name}"))
else:
# Idempotent - location already exists
response.headers["HX-Trigger"] = json.dumps(
success_toast(f"Location already exists: {name}")
)
return response
# =============================================================================
# POST /actions/location-renamed
# =============================================================================
@require_role(UserRole.ADMIN)
async def location_renamed(req: Request):
"""POST /actions/location-renamed - Rename a location (admin-only)."""
db = req.app.state.db
form = await req.form()
location_id = form.get("location_id", "").strip()
new_name = form.get("new_name", "").strip()
nonce = form.get("nonce")
# Validation
if not location_id:
return _render_error_list(db, "Location ID is required")
if not new_name:
return _render_error_list(db, "New name cannot be empty")
ts_utc = int(time.time() * 1000)
# Get actor from auth
auth = req.scope.get("auth")
actor = auth.username if auth else "unknown"
# Rename location
location_service = _get_location_service(db)
try:
location_service.rename_location(
location_id=location_id,
new_name=new_name,
ts_utc=ts_utc,
actor=actor,
nonce=nonce,
route="/actions/location-renamed",
)
except ValidationError as e:
return _render_error_list(db, str(e))
# Get updated list
locations = LocationRepository(db).list_all()
# Success response with toast
response = HTMLResponse(
content=to_xml(location_list(locations)),
)
response.headers["HX-Trigger"] = json.dumps(success_toast(f"Renamed location to: {new_name}"))
return response
# =============================================================================
# POST /actions/location-archived
# =============================================================================
@require_role(UserRole.ADMIN)
async def location_archived(req: Request):
"""POST /actions/location-archived - Archive a location (admin-only)."""
db = req.app.state.db
form = await req.form()
location_id = form.get("location_id", "").strip()
nonce = form.get("nonce")
# Validation
if not location_id:
return _render_error_list(db, "Location ID is required")
ts_utc = int(time.time() * 1000)
# Get actor from auth
auth = req.scope.get("auth")
actor = auth.username if auth else "unknown"
# Archive location
location_service = _get_location_service(db)
try:
location_service.archive_location(
location_id=location_id,
ts_utc=ts_utc,
actor=actor,
nonce=nonce,
route="/actions/location-archived",
)
except ValidationError as e:
return _render_error_list(db, str(e))
# Get updated list
locations = LocationRepository(db).list_all()
# Success response with toast
response = HTMLResponse(
content=to_xml(location_list(locations)),
)
response.headers["HX-Trigger"] = json.dumps(success_toast("Location archived"))
return response
# =============================================================================
# Helper Functions
# =============================================================================
def _render_error_list(db, error_message: str) -> HTMLResponse:
"""Render location list with error message."""
locations = LocationRepository(db).list_all()
return HTMLResponse(
content=to_xml(location_list(locations, error=error_message)),
status_code=422,
)
def register_location_routes(rt, app):
"""Register location management routes.
Args:
rt: FastHTML route decorator.
app: FastHTML application instance.
"""
rt("/locations")(locations_index)
rt("/locations/{location_id}/rename")(location_rename_form)
rt("/actions/location-created", methods=["POST"])(location_created)
rt("/actions/location-renamed", methods=["POST"])(location_renamed)
rt("/actions/location-archived", methods=["POST"])(location_archived)

View File

@@ -1,11 +1,70 @@
# ABOUTME: Base HTML template for AnimalTrack pages.
# ABOUTME: Provides consistent layout with MonsterUI theme and bottom nav.
from fasthtml.common import Container, Div, Title
from fasthtml.common import Container, Div, Script, Title
from animaltrack.web.templates.nav import BottomNav, BottomNavStyles
def toast_container():
"""Create a toast container for displaying notifications.
This container holds toast notifications that appear in the top-right corner.
Toasts are triggered via HTMX events (HX-Trigger header with showToast).
"""
return Div(
id="toast-container",
cls="toast toast-end toast-top z-50",
)
def toast_script():
"""JavaScript to handle showToast events from HTMX.
Listens for the showToast event and creates toast notifications
that auto-dismiss after 5 seconds.
"""
script = """
document.body.addEventListener('showToast', function(evt) {
var message = evt.detail.message || 'Action completed';
var type = evt.detail.type || 'success';
// Create alert element with appropriate styling
var alertClass = 'alert shadow-lg mb-2 ';
if (type === 'success') {
alertClass += 'alert-success';
} else if (type === 'error') {
alertClass += 'alert-error';
} else if (type === 'warning') {
alertClass += 'alert-warning';
} else {
alertClass += 'alert-info';
}
var toast = document.createElement('div');
toast.className = alertClass;
toast.innerHTML = '<span>' + message + '</span>';
var container = document.getElementById('toast-container');
if (container) {
container.appendChild(toast);
// Auto-remove after 5 seconds
setTimeout(function() {
toast.style.opacity = '0';
toast.style.transition = 'opacity 0.5s';
setTimeout(function() {
if (toast.parentNode) {
toast.parentNode.removeChild(toast);
}
}, 500);
}, 5000);
}
});
"""
return Script(script)
def page(content, title: str = "AnimalTrack", active_nav: str = "egg"):
"""
Base page template wrapper with navigation.
@@ -15,6 +74,7 @@ def page(content, title: str = "AnimalTrack", active_nav: str = "egg"):
- Bottom navigation styling
- Content container with nav padding
- Fixed bottom navigation bar
- Toast container for notifications
Args:
content: Page content (FT components)
@@ -27,6 +87,7 @@ def page(content, title: str = "AnimalTrack", active_nav: str = "egg"):
return (
Title(title),
BottomNavStyles(),
toast_script(),
# Main content with bottom padding for fixed nav
# hx-boost enables AJAX for all descendant forms/links
Div(
@@ -35,5 +96,6 @@ def page(content, title: str = "AnimalTrack", active_nav: str = "egg"):
hx_target="body",
cls="pb-20 min-h-screen bg-[#0f0f0e] text-stone-100",
),
toast_container(),
BottomNav(active_id=active_nav),
)

View File

@@ -0,0 +1,170 @@
# ABOUTME: Templates for Location management pages.
# ABOUTME: Admin-only pages for creating, renaming, and archiving locations.
from uuid import uuid4
from fasthtml.common import H1, Div, Form, Hidden, Table, Tbody, Td, Th, Thead, Tr
from monsterui.all import (
Alert,
AlertT,
Button,
ButtonT,
Card,
DivFullySpaced,
Grid,
LabelInput,
)
from animaltrack.models.reference import Location
def location_list(
locations: list[Location],
error: str | None = None,
) -> Div:
"""Create the location list page.
Args:
locations: List of all locations.
error: Optional error message to display.
Returns:
Div containing the location management page.
"""
# Error alert if present
error_alert = None
if error:
error_alert = Alert(error, cls=AlertT.error)
# Create new location form
create_form = Card(
Form(
LabelInput(
"Location Name",
name="name",
type="text",
required=True,
placeholder="Enter location name",
),
Hidden(name="nonce", value=str(uuid4())),
Button("Create Location", type="submit", cls=ButtonT.primary),
hx_post="/actions/location-created",
hx_target="#location-list",
hx_swap="outerHTML",
),
header=Div("Create New Location", cls="text-lg font-semibold"),
)
# Location table
rows = []
for loc in locations:
status = "Active" if loc.active else "Archived"
status_badge = Div(status, cls="badge badge-success" if loc.active else "badge badge-ghost")
# Action buttons
actions = DivFullySpaced(
Button(
"Rename",
cls=ButtonT.ghost + " btn-sm",
hx_get=f"/locations/{loc.id}/rename",
hx_target="#modal-container",
hx_swap="innerHTML",
)
if loc.active
else None,
Button(
"Archive",
cls=ButtonT.ghost + " btn-sm text-warning",
hx_post="/actions/location-archived",
hx_vals=f'{{"location_id": "{loc.id}", "nonce": "{uuid4()}"}}',
hx_target="#location-list",
hx_swap="outerHTML",
hx_confirm="Are you sure you want to archive this location?",
)
if loc.active
else None,
)
rows.append(
Tr(
Td(loc.name),
Td(status_badge),
Td(actions),
)
)
table = Table(
Thead(
Tr(
Th("Name"),
Th("Status"),
Th("Actions"),
)
),
Tbody(*rows),
cls="table table-zebra w-full",
)
location_card = Card(
table,
header=Div("All Locations", cls="text-lg font-semibold"),
)
return Div(
H1("Location Management", cls="text-2xl font-bold mb-4"),
error_alert,
Grid(
create_form,
cols_sm=1,
cols_md=1,
cols_lg=1,
gap=4,
),
Div(id="modal-container"),
Div(
location_card,
id="location-list",
cls="mt-4",
),
)
def rename_form(
location: Location,
error: str | None = None,
) -> Div:
"""Create the rename location form.
Args:
location: The location to rename.
error: Optional error message to display.
Returns:
Div containing the rename form.
"""
error_alert = None
if error:
error_alert = Alert(error, cls=AlertT.error)
return Card(
error_alert,
Form(
LabelInput(
"New Name",
name="new_name",
type="text",
required=True,
value=location.name,
),
Hidden(name="location_id", value=location.id),
Hidden(name="nonce", value=str(uuid4())),
DivFullySpaced(
Button("Cancel", type="button", cls=ButtonT.ghost, hx_get="/locations"),
Button("Rename", type="submit", cls=ButtonT.primary),
),
hx_post="/actions/location-renamed",
hx_target="#location-list",
hx_swap="outerHTML",
),
header=Div(f"Rename: {location.name}", cls="text-lg font-semibold"),
)

View File

@@ -240,6 +240,9 @@ class TestAnimalServiceTransactionIntegrity:
def test_no_partial_data_on_projection_error(self, seeded_db, event_store, valid_location_id):
"""If projection fails, event is not persisted."""
# Count events before the test (seeds create LocationCreated events)
event_count_before = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0]
# Create a registry with a failing projection
from animaltrack.projections import Projection, ProjectionError
@@ -264,9 +267,9 @@ class TestAnimalServiceTransactionIntegrity:
with pytest.raises(ProjectionError):
service.create_cohort(payload, ts_utc, "test_user")
# Verify nothing was persisted
event_count = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0]
assert event_count == 0
# Verify no new events were persisted (count unchanged from before)
event_count_after = seeded_db.execute("SELECT COUNT(*) FROM events").fetchone()[0]
assert event_count_after == event_count_before
animal_count = seeded_db.execute("SELECT COUNT(*) FROM animal_registry").fetchone()[0]
assert animal_count == 0

View File

@@ -0,0 +1,343 @@
# ABOUTME: Tests for LocationService operations.
# ABOUTME: Tests create_location, rename_location, and archive_location with event emission.
import time
import pytest
from animaltrack.events.store import EventStore
from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED
from animaltrack.projections import ProjectionRegistry
from animaltrack.services.location import LocationService, ValidationError
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with location projections registered."""
from animaltrack.projections.location import LocationProjection
registry = ProjectionRegistry()
registry.register(LocationProjection(seeded_db))
return registry
@pytest.fixture
def location_service(seeded_db, event_store, projection_registry):
"""Create a LocationService for testing."""
return LocationService(seeded_db, event_store, projection_registry)
# =============================================================================
# create_location Tests
# =============================================================================
class TestLocationServiceCreate:
"""Tests for create_location()."""
def test_creates_location_created_event(self, seeded_db, location_service):
"""create_location creates a LocationCreated event."""
ts_utc = int(time.time() * 1000)
event = location_service.create_location("New Location", ts_utc, "test_user")
assert event.type == LOCATION_CREATED
assert event.actor == "test_user"
assert event.ts_utc == ts_utc
def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service):
"""Event entity_refs contains location_id."""
ts_utc = int(time.time() * 1000)
event = location_service.create_location("Another Location", ts_utc, "test_user")
assert "location_id" in event.entity_refs
assert len(event.entity_refs["location_id"]) == 26 # ULID length
def test_event_payload_contains_name(self, seeded_db, location_service):
"""Event payload contains the location name."""
ts_utc = int(time.time() * 1000)
event = location_service.create_location("My Location", ts_utc, "test_user")
assert event.payload["name"] == "My Location"
def test_inserts_into_locations_table(self, seeded_db, location_service):
"""create_location inserts a new row in locations table."""
ts_utc = int(time.time() * 1000)
event = location_service.create_location("Test Location", ts_utc, "test_user")
row = seeded_db.execute(
"SELECT id, name, active FROM locations WHERE id = ?",
(event.entity_refs["location_id"],),
).fetchone()
assert row is not None
assert row[1] == "Test Location"
assert row[2] == 1 # active=True
def test_returns_existing_location_if_name_exists(self, seeded_db, location_service):
"""create_location returns None if location with same name already exists."""
ts_utc = int(time.time() * 1000)
# Create first location
event1 = location_service.create_location("Unique Name", ts_utc, "test_user")
assert event1 is not None
# Try to create duplicate - should return None (idempotent)
event2 = location_service.create_location("Unique Name", ts_utc + 1000, "test_user")
assert event2 is None
def test_idempotent_skips_if_event_exists_for_location(self, seeded_db, location_service):
"""create_location is idempotent - skips if LocationCreated event exists."""
ts_utc = int(time.time() * 1000)
# Create the location once
event1 = location_service.create_location("Idempotent Test", ts_utc, "test_user")
location_id = event1.entity_refs["location_id"]
# Count events for this location
event_count_before = seeded_db.execute(
"SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?",
(LOCATION_CREATED, f"%{location_id}%"),
).fetchone()[0]
# Calling create again with same name should not create a new event
event2 = location_service.create_location("Idempotent Test", ts_utc + 1000, "test_user")
event_count_after = seeded_db.execute(
"SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?",
(LOCATION_CREATED, f"%{location_id}%"),
).fetchone()[0]
assert event2 is None
assert event_count_after == event_count_before
class TestLocationServiceCreateValidation:
"""Tests for create_location() validation."""
def test_rejects_empty_name(self, seeded_db, location_service):
"""Raises ValidationError for empty name."""
with pytest.raises(ValidationError, match="name"):
location_service.create_location("", int(time.time() * 1000), "test_user")
def test_rejects_whitespace_only_name(self, seeded_db, location_service):
"""Raises ValidationError for whitespace-only name."""
with pytest.raises(ValidationError, match="name"):
location_service.create_location(" ", int(time.time() * 1000), "test_user")
# =============================================================================
# rename_location Tests
# =============================================================================
class TestLocationServiceRename:
"""Tests for rename_location()."""
def test_creates_location_renamed_event(self, seeded_db, location_service):
"""rename_location creates a LocationRenamed event."""
ts_utc = int(time.time() * 1000)
# First create a location
create_event = location_service.create_location("Original Name", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
# Rename it
event = location_service.rename_location(
location_id, "New Name", ts_utc + 1000, "test_user"
)
assert event.type == LOCATION_RENAMED
assert event.actor == "test_user"
assert event.ts_utc == ts_utc + 1000
def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service):
"""Event entity_refs contains location_id."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Rename Test", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
event = location_service.rename_location(location_id, "Renamed", ts_utc + 1000, "test_user")
assert event.entity_refs["location_id"] == location_id
def test_event_payload_contains_new_name(self, seeded_db, location_service):
"""Event payload contains the new name."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Before", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
event = location_service.rename_location(location_id, "After", ts_utc + 1000, "test_user")
assert event.payload["new_name"] == "After"
def test_updates_locations_table_name(self, seeded_db, location_service):
"""rename_location updates the name in locations table."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Old Name", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
location_service.rename_location(location_id, "Updated Name", ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT name FROM locations WHERE id = ?", (location_id,)
).fetchone()
assert row[0] == "Updated Name"
class TestLocationServiceRenameValidation:
"""Tests for rename_location() validation."""
def test_rejects_nonexistent_location(self, seeded_db, location_service):
"""Raises ValidationError for non-existent location_id."""
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
with pytest.raises(ValidationError, match="not found"):
location_service.rename_location(
fake_id, "New Name", int(time.time() * 1000), "test_user"
)
def test_rejects_duplicate_name(self, seeded_db, location_service):
"""Raises ValidationError when renaming to an existing name."""
ts_utc = int(time.time() * 1000)
# Create two locations
location_service.create_location("First Location", ts_utc, "test_user")
create_event2 = location_service.create_location(
"Second Location", ts_utc + 1000, "test_user"
)
location_id = create_event2.entity_refs["location_id"]
# Try to rename second to first's name
with pytest.raises(ValidationError, match="already exists"):
location_service.rename_location(
location_id, "First Location", ts_utc + 2000, "test_user"
)
def test_rejects_empty_new_name(self, seeded_db, location_service):
"""Raises ValidationError for empty new_name."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Test", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
with pytest.raises(ValidationError, match="name"):
location_service.rename_location(location_id, "", ts_utc + 1000, "test_user")
def test_allows_rename_to_same_name(self, seeded_db, location_service):
"""Allows renaming to the same name (no-op)."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Same Name", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
# Should not raise - renaming to same name is allowed (no-op)
event = location_service.rename_location(
location_id, "Same Name", ts_utc + 1000, "test_user"
)
assert event.type == LOCATION_RENAMED
# =============================================================================
# archive_location Tests
# =============================================================================
class TestLocationServiceArchive:
"""Tests for archive_location()."""
def test_creates_location_archived_event(self, seeded_db, location_service):
"""archive_location creates a LocationArchived event."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("To Archive", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
event = location_service.archive_location(location_id, ts_utc + 1000, "test_user")
assert event.type == LOCATION_ARCHIVED
assert event.actor == "test_user"
assert event.ts_utc == ts_utc + 1000
def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service):
"""Event entity_refs contains location_id."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Archive Test", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
event = location_service.archive_location(location_id, ts_utc + 1000, "test_user")
assert event.entity_refs["location_id"] == location_id
def test_sets_location_inactive(self, seeded_db, location_service):
"""archive_location sets active=0 in locations table."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Active Location", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
# Verify initially active
row = seeded_db.execute(
"SELECT active FROM locations WHERE id = ?", (location_id,)
).fetchone()
assert row[0] == 1
# Archive it
location_service.archive_location(location_id, ts_utc + 1000, "test_user")
# Verify now inactive
row = seeded_db.execute(
"SELECT active FROM locations WHERE id = ?", (location_id,)
).fetchone()
assert row[0] == 0
def test_location_not_in_list_active(self, seeded_db, location_service):
"""Archived location does not appear in list_active()."""
from animaltrack.repositories.locations import LocationRepository
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Will Be Archived", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
# Before archive - should be in list
active_before = LocationRepository(seeded_db).list_active()
assert any(loc.id == location_id for loc in active_before)
# Archive it
location_service.archive_location(location_id, ts_utc + 1000, "test_user")
# After archive - should not be in list
active_after = LocationRepository(seeded_db).list_active()
assert not any(loc.id == location_id for loc in active_after)
class TestLocationServiceArchiveValidation:
"""Tests for archive_location() validation."""
def test_rejects_nonexistent_location(self, seeded_db, location_service):
"""Raises ValidationError for non-existent location_id."""
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
with pytest.raises(ValidationError, match="not found"):
location_service.archive_location(fake_id, int(time.time() * 1000), "test_user")
def test_rejects_already_archived_location(self, seeded_db, location_service):
"""Raises ValidationError when archiving an already archived location."""
ts_utc = int(time.time() * 1000)
create_event = location_service.create_location("Already Archived", ts_utc, "test_user")
location_id = create_event.entity_refs["location_id"]
# Archive once
location_service.archive_location(location_id, ts_utc + 1000, "test_user")
# Try to archive again
with pytest.raises(ValidationError, match="already archived"):
location_service.archive_location(location_id, ts_utc + 2000, "test_user")

443
tests/test_web_locations.py Normal file
View File

@@ -0,0 +1,443 @@
# ABOUTME: Tests for Location management web routes.
# ABOUTME: Covers GET /locations list and POST /actions/location-* (admin-only).
import json
import os
import time
import pytest
from starlette.testclient import TestClient
from animaltrack.models.reference import User, UserRole
from animaltrack.repositories.users import UserRepository
def make_test_settings(
csrf_secret: str = "test-secret",
trusted_proxy_ips: str = "127.0.0.1",
dev_mode: bool = True,
):
"""Create Settings for testing by setting env vars temporarily."""
from animaltrack.config import Settings
old_env = os.environ.copy()
try:
os.environ["CSRF_SECRET"] = csrf_secret
os.environ["TRUSTED_PROXY_IPS"] = trusted_proxy_ips
os.environ["DEV_MODE"] = str(dev_mode).lower()
return Settings()
finally:
os.environ.clear()
os.environ.update(old_env)
@pytest.fixture
def admin_client(seeded_db):
"""Test client with admin role (dev mode - bypasses CSRF, auto-admin auth)."""
from animaltrack.web.app import create_app
settings = make_test_settings(trusted_proxy_ips="testclient", dev_mode=True)
app, rt = create_app(settings=settings, db=seeded_db)
return TestClient(app, raise_server_exceptions=True)
@pytest.fixture
def user_client(seeded_db):
"""Test client with regular (recorder) role."""
from animaltrack.web.app import create_app
# Create recorder user in database
user_repo = UserRepository(seeded_db)
now = int(time.time() * 1000)
user = User(
username="recorder",
display_name="Recorder User",
role=UserRole.RECORDER,
active=True,
created_at_utc=now,
updated_at_utc=now,
)
user_repo.upsert(user)
# Use dev_mode=False to enable real auth
settings = make_test_settings(trusted_proxy_ips="testclient", dev_mode=False)
app, rt = create_app(settings=settings, db=seeded_db)
client = TestClient(app, raise_server_exceptions=True)
# Set header to simulate trusted proxy auth
client.headers["X-Oidc-Username"] = "recorder"
return client
# =============================================================================
# GET /locations Tests
# =============================================================================
class TestLocationListRendering:
"""Tests for GET /locations list rendering."""
def test_locations_page_renders_for_admin(self, admin_client):
"""GET /locations returns 200 for admin."""
resp = admin_client.get("/locations")
assert resp.status_code == 200
def test_locations_page_shows_seeded_locations(self, admin_client):
"""Locations page shows seeded locations."""
resp = admin_client.get("/locations")
assert resp.status_code == 200
assert "Strip 1" in resp.text
assert "Strip 2" in resp.text
assert "Nursery 1" in resp.text
def test_locations_page_returns_403_for_recorder(self, user_client):
"""GET /locations returns 403 for non-admin."""
resp = user_client.get("/locations")
assert resp.status_code == 403
# =============================================================================
# POST /actions/location-created Tests
# =============================================================================
class TestLocationCreatedSuccess:
"""Tests for successful POST /actions/location-created."""
def test_creates_location(self, admin_client, seeded_db):
"""POST creates a new location."""
resp = admin_client.post(
"/actions/location-created",
data={
"name": "New Test Location",
"nonce": "test-nonce-loc-1",
},
)
assert resp.status_code == 200
# Verify location was created
row = seeded_db.execute(
"SELECT name, active FROM locations WHERE name = 'New Test Location'"
).fetchone()
assert row is not None
assert row[0] == "New Test Location"
assert row[1] == 1
def test_creates_event(self, admin_client, seeded_db):
"""POST creates LocationCreated event."""
resp = admin_client.post(
"/actions/location-created",
data={
"name": "Event Test Location",
"nonce": "test-nonce-loc-2",
},
)
assert resp.status_code == 200
event_row = seeded_db.execute(
"SELECT type FROM events WHERE type = 'LocationCreated' ORDER BY id DESC LIMIT 1"
).fetchone()
assert event_row is not None
assert event_row[0] == "LocationCreated"
def test_returns_toast_on_success(self, admin_client):
"""POST returns HX-Trigger with toast message."""
resp = admin_client.post(
"/actions/location-created",
data={
"name": "Toast Test Location",
"nonce": "test-nonce-loc-3",
},
)
assert resp.status_code == 200
assert "HX-Trigger" in resp.headers
trigger = json.loads(resp.headers["HX-Trigger"])
assert "showToast" in trigger
assert trigger["showToast"]["type"] == "success"
def test_idempotent_for_existing_name(self, admin_client, seeded_db):
"""POST is idempotent - returns success for existing name."""
# First create
resp1 = admin_client.post(
"/actions/location-created",
data={
"name": "Idempotent Location",
"nonce": "test-nonce-loc-4",
},
)
assert resp1.status_code == 200
# Second create with same name - should still succeed (idempotent)
resp2 = admin_client.post(
"/actions/location-created",
data={
"name": "Idempotent Location",
"nonce": "test-nonce-loc-5",
},
)
assert resp2.status_code == 200
class TestLocationCreatedValidation:
"""Tests for POST /actions/location-created validation."""
def test_returns_403_for_recorder(self, user_client):
"""POST returns 403 for non-admin."""
resp = user_client.post(
"/actions/location-created",
data={
"name": "Unauthorized Location",
"nonce": "test-nonce-loc-6",
},
)
assert resp.status_code == 403
def test_returns_422_for_empty_name(self, admin_client):
"""POST returns 422 for empty name."""
resp = admin_client.post(
"/actions/location-created",
data={
"name": "",
"nonce": "test-nonce-loc-7",
},
)
assert resp.status_code == 422
def test_returns_422_for_whitespace_name(self, admin_client):
"""POST returns 422 for whitespace-only name."""
resp = admin_client.post(
"/actions/location-created",
data={
"name": " ",
"nonce": "test-nonce-loc-8",
},
)
assert resp.status_code == 422
# =============================================================================
# POST /actions/location-renamed Tests
# =============================================================================
class TestLocationRenamedSuccess:
"""Tests for successful POST /actions/location-renamed."""
def test_renames_location(self, admin_client, seeded_db):
"""POST renames an existing location."""
# Get an existing location ID
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
location_id = row[0]
resp = admin_client.post(
"/actions/location-renamed",
data={
"location_id": location_id,
"new_name": "Renamed Strip 1",
"nonce": "test-nonce-loc-9",
},
)
assert resp.status_code == 200
# Verify rename
row = seeded_db.execute(
"SELECT name FROM locations WHERE id = ?", (location_id,)
).fetchone()
assert row[0] == "Renamed Strip 1"
def test_creates_event(self, admin_client, seeded_db):
"""POST creates LocationRenamed event."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
location_id = row[0]
resp = admin_client.post(
"/actions/location-renamed",
data={
"location_id": location_id,
"new_name": "Renamed Strip 2",
"nonce": "test-nonce-loc-10",
},
)
assert resp.status_code == 200
event_row = seeded_db.execute(
"SELECT type FROM events WHERE type = 'LocationRenamed' ORDER BY id DESC LIMIT 1"
).fetchone()
assert event_row is not None
class TestLocationRenamedValidation:
"""Tests for POST /actions/location-renamed validation."""
def test_returns_403_for_recorder(self, user_client, seeded_db):
"""POST returns 403 for non-admin."""
row = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()
location_id = row[0]
resp = user_client.post(
"/actions/location-renamed",
data={
"location_id": location_id,
"new_name": "New Name",
"nonce": "test-nonce-loc-11",
},
)
assert resp.status_code == 403
def test_returns_422_for_nonexistent_location(self, admin_client):
"""POST returns 422 for non-existent location."""
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
resp = admin_client.post(
"/actions/location-renamed",
data={
"location_id": fake_id,
"new_name": "New Name",
"nonce": "test-nonce-loc-12",
},
)
assert resp.status_code == 422
def test_returns_422_for_duplicate_name(self, admin_client, seeded_db):
"""POST returns 422 when renaming to existing name."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 3'").fetchone()
location_id = row[0]
# Try to rename to an existing name
resp = admin_client.post(
"/actions/location-renamed",
data={
"location_id": location_id,
"new_name": "Strip 4", # Already exists
"nonce": "test-nonce-loc-13",
},
)
assert resp.status_code == 422
# =============================================================================
# POST /actions/location-archived Tests
# =============================================================================
class TestLocationArchivedSuccess:
"""Tests for successful POST /actions/location-archived."""
def test_archives_location(self, admin_client, seeded_db):
"""POST archives an existing location."""
# First create a location to archive
admin_client.post(
"/actions/location-created",
data={"name": "To Be Archived", "nonce": "test-nonce-loc-14"},
)
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'To Be Archived'").fetchone()
location_id = row[0]
resp = admin_client.post(
"/actions/location-archived",
data={
"location_id": location_id,
"nonce": "test-nonce-loc-15",
},
)
assert resp.status_code == 200
# Verify archived
row = seeded_db.execute(
"SELECT active FROM locations WHERE id = ?", (location_id,)
).fetchone()
assert row[0] == 0
def test_creates_event(self, admin_client, seeded_db):
"""POST creates LocationArchived event."""
# Create a location to archive
admin_client.post(
"/actions/location-created",
data={"name": "Archive Event Test", "nonce": "test-nonce-loc-16"},
)
row = seeded_db.execute(
"SELECT id FROM locations WHERE name = 'Archive Event Test'"
).fetchone()
location_id = row[0]
resp = admin_client.post(
"/actions/location-archived",
data={
"location_id": location_id,
"nonce": "test-nonce-loc-17",
},
)
assert resp.status_code == 200
event_row = seeded_db.execute(
"SELECT type FROM events WHERE type = 'LocationArchived' ORDER BY id DESC LIMIT 1"
).fetchone()
assert event_row is not None
class TestLocationArchivedValidation:
"""Tests for POST /actions/location-archived validation."""
def test_returns_403_for_recorder(self, user_client, seeded_db):
"""POST returns 403 for non-admin."""
row = seeded_db.execute("SELECT id FROM locations LIMIT 1").fetchone()
location_id = row[0]
resp = user_client.post(
"/actions/location-archived",
data={
"location_id": location_id,
"nonce": "test-nonce-loc-18",
},
)
assert resp.status_code == 403
def test_returns_422_for_nonexistent_location(self, admin_client):
"""POST returns 422 for non-existent location."""
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
resp = admin_client.post(
"/actions/location-archived",
data={
"location_id": fake_id,
"nonce": "test-nonce-loc-19",
},
)
assert resp.status_code == 422
def test_returns_422_for_already_archived(self, admin_client, seeded_db):
"""POST returns 422 when archiving already archived location."""
# Create and archive a location
admin_client.post(
"/actions/location-created",
data={"name": "Double Archive Test", "nonce": "test-nonce-loc-20"},
)
row = seeded_db.execute(
"SELECT id FROM locations WHERE name = 'Double Archive Test'"
).fetchone()
location_id = row[0]
# Archive once
admin_client.post(
"/actions/location-archived",
data={"location_id": location_id, "nonce": "test-nonce-loc-21"},
)
# Try to archive again
resp = admin_client.post(
"/actions/location-archived",
data={
"location_id": location_id,
"nonce": "test-nonce-loc-22",
},
)
assert resp.status_code == 422

109
tests/test_web_responses.py Normal file
View File

@@ -0,0 +1,109 @@
# ABOUTME: Tests for web response helper functions.
# ABOUTME: Tests error_response, error_toast, and success_toast helpers.
import json
from starlette.responses import HTMLResponse
from animaltrack.web.responses import error_response, error_toast, success_toast
class TestErrorResponse:
"""Tests for error_response()."""
def test_returns_html_response(self):
"""error_response returns an HTMLResponse."""
response = error_response("<div>Error</div>")
assert isinstance(response, HTMLResponse)
def test_default_status_422(self):
"""error_response defaults to 422 status code."""
response = error_response("<div>Validation error</div>")
assert response.status_code == 422
def test_custom_status_code(self):
"""error_response accepts custom status code."""
response = error_response("<div>Conflict</div>", status_code=409)
assert response.status_code == 409
def test_403_status_code(self):
"""error_response works with 403 status code."""
response = error_response("<div>Forbidden</div>", status_code=403)
assert response.status_code == 403
def test_401_status_code(self):
"""error_response works with 401 status code."""
response = error_response("<div>Unauthorized</div>", status_code=401)
assert response.status_code == 401
def test_content_in_body(self):
"""error_response includes content in body."""
response = error_response("<p>Error message</p>")
assert b"<p>Error message</p>" in response.body
class TestErrorToast:
"""Tests for error_toast()."""
def test_returns_html_response(self):
"""error_toast returns an HTMLResponse."""
response = error_toast("Something went wrong")
assert isinstance(response, HTMLResponse)
def test_default_status_422(self):
"""error_toast defaults to 422 status code."""
response = error_toast("Validation failed")
assert response.status_code == 422
def test_custom_status_code(self):
"""error_toast accepts custom status code."""
response = error_toast("Not allowed", status_code=403)
assert response.status_code == 403
def test_has_hx_trigger_header(self):
"""error_toast sets HX-Trigger header."""
response = error_toast("Error message")
assert "HX-Trigger" in response.headers
def test_toast_message_in_header(self):
"""error_toast includes message in HX-Trigger header."""
response = error_toast("Error message")
trigger = json.loads(response.headers["HX-Trigger"])
assert trigger["showToast"]["message"] == "Error message"
def test_toast_type_error(self):
"""error_toast sets toast type to error."""
response = error_toast("Error message")
trigger = json.loads(response.headers["HX-Trigger"])
assert trigger["showToast"]["type"] == "error"
class TestSuccessToast:
"""Tests for success_toast()."""
def test_returns_dict(self):
"""success_toast returns a dict (for HX-Trigger header)."""
result = success_toast("Action completed")
assert isinstance(result, dict)
def test_has_show_toast_key(self):
"""success_toast dict has showToast key."""
result = success_toast("Action completed")
assert "showToast" in result
def test_message_in_toast(self):
"""success_toast includes message."""
result = success_toast("Created successfully")
assert result["showToast"]["message"] == "Created successfully"
def test_toast_type_success(self):
"""success_toast sets toast type to success."""
result = success_toast("Done")
assert result["showToast"]["type"] == "success"
def test_can_json_dumps(self):
"""success_toast result can be JSON serialized."""
result = success_toast("Test message")
serialized = json.dumps(result)
assert '"showToast"' in serialized
assert '"Test message"' in serialized