# ABOUTME: E2E test #8 from spec section 21.8: Optimistic locking with confirm. # ABOUTME: Tests concurrent move handling with selection validation and confirmation flow. import time import pytest from animaltrack.events.payloads import ( AnimalCohortCreatedPayload, AnimalMovedPayload, ) from animaltrack.events.store import EventStore from animaltrack.events.types import ANIMAL_MOVED from animaltrack.selection import compute_roster_hash, parse_filter, resolve_filter from animaltrack.selection.validation import SelectionContext, validate_selection @pytest.fixture def now_utc(): """Current time in milliseconds since epoch.""" return int(time.time() * 1000) @pytest.fixture def full_projection_registry(seeded_db): """Create a ProjectionRegistry with all projections.""" from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.feed import FeedInventoryProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.projections.products import ProductsProjection registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) registry.register(ProductsProjection(seeded_db)) registry.register(FeedInventoryProjection(seeded_db)) return registry @pytest.fixture def services(seeded_db, full_projection_registry): """Create all services needed for E2E test.""" from animaltrack.services.animal import AnimalService event_store = EventStore(seeded_db) return { "db": seeded_db, "event_store": event_store, "registry": full_projection_registry, "animal_service": AnimalService(seeded_db, event_store, full_projection_registry), } @pytest.fixture def strip1_id(seeded_db): """Get Strip 1 location ID from seeds.""" return seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone()[0] @pytest.fixture def strip2_id(seeded_db): """Get Strip 2 location ID from seeds.""" return seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 2'").fetchone()[0] @pytest.fixture def nursery1_id(seeded_db): """Get Nursery 1 location ID from seeds.""" return seeded_db.execute("SELECT id FROM locations WHERE name = 'Nursery 1'").fetchone()[0] @pytest.fixture def optimistic_lock_scenario(seeded_db, services, now_utc, strip1_id, strip2_id): """Set up optimistic lock test scenario. Creates: - 5 adult female ducks at Strip 1 - 3 adult female ducks at Strip 2 Returns dict with animal IDs and references. """ one_day_ms = 24 * 60 * 60 * 1000 animal_creation_ts = now_utc - one_day_ms # Create 5 adult female ducks at Strip 1 cohort1_payload = AnimalCohortCreatedPayload( species="duck", count=5, life_stage="adult", sex="female", location_id=strip1_id, origin="purchased", ) cohort1_event = services["animal_service"].create_cohort( cohort1_payload, animal_creation_ts, "test_user" ) strip1_animal_ids = cohort1_event.entity_refs["animal_ids"] # Create 3 adult female ducks at Strip 2 cohort2_payload = AnimalCohortCreatedPayload( species="duck", count=3, life_stage="adult", sex="female", location_id=strip2_id, origin="purchased", ) cohort2_event = services["animal_service"].create_cohort( cohort2_payload, animal_creation_ts, "test_user" ) strip2_animal_ids = cohort2_event.entity_refs["animal_ids"] return { "strip1_id": strip1_id, "strip2_id": strip2_id, "strip1_animal_ids": strip1_animal_ids, "strip2_animal_ids": strip2_animal_ids, "animal_creation_ts": animal_creation_ts, } class TestE2E8OptimisticLockWithConfirm: """E2E test #8: Optimistic locking with confirm from spec section 21.8. Scenario: - Setup: Strip 1 = 5 female ducks, Strip 2 = 3 female ducks - Client A resolves filter location:"Strip 1" → 5 ids, hash H1 - Client B moves 2 from Strip 1 → Strip 2 (commits) - Client A submits move to Nursery 1 with roster_hash=H1 → 409, diff shows removed:2 - Client A resubmits with confirmed=true → moves remaining 3 to Nursery 1 Final state: - Strip 1 females = 0 - Strip 2 females = 5 (original 3 + 2 moved from Strip 1) - Nursery 1 females = 3 - Two AnimalMoved events logged """ def test_client_a_initial_resolution( self, seeded_db, services, now_utc, strip1_id, optimistic_lock_scenario ): """Client A resolves filter and gets 5 animals with hash.""" filter_str = 'location:"Strip 1"' filter_ast = parse_filter(filter_str) ts_utc = now_utc resolution = resolve_filter(seeded_db, filter_ast, ts_utc) assert len(resolution.animal_ids) == 5 assert resolution.roster_hash != "" def test_concurrent_move_causes_mismatch( self, seeded_db, services, now_utc, strip1_id, strip2_id, nursery1_id, optimistic_lock_scenario, ): """After client B moves 2 animals, client A's hash becomes invalid.""" strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"] # Client A resolves filter at T1 filter_str = 'location:"Strip 1"' filter_ast = parse_filter(filter_str) ts_t1 = now_utc client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1) client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id) assert len(client_a_resolution.animal_ids) == 5 # Client B moves 2 animals from Strip 1 to Strip 2 at T2 ts_t2 = ts_t1 + 1000 move_payload = AnimalMovedPayload( resolved_ids=strip1_animal_ids[:2], to_location_id=strip2_id, ) services["animal_service"].move_animals(move_payload, ts_t2, "client_b") # Client A submits with old hash at T3 (should detect mismatch) ts_t3 = ts_t2 + 1000 context = SelectionContext( filter=filter_str, resolved_ids=list(client_a_resolution.animal_ids), roster_hash=client_a_hash, ts_utc=ts_t3, from_location_id=strip1_id, confirmed=False, ) result = validate_selection(seeded_db, context) # Validation should fail with mismatch assert not result.valid assert result.diff is not None assert len(result.diff.removed) == 2 assert len(result.diff.added) == 0 def test_confirmed_proceeds_with_remaining_animals( self, seeded_db, services, now_utc, strip1_id, strip2_id, nursery1_id, optimistic_lock_scenario, ): """With confirmed=true, move proceeds with current server resolution.""" strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"] # Client A resolves filter at T1 filter_str = 'location:"Strip 1"' filter_ast = parse_filter(filter_str) ts_t1 = now_utc client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1) client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id) # Client B moves 2 animals from Strip 1 to Strip 2 at T2 ts_t2 = ts_t1 + 1000 move_payload = AnimalMovedPayload( resolved_ids=strip1_animal_ids[:2], to_location_id=strip2_id, ) services["animal_service"].move_animals(move_payload, ts_t2, "client_b") # Client A resubmits with confirmed=true at T3 ts_t3 = ts_t2 + 1000 context = SelectionContext( filter=filter_str, resolved_ids=list(client_a_resolution.animal_ids), roster_hash=client_a_hash, ts_utc=ts_t3, from_location_id=strip1_id, confirmed=True, ) result = validate_selection(seeded_db, context) # Validation should pass with confirmed=true assert result.valid # Per route behavior: re-resolve at ts_t3 to get current IDs # (validate_selection returns client's IDs, route re-resolves) current_resolution = resolve_filter(seeded_db, filter_ast, ts_t3) ids_to_move = current_resolution.animal_ids assert len(ids_to_move) == 3 # Now move those 3 to Nursery 1 move_payload = AnimalMovedPayload( resolved_ids=ids_to_move, to_location_id=nursery1_id, ) services["animal_service"].move_animals(move_payload, ts_t3, "client_a") # Verify final state # Strip 1 should have 0 females strip1_count = seeded_db.execute( """SELECT COUNT(*) FROM live_animals_by_location WHERE location_id = ? AND sex = 'female'""", (strip1_id,), ).fetchone()[0] assert strip1_count == 0 def test_final_state_after_both_moves( self, seeded_db, services, now_utc, strip1_id, strip2_id, nursery1_id, optimistic_lock_scenario, ): """Full scenario verifies all final location counts.""" strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"] # Client A resolves filter at T1 filter_str = 'location:"Strip 1"' filter_ast = parse_filter(filter_str) ts_t1 = now_utc client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1) client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id) # Client B moves 2 animals from Strip 1 to Strip 2 at T2 ts_t2 = ts_t1 + 1000 move_payload_b = AnimalMovedPayload( resolved_ids=strip1_animal_ids[:2], to_location_id=strip2_id, ) services["animal_service"].move_animals(move_payload_b, ts_t2, "client_b") # Client A confirms and moves remaining 3 to Nursery 1 at T3 ts_t3 = ts_t2 + 1000 context = SelectionContext( filter=filter_str, resolved_ids=list(client_a_resolution.animal_ids), roster_hash=client_a_hash, ts_utc=ts_t3, from_location_id=strip1_id, confirmed=True, ) result = validate_selection(seeded_db, context) assert result.valid # Per route behavior: re-resolve to get current IDs current_resolution = resolve_filter(seeded_db, filter_ast, ts_t3) ids_to_move = current_resolution.animal_ids move_payload_a = AnimalMovedPayload( resolved_ids=ids_to_move, to_location_id=nursery1_id, ) services["animal_service"].move_animals(move_payload_a, ts_t3, "client_a") # Verify final state: # Strip 1 females = 0 (all moved out) strip1_count = seeded_db.execute( """SELECT COUNT(*) FROM live_animals_by_location WHERE location_id = ? AND sex = 'female'""", (strip1_id,), ).fetchone()[0] assert strip1_count == 0 # Strip 2 females = 5 (original 3 + 2 moved from Strip 1) strip2_count = seeded_db.execute( """SELECT COUNT(*) FROM live_animals_by_location WHERE location_id = ? AND sex = 'female'""", (strip2_id,), ).fetchone()[0] assert strip2_count == 5 # Nursery 1 females = 3 (moved from Strip 1 by client A) nursery1_count = seeded_db.execute( """SELECT COUNT(*) FROM live_animals_by_location WHERE location_id = ? AND sex = 'female'""", (nursery1_id,), ).fetchone()[0] assert nursery1_count == 3 def test_two_move_events_logged( self, seeded_db, services, now_utc, strip1_id, strip2_id, nursery1_id, optimistic_lock_scenario, ): """Both moves should create AnimalMoved events.""" strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"] # Count events before events_before = seeded_db.execute( "SELECT COUNT(*) FROM events WHERE type = ?", (ANIMAL_MOVED,), ).fetchone()[0] # Client A resolves filter at T1 filter_str = 'location:"Strip 1"' filter_ast = parse_filter(filter_str) ts_t1 = now_utc client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1) client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id) # Client B moves 2 animals from Strip 1 to Strip 2 at T2 ts_t2 = ts_t1 + 1000 move_payload_b = AnimalMovedPayload( resolved_ids=strip1_animal_ids[:2], to_location_id=strip2_id, ) services["animal_service"].move_animals(move_payload_b, ts_t2, "client_b") # Client A confirms and moves remaining 3 to Nursery 1 at T3 ts_t3 = ts_t2 + 1000 context = SelectionContext( filter=filter_str, resolved_ids=list(client_a_resolution.animal_ids), roster_hash=client_a_hash, ts_utc=ts_t3, from_location_id=strip1_id, confirmed=True, ) result = validate_selection(seeded_db, context) assert result.valid # Per route behavior: re-resolve to get current IDs current_resolution = resolve_filter(seeded_db, filter_ast, ts_t3) ids_to_move = current_resolution.animal_ids move_payload_a = AnimalMovedPayload( resolved_ids=ids_to_move, to_location_id=nursery1_id, ) services["animal_service"].move_animals(move_payload_a, ts_t3, "client_a") # Count events after events_after = seeded_db.execute( "SELECT COUNT(*) FROM events WHERE type = ?", (ANIMAL_MOVED,), ).fetchone()[0] # Two new move events should be logged assert events_after - events_before == 2 def test_move_without_confirm_fails_on_mismatch( self, seeded_db, services, now_utc, strip1_id, strip2_id, nursery1_id, optimistic_lock_scenario, ): """Without confirmed=true, mismatch should block the move.""" strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"] # Client A resolves filter at T1 filter_str = 'location:"Strip 1"' filter_ast = parse_filter(filter_str) ts_t1 = now_utc client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1) client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id) # Client B moves 2 animals from Strip 1 to Strip 2 at T2 ts_t2 = ts_t1 + 1000 move_payload = AnimalMovedPayload( resolved_ids=strip1_animal_ids[:2], to_location_id=strip2_id, ) services["animal_service"].move_animals(move_payload, ts_t2, "client_b") # Client A tries to move without confirmed=true at T3 ts_t3 = ts_t2 + 1000 context = SelectionContext( filter=filter_str, resolved_ids=list(client_a_resolution.animal_ids), roster_hash=client_a_hash, ts_utc=ts_t3, from_location_id=strip1_id, confirmed=False, # Not confirmed ) result = validate_selection(seeded_db, context) # Should be invalid assert not result.valid # Diff should show 2 removed assert len(result.diff.removed) == 2 def test_diff_contains_updated_hash_and_ids( self, seeded_db, services, now_utc, strip1_id, strip2_id, nursery1_id, optimistic_lock_scenario, ): """Mismatch response should contain updated hash and resolved IDs.""" strip1_animal_ids = optimistic_lock_scenario["strip1_animal_ids"] # Client A resolves filter at T1 filter_str = 'location:"Strip 1"' filter_ast = parse_filter(filter_str) ts_t1 = now_utc client_a_resolution = resolve_filter(seeded_db, filter_ast, ts_t1) client_a_hash = compute_roster_hash(client_a_resolution.animal_ids, strip1_id) # Client B moves 2 animals from Strip 1 to Strip 2 at T2 ts_t2 = ts_t1 + 1000 move_payload = AnimalMovedPayload( resolved_ids=strip1_animal_ids[:2], to_location_id=strip2_id, ) services["animal_service"].move_animals(move_payload, ts_t2, "client_b") # Client A submits without confirmed at T3 ts_t3 = ts_t2 + 1000 context = SelectionContext( filter=filter_str, resolved_ids=list(client_a_resolution.animal_ids), roster_hash=client_a_hash, ts_utc=ts_t3, from_location_id=strip1_id, confirmed=False, ) result = validate_selection(seeded_db, context) # Result should contain updated info assert not result.valid assert len(result.resolved_ids) == 3 # Current server resolution assert result.roster_hash != client_a_hash # Hash changed assert result.roster_hash != "" # New hash computed