Create migration for events, event_revisions, event_tombstones, idempotency_nonces, and event_animals tables with ULID checks and JSON validation. Add Pydantic models with field validators. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
453 lines
16 KiB
Python
453 lines
16 KiB
Python
# ABOUTME: Tests for the event tables migration (0002-event-tables.sql).
|
|
# ABOUTME: Validates that tables are created with correct schema and constraints.
|
|
|
|
import json
|
|
|
|
import apsw
|
|
import pytest
|
|
|
|
from animaltrack.db import get_db
|
|
from animaltrack.migrations import run_migrations
|
|
|
|
|
|
@pytest.fixture
|
|
def migrated_db(tmp_path):
|
|
"""Create a database with migrations applied."""
|
|
db_path = str(tmp_path / "test.db")
|
|
migrations_dir = "migrations"
|
|
run_migrations(db_path, migrations_dir, verbose=False)
|
|
return get_db(db_path)
|
|
|
|
|
|
class TestMigrationCreatesAllTables:
|
|
"""Tests that migration creates all event tables."""
|
|
|
|
def test_events_table_exists(self, migrated_db):
|
|
"""Migration creates events table."""
|
|
result = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='events'"
|
|
).fetchone()
|
|
assert result is not None
|
|
assert result[0] == "events"
|
|
|
|
def test_event_revisions_table_exists(self, migrated_db):
|
|
"""Migration creates event_revisions table."""
|
|
result = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='event_revisions'"
|
|
).fetchone()
|
|
assert result is not None
|
|
|
|
def test_event_tombstones_table_exists(self, migrated_db):
|
|
"""Migration creates event_tombstones table."""
|
|
result = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='event_tombstones'"
|
|
).fetchone()
|
|
assert result is not None
|
|
|
|
def test_idempotency_nonces_table_exists(self, migrated_db):
|
|
"""Migration creates idempotency_nonces table."""
|
|
result = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='idempotency_nonces'"
|
|
).fetchone()
|
|
assert result is not None
|
|
|
|
def test_event_animals_table_exists(self, migrated_db):
|
|
"""Migration creates event_animals table."""
|
|
result = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='table' AND name='event_animals'"
|
|
).fetchone()
|
|
assert result is not None
|
|
|
|
|
|
class TestEventsTable:
|
|
"""Tests for events table schema and constraints."""
|
|
|
|
def test_insert_valid_event(self, migrated_db):
|
|
"""Can insert valid event data."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload, version)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
"ProductCollected",
|
|
1704067200000,
|
|
"ppetru",
|
|
json.dumps({"location_id": "01ARZ3NDEKTSV4RRFFQ69G5FAW"}),
|
|
json.dumps({"quantity": 10}),
|
|
1,
|
|
),
|
|
)
|
|
result = migrated_db.execute(
|
|
"SELECT id, type, version FROM events WHERE id=?", ("01ARZ3NDEKTSV4RRFFQ69G5FAV",)
|
|
).fetchone()
|
|
assert result == ("01ARZ3NDEKTSV4RRFFQ69G5FAV", "ProductCollected", 1)
|
|
|
|
def test_id_length_check_constraint(self, migrated_db):
|
|
"""Event ID must be exactly 26 characters."""
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
("short", "FeedGiven", 1704067200000, "ppetru", "{}", "{}"),
|
|
)
|
|
|
|
def test_entity_refs_must_be_valid_json(self, migrated_db):
|
|
"""Event entity_refs must be valid JSON."""
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
"FeedGiven",
|
|
1704067200000,
|
|
"ppetru",
|
|
"not valid json",
|
|
"{}",
|
|
),
|
|
)
|
|
|
|
def test_payload_must_be_valid_json(self, migrated_db):
|
|
"""Event payload must be valid JSON."""
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
"FeedGiven",
|
|
1704067200000,
|
|
"ppetru",
|
|
"{}",
|
|
"not valid json",
|
|
),
|
|
)
|
|
|
|
def test_version_defaults_to_1(self, migrated_db):
|
|
"""Event version defaults to 1."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO events (id, type, ts_utc, actor, entity_refs, payload)
|
|
VALUES (?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
"FeedGiven",
|
|
1704067200000,
|
|
"ppetru",
|
|
"{}",
|
|
"{}",
|
|
),
|
|
)
|
|
result = migrated_db.execute(
|
|
"SELECT version FROM events WHERE id=?",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV",),
|
|
).fetchone()
|
|
assert result[0] == 1
|
|
|
|
|
|
class TestEventRevisionsTable:
|
|
"""Tests for event_revisions table schema and constraints."""
|
|
|
|
def test_insert_valid_event_revision(self, migrated_db):
|
|
"""Can insert valid event revision data."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_revisions
|
|
(event_id, version, ts_utc, actor, entity_refs, payload, edited_at_utc, edited_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
1,
|
|
1704067200000,
|
|
"ppetru",
|
|
"{}",
|
|
json.dumps({"quantity": 10}),
|
|
1704153600000,
|
|
"ines",
|
|
),
|
|
)
|
|
result = migrated_db.execute(
|
|
"SELECT event_id, version, edited_by FROM event_revisions"
|
|
).fetchone()
|
|
assert result == ("01ARZ3NDEKTSV4RRFFQ69G5FAV", 1, "ines")
|
|
|
|
def test_composite_primary_key(self, migrated_db):
|
|
"""event_revisions has composite primary key (event_id, version)."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_revisions
|
|
(event_id, version, ts_utc, actor, entity_refs, payload, edited_at_utc, edited_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
1,
|
|
1704067200000,
|
|
"ppetru",
|
|
"{}",
|
|
"{}",
|
|
1704153600000,
|
|
"ines",
|
|
),
|
|
)
|
|
# Same event_id with different version should work
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_revisions
|
|
(event_id, version, ts_utc, actor, entity_refs, payload, edited_at_utc, edited_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
2,
|
|
1704067200000,
|
|
"ppetru",
|
|
"{}",
|
|
"{}",
|
|
1704240000000,
|
|
"ines",
|
|
),
|
|
)
|
|
# Duplicate (event_id, version) should fail
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_revisions
|
|
(event_id, version, ts_utc, actor, entity_refs, payload, edited_at_utc, edited_by)
|
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
1,
|
|
1704067200000,
|
|
"ppetru",
|
|
"{}",
|
|
"{}",
|
|
1704153600000,
|
|
"ines",
|
|
),
|
|
)
|
|
|
|
|
|
class TestEventTombstonesTable:
|
|
"""Tests for event_tombstones table schema and constraints."""
|
|
|
|
def test_insert_valid_event_tombstone(self, migrated_db):
|
|
"""Can insert valid event tombstone data."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id, reason)
|
|
VALUES (?, ?, ?, ?, ?)
|
|
""",
|
|
(
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
1704067200000,
|
|
"ppetru",
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAW",
|
|
"Duplicate entry",
|
|
),
|
|
)
|
|
result = migrated_db.execute(
|
|
"SELECT id, target_event_id, reason FROM event_tombstones"
|
|
).fetchone()
|
|
assert result == (
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAV",
|
|
"01ARZ3NDEKTSV4RRFFQ69G5FAW",
|
|
"Duplicate entry",
|
|
)
|
|
|
|
def test_reason_nullable(self, migrated_db):
|
|
"""Tombstone reason can be NULL."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", 1704067200000, "ppetru", "01ARZ3NDEKTSV4RRFFQ69G5FAW"),
|
|
)
|
|
result = migrated_db.execute(
|
|
"SELECT reason FROM event_tombstones WHERE id=?",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV",),
|
|
).fetchone()
|
|
assert result[0] is None
|
|
|
|
def test_id_length_check_constraint(self, migrated_db):
|
|
"""Tombstone ID must be exactly 26 characters."""
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
("short", 1704067200000, "ppetru", "01ARZ3NDEKTSV4RRFFQ69G5FAW"),
|
|
)
|
|
|
|
def test_target_event_id_length_check_constraint(self, migrated_db):
|
|
"""Tombstone target_event_id must be exactly 26 characters."""
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_tombstones (id, ts_utc, actor, target_event_id)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", 1704067200000, "ppetru", "short"),
|
|
)
|
|
|
|
|
|
class TestIdempotencyNoncesTable:
|
|
"""Tests for idempotency_nonces table schema."""
|
|
|
|
def test_insert_valid_nonce(self, migrated_db):
|
|
"""Can insert valid idempotency nonce data."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO idempotency_nonces (nonce, actor, route, created_at_utc)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "ppetru", "/actions/product-collected", 1704067200000),
|
|
)
|
|
result = migrated_db.execute("SELECT nonce, route FROM idempotency_nonces").fetchone()
|
|
assert result == ("01ARZ3NDEKTSV4RRFFQ69G5FAV", "/actions/product-collected")
|
|
|
|
def test_nonce_is_primary_key(self, migrated_db):
|
|
"""Nonce is primary key (duplicate rejected)."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO idempotency_nonces (nonce, actor, route, created_at_utc)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "ppetru", "/actions/product-collected", 1704067200000),
|
|
)
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO idempotency_nonces (nonce, actor, route, created_at_utc)
|
|
VALUES (?, ?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "ines", "/actions/feed-given", 1704153600000),
|
|
)
|
|
|
|
|
|
class TestEventAnimalsTable:
|
|
"""Tests for event_animals table schema and constraints."""
|
|
|
|
def test_insert_valid_event_animal(self, migrated_db):
|
|
"""Can insert valid event_animal data."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "01ARZ3NDEKTSV4RRFFQ69G5FAW", 1704067200000),
|
|
)
|
|
result = migrated_db.execute("SELECT event_id, animal_id FROM event_animals").fetchone()
|
|
assert result == ("01ARZ3NDEKTSV4RRFFQ69G5FAV", "01ARZ3NDEKTSV4RRFFQ69G5FAW")
|
|
|
|
def test_composite_primary_key(self, migrated_db):
|
|
"""event_animals has composite primary key (event_id, animal_id)."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "01ARZ3NDEKTSV4RRFFQ69G5FAW", 1704067200000),
|
|
)
|
|
# Same event_id with different animal_id should work
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "01ARZ3NDEKTSV4RRFFQ69G5FAX", 1704067200000),
|
|
)
|
|
# Duplicate (event_id, animal_id) should fail
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "01ARZ3NDEKTSV4RRFFQ69G5FAW", 1704067200000),
|
|
)
|
|
|
|
def test_unique_index_animal_ts(self, migrated_db):
|
|
"""Unique constraint on (animal_id, ts_utc) prevents same-animal same-timestamp."""
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "01ARZ3NDEKTSV4RRFFQ69G5FAW", 1704067200000),
|
|
)
|
|
# Same animal_id at same ts_utc with different event_id should fail
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAX", "01ARZ3NDEKTSV4RRFFQ69G5FAW", 1704067200000),
|
|
)
|
|
|
|
def test_event_id_length_check_constraint(self, migrated_db):
|
|
"""event_animals event_id must be exactly 26 characters."""
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("short", "01ARZ3NDEKTSV4RRFFQ69G5FAW", 1704067200000),
|
|
)
|
|
|
|
def test_animal_id_length_check_constraint(self, migrated_db):
|
|
"""event_animals animal_id must be exactly 26 characters."""
|
|
with pytest.raises(apsw.ConstraintError):
|
|
migrated_db.execute(
|
|
"""
|
|
INSERT INTO event_animals (event_id, animal_id, ts_utc)
|
|
VALUES (?, ?, ?)
|
|
""",
|
|
("01ARZ3NDEKTSV4RRFFQ69G5FAV", "short", 1704067200000),
|
|
)
|
|
|
|
|
|
class TestIndexes:
|
|
"""Tests that indexes are created correctly."""
|
|
|
|
def test_events_indexes_exist(self, migrated_db):
|
|
"""Events table has required indexes."""
|
|
indexes = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='events'"
|
|
).fetchall()
|
|
index_names = {row[0] for row in indexes}
|
|
assert "idx_events_ts" in index_names
|
|
assert "idx_events_type_ts" in index_names
|
|
assert "idx_events_actor_ts" in index_names
|
|
|
|
def test_event_tombstones_index_exists(self, migrated_db):
|
|
"""event_tombstones table has target index."""
|
|
indexes = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='event_tombstones'"
|
|
).fetchall()
|
|
index_names = {row[0] for row in indexes}
|
|
assert "idx_event_tombstones_target" in index_names
|
|
|
|
def test_event_animals_unique_index_exists(self, migrated_db):
|
|
"""event_animals table has unique index on (animal_id, ts_utc)."""
|
|
indexes = migrated_db.execute(
|
|
"SELECT name FROM sqlite_master WHERE type='index' AND tbl_name='event_animals'"
|
|
).fetchall()
|
|
index_names = {row[0] for row in indexes}
|
|
assert "ux_event_animals_animal_ts" in index_names
|