# ABOUTME: Tests for the EventStore class. # ABOUTME: Validates event creation, retrieval, nonce validation, and clock skew guards. import time import pytest from animaltrack.events import ( PRODUCT_COLLECTED, ClockSkewError, DuplicateNonceError, ) from animaltrack.events.store import EventStore from animaltrack.id_gen import generate_id @pytest.fixture def now_utc(): """Current time in milliseconds since epoch.""" return int(time.time() * 1000) @pytest.fixture def event_store(migrated_db): """Create an EventStore instance with a migrated database.""" return EventStore(migrated_db) class TestEventStore: """Tests for basic EventStore operations.""" def test_append_event_creates_event(self, event_store, now_utc): """append_event creates and returns an Event with correct fields.""" event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={"location_id": "01ARZ3NDEKTSV4RRFFQ69G5FAV"}, payload={"product_code": "egg.duck", "quantity": 5}, ) assert event.type == PRODUCT_COLLECTED assert event.ts_utc == now_utc assert event.actor == "ppetru" assert event.entity_refs == {"location_id": "01ARZ3NDEKTSV4RRFFQ69G5FAV"} assert event.payload == {"product_code": "egg.duck", "quantity": 5} assert event.version == 1 def test_append_event_generates_ulid(self, event_store, now_utc): """append_event generates a 26-character ULID for the event ID.""" event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, ) assert len(event.id) == 26 assert event.id.isalnum() assert event.id.isupper() def test_get_event_returns_event(self, event_store, now_utc): """get_event returns the event by ID.""" created = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={"location_id": "01ARZ3NDEKTSV4RRFFQ69G5FAV"}, payload={"product_code": "egg.duck", "quantity": 5}, ) retrieved = event_store.get_event(created.id) assert retrieved is not None assert retrieved.id == created.id assert retrieved.type == PRODUCT_COLLECTED assert retrieved.ts_utc == now_utc assert retrieved.actor == "ppetru" assert retrieved.entity_refs == {"location_id": "01ARZ3NDEKTSV4RRFFQ69G5FAV"} assert retrieved.payload == {"product_code": "egg.duck", "quantity": 5} def test_get_event_not_found(self, event_store): """get_event returns None for non-existent event ID.""" result = event_store.get_event("01ARZ3NDEKTSV4RRFFQ69G5FAV") assert result is None def test_list_events_no_filter(self, event_store, now_utc): """list_events returns all events ordered by ts_utc ASC.""" event1 = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={"order": 1}, ) event2 = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc + 1000, actor="ines", entity_refs={}, payload={"order": 2}, ) events = event_store.list_events() assert len(events) == 2 assert events[0].id == event1.id assert events[1].id == event2.id def test_list_events_by_type(self, event_store, now_utc): """list_events filters by event type.""" from animaltrack.events import FEED_GIVEN event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, ) feed_event = event_store.append_event( event_type=FEED_GIVEN, ts_utc=now_utc + 1000, actor="ppetru", entity_refs={}, payload={}, ) events = event_store.list_events(event_type=FEED_GIVEN) assert len(events) == 1 assert events[0].id == feed_event.id def test_list_events_by_time_range(self, event_store, now_utc): """list_events filters by since_utc and until_utc.""" event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc - 10000, actor="ppetru", entity_refs={}, payload={"order": 1}, ) event2 = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={"order": 2}, ) event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc + 10000, actor="ppetru", entity_refs={}, payload={"order": 3}, ) events = event_store.list_events(since_utc=now_utc - 5000, until_utc=now_utc + 5000) assert len(events) == 1 assert events[0].id == event2.id def test_list_events_by_actor(self, event_store, now_utc): """list_events filters by actor.""" event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, ) ines_event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc + 1000, actor="ines", entity_refs={}, payload={}, ) events = event_store.list_events(actor="ines") assert len(events) == 1 assert events[0].id == ines_event.id def test_list_events_limit(self, event_store, now_utc): """list_events respects limit parameter.""" for i in range(5): event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc + i * 1000, actor="ppetru", entity_refs={}, payload={"order": i}, ) events = event_store.list_events(limit=3) assert len(events) == 3 class TestClockSkewValidation: """Tests for clock skew validation.""" def test_clock_skew_rejected(self, event_store, now_utc): """ts_utc more than 5 minutes in the future raises ClockSkewError.""" future_ts = now_utc + (6 * 60 * 1000) # 6 minutes in the future with pytest.raises(ClockSkewError): event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=future_ts, actor="ppetru", entity_refs={}, payload={}, ) def test_clock_skew_at_boundary_accepted(self, event_store, now_utc): """ts_utc exactly at now + 5 minutes is accepted.""" boundary_ts = now_utc + (5 * 60 * 1000) # Exactly 5 minutes in the future event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=boundary_ts, actor="ppetru", entity_refs={}, payload={}, ) assert event.ts_utc == boundary_ts def test_clock_skew_within_range_accepted(self, event_store, now_utc): """ts_utc in the past or near-future is accepted.""" past_ts = now_utc - (60 * 60 * 1000) # 1 hour in the past event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=past_ts, actor="ppetru", entity_refs={}, payload={}, ) assert event.ts_utc == past_ts class TestNonceValidation: """Tests for idempotency nonce validation.""" def test_nonce_validation_rejects_duplicate(self, event_store, now_utc): """Same nonce raises DuplicateNonceError.""" nonce = generate_id() event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, nonce=nonce, route="/actions/product-collected", ) with pytest.raises(DuplicateNonceError): event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc + 1000, actor="ppetru", entity_refs={}, payload={}, nonce=nonce, route="/actions/product-collected", ) def test_nonce_recorded_in_table(self, migrated_db, event_store, now_utc): """Nonce is stored in idempotency_nonces table.""" nonce = generate_id() event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, nonce=nonce, route="/actions/product-collected", ) row = migrated_db.execute( "SELECT actor, route, created_at_utc FROM idempotency_nonces WHERE nonce = ?", (nonce,), ).fetchone() assert row is not None assert row[0] == "ppetru" assert row[1] == "/actions/product-collected" assert row[2] == now_utc def test_nonce_optional(self, event_store, now_utc): """append_event works without nonce (for internal events).""" event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="system", entity_refs={}, payload={}, ) assert event is not None assert event.id is not None def test_nonce_requires_route(self, event_store, now_utc): """Nonce without route raises ValueError.""" nonce = generate_id() with pytest.raises(ValueError, match="route is required"): event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, nonce=nonce, route=None, ) class TestTombstoneChecking: """Tests for tombstone checking.""" def test_is_tombstoned_false(self, event_store, now_utc): """is_tombstoned returns False when no tombstone exists.""" event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, ) assert event_store.is_tombstoned(event.id) is False def test_is_tombstoned_true(self, migrated_db, event_store, now_utc): """is_tombstoned returns True when tombstone exists.""" event = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={}, ) # Manually insert a tombstone for testing tombstone_id = generate_id() migrated_db.execute( """INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason) VALUES (?, ?, ?, ?, ?)""", (tombstone_id, now_utc + 1000, "admin", event.id, "Test deletion"), ) assert event_store.is_tombstoned(event.id) is True def test_list_events_excludes_tombstoned_by_default(self, migrated_db, event_store, now_utc): """list_events excludes tombstoned events by default.""" event1 = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={"order": 1}, ) event2 = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc + 1000, actor="ppetru", entity_refs={}, payload={"order": 2}, ) # Tombstone event1 tombstone_id = generate_id() migrated_db.execute( """INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason) VALUES (?, ?, ?, ?, ?)""", (tombstone_id, now_utc + 2000, "admin", event1.id, "Test deletion"), ) events = event_store.list_events() assert len(events) == 1 assert events[0].id == event2.id def test_list_events_includes_tombstoned_when_requested( self, migrated_db, event_store, now_utc ): """list_events includes tombstoned events when include_tombstoned=True.""" event1 = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc, actor="ppetru", entity_refs={}, payload={"order": 1}, ) event2 = event_store.append_event( event_type=PRODUCT_COLLECTED, ts_utc=now_utc + 1000, actor="ppetru", entity_refs={}, payload={"order": 2}, ) # Tombstone event1 tombstone_id = generate_id() migrated_db.execute( """INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason) VALUES (?, ?, ?, ?, ?)""", (tombstone_id, now_utc + 2000, "admin", event1.id, "Test deletion"), ) events = event_store.list_events(include_tombstoned=True) assert len(events) == 2 assert events[0].id == event1.id assert events[1].id == event2.id