feat: implement Move Animals UI with optimistic locking (Step 7.5)

Add Move Animals form with selection context validation and concurrent
change handling via optimistic locking. When selection changes between
client resolution and submit, the user is shown a diff panel and can
confirm to proceed with the current server resolution.

Key changes:
- Add move template with form and diff panel components
- Add move routes (GET /move, POST /actions/animal-move)
- Register move routes in app
- Fix to_xml() usage for HTMLResponse (was using str())
- Use max(current_time, form_ts) for confirmed re-resolution

Tests:
- 15 route tests covering form rendering, success, validation, mismatch
- 7 E2E tests for optimistic lock flow (spec §21.8)

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-30 14:31:03 +00:00
parent b1bfdfb05c
commit ff4fa86beb
7 changed files with 1530 additions and 4 deletions

View File

@@ -17,7 +17,12 @@ from animaltrack.web.middleware import (
csrf_before,
request_id_before,
)
from animaltrack.web.routes import register_egg_routes, register_feed_routes, register_health_routes
from animaltrack.web.routes import (
register_egg_routes,
register_feed_routes,
register_health_routes,
register_move_routes,
)
# Default static directory relative to this module
DEFAULT_STATIC_DIR = Path(__file__).parent.parent / "static"
@@ -127,5 +132,6 @@ def create_app(
register_health_routes(rt, app)
register_egg_routes(rt, app)
register_feed_routes(rt, app)
register_move_routes(rt, app)
return app, rt

View File

@@ -4,5 +4,11 @@
from animaltrack.web.routes.eggs import register_egg_routes
from animaltrack.web.routes.feed import register_feed_routes
from animaltrack.web.routes.health import register_health_routes
from animaltrack.web.routes.move import register_move_routes
__all__ = ["register_egg_routes", "register_feed_routes", "register_health_routes"]
__all__ = [
"register_egg_routes",
"register_feed_routes",
"register_health_routes",
"register_move_routes",
]

View File

@@ -7,6 +7,7 @@ import json
import time
from typing import Any
from fasthtml.common import to_xml
from starlette.requests import Request
from starlette.responses import HTMLResponse
@@ -137,7 +138,7 @@ async def product_collected(request: Request):
# Success: re-render form with location sticking, qty cleared
response = HTMLResponse(
content=str(
content=to_xml(
page(
egg_form(locations, selected_location_id=location_id, action=product_collected),
title="Egg - AnimalTrack",
@@ -177,7 +178,7 @@ def _render_error_form(locations, selected_location_id, error_message):
HTMLResponse with 422 status.
"""
return HTMLResponse(
content=str(
content=to_xml(
page(
egg_form(
locations,

View File

@@ -0,0 +1,326 @@
# ABOUTME: Routes for Move Animals functionality.
# ABOUTME: Handles GET /move form and POST /actions/animal-move with optimistic locking.
from __future__ import annotations
import json
import time
from typing import Any
from fasthtml.common import to_xml
from starlette.requests import Request
from starlette.responses import HTMLResponse
from animaltrack.events.payloads import AnimalMovedPayload
from animaltrack.events.store import EventStore
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.repositories.locations import LocationRepository
from animaltrack.selection import compute_roster_hash, parse_filter, resolve_filter
from animaltrack.selection.validation import SelectionContext, validate_selection
from animaltrack.services.animal import AnimalService, ValidationError
from animaltrack.web.templates import page
from animaltrack.web.templates.move import diff_panel, move_form
def _get_from_location(
db: Any, animal_ids: list[str], ts_utc: int
) -> tuple[str | None, str | None]:
"""Get the common from_location_id for all animals at given timestamp.
Args:
db: Database connection.
animal_ids: List of animal IDs to check.
ts_utc: Timestamp for location lookup.
Returns:
Tuple of (from_location_id, from_location_name) or (None, None) if
animals are from multiple locations.
"""
if not animal_ids:
return None, None
# Get current location for each animal using location intervals
query = """
SELECT DISTINCT ali.location_id, l.name
FROM animal_location_intervals ali
JOIN locations l ON ali.location_id = l.id
WHERE ali.animal_id IN ({})
AND ali.start_utc <= ?
AND (ali.end_utc IS NULL OR ali.end_utc > ?)
""".format(",".join("?" * len(animal_ids)))
params = list(animal_ids) + [ts_utc, ts_utc]
rows = db.execute(query, params).fetchall()
if len(rows) != 1:
# Animals are from multiple locations or no location found
return None, None
return rows[0][0], rows[0][1]
def move_index(request: Request):
"""GET /move - Move Animals form."""
db = request.app.state.db
locations = LocationRepository(db).list_active()
# Get filter from query params
filter_str = request.query_params.get("filter", "")
# Resolve selection if filter provided
ts_utc = int(time.time() * 1000)
resolved_ids: list[str] = []
roster_hash = ""
from_location_id = None
from_location_name = None
if filter_str or not request.query_params:
# If no filter, default to empty (show all alive animals)
filter_ast = parse_filter(filter_str)
resolution = resolve_filter(db, filter_ast, ts_utc)
resolved_ids = resolution.animal_ids
if resolved_ids:
from_location_id, from_location_name = _get_from_location(db, resolved_ids, ts_utc)
roster_hash = compute_roster_hash(resolved_ids, from_location_id)
return page(
move_form(
locations,
filter_str=filter_str,
resolved_ids=resolved_ids,
roster_hash=roster_hash,
from_location_id=from_location_id,
ts_utc=ts_utc,
resolved_count=len(resolved_ids),
from_location_name=from_location_name,
action=animal_move,
),
title="Move - AnimalTrack",
active_nav="move",
)
async def animal_move(request: Request):
"""POST /actions/animal-move - Move animals to new location."""
db = request.app.state.db
form = await request.form()
# Extract form data
filter_str = form.get("filter", "")
to_location_id = form.get("to_location_id", "")
from_location_id = form.get("from_location_id", "") or None
roster_hash = form.get("roster_hash", "")
confirmed = form.get("confirmed", "") == "true"
nonce = form.get("nonce")
# Get timestamp - use provided or current
ts_utc_str = form.get("ts_utc", "0")
try:
ts_utc = int(ts_utc_str)
if ts_utc == 0:
ts_utc = int(time.time() * 1000)
except ValueError:
ts_utc = int(time.time() * 1000)
# resolved_ids can be multiple values
resolved_ids = form.getlist("resolved_ids")
# Get locations for potential re-render
locations = LocationRepository(db).list_active()
# Validation: destination required
if not to_location_id:
return _render_error_form(db, locations, filter_str, "Please select a destination")
# Validation: must have animals
if not resolved_ids:
return _render_error_form(db, locations, filter_str, "No animals selected to move")
# Validation: destination must be different from source
if to_location_id == from_location_id:
return _render_error_form(
db, locations, filter_str, "Destination must be different from source"
)
# Validate destination exists and is active
dest_location = None
for loc in locations:
if loc.id == to_location_id:
dest_location = loc
break
if not dest_location:
return _render_error_form(db, locations, filter_str, "Invalid destination location")
# Build selection context for validation
context = SelectionContext(
filter=filter_str,
resolved_ids=list(resolved_ids),
roster_hash=roster_hash,
ts_utc=ts_utc,
from_location_id=from_location_id,
confirmed=confirmed,
)
# Validate selection (check for concurrent changes)
result = validate_selection(db, context)
if not result.valid:
# Mismatch detected - return 409 with diff panel
return HTMLResponse(
content=to_xml(
page(
diff_panel(
diff=result.diff,
filter_str=filter_str,
resolved_ids=result.resolved_ids,
roster_hash=result.roster_hash,
from_location_id=from_location_id,
to_location_id=to_location_id,
ts_utc=ts_utc,
locations=locations,
action=animal_move,
),
title="Move - AnimalTrack",
active_nav="move",
)
),
status_code=409,
)
# When confirmed, re-resolve to get current server IDs (per spec: "server re-resolves")
if confirmed:
# Re-resolve the filter at current timestamp to get animals still matching
# Use max of current time and form's ts_utc to ensure we resolve at least
# as late as the submission - important when moves happened after client's resolution
current_ts = max(int(time.time() * 1000), ts_utc)
filter_ast = parse_filter(filter_str)
current_resolution = resolve_filter(db, filter_ast, current_ts)
ids_to_move = current_resolution.animal_ids
# Update from_location_id based on current resolution
from_location_id, _ = _get_from_location(db, ids_to_move, current_ts)
else:
ids_to_move = resolved_ids
# Check we still have animals to move after validation
if not ids_to_move:
return _render_error_form(db, locations, filter_str, "No animals remaining to move")
# Create animal service
event_store = EventStore(db)
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(db))
registry.register(EventAnimalsProjection(db))
registry.register(IntervalProjection(db))
animal_service = AnimalService(db, event_store, registry)
# Create payload
payload = AnimalMovedPayload(
resolved_ids=list(ids_to_move),
to_location_id=to_location_id,
)
# Get actor from auth
auth = request.scope.get("auth")
actor = auth.username if auth else "unknown"
# Move animals
try:
animal_service.move_animals(
payload, ts_utc, actor, nonce=nonce, route="/actions/animal-move"
)
except ValidationError as e:
return _render_error_form(db, locations, filter_str, str(e))
# Success: re-render fresh form (nothing sticks per spec)
response = HTMLResponse(
content=to_xml(
page(
move_form(
locations,
action=animal_move,
),
title="Move - AnimalTrack",
active_nav="move",
)
),
)
# Add toast trigger header
response.headers["HX-Trigger"] = json.dumps(
{
"showToast": {
"message": f"Moved {len(ids_to_move)} animals to {dest_location.name}",
"type": "success",
}
}
)
return response
def register_move_routes(rt, app):
"""Register move routes.
Args:
rt: FastHTML route decorator.
app: FastHTML application instance.
"""
rt("/move")(move_index)
rt("/actions/animal-move", methods=["POST"])(animal_move)
def _render_error_form(db, locations, filter_str, error_message):
"""Render form with error message.
Args:
db: Database connection.
locations: List of active locations.
filter_str: Current filter string.
error_message: Error message to display.
Returns:
HTMLResponse with 422 status.
"""
# Re-resolve to show current selection info
ts_utc = int(time.time() * 1000)
resolved_ids: list[str] = []
roster_hash = ""
from_location_id = None
from_location_name = None
if filter_str:
filter_ast = parse_filter(filter_str)
resolution = resolve_filter(db, filter_ast, ts_utc)
resolved_ids = resolution.animal_ids
if resolved_ids:
from_location_id, from_location_name = _get_from_location(db, resolved_ids, ts_utc)
roster_hash = compute_roster_hash(resolved_ids, from_location_id)
return HTMLResponse(
content=to_xml(
page(
move_form(
locations,
filter_str=filter_str,
resolved_ids=resolved_ids,
roster_hash=roster_hash,
from_location_id=from_location_id,
ts_utc=ts_utc,
resolved_count=len(resolved_ids),
from_location_name=from_location_name,
error=error_message,
action=animal_move,
),
title="Move - AnimalTrack",
active_nav="move",
)
),
status_code=422,
)

View File

@@ -0,0 +1,214 @@
# ABOUTME: Templates for Move Animals form.
# ABOUTME: Provides form components for moving animals with selection context and mismatch handling.
from collections.abc import Callable
from typing import Any
from fasthtml.common import H2, Div, Form, Hidden, Option, P, Span
from monsterui.all import Alert, AlertT, Button, ButtonT, LabelInput, LabelSelect
from ulid import ULID
from animaltrack.models.reference import Location
from animaltrack.selection.validation import SelectionDiff
def move_form(
locations: list[Location],
filter_str: str = "",
resolved_ids: list[str] | None = None,
roster_hash: str = "",
from_location_id: str | None = None,
ts_utc: int | None = None,
resolved_count: int = 0,
from_location_name: str | None = None,
error: str | None = None,
action: Callable[..., Any] | str = "/actions/animal-move",
) -> Form:
"""Create the Move Animals form.
Args:
locations: List of active locations for the dropdown.
filter_str: Current filter string (DSL).
resolved_ids: Resolved animal IDs from filter.
roster_hash: Hash of resolved selection.
from_location_id: Common source location ID (if all animals from same location).
ts_utc: Timestamp of resolution.
resolved_count: Number of resolved animals.
from_location_name: Name of source location for display.
error: Optional error message to display.
action: Route function or URL string for form submission.
Returns:
Form component for moving animals.
"""
if resolved_ids is None:
resolved_ids = []
# Build destination location options (exclude from_location if set)
location_options = [Option("Select destination...", value="", disabled=True, selected=True)]
for loc in locations:
if loc.id != from_location_id:
location_options.append(Option(loc.name, value=loc.id))
# Error display component
error_component = None
if error:
error_component = Alert(
error,
cls=AlertT.warning,
)
# Selection preview component
selection_preview = None
if resolved_count > 0:
location_info = f" from {from_location_name}" if from_location_name else ""
selection_preview = Div(
P(
Span(f"{resolved_count}", cls="font-bold text-lg"),
f" animals selected{location_info}",
cls="text-sm",
),
cls="p-3 bg-slate-100 dark:bg-slate-800 rounded-md mb-4",
)
elif filter_str:
selection_preview = Div(
P("No animals match this filter", cls="text-sm text-amber-600"),
cls="p-3 bg-amber-50 dark:bg-amber-900/20 rounded-md mb-4",
)
# Hidden fields for resolved_ids (as multiple values)
resolved_id_fields = [
Hidden(name="resolved_ids", value=animal_id) for animal_id in resolved_ids
]
return Form(
H2("Move Animals", cls="text-xl font-bold mb-4"),
# Error message if present
error_component,
# Filter input
LabelInput(
"Filter",
id="filter",
name="filter",
value=filter_str,
placeholder='e.g., location:"Strip 1" species:duck',
),
# Selection preview
selection_preview,
# Destination dropdown
LabelSelect(
*location_options,
label="Destination",
id="to_location_id",
name="to_location_id",
),
# Hidden fields for selection context
*resolved_id_fields,
Hidden(name="roster_hash", value=roster_hash),
Hidden(name="from_location_id", value=from_location_id or ""),
Hidden(name="ts_utc", value=str(ts_utc or 0)),
Hidden(name="resolver_version", value="v1"),
Hidden(name="confirmed", value=""),
Hidden(name="nonce", value=str(ULID())),
# Submit button
Button("Move Animals", type="submit", cls=ButtonT.primary),
# Form submission via standard action/method (hx-boost handles AJAX)
action=action,
method="post",
cls="space-y-4",
)
def diff_panel(
diff: SelectionDiff,
filter_str: str,
resolved_ids: list[str],
roster_hash: str,
from_location_id: str,
to_location_id: str,
ts_utc: int,
locations: list[Location],
action: Callable[..., Any] | str = "/actions/animal-move",
) -> Div:
"""Create the mismatch confirmation panel.
Shows diff information and allows user to confirm or cancel.
Args:
diff: SelectionDiff with added/removed counts.
filter_str: Original filter string.
resolved_ids: Server's resolved IDs (current).
roster_hash: Server's roster hash (current).
from_location_id: Source location ID.
to_location_id: Destination location ID.
ts_utc: Timestamp for resolution.
locations: List of locations for display.
action: Route function or URL for confirmation submit.
Returns:
Div containing the diff panel with confirm button.
"""
# Find destination location name
to_location_name = "Unknown"
for loc in locations:
if loc.id == to_location_id:
to_location_name = loc.name
break
# Build description of changes
changes = []
if diff.removed:
changes.append(f"{len(diff.removed)} animals were moved since you loaded this page")
if diff.added:
changes.append(f"{len(diff.added)} animals were added")
changes_text = ". ".join(changes) + "." if changes else "The selection has changed."
# Build confirmation form with hidden fields
resolved_id_fields = [
Hidden(name="resolved_ids", value=animal_id) for animal_id in resolved_ids
]
confirm_form = Form(
*resolved_id_fields,
Hidden(name="filter", value=filter_str),
Hidden(name="roster_hash", value=roster_hash),
Hidden(name="from_location_id", value=from_location_id),
Hidden(name="to_location_id", value=to_location_id),
Hidden(name="ts_utc", value=str(ts_utc)),
Hidden(name="resolver_version", value="v1"),
Hidden(name="confirmed", value="true"),
Hidden(name="nonce", value=str(ULID())),
Div(
Button(
"Cancel",
type="button",
cls=ButtonT.default,
onclick="window.location.href='/move'",
),
Button(
f"Confirm Move ({diff.server_count} animals)",
type="submit",
cls=ButtonT.primary,
),
cls="flex gap-3 mt-4",
),
action=action,
method="post",
)
return Div(
Alert(
Div(
P("Selection Changed", cls="font-bold text-lg mb-2"),
P(changes_text, cls="mb-2"),
P(
f"Would you like to proceed with the remaining {diff.server_count} animals to {to_location_name}?",
cls="text-sm",
),
),
cls=AlertT.warning,
),
confirm_form,
cls="space-y-4",
)

View File

@@ -0,0 +1,499 @@
# ABOUTME: E2E test #8 from spec section 21.8: Optimistic locking with confirm.
# ABOUTME: Tests concurrent move handling with selection validation and confirmation flow.
import time
import pytest
from animaltrack.events.payloads import (
AnimalCohortCreatedPayload,
AnimalMovedPayload,
)
from animaltrack.events.store import EventStore
from animaltrack.events.types import ANIMAL_MOVED
from animaltrack.selection import compute_roster_hash, parse_filter, resolve_filter
from animaltrack.selection.validation import SelectionContext, validate_selection
@pytest.fixture
def now_utc():
"""Current time in milliseconds since epoch."""
return int(time.time() * 1000)
@pytest.fixture
def full_projection_registry(seeded_db):
"""Create a ProjectionRegistry with all projections."""
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.feed import FeedInventoryProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.projections.products import ProductsProjection
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(ProductsProjection(seeded_db))
registry.register(FeedInventoryProjection(seeded_db))
return registry
@pytest.fixture
def services(seeded_db, full_projection_registry):
"""Create all services needed for E2E test."""
from animaltrack.services.animal import AnimalService
event_store = EventStore(seeded_db)
return {
"db": seeded_db,
"event_store": event_store,
"registry": full_projection_registry,
"animal_service": AnimalService(seeded_db, event_store, full_projection_registry),
}
@pytest.fixture
def strip1_id(seeded_db):
"""Get Strip 1 location ID from seeds."""
return seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0]
@pytest.fixture
def strip2_id(seeded_db):
"""Get Strip 2 location ID from seeds."""
return seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0]
@pytest.fixture
def nursery1_id(seeded_db):
"""Get Nursery 1 location ID from seeds."""
return seeded_db.execute("SELECT id FROM locations WHERE name = 'Nursery 1'").fetchone()[0]
@pytest.fixture
def optimistic_lock_scenario(seeded_db, services, now_utc, strip1_id, strip2_id):
"""Set up optimistic lock test scenario.
Creates:
- 5 adult female ducks at Strip 1
- 3 adult female ducks at Strip 2
Returns dict with animal IDs and references.
"""
one_day_ms = 24 * 60 * 60 * 1000
animal_creation_ts = now_utc - one_day_ms
# Create 5 adult female ducks at Strip 1
cohort1_payload = AnimalCohortCreatedPayload(
species="duck",
count=5,
life_stage="adult",
sex="female",
location_id=strip1_id,
origin="purchased",
)
cohort1_event = services["animal_service"].create_cohort(
cohort1_payload, animal_creation_ts, "test_user"
)
strip1_animal_ids = cohort1_event.entity_refs["animal_ids"]
# Create 3 adult female ducks at Strip 2
cohort2_payload = AnimalCohortCreatedPayload(
species="duck",
count=3,
life_stage="adult",
sex="female",
location_id=strip2_id,
origin="purchased",
)
cohort2_event = services["animal_service"].create_cohort(
cohort2_payload, animal_creation_ts, "test_user"
)
strip2_animal_ids = cohort2_event.entity_refs["animal_ids"]
return {
"strip1_id": strip1_id,
"strip2_id": strip2_id,
"strip1_animal_ids": strip1_animal_ids,
"strip2_animal_ids": strip2_animal_ids,
"animal_creation_ts": animal_creation_ts,
}
class TestE2E8OptimisticLockWithConfirm:
"""E2E test #8: Optimistic locking with confirm from spec section 21.8.
Scenario:
- Setup: Strip 1 = 5 female ducks, Strip 2 = 3 female ducks
- Client A resolves filter location:"Strip 1" → 5 ids, hash H1
- Client B moves 2 from Strip 1 → Strip 2 (commits)
- Client A submits move to Nursery 1 with roster_hash=H1 → 409, diff shows removed:2
- Client A resubmits with confirmed=true → moves remaining 3 to Nursery 1
Final state:
- Strip 1 females = 0
- Strip 2 females = 5 (original 3 + 2 moved from Strip 1)
- Nursery 1 females = 3
- Two AnimalMoved events logged
"""
def test_client_a_initial_resolution(
self, seeded_db, services, now_utc, strip1_id, optimistic_lock_scenario
):
"""Client A resolves filter and gets 5 animals with hash."""
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
ts_utc = now_utc
resolution = resolve_filter(seeded_db, filter_ast, ts_utc)
assert len(resolution.animal_ids) == 5
assert resolution.roster_hash != ""
def test_concurrent_move_causes_mismatch(
self,
seeded_db,
services,
now_utc,
strip1_id,
strip2_id,
nursery1_id,
optimistic_lock_scenario,
):
"""After client B moves 2 animals, client A's hash becomes invalid."""
strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"]
# Client A resolves filter at T1
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
ts_t1 = now_utc
client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1)
client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id)
assert len(client_a_resolution.animal_ids) == 5
# Client B moves 2 animals from Strip 1 to Strip 2 at T2
ts_t2 = ts_t1 + 1000
move_payload = AnimalMovedPayload(
resolved_ids=strip1_animal_ids[:2],
to_location_id=strip2_id,
)
services["animal_service"].move_animals(move_payload, ts_t2, "client_b")
# Client A submits with old hash at T3 (should detect mismatch)
ts_t3 = ts_t2 + 1000
context = SelectionContext(
filter=filter_str,
resolved_ids=list(client_a_resolution.animal_ids),
roster_hash=client_a_hash,
ts_utc=ts_t3,
from_location_id=strip1_id,
confirmed=False,
)
result = validate_selection(seeded_db, context)
# Validation should fail with mismatch
assert not result.valid
assert result.diff is not None
assert len(result.diff.removed) == 2
assert len(result.diff.added) == 0
def test_confirmed_proceeds_with_remaining_animals(
self,
seeded_db,
services,
now_utc,
strip1_id,
strip2_id,
nursery1_id,
optimistic_lock_scenario,
):
"""With confirmed=true, move proceeds with current server resolution."""
strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"]
# Client A resolves filter at T1
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
ts_t1 = now_utc
client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1)
client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id)
# Client B moves 2 animals from Strip 1 to Strip 2 at T2
ts_t2 = ts_t1 + 1000
move_payload = AnimalMovedPayload(
resolved_ids=strip1_animal_ids[:2],
to_location_id=strip2_id,
)
services["animal_service"].move_animals(move_payload, ts_t2, "client_b")
# Client A resubmits with confirmed=true at T3
ts_t3 = ts_t2 + 1000
context = SelectionContext(
filter=filter_str,
resolved_ids=list(client_a_resolution.animal_ids),
roster_hash=client_a_hash,
ts_utc=ts_t3,
from_location_id=strip1_id,
confirmed=True,
)
result = validate_selection(seeded_db, context)
# Validation should pass with confirmed=true
assert result.valid
# Per route behavior: re-resolve at ts_t3 to get current IDs
# (validate_selection returns client's IDs, route re-resolves)
current_resolution = resolve_filter(seeded_db, filter_ast, ts_t3)
ids_to_move = current_resolution.animal_ids
assert len(ids_to_move) == 3
# Now move those 3 to Nursery 1
move_payload = AnimalMovedPayload(
resolved_ids=ids_to_move,
to_location_id=nursery1_id,
)
services["animal_service"].move_animals(move_payload, ts_t3, "client_a")
# Verify final state
# Strip 1 should have 0 females
strip1_count = seeded_db.execute(
"""SELECT COUNT(*) FROM live_animals_by_location
WHERE location_id = ? AND sex = 'female'""",
(strip1_id,),
).fetchone()[0]
assert strip1_count == 0
def test_final_state_after_both_moves(
self,
seeded_db,
services,
now_utc,
strip1_id,
strip2_id,
nursery1_id,
optimistic_lock_scenario,
):
"""Full scenario verifies all final location counts."""
strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"]
# Client A resolves filter at T1
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
ts_t1 = now_utc
client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1)
client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id)
# Client B moves 2 animals from Strip 1 to Strip 2 at T2
ts_t2 = ts_t1 + 1000
move_payload_b = AnimalMovedPayload(
resolved_ids=strip1_animal_ids[:2],
to_location_id=strip2_id,
)
services["animal_service"].move_animals(move_payload_b, ts_t2, "client_b")
# Client A confirms and moves remaining 3 to Nursery 1 at T3
ts_t3 = ts_t2 + 1000
context = SelectionContext(
filter=filter_str,
resolved_ids=list(client_a_resolution.animal_ids),
roster_hash=client_a_hash,
ts_utc=ts_t3,
from_location_id=strip1_id,
confirmed=True,
)
result = validate_selection(seeded_db, context)
assert result.valid
# Per route behavior: re-resolve to get current IDs
current_resolution = resolve_filter(seeded_db, filter_ast, ts_t3)
ids_to_move = current_resolution.animal_ids
move_payload_a = AnimalMovedPayload(
resolved_ids=ids_to_move,
to_location_id=nursery1_id,
)
services["animal_service"].move_animals(move_payload_a, ts_t3, "client_a")
# Verify final state:
# Strip 1 females = 0 (all moved out)
strip1_count = seeded_db.execute(
"""SELECT COUNT(*) FROM live_animals_by_location
WHERE location_id = ? AND sex = 'female'""",
(strip1_id,),
).fetchone()[0]
assert strip1_count == 0
# Strip 2 females = 5 (original 3 + 2 moved from Strip 1)
strip2_count = seeded_db.execute(
"""SELECT COUNT(*) FROM live_animals_by_location
WHERE location_id = ? AND sex = 'female'""",
(strip2_id,),
).fetchone()[0]
assert strip2_count == 5
# Nursery 1 females = 3 (moved from Strip 1 by client A)
nursery1_count = seeded_db.execute(
"""SELECT COUNT(*) FROM live_animals_by_location
WHERE location_id = ? AND sex = 'female'""",
(nursery1_id,),
).fetchone()[0]
assert nursery1_count == 3
def test_two_move_events_logged(
self,
seeded_db,
services,
now_utc,
strip1_id,
strip2_id,
nursery1_id,
optimistic_lock_scenario,
):
"""Both moves should create AnimalMoved events."""
strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"]
# Count events before
events_before = seeded_db.execute(
"SELECT COUNT(*) FROM events WHERE type = ?",
(ANIMAL_MOVED,),
).fetchone()[0]
# Client A resolves filter at T1
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
ts_t1 = now_utc
client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1)
client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id)
# Client B moves 2 animals from Strip 1 to Strip 2 at T2
ts_t2 = ts_t1 + 1000
move_payload_b = AnimalMovedPayload(
resolved_ids=strip1_animal_ids[:2],
to_location_id=strip2_id,
)
services["animal_service"].move_animals(move_payload_b, ts_t2, "client_b")
# Client A confirms and moves remaining 3 to Nursery 1 at T3
ts_t3 = ts_t2 + 1000
context = SelectionContext(
filter=filter_str,
resolved_ids=list(client_a_resolution.animal_ids),
roster_hash=client_a_hash,
ts_utc=ts_t3,
from_location_id=strip1_id,
confirmed=True,
)
result = validate_selection(seeded_db, context)
assert result.valid
# Per route behavior: re-resolve to get current IDs
current_resolution = resolve_filter(seeded_db, filter_ast, ts_t3)
ids_to_move = current_resolution.animal_ids
move_payload_a = AnimalMovedPayload(
resolved_ids=ids_to_move,
to_location_id=nursery1_id,
)
services["animal_service"].move_animals(move_payload_a, ts_t3, "client_a")
# Count events after
events_after = seeded_db.execute(
"SELECT COUNT(*) FROM events WHERE type = ?",
(ANIMAL_MOVED,),
).fetchone()[0]
# Two new move events should be logged
assert events_after - events_before == 2
def test_move_without_confirm_fails_on_mismatch(
self,
seeded_db,
services,
now_utc,
strip1_id,
strip2_id,
nursery1_id,
optimistic_lock_scenario,
):
"""Without confirmed=true, mismatch should block the move."""
strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"]
# Client A resolves filter at T1
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
ts_t1 = now_utc
client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1)
client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id)
# Client B moves 2 animals from Strip 1 to Strip 2 at T2
ts_t2 = ts_t1 + 1000
move_payload = AnimalMovedPayload(
resolved_ids=strip1_animal_ids[:2],
to_location_id=strip2_id,
)
services["animal_service"].move_animals(move_payload, ts_t2, "client_b")
# Client A tries to move without confirmed=true at T3
ts_t3 = ts_t2 + 1000
context = SelectionContext(
filter=filter_str,
resolved_ids=list(client_a_resolution.animal_ids),
roster_hash=client_a_hash,
ts_utc=ts_t3,
from_location_id=strip1_id,
confirmed=False, # Not confirmed
)
result = validate_selection(seeded_db, context)
# Should be invalid
assert not result.valid
# Diff should show 2 removed
assert len(result.diff.removed) == 2
def test_diff_contains_updated_hash_and_ids(
self,
seeded_db,
services,
now_utc,
strip1_id,
strip2_id,
nursery1_id,
optimistic_lock_scenario,
):
"""Mismatch response should contain updated hash and resolved IDs."""
strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"]
# Client A resolves filter at T1
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
ts_t1 = now_utc
client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1)
client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id)
# Client B moves 2 animals from Strip 1 to Strip 2 at T2
ts_t2 = ts_t1 + 1000
move_payload = AnimalMovedPayload(
resolved_ids=strip1_animal_ids[:2],
to_location_id=strip2_id,
)
services["animal_service"].move_animals(move_payload, ts_t2, "client_b")
# Client A submits without confirmed at T3
ts_t3 = ts_t2 + 1000
context = SelectionContext(
filter=filter_str,
resolved_ids=list(client_a_resolution.animal_ids),
roster_hash=client_a_hash,
ts_utc=ts_t3,
from_location_id=strip1_id,
confirmed=False,
)
result = validate_selection(seeded_db, context)
# Result should contain updated info
assert not result.valid
assert len(result.resolved_ids) == 3 # Current server resolution
assert result.roster_hash != client_a_hash # Hash changed
assert result.roster_hash != "" # New hash computed

474
tests/test_web_move.py Normal file
View File

@@ -0,0 +1,474 @@
# ABOUTME: Tests for Move Animals web routes.
# ABOUTME: Covers GET /move form rendering and POST /actions/animal-move with optimistic locking.
import os
import time
import pytest
from starlette.testclient import TestClient
from animaltrack.events.payloads import AnimalCohortCreatedPayload, AnimalMovedPayload
from animaltrack.events.store import EventStore
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.selection import compute_roster_hash, parse_filter, resolve_filter
from animaltrack.services.animal import AnimalService
def make_test_settings(
csrf_secret: str = "test-secret",
trusted_proxy_ips: str = "127.0.0.1",
dev_mode: bool = True,
):
"""Create Settings for testing by setting env vars temporarily."""
from animaltrack.config import Settings
old_env = os.environ.copy()
try:
os.environ["CSRF_SECRET"] = csrf_secret
os.environ["TRUSTED_PROXY_IPS"] = trusted_proxy_ips
os.environ["DEV_MODE"] = str(dev_mode).lower()
return Settings()
finally:
os.environ.clear()
os.environ.update(old_env)
@pytest.fixture
def client(seeded_db):
"""Create a test client for the app."""
from animaltrack.web.app import create_app
settings = make_test_settings(trusted_proxy_ips="testclient")
app, rt = create_app(settings=settings, db=seeded_db)
return TestClient(app, raise_server_exceptions=True)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with animal projections registered."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, projection_registry):
"""Create an AnimalService for testing."""
event_store = EventStore(seeded_db)
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def location_strip1_id(seeded_db):
"""Get Strip 1 location ID from seeded data."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
@pytest.fixture
def location_strip2_id(seeded_db):
"""Get Strip 2 location ID from seeded data."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
return row[0]
@pytest.fixture
def location_nursery1_id(seeded_db):
"""Get Nursery 1 location ID from seeded data."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Nursery 1'").fetchone()
return row[0]
@pytest.fixture
def ducks_at_strip1(seeded_db, animal_service, location_strip1_id):
"""Create 5 female ducks at Strip 1 for testing move operations."""
payload = AnimalCohortCreatedPayload(
species="duck",
count=5,
life_stage="adult",
sex="female",
location_id=location_strip1_id,
origin="purchased",
)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
return event.entity_refs["animal_ids"]
class TestMoveFormRendering:
"""Tests for GET /move form rendering."""
def test_move_form_renders(self, client):
"""GET /move returns 200 with form elements."""
resp = client.get("/move")
assert resp.status_code == 200
assert "Move" in resp.text
def test_move_form_shows_locations(self, client):
"""Form has location dropdown with seeded locations."""
resp = client.get("/move")
assert resp.status_code == 200
assert "Strip 1" in resp.text
assert "Strip 2" in resp.text
def test_move_form_has_filter_field(self, client):
"""Form has filter input field."""
resp = client.get("/move")
assert resp.status_code == 200
assert 'name="filter"' in resp.text or 'id="filter"' in resp.text
def test_move_form_has_destination_dropdown(self, client):
"""Form has destination location dropdown."""
resp = client.get("/move")
assert resp.status_code == 200
assert 'name="to_location_id"' in resp.text or 'id="to_location_id"' in resp.text
def test_move_form_with_filter_param(self, client, ducks_at_strip1):
"""GET /move?filter=... pre-fills filter and shows animal count."""
resp = client.get('/move?filter=location:"Strip 1"')
assert resp.status_code == 200
# Filter should be pre-filled
assert "Strip 1" in resp.text
# Should show animal count (5 ducks)
assert "5" in resp.text
def test_move_form_has_hidden_fields(self, client, ducks_at_strip1):
"""Form has hidden fields for selection context."""
resp = client.get('/move?filter=location:"Strip 1"')
assert resp.status_code == 200
# Hidden fields for selection context
assert 'name="roster_hash"' in resp.text
assert 'name="ts_utc"' in resp.text
assert 'name="nonce"' in resp.text
class TestMoveAnimalSuccess:
"""Tests for successful POST /actions/animal-move."""
def test_move_creates_event(
self,
client,
seeded_db,
animal_service,
location_strip1_id,
location_strip2_id,
ducks_at_strip1,
):
"""POST creates AnimalMoved event when valid."""
# Get selection context by resolving filter
ts_utc = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
resolution = resolve_filter(seeded_db, filter_ast, ts_utc)
roster_hash = compute_roster_hash(resolution.animal_ids, location_strip1_id)
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
"to_location_id": location_strip2_id,
"resolved_ids": resolution.animal_ids,
"roster_hash": roster_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_utc),
"nonce": "test-nonce-move-1",
},
)
assert resp.status_code in [200, 302, 303]
# Verify event was created
event_row = seeded_db.execute(
"SELECT type FROM events WHERE type = 'AnimalMoved' ORDER BY id DESC LIMIT 1"
).fetchone()
assert event_row is not None
assert event_row[0] == "AnimalMoved"
def test_move_success_returns_toast(
self,
client,
seeded_db,
animal_service,
location_strip1_id,
location_strip2_id,
ducks_at_strip1,
):
"""Successful move returns HX-Trigger with toast."""
ts_utc = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
resolution = resolve_filter(seeded_db, filter_ast, ts_utc)
roster_hash = compute_roster_hash(resolution.animal_ids, location_strip1_id)
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
"to_location_id": location_strip2_id,
"resolved_ids": resolution.animal_ids,
"roster_hash": roster_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_utc),
"nonce": "test-nonce-move-2",
},
)
assert resp.status_code == 200
assert "HX-Trigger" in resp.headers
assert "showToast" in resp.headers["HX-Trigger"]
def test_move_success_resets_form(
self,
client,
seeded_db,
animal_service,
location_strip1_id,
location_strip2_id,
ducks_at_strip1,
):
"""After successful move, form is reset (nothing sticks)."""
ts_utc = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
resolution = resolve_filter(seeded_db, filter_ast, ts_utc)
roster_hash = compute_roster_hash(resolution.animal_ids, location_strip1_id)
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
"to_location_id": location_strip2_id,
"resolved_ids": resolution.animal_ids,
"roster_hash": roster_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_utc),
"nonce": "test-nonce-move-3",
},
)
assert resp.status_code == 200
# Form should be reset - filter input should be empty (no value attribute or empty value)
# The old filter value should not be pre-filled
assert 'value="location:' not in resp.text
# The filter field should exist but be empty (or have no value)
assert 'name="filter"' in resp.text
class TestMoveAnimalValidation:
"""Tests for validation errors in POST /actions/animal-move."""
def test_move_no_animals_returns_422(self, client, location_strip1_id, location_strip2_id):
"""Moving with no animals selected returns 422."""
ts_utc = int(time.time() * 1000)
roster_hash = compute_roster_hash([])
resp = client.post(
"/actions/animal-move",
data={
"filter": "species:nonexistent",
"to_location_id": location_strip2_id,
"resolved_ids": [],
"roster_hash": roster_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_utc),
"nonce": "test-nonce-move-4",
},
)
assert resp.status_code == 422
def test_move_same_location_returns_422(
self, client, seeded_db, location_strip1_id, ducks_at_strip1
):
"""Moving to same location returns 422."""
ts_utc = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
resolution = resolve_filter(seeded_db, filter_ast, ts_utc)
roster_hash = compute_roster_hash(resolution.animal_ids, location_strip1_id)
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
"to_location_id": location_strip1_id, # Same as from
"resolved_ids": resolution.animal_ids,
"roster_hash": roster_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_utc),
"nonce": "test-nonce-move-5",
},
)
assert resp.status_code == 422
def test_move_missing_destination_returns_422(
self, client, seeded_db, location_strip1_id, ducks_at_strip1
):
"""Missing to_location_id returns 422."""
ts_utc = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
resolution = resolve_filter(seeded_db, filter_ast, ts_utc)
roster_hash = compute_roster_hash(resolution.animal_ids, location_strip1_id)
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
# Missing to_location_id
"resolved_ids": resolution.animal_ids,
"roster_hash": roster_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_utc),
"nonce": "test-nonce-move-6",
},
)
assert resp.status_code == 422
class TestMoveAnimalMismatch:
"""Tests for optimistic locking mismatch handling."""
def test_mismatch_returns_409(
self,
client,
seeded_db,
animal_service,
location_strip1_id,
location_strip2_id,
ducks_at_strip1,
):
"""Hash mismatch (concurrent change) returns 409."""
# Client A resolves at ts_before
ts_before = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
client_resolution = resolve_filter(seeded_db, filter_ast, ts_before)
client_hash = compute_roster_hash(client_resolution.animal_ids, location_strip1_id)
# Client B moves 2 animals away
ts_move = ts_before + 1000
move_payload = AnimalMovedPayload(
resolved_ids=ducks_at_strip1[:2],
to_location_id=location_strip2_id,
)
animal_service.move_animals(move_payload, ts_move, "client_b")
# Client A submits with old hash at new timestamp
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
"to_location_id": location_strip2_id,
"resolved_ids": client_resolution.animal_ids,
"roster_hash": client_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_move), # Using ts_move so server will see different state
"nonce": "test-nonce-move-7",
},
)
assert resp.status_code == 409
def test_mismatch_shows_diff(
self,
client,
seeded_db,
animal_service,
location_strip1_id,
location_strip2_id,
ducks_at_strip1,
):
"""409 response shows diff panel with removed count."""
ts_before = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
client_resolution = resolve_filter(seeded_db, filter_ast, ts_before)
client_hash = compute_roster_hash(client_resolution.animal_ids, location_strip1_id)
# Move 2 animals away
ts_move = ts_before + 1000
move_payload = AnimalMovedPayload(
resolved_ids=ducks_at_strip1[:2],
to_location_id=location_strip2_id,
)
animal_service.move_animals(move_payload, ts_move, "client_b")
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
"to_location_id": location_strip2_id,
"resolved_ids": client_resolution.animal_ids,
"roster_hash": client_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_move),
"nonce": "test-nonce-move-8",
},
)
assert resp.status_code == 409
# Response should show diff info
assert "2" in resp.text # 2 removed
def test_confirmed_proceeds_despite_mismatch(
self,
client,
seeded_db,
animal_service,
location_strip1_id,
location_strip2_id,
location_nursery1_id,
ducks_at_strip1,
):
"""confirmed=true bypasses mismatch and proceeds with server's resolution."""
ts_before = int(time.time() * 1000)
filter_str = 'location:"Strip 1"'
filter_ast = parse_filter(filter_str)
client_resolution = resolve_filter(seeded_db, filter_ast, ts_before)
client_hash = compute_roster_hash(client_resolution.animal_ids, location_strip1_id)
# Move 2 animals away
ts_move = ts_before + 1000
move_payload = AnimalMovedPayload(
resolved_ids=ducks_at_strip1[:2],
to_location_id=location_strip2_id,
)
animal_service.move_animals(move_payload, ts_move, "client_b")
# Client A resubmits with confirmed=true
resp = client.post(
"/actions/animal-move",
data={
"filter": filter_str,
"to_location_id": location_nursery1_id,
"resolved_ids": client_resolution.animal_ids,
"roster_hash": client_hash,
"from_location_id": location_strip1_id,
"ts_utc": str(ts_move),
"confirmed": "true",
"nonce": "test-nonce-move-9",
},
)
# Should succeed
print("RESPONSE STATUS:", resp.status_code)
print("RESPONSE TEXT:", resp.text[:2000])
assert resp.status_code == 200
# Verify only 3 animals were moved (the ones still at Strip 1)
event_row = seeded_db.execute(
"SELECT payload FROM events WHERE type = 'AnimalMoved' ORDER BY id DESC LIMIT 1"
).fetchone()
import json
payload = json.loads(event_row[0])
# Should have moved 3 animals (5 original - 2 moved by client B)
assert len(payload["resolved_ids"]) == 3