# ABOUTME: Tests for LocationService operations. # ABOUTME: Tests create_location, rename_location, and archive_location with event emission. import time import pytest from animaltrack.events.store import EventStore from animaltrack.events.types import LOCATION_ARCHIVED, LOCATION_CREATED, LOCATION_RENAMED from animaltrack.projections import ProjectionRegistry from animaltrack.services.location import LocationService, ValidationError @pytest.fixture def event_store(seeded_db): """Create an EventStore for testing.""" return EventStore(seeded_db) @pytest.fixture def projection_registry(seeded_db): """Create a ProjectionRegistry with location projections registered.""" from animaltrack.projections.location import LocationProjection registry = ProjectionRegistry() registry.register(LocationProjection(seeded_db)) return registry @pytest.fixture def location_service(seeded_db, event_store, projection_registry): """Create a LocationService for testing.""" return LocationService(seeded_db, event_store, projection_registry) # ============================================================================= # create_location Tests # ============================================================================= class TestLocationServiceCreate: """Tests for create_location().""" def test_creates_location_created_event(self, seeded_db, location_service): """create_location creates a LocationCreated event.""" ts_utc = int(time.time() * 1000) event = location_service.create_location("New Location", ts_utc, "test_user") assert event.type == LOCATION_CREATED assert event.actor == "test_user" assert event.ts_utc == ts_utc def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service): """Event entity_refs contains location_id.""" ts_utc = int(time.time() * 1000) event = location_service.create_location("Another Location", ts_utc, "test_user") assert "location_id" in event.entity_refs assert len(event.entity_refs["location_id"]) == 26 # ULID length def test_event_payload_contains_name(self, seeded_db, location_service): """Event payload contains the location name.""" ts_utc = int(time.time() * 1000) event = location_service.create_location("My Location", ts_utc, "test_user") assert event.payload["name"] == "My Location" def test_inserts_into_locations_table(self, seeded_db, location_service): """create_location inserts a new row in locations table.""" ts_utc = int(time.time() * 1000) event = location_service.create_location("Test Location", ts_utc, "test_user") row = seeded_db.execute( "SELECT id, name, active FROM locations WHERE id = ?", (event.entity_refs["location_id"],), ).fetchone() assert row is not None assert row[1] == "Test Location" assert row[2] == 1 # active=True def test_returns_existing_location_if_name_exists(self, seeded_db, location_service): """create_location returns None if location with same name already exists.""" ts_utc = int(time.time() * 1000) # Create first location event1 = location_service.create_location("Unique Name", ts_utc, "test_user") assert event1 is not None # Try to create duplicate - should return None (idempotent) event2 = location_service.create_location("Unique Name", ts_utc + 1000, "test_user") assert event2 is None def test_idempotent_skips_if_event_exists_for_location(self, seeded_db, location_service): """create_location is idempotent - skips if LocationCreated event exists.""" ts_utc = int(time.time() * 1000) # Create the location once event1 = location_service.create_location("Idempotent Test", ts_utc, "test_user") location_id = event1.entity_refs["location_id"] # Count events for this location event_count_before = seeded_db.execute( "SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?", (LOCATION_CREATED, f"%{location_id}%"), ).fetchone()[0] # Calling create again with same name should not create a new event event2 = location_service.create_location("Idempotent Test", ts_utc + 1000, "test_user") event_count_after = seeded_db.execute( "SELECT COUNT(*) FROM events WHERE type = ? AND entity_refs LIKE ?", (LOCATION_CREATED, f"%{location_id}%"), ).fetchone()[0] assert event2 is None assert event_count_after == event_count_before class TestLocationServiceCreateValidation: """Tests for create_location() validation.""" def test_rejects_empty_name(self, seeded_db, location_service): """Raises ValidationError for empty name.""" with pytest.raises(ValidationError, match="name"): location_service.create_location("", int(time.time() * 1000), "test_user") def test_rejects_whitespace_only_name(self, seeded_db, location_service): """Raises ValidationError for whitespace-only name.""" with pytest.raises(ValidationError, match="name"): location_service.create_location(" ", int(time.time() * 1000), "test_user") # ============================================================================= # rename_location Tests # ============================================================================= class TestLocationServiceRename: """Tests for rename_location().""" def test_creates_location_renamed_event(self, seeded_db, location_service): """rename_location creates a LocationRenamed event.""" ts_utc = int(time.time() * 1000) # First create a location create_event = location_service.create_location("Original Name", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] # Rename it event = location_service.rename_location( location_id, "New Name", ts_utc + 1000, "test_user" ) assert event.type == LOCATION_RENAMED assert event.actor == "test_user" assert event.ts_utc == ts_utc + 1000 def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service): """Event entity_refs contains location_id.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Rename Test", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] event = location_service.rename_location(location_id, "Renamed", ts_utc + 1000, "test_user") assert event.entity_refs["location_id"] == location_id def test_event_payload_contains_new_name(self, seeded_db, location_service): """Event payload contains the new name.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Before", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] event = location_service.rename_location(location_id, "After", ts_utc + 1000, "test_user") assert event.payload["new_name"] == "After" def test_updates_locations_table_name(self, seeded_db, location_service): """rename_location updates the name in locations table.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Old Name", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] location_service.rename_location(location_id, "Updated Name", ts_utc + 1000, "test_user") row = seeded_db.execute( "SELECT name FROM locations WHERE id = ?", (location_id,) ).fetchone() assert row[0] == "Updated Name" class TestLocationServiceRenameValidation: """Tests for rename_location() validation.""" def test_rejects_nonexistent_location(self, seeded_db, location_service): """Raises ValidationError for non-existent location_id.""" fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV" with pytest.raises(ValidationError, match="not found"): location_service.rename_location( fake_id, "New Name", int(time.time() * 1000), "test_user" ) def test_rejects_duplicate_name(self, seeded_db, location_service): """Raises ValidationError when renaming to an existing name.""" ts_utc = int(time.time() * 1000) # Create two locations location_service.create_location("First Location", ts_utc, "test_user") create_event2 = location_service.create_location( "Second Location", ts_utc + 1000, "test_user" ) location_id = create_event2.entity_refs["location_id"] # Try to rename second to first's name with pytest.raises(ValidationError, match="already exists"): location_service.rename_location( location_id, "First Location", ts_utc + 2000, "test_user" ) def test_rejects_empty_new_name(self, seeded_db, location_service): """Raises ValidationError for empty new_name.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Test", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] with pytest.raises(ValidationError, match="name"): location_service.rename_location(location_id, "", ts_utc + 1000, "test_user") def test_allows_rename_to_same_name(self, seeded_db, location_service): """Allows renaming to the same name (no-op).""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Same Name", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] # Should not raise - renaming to same name is allowed (no-op) event = location_service.rename_location( location_id, "Same Name", ts_utc + 1000, "test_user" ) assert event.type == LOCATION_RENAMED # ============================================================================= # archive_location Tests # ============================================================================= class TestLocationServiceArchive: """Tests for archive_location().""" def test_creates_location_archived_event(self, seeded_db, location_service): """archive_location creates a LocationArchived event.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("To Archive", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] event = location_service.archive_location(location_id, ts_utc + 1000, "test_user") assert event.type == LOCATION_ARCHIVED assert event.actor == "test_user" assert event.ts_utc == ts_utc + 1000 def test_event_has_location_id_in_entity_refs(self, seeded_db, location_service): """Event entity_refs contains location_id.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Archive Test", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] event = location_service.archive_location(location_id, ts_utc + 1000, "test_user") assert event.entity_refs["location_id"] == location_id def test_sets_location_inactive(self, seeded_db, location_service): """archive_location sets active=0 in locations table.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Active Location", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] # Verify initially active row = seeded_db.execute( "SELECT active FROM locations WHERE id = ?", (location_id,) ).fetchone() assert row[0] == 1 # Archive it location_service.archive_location(location_id, ts_utc + 1000, "test_user") # Verify now inactive row = seeded_db.execute( "SELECT active FROM locations WHERE id = ?", (location_id,) ).fetchone() assert row[0] == 0 def test_location_not_in_list_active(self, seeded_db, location_service): """Archived location does not appear in list_active().""" from animaltrack.repositories.locations import LocationRepository ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Will Be Archived", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] # Before archive - should be in list active_before = LocationRepository(seeded_db).list_active() assert any(loc.id == location_id for loc in active_before) # Archive it location_service.archive_location(location_id, ts_utc + 1000, "test_user") # After archive - should not be in list active_after = LocationRepository(seeded_db).list_active() assert not any(loc.id == location_id for loc in active_after) class TestLocationServiceArchiveValidation: """Tests for archive_location() validation.""" def test_rejects_nonexistent_location(self, seeded_db, location_service): """Raises ValidationError for non-existent location_id.""" fake_id = "01ARZ3NDEKTSV4RRFFQ69G5FAV" with pytest.raises(ValidationError, match="not found"): location_service.archive_location(fake_id, int(time.time() * 1000), "test_user") def test_rejects_already_archived_location(self, seeded_db, location_service): """Raises ValidationError when archiving an already archived location.""" ts_utc = int(time.time() * 1000) create_event = location_service.create_location("Already Archived", ts_utc, "test_user") location_id = create_event.entity_refs["location_id"] # Archive once location_service.archive_location(location_id, ts_utc + 1000, "test_user") # Try to archive again with pytest.raises(ValidationError, match="already archived"): location_service.archive_location(location_id, ts_utc + 2000, "test_user")