- Add LocationService with create/rename/archive methods - Add LocationProjection for event handling - Add admin-only location management routes at /locations - Add error response helpers (error_response, error_toast, success_toast) - Add toast handler JS to base template for HX-Trigger notifications - Update seeds.py to emit LocationCreated events per spec §23 - Archived locations block animal moves with 422 error 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
344 lines
14 KiB
Python
344 lines
14 KiB
Python
# ABOUTME: Tests for LocationService operations.
|
|
# ABOUTME: Tests create_location, rename_location, and archive_location with event emission.
|
|
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from animaltrack.events.store import EventStore
|
|
from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED
|
|
from animaltrack.projections import ProjectionRegistry
|
|
from animaltrack.services.location import LocationService, ValidationError
|
|
|
|
|
|
@pytest.fixture
|
|
def event_store(seeded_db):
|
|
"""Create an EventStore for testing."""
|
|
return EventStore(seeded_db)
|
|
|
|
|
|
@pytest.fixture
|
|
def projection_registry(seeded_db):
|
|
"""Create a ProjectionRegistry with location projections registered."""
|
|
from animaltrack.projections.location import LocationProjection
|
|
|
|
registry = ProjectionRegistry()
|
|
registry.register(LocationProjection(seeded_db))
|
|
return registry
|
|
|
|
|
|
@pytest.fixture
|
|
def location_service(seeded_db, event_store, projection_registry):
|
|
"""Create a LocationService for testing."""
|
|
return LocationService(seeded_db, event_store, projection_registry)
|
|
|
|
|
|
# =============================================================================
|
|
# create_location Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestLocationServiceCreate:
|
|
"""Tests for create_location()."""
|
|
|
|
def test_creates_location_created_event(self, seeded_db, location_service):
|
|
"""create_location creates a LocationCreated event."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = location_service.create_location("New Location", ts_utc, "test_user")
|
|
|
|
assert event.type == LOCATION_CREATED
|
|
assert event.actor == "test_user"
|
|
assert event.ts_utc == ts_utc
|
|
|
|
def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service):
|
|
"""Event entity_refs contains location_id."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = location_service.create_location("Another Location", ts_utc, "test_user")
|
|
|
|
assert "location_id" in event.entity_refs
|
|
assert len(event.entity_refs["location_id"]) == 26 # ULID length
|
|
|
|
def test_event_payload_contains_name(self, seeded_db, location_service):
|
|
"""Event payload contains the location name."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = location_service.create_location("My Location", ts_utc, "test_user")
|
|
|
|
assert event.payload["name"] == "My Location"
|
|
|
|
def test_inserts_into_locations_table(self, seeded_db, location_service):
|
|
"""create_location inserts a new row in locations table."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
event = location_service.create_location("Test Location", ts_utc, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT id, name, active FROM locations WHERE id = ?",
|
|
(event.entity_refs["location_id"],),
|
|
).fetchone()
|
|
|
|
assert row is not None
|
|
assert row[1] == "Test Location"
|
|
assert row[2] == 1 # active=True
|
|
|
|
def test_returns_existing_location_if_name_exists(self, seeded_db, location_service):
|
|
"""create_location returns None if location with same name already exists."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
# Create first location
|
|
event1 = location_service.create_location("Unique Name", ts_utc, "test_user")
|
|
assert event1 is not None
|
|
|
|
# Try to create duplicate - should return None (idempotent)
|
|
event2 = location_service.create_location("Unique Name", ts_utc + 1000, "test_user")
|
|
|
|
assert event2 is None
|
|
|
|
def test_idempotent_skips_if_event_exists_for_location(self, seeded_db, location_service):
|
|
"""create_location is idempotent - skips if LocationCreated event exists."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
# Create the location once
|
|
event1 = location_service.create_location("Idempotent Test", ts_utc, "test_user")
|
|
location_id = event1.entity_refs["location_id"]
|
|
|
|
# Count events for this location
|
|
event_count_before = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?",
|
|
(LOCATION_CREATED, f"%{location_id}%"),
|
|
).fetchone()[0]
|
|
|
|
# Calling create again with same name should not create a new event
|
|
event2 = location_service.create_location("Idempotent Test", ts_utc + 1000, "test_user")
|
|
|
|
event_count_after = seeded_db.execute(
|
|
"SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?",
|
|
(LOCATION_CREATED, f"%{location_id}%"),
|
|
).fetchone()[0]
|
|
|
|
assert event2 is None
|
|
assert event_count_after == event_count_before
|
|
|
|
|
|
class TestLocationServiceCreateValidation:
|
|
"""Tests for create_location() validation."""
|
|
|
|
def test_rejects_empty_name(self, seeded_db, location_service):
|
|
"""Raises ValidationError for empty name."""
|
|
with pytest.raises(ValidationError, match="name"):
|
|
location_service.create_location("", int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_whitespace_only_name(self, seeded_db, location_service):
|
|
"""Raises ValidationError for whitespace-only name."""
|
|
with pytest.raises(ValidationError, match="name"):
|
|
location_service.create_location(" ", int(time.time() * 1000), "test_user")
|
|
|
|
|
|
# =============================================================================
|
|
# rename_location Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestLocationServiceRename:
|
|
"""Tests for rename_location()."""
|
|
|
|
def test_creates_location_renamed_event(self, seeded_db, location_service):
|
|
"""rename_location creates a LocationRenamed event."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
# First create a location
|
|
create_event = location_service.create_location("Original Name", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
# Rename it
|
|
event = location_service.rename_location(
|
|
location_id, "New Name", ts_utc + 1000, "test_user"
|
|
)
|
|
|
|
assert event.type == LOCATION_RENAMED
|
|
assert event.actor == "test_user"
|
|
assert event.ts_utc == ts_utc + 1000
|
|
|
|
def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service):
|
|
"""Event entity_refs contains location_id."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Rename Test", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
event = location_service.rename_location(location_id, "Renamed", ts_utc + 1000, "test_user")
|
|
|
|
assert event.entity_refs["location_id"] == location_id
|
|
|
|
def test_event_payload_contains_new_name(self, seeded_db, location_service):
|
|
"""Event payload contains the new name."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Before", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
event = location_service.rename_location(location_id, "After", ts_utc + 1000, "test_user")
|
|
|
|
assert event.payload["new_name"] == "After"
|
|
|
|
def test_updates_locations_table_name(self, seeded_db, location_service):
|
|
"""rename_location updates the name in locations table."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Old Name", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
location_service.rename_location(location_id, "Updated Name", ts_utc + 1000, "test_user")
|
|
|
|
row = seeded_db.execute(
|
|
"SELECT name FROM locations WHERE id = ?", (location_id,)
|
|
).fetchone()
|
|
|
|
assert row[0] == "Updated Name"
|
|
|
|
|
|
class TestLocationServiceRenameValidation:
|
|
"""Tests for rename_location() validation."""
|
|
|
|
def test_rejects_nonexistent_location(self, seeded_db, location_service):
|
|
"""Raises ValidationError for non-existent location_id."""
|
|
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
location_service.rename_location(
|
|
fake_id, "New Name", int(time.time() * 1000), "test_user"
|
|
)
|
|
|
|
def test_rejects_duplicate_name(self, seeded_db, location_service):
|
|
"""Raises ValidationError when renaming to an existing name."""
|
|
ts_utc = int(time.time() * 1000)
|
|
|
|
# Create two locations
|
|
location_service.create_location("First Location", ts_utc, "test_user")
|
|
create_event2 = location_service.create_location(
|
|
"Second Location", ts_utc + 1000, "test_user"
|
|
)
|
|
location_id = create_event2.entity_refs["location_id"]
|
|
|
|
# Try to rename second to first's name
|
|
with pytest.raises(ValidationError, match="already exists"):
|
|
location_service.rename_location(
|
|
location_id, "First Location", ts_utc + 2000, "test_user"
|
|
)
|
|
|
|
def test_rejects_empty_new_name(self, seeded_db, location_service):
|
|
"""Raises ValidationError for empty new_name."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Test", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
with pytest.raises(ValidationError, match="name"):
|
|
location_service.rename_location(location_id, "", ts_utc + 1000, "test_user")
|
|
|
|
def test_allows_rename_to_same_name(self, seeded_db, location_service):
|
|
"""Allows renaming to the same name (no-op)."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Same Name", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
# Should not raise - renaming to same name is allowed (no-op)
|
|
event = location_service.rename_location(
|
|
location_id, "Same Name", ts_utc + 1000, "test_user"
|
|
)
|
|
|
|
assert event.type == LOCATION_RENAMED
|
|
|
|
|
|
# =============================================================================
|
|
# archive_location Tests
|
|
# =============================================================================
|
|
|
|
|
|
class TestLocationServiceArchive:
|
|
"""Tests for archive_location()."""
|
|
|
|
def test_creates_location_archived_event(self, seeded_db, location_service):
|
|
"""archive_location creates a LocationArchived event."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("To Archive", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
event = location_service.archive_location(location_id, ts_utc + 1000, "test_user")
|
|
|
|
assert event.type == LOCATION_ARCHIVED
|
|
assert event.actor == "test_user"
|
|
assert event.ts_utc == ts_utc + 1000
|
|
|
|
def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service):
|
|
"""Event entity_refs contains location_id."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Archive Test", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
event = location_service.archive_location(location_id, ts_utc + 1000, "test_user")
|
|
|
|
assert event.entity_refs["location_id"] == location_id
|
|
|
|
def test_sets_location_inactive(self, seeded_db, location_service):
|
|
"""archive_location sets active=0 in locations table."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Active Location", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
# Verify initially active
|
|
row = seeded_db.execute(
|
|
"SELECT active FROM locations WHERE id = ?", (location_id,)
|
|
).fetchone()
|
|
assert row[0] == 1
|
|
|
|
# Archive it
|
|
location_service.archive_location(location_id, ts_utc + 1000, "test_user")
|
|
|
|
# Verify now inactive
|
|
row = seeded_db.execute(
|
|
"SELECT active FROM locations WHERE id = ?", (location_id,)
|
|
).fetchone()
|
|
assert row[0] == 0
|
|
|
|
def test_location_not_in_list_active(self, seeded_db, location_service):
|
|
"""Archived location does not appear in list_active()."""
|
|
from animaltrack.repositories.locations import LocationRepository
|
|
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Will Be Archived", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
# Before archive - should be in list
|
|
active_before = LocationRepository(seeded_db).list_active()
|
|
assert any(loc.id == location_id for loc in active_before)
|
|
|
|
# Archive it
|
|
location_service.archive_location(location_id, ts_utc + 1000, "test_user")
|
|
|
|
# After archive - should not be in list
|
|
active_after = LocationRepository(seeded_db).list_active()
|
|
assert not any(loc.id == location_id for loc in active_after)
|
|
|
|
|
|
class TestLocationServiceArchiveValidation:
|
|
"""Tests for archive_location() validation."""
|
|
|
|
def test_rejects_nonexistent_location(self, seeded_db, location_service):
|
|
"""Raises ValidationError for non-existent location_id."""
|
|
fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV"
|
|
|
|
with pytest.raises(ValidationError, match="not found"):
|
|
location_service.archive_location(fake_id, int(time.time() * 1000), "test_user")
|
|
|
|
def test_rejects_already_archived_location(self, seeded_db, location_service):
|
|
"""Raises ValidationError when archiving an already archived location."""
|
|
ts_utc = int(time.time() * 1000)
|
|
create_event = location_service.create_location("Already Archived", ts_utc, "test_user")
|
|
location_id = create_event.entity_refs["location_id"]
|
|
|
|
# Archive once
|
|
location_service.archive_location(location_id, ts_utc + 1000, "test_user")
|
|
|
|
# Try to archive again
|
|
with pytest.raises(ValidationError, match="already archived"):
|
|
location_service.archive_location(location_id, ts_utc + 2000, "test_user")
|