feat: implement Animal Registry view with filtering and pagination (Step 8.1)

- Add status field to filter DSL parser and resolver
- Create AnimalRepository with list_animals and get_facet_counts
- Implement registry templates with table, facet sidebar, infinite scroll
- Create registry route handler with HTMX partial support
- Default shows only alive animals; status filter overrides

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-30 14:59:13 +00:00
parent 254466827c
commit 8e155080e4
12 changed files with 1630 additions and 28 deletions

19
PLAN.md
View File

@@ -305,15 +305,16 @@ Check off items as completed. Each phase builds on the previous.
## Phase 8: Registry & Event Log
### Step 8.1: Animal Registry View
- [ ] Create `web/routes/registry.py`:
- [ ] GET /registry with filters and pagination
- [ ] Create `web/templates/registry.py`:
- [ ] Table: ID, species, sex, life_stage, location, tags, last event, status
- [ ] Facet sidebar with counts
- [ ] Create `repositories/animals.py` with list_animals, get_facet_counts
- [ ] Implement infinite scroll (50/page, cursor=base64)
- [ ] Write tests: renders with animals, pagination works, filters apply
- [ ] **Commit checkpoint**
- [x] Create `web/routes/registry.py`:
- [x] GET /registry with filters and pagination
- [x] Create `web/templates/registry.py`:
- [x] Table: ID, species, sex, life_stage, location, tags, last event, status
- [x] Facet sidebar with counts
- [x] Create `repositories/animals.py` with list_animals, get_facet_counts
- [x] Implement infinite scroll (50/page, cursor=base64)
- [x] Write tests: renders with animals, pagination works, filters apply
- [x] Add `status` filter field to selection DSL (parser + resolver)
- [x] **Commit checkpoint**
### Step 8.2: Event Log Projection & View
- [ ] Create migration for event_log_by_location table with cap trigger

View File

@@ -0,0 +1,355 @@
# ABOUTME: Repository for animal registry queries.
# ABOUTME: Provides list_animals with filtering/pagination and facet counts.
import base64
import json
from dataclasses import dataclass, field
from typing import Any
from animaltrack.selection.parser import parse_filter
@dataclass
class AnimalListItem:
"""Animal data for registry list view."""
animal_id: str
species_code: str
sex: str
life_stage: str
status: str
location_id: str
location_name: str
nickname: str | None
identified: bool
tags: list[str]
last_event_utc: int
@dataclass
class PaginatedResult:
"""Paginated list result with cursor."""
items: list[AnimalListItem]
next_cursor: str | None
total_count: int
@dataclass
class FacetCounts:
"""Facet counts for sidebar."""
by_status: dict[str, int] = field(default_factory=dict)
by_species: dict[str, int] = field(default_factory=dict)
by_sex: dict[str, int] = field(default_factory=dict)
by_life_stage: dict[str, int] = field(default_factory=dict)
by_location: dict[str, int] = field(default_factory=dict)
class AnimalRepository:
"""Repository for animal registry operations."""
PAGE_SIZE = 50
def __init__(self, db: Any) -> None:
"""Initialize repository with database connection.
Args:
db: A fastlite database connection.
"""
self.db = db
def list_animals(
self,
filter_str: str = "",
cursor: str | None = None,
) -> PaginatedResult:
"""List animals with filtering and cursor pagination.
Args:
filter_str: DSL filter string (reuses selection parser).
cursor: Base64-encoded cursor for pagination.
Returns:
PaginatedResult with items and next_cursor.
"""
# Parse filter
filter_ast = parse_filter(filter_str)
# Check if filter has explicit status
has_status_filter = any(f.field == "status" for f in filter_ast.filters)
# Build WHERE clauses from filter
where_clauses = []
params: list[Any] = []
# Default to alive if no status filter
if not has_status_filter:
where_clauses.append("ar.status = 'alive'")
# Apply filter clauses
for field_filter in filter_ast.filters:
clause, filter_params = self._build_where_clause(field_filter)
where_clauses.append(clause)
params.extend(filter_params)
# Decode cursor for pagination
cursor_last_event: int | None = None
cursor_animal_id: str | None = None
if cursor:
try:
decoded = json.loads(base64.b64decode(cursor).decode())
cursor_last_event = decoded["last_event_utc"]
cursor_animal_id = decoded["animal_id"]
except (ValueError, KeyError):
pass # Invalid cursor, ignore
# Build WHERE string
where_str = " AND ".join(where_clauses) if where_clauses else "1=1"
# Get total count (before pagination)
count_query = f"""
SELECT COUNT(*)
FROM animal_registry ar
JOIN locations l ON ar.location_id = l.id
WHERE {where_str}
"""
total_count = self.db.execute(count_query, params).fetchone()[0]
# Add cursor pagination
pagination_params = list(params)
if cursor_last_event is not None and cursor_animal_id is not None:
where_str += (
" AND (ar.last_event_utc < ? OR (ar.last_event_utc = ? AND ar.animal_id < ?))"
)
pagination_params.extend([cursor_last_event, cursor_last_event, cursor_animal_id])
# Build main query
query = f"""
SELECT
ar.animal_id,
ar.species_code,
ar.sex,
ar.life_stage,
ar.status,
ar.location_id,
l.name as location_name,
ar.nickname,
ar.identified,
ar.last_event_utc,
COALESCE(
(SELECT json_group_array(tag)
FROM animal_tag_intervals ati
WHERE ati.animal_id = ar.animal_id
AND ati.end_utc IS NULL),
'[]'
) as tags
FROM animal_registry ar
JOIN locations l ON ar.location_id = l.id
WHERE {where_str}
ORDER BY ar.last_event_utc DESC, ar.animal_id DESC
LIMIT ?
"""
pagination_params.append(self.PAGE_SIZE + 1) # Fetch one extra to detect next page
rows = self.db.execute(query, pagination_params).fetchall()
# Build items
items = []
for row in rows[: self.PAGE_SIZE]:
tags_json = row[10]
tags = json.loads(tags_json) if tags_json else []
items.append(
AnimalListItem(
animal_id=row[0],
species_code=row[1],
sex=row[2],
life_stage=row[3],
status=row[4],
location_id=row[5],
location_name=row[6],
nickname=row[7],
identified=bool(row[8]),
last_event_utc=row[9],
tags=tags,
)
)
# Determine next cursor
next_cursor = None
if len(rows) > self.PAGE_SIZE:
last_item = items[-1]
cursor_data = {
"last_event_utc": last_item.last_event_utc,
"animal_id": last_item.animal_id,
}
next_cursor = base64.b64encode(json.dumps(cursor_data).encode()).decode()
return PaginatedResult(
items=items,
next_cursor=next_cursor,
total_count=total_count,
)
def get_facet_counts(
self,
filter_str: str = "",
) -> FacetCounts:
"""Get facet counts for current filter.
Counts are computed for all facets within the filtered set.
Unlike list_animals, facets do NOT default to alive-only so users
can see the full status breakdown.
Args:
filter_str: DSL filter string.
Returns:
FacetCounts with breakdowns by status, species, sex, life_stage, location.
"""
# Parse filter
filter_ast = parse_filter(filter_str)
# Build WHERE clauses from filter
# Note: facets do NOT apply the default alive filter - users should see
# the full breakdown of all statuses
where_clauses = []
params: list[Any] = []
# Apply filter clauses
for field_filter in filter_ast.filters:
clause, filter_params = self._build_where_clause(field_filter)
where_clauses.append(clause)
params.extend(filter_params)
where_str = " AND ".join(where_clauses) if where_clauses else "1=1"
facets = FacetCounts()
# Status counts
status_query = f"""
SELECT status, COUNT(*)
FROM animal_registry ar
JOIN locations l ON ar.location_id = l.id
WHERE {where_str}
GROUP BY status
"""
for row in self.db.execute(status_query, params).fetchall():
facets.by_status[row[0]] = row[1]
# Species counts
species_query = f"""
SELECT species_code, COUNT(*)
FROM animal_registry ar
JOIN locations l ON ar.location_id = l.id
WHERE {where_str}
GROUP BY species_code
"""
for row in self.db.execute(species_query, params).fetchall():
facets.by_species[row[0]] = row[1]
# Sex counts
sex_query = f"""
SELECT sex, COUNT(*)
FROM animal_registry ar
JOIN locations l ON ar.location_id = l.id
WHERE {where_str}
GROUP BY sex
"""
for row in self.db.execute(sex_query, params).fetchall():
facets.by_sex[row[0]] = row[1]
# Life stage counts
life_stage_query = f"""
SELECT life_stage, COUNT(*)
FROM animal_registry ar
JOIN locations l ON ar.location_id = l.id
WHERE {where_str}
GROUP BY life_stage
"""
for row in self.db.execute(life_stage_query, params).fetchall():
facets.by_life_stage[row[0]] = row[1]
# Location counts (by location_id, not name)
location_query = f"""
SELECT ar.location_id, COUNT(*)
FROM animal_registry ar
JOIN locations l ON ar.location_id = l.id
WHERE {where_str}
GROUP BY ar.location_id
"""
for row in self.db.execute(location_query, params).fetchall():
facets.by_location[row[0]] = row[1]
return facets
def _build_where_clause(self, field_filter) -> tuple[str, list[Any]]:
"""Build SQL WHERE clause for a single field filter.
Args:
field_filter: The field filter to build clause for.
Returns:
Tuple of (SQL clause string, list of parameters).
"""
field = field_filter.field
values = list(field_filter.values)
negated = field_filter.negated
placeholders = ",".join("?" * len(values))
if field == "species":
op = "NOT IN" if negated else "IN"
return f"ar.species_code {op} ({placeholders})", values
elif field == "status":
op = "NOT IN" if negated else "IN"
return f"ar.status {op} ({placeholders})", values
elif field == "sex":
op = "NOT IN" if negated else "IN"
return f"ar.sex {op} ({placeholders})", values
elif field == "life_stage":
op = "NOT IN" if negated else "IN"
return f"ar.life_stage {op} ({placeholders})", values
elif field == "identified":
int_values = [int(v) for v in values]
op = "NOT IN" if negated else "IN"
placeholders = ",".join("?" * len(int_values))
return f"ar.identified {op} ({placeholders})", int_values
elif field == "location":
# Location by name
op = "NOT IN" if negated else "IN"
return f"l.name {op} ({placeholders})", values
elif field == "tag":
# Tag filter - check animal_tag_intervals
if negated:
return (
f"""
ar.animal_id NOT IN (
SELECT animal_id FROM animal_tag_intervals
WHERE tag IN ({placeholders})
AND end_utc IS NULL
)
""",
values,
)
else:
return (
f"""
ar.animal_id IN (
SELECT animal_id FROM animal_tag_intervals
WHERE tag IN ({placeholders})
AND end_utc IS NULL
)
""",
values,
)
else:
# Unknown field - should not happen if parser validates
return "1=1", []

View File

@@ -6,7 +6,9 @@ from collections.abc import Iterator
from animaltrack.selection.ast import FieldFilter, FilterAST
# Supported filter fields
VALID_FIELDS = frozenset({"location", "species", "sex", "life_stage", "identified", "tag"})
VALID_FIELDS = frozenset(
{"location", "species", "sex", "life_stage", "identified", "tag", "status"}
)
# Fields that can be used as flags (without :value)
FLAG_FIELDS = frozenset({"identified"})

View File

@@ -62,6 +62,11 @@ def resolve_selection(
return resolved_ids
def _has_status_filter(filter_ast: FilterAST) -> bool:
"""Check if the filter AST contains an explicit status filter."""
return any(f.field == "status" for f in filter_ast.filters)
def resolve_filter(
db: Any,
filter_ast: FilterAST,
@@ -80,23 +85,37 @@ def resolve_filter(
Returns:
SelectionResult with sorted animal_ids and roster_hash.
"""
# Check if explicit status filter is provided
has_status = _has_status_filter(filter_ast)
# Build base query - all animals with location interval at ts_utc
# and status='alive' at ts_utc
base_query = """
SELECT DISTINCT ali.animal_id
FROM animal_location_intervals ali
WHERE ali.start_utc <= ?
AND (ali.end_utc IS NULL OR ali.end_utc > ?)
AND EXISTS (
SELECT 1 FROM animal_attr_intervals aai
WHERE aai.animal_id = ali.animal_id
AND aai.attr = 'status'
AND aai.value = 'alive'
AND aai.start_utc <= ?
AND (aai.end_utc IS NULL OR aai.end_utc > ?)
)
"""
params: list[Any] = [ts_utc, ts_utc, ts_utc, ts_utc]
# If no explicit status filter, default to status='alive'
if has_status:
# When status filter is explicit, don't apply default alive filter
base_query = """
SELECT DISTINCT ali.animal_id
FROM animal_location_intervals ali
WHERE ali.start_utc <= ?
AND (ali.end_utc IS NULL OR ali.end_utc > ?)
"""
params: list[Any] = [ts_utc, ts_utc]
else:
# Default: only alive animals
base_query = """
SELECT DISTINCT ali.animal_id
FROM animal_location_intervals ali
WHERE ali.start_utc <= ?
AND (ali.end_utc IS NULL OR ali.end_utc > ?)
AND EXISTS (
SELECT 1 FROM animal_attr_intervals aai
WHERE aai.animal_id = ali.animal_id
AND aai.attr = 'status'
AND aai.value = 'alive'
AND aai.start_utc <= ?
AND (aai.end_utc IS NULL OR aai.end_utc > ?)
)
"""
params = [ts_utc, ts_utc, ts_utc, ts_utc]
# Apply each filter
for field_filter in filter_ast.filters:
@@ -188,6 +207,19 @@ def _build_filter_clause(field_filter: FieldFilter, ts_utc: int) -> tuple[str, l
params = values + [ts_utc, ts_utc]
return query, params
elif field == "status":
# Historical status from animal_attr_intervals
placeholders = ",".join("?" * len(values))
query = f"""
SELECT animal_id FROM animal_attr_intervals
WHERE attr = 'status'
AND value IN ({placeholders})
AND start_utc <= ?
AND (end_utc IS NULL OR end_utc > ?)
"""
params = values + [ts_utc, ts_utc]
return query, params
else:
# Unknown field - should not happen if parser validates
msg = f"Unknown filter field: {field}"

View File

@@ -22,6 +22,7 @@ from animaltrack.web.routes import (
register_feed_routes,
register_health_routes,
register_move_routes,
register_registry_routes,
)
# Default static directory relative to this module
@@ -133,5 +134,6 @@ def create_app(
register_egg_routes(rt, app)
register_feed_routes(rt, app)
register_move_routes(rt, app)
register_registry_routes(rt, app)
return app, rt

View File

@@ -5,10 +5,12 @@ from animaltrack.web.routes.eggs import register_egg_routes
from animaltrack.web.routes.feed import register_feed_routes
from animaltrack.web.routes.health import register_health_routes
from animaltrack.web.routes.move import register_move_routes
from animaltrack.web.routes.registry import register_registry_routes
__all__ = [
"register_egg_routes",
"register_feed_routes",
"register_health_routes",
"register_move_routes",
"register_registry_routes",
]

View File

@@ -0,0 +1,76 @@
# ABOUTME: Routes for Animal Registry view.
# ABOUTME: Handles GET /registry with filters, pagination, and facets.
from starlette.requests import Request
from starlette.responses import HTMLResponse
from animaltrack.repositories.animals import AnimalRepository
from animaltrack.repositories.locations import LocationRepository
from animaltrack.repositories.species import SpeciesRepository
from animaltrack.web.templates.base import page
from animaltrack.web.templates.registry import (
animal_table_rows,
registry_page,
)
def registry_index(request: Request):
"""GET /registry - Animal Registry with filtering and pagination.
Query params:
filter: DSL filter string (e.g., "species:duck status:alive")
cursor: Base64-encoded pagination cursor
Returns:
Full page on initial load, or just table rows for HTMX requests with cursor.
"""
db = request.app.state.db
# Extract query params
filter_str = request.query_params.get("filter", "")
cursor = request.query_params.get("cursor")
# Get animal data
animal_repo = AnimalRepository(db)
result = animal_repo.list_animals(filter_str=filter_str, cursor=cursor)
facets = animal_repo.get_facet_counts(filter_str=filter_str)
# Get reference data for facet labels
locations = LocationRepository(db).list_active()
species_list = SpeciesRepository(db).list_all()
# If this is an HTMX request for more rows (has cursor), return just the rows
if request.headers.get("HX-Request") and cursor:
from fasthtml.common import to_xml
rows = animal_table_rows(
result.items,
next_cursor=result.next_cursor,
filter_str=filter_str,
)
return HTMLResponse(content=to_xml(tuple(rows)))
# Full page render
return page(
registry_page(
animals=result.items,
facets=facets,
filter_str=filter_str,
next_cursor=result.next_cursor,
total_count=result.total_count,
locations=locations,
species_list=species_list,
),
title="Registry - AnimalTrack",
active_nav="registry",
)
def register_registry_routes(rt, app):
"""Register registry routes.
Args:
rt: FastHTML route decorator.
app: FastHTML application instance.
"""
rt("/registry")(registry_index)

View File

@@ -0,0 +1,312 @@
# ABOUTME: Templates for Animal Registry view.
# ABOUTME: Table with infinite scroll, facet sidebar, and filter input.
from datetime import UTC, datetime
from typing import Any
from urllib.parse import urlencode
from fasthtml.common import H2, A, Div, Form, P, Span, Table, Tbody, Td, Th, Thead, Tr
from monsterui.all import Button, ButtonT, Card, Grid, LabelInput
from animaltrack.models.reference import Location, Species
from animaltrack.repositories.animals import AnimalListItem, FacetCounts
def registry_page(
animals: list[AnimalListItem],
facets: FacetCounts,
filter_str: str = "",
next_cursor: str | None = None,
total_count: int = 0,
locations: list[Location] | None = None,
species_list: list[Species] | None = None,
) -> Grid:
"""Full registry page with sidebar and table.
Args:
animals: List of animals for the current page.
facets: Facet counts for the sidebar.
filter_str: Current filter string.
next_cursor: Cursor for the next page (if more exist).
total_count: Total number of animals matching the filter.
locations: List of locations for facet labels.
species_list: List of species for facet labels.
Returns:
Grid component with sidebar and main content.
"""
return Grid(
# Sidebar with facets
facet_sidebar(facets, filter_str, locations, species_list),
# Main content
Div(
# Header with filter
registry_header(filter_str, total_count),
# Animal table
animal_table(animals, next_cursor, filter_str),
cls="col-span-3",
),
cols_sm=1,
cols_md=4,
cls="gap-4 p-4",
)
def registry_header(filter_str: str, total_count: int) -> Div:
"""Header with title and filter input.
Args:
filter_str: Current filter string.
total_count: Total number of matching animals.
Returns:
Div with header and filter form.
"""
return Div(
Div(
H2("Animal Registry", cls="text-xl font-bold"),
Span(f"{total_count} animals", cls="text-sm text-stone-400"),
cls="flex items-center justify-between",
),
# Filter form
Form(
Div(
LabelInput(
"Filter",
id="filter",
name="filter",
value=filter_str,
placeholder='e.g., species:duck status:alive location:"Strip 1"',
cls="flex-1",
),
Button("Apply", type="submit", cls=ButtonT.primary),
cls="flex gap-2 items-end",
),
action="/registry",
method="get",
cls="mt-4",
),
cls="mb-4",
)
def facet_sidebar(
facets: FacetCounts,
filter_str: str,
locations: list[Location] | None,
species_list: list[Species] | None,
) -> Div:
"""Sidebar with clickable facet counts.
Args:
facets: Facet counts for display.
filter_str: Current filter string.
locations: List of locations for name lookup.
species_list: List of species for name lookup.
Returns:
Div containing facet sections.
"""
location_map = {loc.id: loc.name for loc in (locations or [])}
species_map = {s.code: s.name for s in (species_list or [])}
return Div(
facet_section("Status", facets.by_status, filter_str, "status"),
facet_section("Species", facets.by_species, filter_str, "species", species_map),
facet_section("Sex", facets.by_sex, filter_str, "sex"),
facet_section("Life Stage", facets.by_life_stage, filter_str, "life_stage"),
facet_section("Location", facets.by_location, filter_str, "location", location_map),
cls="space-y-4",
)
def facet_section(
title: str,
counts: dict[str, int],
filter_str: str,
field: str,
label_map: dict[str, str] | None = None,
) -> Any:
"""Single facet section with clickable items.
Args:
title: Section title.
counts: Dictionary of value -> count.
filter_str: Current filter string.
field: Field name for building filter links.
label_map: Optional mapping from value to display label.
Returns:
Card component with facet items, or None if no counts.
"""
if not counts:
return None
items = []
for value, count in sorted(counts.items(), key=lambda x: -x[1]):
label = label_map.get(value, value) if label_map else value.replace("_", " ").title()
# Build filter link - append to current filter
if filter_str:
new_filter = f"{filter_str} {field}:{value}"
else:
new_filter = f"{field}:{value}"
href = f"/registry?{urlencode({'filter': new_filter})}"
items.append(
A(
Div(
Span(label, cls="text-sm"),
Span(str(count), cls="text-xs text-stone-400 ml-auto"),
cls="flex justify-between items-center",
),
href=href,
cls="block p-2 hover:bg-slate-800 rounded",
)
)
return Card(
P(title, cls="font-bold text-sm mb-2"),
*items,
)
def animal_table(
animals: list[AnimalListItem],
next_cursor: str | None,
filter_str: str,
) -> Table:
"""Animal table with infinite scroll.
Args:
animals: List of animals for the current page.
next_cursor: Cursor for the next page (if more exist).
filter_str: Current filter string.
Returns:
Table component with header and body.
"""
return Table(
Thead(
Tr(
Th("ID", shrink=True),
Th("Species"),
Th("Sex"),
Th("Life Stage"),
Th("Location"),
Th("Tags"),
Th("Last Event"),
Th("Status"),
)
),
Tbody(
*[animal_row(a) for a in animals],
# Sentinel for infinite scroll
load_more_sentinel(next_cursor, filter_str) if next_cursor else None,
id="animal-tbody",
),
cls="uk-table uk-table-sm uk-table-divider uk-table-hover",
)
def animal_row(animal: AnimalListItem) -> Tr:
"""Single table row for an animal.
Args:
animal: Animal data for the row.
Returns:
Tr component for the animal.
"""
# Format last event timestamp
last_event_dt = datetime.fromtimestamp(animal.last_event_utc / 1000, tz=UTC)
last_event_str = last_event_dt.strftime("%Y-%m-%d %H:%M")
# Display ID (truncated or nickname)
display_id = animal.nickname or animal.animal_id[:8] + "..."
# Status badge styling
status_cls = {
"alive": "bg-green-900/50 text-green-300",
"dead": "bg-red-900/50 text-red-300",
"harvested": "bg-amber-900/50 text-amber-300",
"sold": "bg-blue-900/50 text-blue-300",
"merged_into": "bg-slate-700 text-slate-300",
}.get(animal.status, "bg-slate-700 text-slate-300")
# Format tags (show first 3)
tags_str = ", ".join(animal.tags[:3])
if len(animal.tags) > 3:
tags_str += "..."
return Tr(
Td(
A(
display_id,
href=f"/animals/{animal.animal_id}",
cls="text-amber-500 hover:underline",
),
title=animal.animal_id,
),
Td(animal.species_code),
Td(animal.sex),
Td(animal.life_stage.replace("_", " ")),
Td(animal.location_name),
Td(tags_str, cls="text-xs text-stone-400"),
Td(last_event_str, cls="text-xs text-stone-400"),
Td(
Span(
animal.status,
cls=f"text-xs px-2 py-0.5 rounded {status_cls}",
)
),
)
def animal_table_rows(
animals: list[AnimalListItem],
next_cursor: str | None,
filter_str: str,
) -> list:
"""Just table rows for HTMX append (no wrapper).
Args:
animals: List of animals for the current page.
next_cursor: Cursor for the next page (if more exist).
filter_str: Current filter string.
Returns:
List of Tr components.
"""
rows = [animal_row(a) for a in animals]
if next_cursor:
rows.append(load_more_sentinel(next_cursor, filter_str))
return rows
def load_more_sentinel(cursor: str, filter_str: str) -> Tr:
"""Sentinel row that triggers infinite scroll.
Args:
cursor: Cursor for the next page.
filter_str: Current filter string.
Returns:
Tr component with HTMX attributes for lazy loading.
"""
params = {"cursor": cursor}
if filter_str:
params["filter"] = filter_str
url = f"/registry?{urlencode(params)}"
return Tr(
Td(
Div(
"Loading more...",
cls="text-center text-stone-400 py-4",
),
colspan="8",
),
hx_get=url,
hx_trigger="revealed",
hx_swap="outerHTML",
id="load-more-sentinel",
)

View File

@@ -0,0 +1,308 @@
# ABOUTME: Tests for AnimalRepository - list_animals and get_facet_counts.
# ABOUTME: Covers filtering, pagination, and facet aggregation.
import time
import pytest
from animaltrack.events.payloads import AnimalCohortCreatedPayload
from animaltrack.events.store import EventStore
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.repositories.animals import AnimalListItem, AnimalRepository, PaginatedResult
from animaltrack.services.animal import AnimalService
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with animal projections registered."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, event_store, projection_registry):
"""Create an AnimalService for testing."""
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def valid_location_id(seeded_db):
"""Get Strip 1 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
@pytest.fixture
def strip2_location_id(seeded_db):
"""Get Strip 2 location ID from seeds."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
return row[0]
def make_cohort_payload(
location_id: str,
count: int = 3,
species: str = "duck",
sex: str = "unknown",
life_stage: str = "adult",
) -> AnimalCohortCreatedPayload:
"""Create a cohort payload for testing."""
return AnimalCohortCreatedPayload(
species=species,
count=count,
life_stage=life_stage,
sex=sex,
location_id=location_id,
origin="purchased",
)
class TestAnimalRepositoryListAnimals:
"""Tests for list_animals method."""
def test_returns_empty_when_no_animals(self, seeded_db):
"""list_animals returns empty list when no animals exist."""
repo = AnimalRepository(seeded_db)
result = repo.list_animals()
assert isinstance(result, PaginatedResult)
assert result.items == []
assert result.next_cursor is None
assert result.total_count == 0
def test_returns_animals_as_list_items(self, seeded_db, animal_service, valid_location_id):
"""list_animals returns AnimalListItem objects."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
animal_service.create_cohort(payload, ts_utc, "test_user")
repo = AnimalRepository(seeded_db)
result = repo.list_animals()
assert len(result.items) == 3
for item in result.items:
assert isinstance(item, AnimalListItem)
assert item.species_code == "duck"
assert item.status == "alive"
assert item.location_name == "Strip 1"
def test_default_filters_to_alive_only(self, seeded_db, animal_service, valid_location_id):
"""list_animals defaults to showing only alive animals."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Kill one animal
dead_id = ids[0]
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(dead_id,),
)
repo = AnimalRepository(seeded_db)
result = repo.list_animals()
assert len(result.items) == 2
assert all(item.status == "alive" for item in result.items)
def test_filters_by_species(self, seeded_db, animal_service, valid_location_id):
"""list_animals with species filter returns only matching species."""
ts_utc = int(time.time() * 1000)
# Create ducks
duck_payload = make_cohort_payload(valid_location_id, count=3, species="duck")
animal_service.create_cohort(duck_payload, ts_utc, "test_user")
# Create geese
goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose")
animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
repo = AnimalRepository(seeded_db)
result = repo.list_animals(filter_str="species:duck")
assert len(result.items) == 3
assert all(item.species_code == "duck" for item in result.items)
def test_filters_by_status_dead(self, seeded_db, animal_service, valid_location_id):
"""list_animals with status:dead returns only dead animals."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Kill one animal
dead_id = ids[0]
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(dead_id,),
)
repo = AnimalRepository(seeded_db)
result = repo.list_animals(filter_str="status:dead")
assert len(result.items) == 1
assert result.items[0].animal_id == dead_id
def test_orders_by_last_event_descending(self, seeded_db, animal_service, valid_location_id):
"""list_animals returns animals ordered by last_event_utc DESC."""
ts_utc = int(time.time() * 1000)
# Create first cohort
payload1 = make_cohort_payload(valid_location_id, count=1)
event1 = animal_service.create_cohort(payload1, ts_utc, "test_user")
# Create second cohort later
payload2 = make_cohort_payload(valid_location_id, count=1)
event2 = animal_service.create_cohort(payload2, ts_utc + 1000, "test_user")
repo = AnimalRepository(seeded_db)
result = repo.list_animals()
assert len(result.items) == 2
# Second animal (later event) should come first
assert result.items[0].animal_id == event2.entity_refs["animal_ids"][0]
assert result.items[1].animal_id == event1.entity_refs["animal_ids"][0]
class TestAnimalRepositoryPagination:
"""Tests for cursor-based pagination."""
def test_limits_results_to_page_size(self, seeded_db, animal_service, valid_location_id):
"""list_animals returns at most PAGE_SIZE items."""
ts_utc = int(time.time() * 1000)
# Create more animals than page size
payload = make_cohort_payload(valid_location_id, count=60)
animal_service.create_cohort(payload, ts_utc, "test_user")
repo = AnimalRepository(seeded_db)
result = repo.list_animals()
assert len(result.items) == 50 # PAGE_SIZE
assert result.next_cursor is not None
def test_cursor_returns_next_page(self, seeded_db, animal_service, valid_location_id):
"""Using cursor returns the next page of results."""
ts_utc = int(time.time() * 1000)
# Create 60 animals
payload = make_cohort_payload(valid_location_id, count=60)
animal_service.create_cohort(payload, ts_utc, "test_user")
repo = AnimalRepository(seeded_db)
first_page = repo.list_animals()
second_page = repo.list_animals(cursor=first_page.next_cursor)
assert len(first_page.items) == 50
assert len(second_page.items) == 10
assert second_page.next_cursor is None
# No overlap between pages
first_ids = {item.animal_id for item in first_page.items}
second_ids = {item.animal_id for item in second_page.items}
assert first_ids.isdisjoint(second_ids)
def test_total_count_is_independent_of_pagination(
self, seeded_db, animal_service, valid_location_id
):
"""total_count reflects all matching animals, not just current page."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=60)
animal_service.create_cohort(payload, ts_utc, "test_user")
repo = AnimalRepository(seeded_db)
result = repo.list_animals()
assert result.total_count == 60
assert len(result.items) == 50
class TestAnimalRepositoryFacetCounts:
"""Tests for get_facet_counts method."""
def test_returns_counts_by_status(self, seeded_db, animal_service, valid_location_id):
"""get_facet_counts returns status breakdown."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Kill one
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(ids[0],),
)
repo = AnimalRepository(seeded_db)
facets = repo.get_facet_counts()
assert facets.by_status["alive"] == 2
assert facets.by_status["dead"] == 1
def test_returns_counts_by_species(self, seeded_db, animal_service, valid_location_id):
"""get_facet_counts returns species breakdown."""
ts_utc = int(time.time() * 1000)
duck_payload = make_cohort_payload(valid_location_id, count=3, species="duck")
animal_service.create_cohort(duck_payload, ts_utc, "test_user")
goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose")
animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
repo = AnimalRepository(seeded_db)
facets = repo.get_facet_counts()
assert facets.by_species["duck"] == 3
assert facets.by_species["goose"] == 2
def test_returns_counts_by_location(
self, seeded_db, animal_service, valid_location_id, strip2_location_id
):
"""get_facet_counts returns location breakdown."""
ts_utc = int(time.time() * 1000)
# Create at Strip 1
payload1 = make_cohort_payload(valid_location_id, count=3)
animal_service.create_cohort(payload1, ts_utc, "test_user")
# Create at Strip 2
payload2 = make_cohort_payload(strip2_location_id, count=2)
animal_service.create_cohort(payload2, ts_utc + 1, "test_user")
repo = AnimalRepository(seeded_db)
facets = repo.get_facet_counts()
assert facets.by_location[valid_location_id] == 3
assert facets.by_location[strip2_location_id] == 2
def test_facets_respect_filter(self, seeded_db, animal_service, valid_location_id):
"""get_facet_counts respects the filter parameter."""
ts_utc = int(time.time() * 1000)
duck_payload = make_cohort_payload(valid_location_id, count=3, species="duck", sex="female")
animal_service.create_cohort(duck_payload, ts_utc, "test_user")
goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose", sex="male")
animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
repo = AnimalRepository(seeded_db)
# Get facets for ducks only
facets = repo.get_facet_counts(filter_str="species:duck")
# Only ducks in the counts
assert facets.by_species == {"duck": 3}
assert facets.by_sex == {"female": 3}

View File

@@ -27,13 +27,52 @@ class TestSimpleFilters:
def test_all_supported_fields(self) -> None:
"""All supported fields should parse."""
result = parse_filter("location:strip1 species:duck sex:male life_stage:adult tag:healthy")
assert len(result.filters) == 5
result = parse_filter(
"location:strip1 species:duck sex:male life_stage:adult tag:healthy status:alive"
)
assert len(result.filters) == 6
assert result.filters[0].field == "location"
assert result.filters[1].field == "species"
assert result.filters[2].field == "sex"
assert result.filters[3].field == "life_stage"
assert result.filters[4].field == "tag"
assert result.filters[5].field == "status"
class TestStatusField:
"""Test the status field for filtering by animal status."""
def test_status_alive(self) -> None:
"""status:alive -> filter by alive status."""
result = parse_filter("status:alive")
assert result == FilterAST([FieldFilter("status", ["alive"])])
def test_status_multiple_values(self) -> None:
"""status:alive|dead -> OR of statuses."""
result = parse_filter("status:alive|dead")
assert result == FilterAST([FieldFilter("status", ["alive", "dead"])])
def test_status_all_values(self) -> None:
"""All status values should parse."""
result = parse_filter("status:alive|dead|harvested|sold|merged_into")
assert result == FilterAST(
[FieldFilter("status", ["alive", "dead", "harvested", "sold", "merged_into"])]
)
def test_status_negated(self) -> None:
"""-status:dead -> exclude dead animals."""
result = parse_filter("-status:dead")
assert result == FilterAST([FieldFilter("status", ["dead"], negated=True)])
def test_status_combined_with_other_fields(self) -> None:
"""status:alive species:duck -> combine status with other filters."""
result = parse_filter("status:alive species:duck")
assert result == FilterAST(
[
FieldFilter("status", ["alive"]),
FieldFilter("species", ["duck"]),
]
)
class TestOrSyntax:

View File

@@ -370,6 +370,179 @@ class TestResolveFilterIdentified:
assert result.animal_ids == expected
class TestResolveFilterStatus:
"""Tests for status filter."""
def test_status_alive_is_default(self, seeded_db, animal_service, valid_location_id):
"""Empty filter returns only alive animals (status:alive is implicit)."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Kill one animal by updating status in attr_intervals
dead_id = ids[0]
seeded_db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'
""",
(ts_utc + 1, dead_id),
)
seeded_db.execute(
"""
INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', 'dead', ?, NULL)
""",
(dead_id, ts_utc + 1),
)
# Default filter excludes dead
result = resolve_filter(seeded_db, FilterAST([]), ts_utc + 2)
assert dead_id not in result.animal_ids
assert len(result.animal_ids) == 2
def test_status_dead_returns_dead_animals(self, seeded_db, animal_service, valid_location_id):
"""status:dead returns only dead animals."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Kill one animal
dead_id = ids[0]
seeded_db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'
""",
(ts_utc + 1, dead_id),
)
seeded_db.execute(
"""
INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', 'dead', ?, NULL)
""",
(dead_id, ts_utc + 1),
)
# status:dead filter returns only dead
filter_ast = FilterAST([FieldFilter("status", ["dead"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert result.animal_ids == [dead_id]
def test_status_alive_or_dead_returns_both(self, seeded_db, animal_service, valid_location_id):
"""status:alive|dead returns both alive and dead animals."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Kill one animal
dead_id = ids[0]
seeded_db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'
""",
(ts_utc + 1, dead_id),
)
seeded_db.execute(
"""
INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', 'dead', ?, NULL)
""",
(dead_id, ts_utc + 1),
)
# status:alive|dead filter returns all
filter_ast = FilterAST([FieldFilter("status", ["alive", "dead"])])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert sorted(result.animal_ids) == sorted(ids)
def test_status_negated_excludes(self, seeded_db, animal_service, valid_location_id):
"""-status:dead excludes dead animals (same as default)."""
ts_utc = int(time.time() * 1000)
payload = make_cohort_payload(valid_location_id, count=3)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
ids = event.entity_refs["animal_ids"]
# Kill one animal
dead_id = ids[0]
seeded_db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'
""",
(ts_utc + 1, dead_id),
)
seeded_db.execute(
"""
INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', 'dead', ?, NULL)
""",
(dead_id, ts_utc + 1),
)
# -status:dead uses alive as base, then excludes dead (effectively same as default)
filter_ast = FilterAST([FieldFilter("status", ["dead"], negated=True)])
result = resolve_filter(seeded_db, filter_ast, ts_utc + 2)
assert dead_id not in result.animal_ids
def test_status_combined_with_species(self, seeded_db, animal_service, valid_location_id):
"""status:dead species:duck returns only dead ducks."""
ts_utc = int(time.time() * 1000)
# Create ducks
duck_payload = make_cohort_payload(valid_location_id, count=2, species="duck")
duck_event = animal_service.create_cohort(duck_payload, ts_utc, "test_user")
duck_ids = duck_event.entity_refs["animal_ids"]
# Create geese
goose_payload = make_cohort_payload(valid_location_id, count=2, species="goose")
goose_event = animal_service.create_cohort(goose_payload, ts_utc + 1, "test_user")
goose_ids = goose_event.entity_refs["animal_ids"]
# Kill one duck and one goose
dead_duck_id = duck_ids[0]
dead_goose_id = goose_ids[0]
for dead_id in [dead_duck_id, dead_goose_id]:
seeded_db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND value = 'alive'
""",
(ts_utc + 2, dead_id),
)
seeded_db.execute(
"""
INSERT INTO animal_attr_intervals (animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', 'dead', ?, NULL)
""",
(dead_id, ts_utc + 2),
)
# status:dead species:duck returns only dead duck
filter_ast = FilterAST(
[
FieldFilter("status", ["dead"]),
FieldFilter("species", ["duck"]),
]
)
result = resolve_filter(seeded_db, filter_ast, ts_utc + 3)
assert result.animal_ids == [dead_duck_id]
class TestResolveFilterNegation:
"""Tests for negated filters."""

300
tests/test_web_registry.py Normal file
View File

@@ -0,0 +1,300 @@
# ABOUTME: Tests for Animal Registry web routes.
# ABOUTME: Covers GET /registry rendering, filtering, pagination, and HTMX infinite scroll.
import os
import time
import pytest
from starlette.testclient import TestClient
from animaltrack.events.payloads import AnimalCohortCreatedPayload
from animaltrack.events.store import EventStore
from animaltrack.projections import ProjectionRegistry
from animaltrack.projections.animal_registry import AnimalRegistryProjection
from animaltrack.projections.event_animals import EventAnimalsProjection
from animaltrack.projections.intervals import IntervalProjection
from animaltrack.services.animal import AnimalService
def make_test_settings(
csrf_secret: str = "test-secret",
trusted_proxy_ips: str = "127.0.0.1",
dev_mode: bool = True,
):
"""Create Settings for testing by setting env vars temporarily."""
from animaltrack.config import Settings
old_env = os.environ.copy()
try:
os.environ["CSRF_SECRET"] = csrf_secret
os.environ["TRUSTED_PROXY_IPS"] = trusted_proxy_ips
os.environ["DEV_MODE"] = str(dev_mode).lower()
return Settings()
finally:
os.environ.clear()
os.environ.update(old_env)
@pytest.fixture
def client(seeded_db):
"""Create a test client for the app."""
from animaltrack.web.app import create_app
settings = make_test_settings(trusted_proxy_ips="testclient")
app, rt = create_app(settings=settings, db=seeded_db)
return TestClient(app, raise_server_exceptions=True)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with animal projections registered."""
registry = ProjectionRegistry()
registry.register(AnimalRegistryProjection(seeded_db))
registry.register(EventAnimalsProjection(seeded_db))
registry.register(IntervalProjection(seeded_db))
return registry
@pytest.fixture
def animal_service(seeded_db, projection_registry):
"""Create an AnimalService for testing."""
event_store = EventStore(seeded_db)
return AnimalService(seeded_db, event_store, projection_registry)
@pytest.fixture
def location_strip1_id(seeded_db):
"""Get Strip 1 location ID from seeded data."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()
return row[0]
@pytest.fixture
def location_strip2_id(seeded_db):
"""Get Strip 2 location ID from seeded data."""
row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()
return row[0]
@pytest.fixture
def ducks_at_strip1(seeded_db, animal_service, location_strip1_id):
"""Create 5 female ducks at Strip 1."""
payload = AnimalCohortCreatedPayload(
species="duck",
count=5,
life_stage="adult",
sex="female",
location_id=location_strip1_id,
origin="purchased",
)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
return event.entity_refs["animal_ids"]
@pytest.fixture
def geese_at_strip2(seeded_db, animal_service, location_strip2_id):
"""Create 3 male geese at Strip 2."""
payload = AnimalCohortCreatedPayload(
species="goose",
count=3,
life_stage="adult",
sex="male",
location_id=location_strip2_id,
origin="purchased",
)
ts_utc = int(time.time() * 1000)
event = animal_service.create_cohort(payload, ts_utc, "test_user")
return event.entity_refs["animal_ids"]
class TestRegistryRendering:
"""Tests for GET /registry page rendering."""
def test_registry_renders_empty_state(self, client):
"""GET /registry returns 200 with no animals."""
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "Animal Registry" in response.text
assert "0 animals" in response.text
def test_registry_renders_with_animals(self, client, ducks_at_strip1):
"""GET /registry shows animals in table."""
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "5 animals" in response.text
assert "duck" in response.text.lower()
assert "Strip 1" in response.text
def test_registry_shows_facet_sidebar(self, client, ducks_at_strip1, geese_at_strip2):
"""Facet sidebar shows species/status counts."""
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
# Check facet sections exist
assert "Status" in response.text
assert "Species" in response.text
assert "Location" in response.text
def test_registry_has_filter_input(self, client):
"""Page has filter input field."""
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert 'name="filter"' in response.text
def test_registry_is_in_nav(self, client):
"""Registry link is in navigation."""
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert 'href="/registry"' in response.text
class TestRegistryFiltering:
"""Tests for filter application."""
def test_filter_by_species(self, client, ducks_at_strip1, geese_at_strip2):
"""Filter species:duck shows only ducks."""
response = client.get(
"/registry?filter=species:duck",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "5 animals" in response.text
# Should not show geese count in the main content area
def test_filter_by_location(self, client, ducks_at_strip1, geese_at_strip2):
"""Filter location:'Strip 1' shows only Strip 1 animals."""
response = client.get(
'/registry?filter=location:"Strip 1"',
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "5 animals" in response.text
def test_filter_by_status_all(self, client, seeded_db, ducks_at_strip1):
"""Filter status:alive|dead shows both statuses."""
# Kill one duck
dead_id = ducks_at_strip1[0]
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(dead_id,),
)
response = client.get(
"/registry?filter=status:alive|dead",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "5 animals" in response.text # All 5 shown
def test_default_shows_only_alive(self, client, seeded_db, ducks_at_strip1):
"""Default view excludes dead animals."""
# Kill one duck
dead_id = ducks_at_strip1[0]
seeded_db.execute(
"UPDATE animal_registry SET status = 'dead' WHERE animal_id = ?",
(dead_id,),
)
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "4 animals" in response.text # Only alive shown
class TestRegistryPagination:
"""Tests for infinite scroll pagination."""
def test_first_page_has_sentinel_when_more_exist(
self, client, seeded_db, animal_service, location_strip1_id
):
"""First page includes load-more sentinel when more exist."""
# Create 60 animals (more than page size of 50)
payload = AnimalCohortCreatedPayload(
species="duck",
count=60,
life_stage="adult",
sex="female",
location_id=location_strip1_id,
origin="purchased",
)
ts_utc = int(time.time() * 1000)
animal_service.create_cohort(payload, ts_utc, "test_user")
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "60 animals" in response.text
assert "load-more-sentinel" in response.text
assert "hx-get" in response.text
def test_htmx_request_returns_rows_only(
self, client, seeded_db, animal_service, location_strip1_id
):
"""HTMX requests with cursor return just table rows."""
# Create 60 animals
payload = AnimalCohortCreatedPayload(
species="duck",
count=60,
life_stage="adult",
sex="female",
location_id=location_strip1_id,
origin="purchased",
)
ts_utc = int(time.time() * 1000)
animal_service.create_cohort(payload, ts_utc, "test_user")
# Get first page to get cursor
first_response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert first_response.status_code == 200
# Extract cursor from response (it's in the hx-get attribute)
import re
cursor_match = re.search(r'cursor=([^&"]+)', first_response.text)
assert cursor_match, "Cursor not found in response"
cursor = cursor_match.group(1)
# Make HTMX request with cursor
htmx_response = client.get(
f"/registry?cursor={cursor}",
headers={
"X-Oidc-Username": "test_user",
"HX-Request": "true",
},
)
assert htmx_response.status_code == 200
# Should be just table rows, not full page
assert "Animal Registry" not in htmx_response.text
assert "<tr" in htmx_response.text
def test_last_page_has_no_sentinel(self, client, ducks_at_strip1):
"""Last page has no load-more sentinel when all items fit."""
response = client.get(
"/registry",
headers={"X-Oidc-Username": "test_user"},
)
assert response.status_code == 200
assert "5 animals" in response.text
assert "load-more-sentinel" not in response.text