feat: add database module with connection factory and transactions
Implements Step 1.2: - constants.py: END_OF_TIME_UTC = 32503680000000 (year 3000 sentinel) - db.py: get_db() with pragmas (WAL, synchronous=FULL, foreign_keys, busy_timeout) - db.py: transaction() context manager with BEGIN IMMEDIATE Includes 12 TDD tests for pragmas, commit/rollback, and concurrent writes. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
10
PLAN.md
10
PLAN.md
@@ -16,11 +16,11 @@ Check off items as completed. Each phase builds on the previous.
|
||||
- [x] **Commit checkpoint** (61f704c)
|
||||
|
||||
### Step 1.2: Database Connection & Pragmas
|
||||
- [ ] Create `db.py` with connection factory
|
||||
- [ ] Set pragmas: WAL, synchronous=FULL, foreign_keys=ON, busy_timeout=5000
|
||||
- [ ] Create transaction context manager (BEGIN IMMEDIATE)
|
||||
- [ ] Create `constants.py` with END_OF_TIME_UTC
|
||||
- [ ] Write tests for pragmas and transactions
|
||||
- [x] Create `db.py` with connection factory
|
||||
- [x] Set pragmas: WAL, synchronous=FULL, foreign_keys=ON, busy_timeout=5000
|
||||
- [x] Create transaction context manager (BEGIN IMMEDIATE)
|
||||
- [x] Create `constants.py` with END_OF_TIME_UTC
|
||||
- [x] Write tests for pragmas and transactions
|
||||
- [ ] **Commit checkpoint**
|
||||
|
||||
### Step 1.3: Migration Framework
|
||||
|
||||
6
src/animaltrack/constants.py
Normal file
6
src/animaltrack/constants.py
Normal file
@@ -0,0 +1,6 @@
|
||||
# ABOUTME: Application-wide constants and sentinel values.
|
||||
# ABOUTME: Defines END_OF_TIME_UTC for interval indexes.
|
||||
|
||||
# Sentinel constant for unbounded end_utc in interval indexes.
|
||||
# Represents January 1, 3000 00:00:00 UTC as milliseconds since Unix epoch.
|
||||
END_OF_TIME_UTC: int = 32503680000000
|
||||
76
src/animaltrack/db.py
Normal file
76
src/animaltrack/db.py
Normal file
@@ -0,0 +1,76 @@
|
||||
# ABOUTME: Database connection factory and transaction management.
|
||||
# ABOUTME: Configures SQLite pragmas and provides BEGIN IMMEDIATE transactions.
|
||||
|
||||
from contextlib import contextmanager
|
||||
from typing import Any
|
||||
|
||||
from fastlite import database
|
||||
|
||||
|
||||
def get_db(db_path: str) -> Any:
|
||||
"""
|
||||
Get a database connection with required pragmas set.
|
||||
|
||||
Pragmas configured:
|
||||
- journal_mode=WAL (write-ahead logging for better concurrency)
|
||||
- synchronous=FULL (durability guarantee)
|
||||
- foreign_keys=ON (enforce referential integrity)
|
||||
- busy_timeout=5000 (wait up to 5s for locks)
|
||||
|
||||
Args:
|
||||
db_path: Path to the SQLite database file.
|
||||
|
||||
Returns:
|
||||
A fastlite database connection.
|
||||
"""
|
||||
db = database(db_path)
|
||||
|
||||
# Set pragmas for safety and performance
|
||||
db.execute("PRAGMA journal_mode=WAL")
|
||||
db.execute("PRAGMA synchronous=FULL")
|
||||
db.execute("PRAGMA foreign_keys=ON")
|
||||
db.execute("PRAGMA busy_timeout=5000")
|
||||
|
||||
return db
|
||||
|
||||
|
||||
# Track which connections have an active transaction to prevent nesting
|
||||
_active_transactions: set[int] = set()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def transaction(db: Any):
|
||||
"""
|
||||
Context manager for BEGIN IMMEDIATE transactions.
|
||||
|
||||
Uses BEGIN IMMEDIATE to acquire a write lock immediately, preventing
|
||||
concurrent write transactions from other connections.
|
||||
|
||||
Commits on successful completion, rolls back on exception.
|
||||
|
||||
Args:
|
||||
db: A fastlite database connection.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If called while already in a transaction on this connection.
|
||||
|
||||
Example:
|
||||
with transaction(db):
|
||||
db.execute("INSERT INTO ...")
|
||||
"""
|
||||
conn_id = id(db)
|
||||
|
||||
if conn_id in _active_transactions:
|
||||
raise RuntimeError("Nested transactions are not supported")
|
||||
|
||||
_active_transactions.add(conn_id)
|
||||
try:
|
||||
db.execute("BEGIN IMMEDIATE")
|
||||
try:
|
||||
yield
|
||||
db.execute("COMMIT")
|
||||
except Exception:
|
||||
db.execute("ROLLBACK")
|
||||
raise
|
||||
finally:
|
||||
_active_transactions.discard(conn_id)
|
||||
33
tests/test_constants.py
Normal file
33
tests/test_constants.py
Normal file
@@ -0,0 +1,33 @@
|
||||
# ABOUTME: Tests for the constants module.
|
||||
# ABOUTME: Validates sentinel values used throughout the application.
|
||||
|
||||
import datetime
|
||||
|
||||
|
||||
class TestEndOfTimeUtc:
|
||||
"""Test the END_OF_TIME_UTC sentinel constant."""
|
||||
|
||||
def test_end_of_time_value(self):
|
||||
"""END_OF_TIME_UTC should equal 32503680000000 (ms epoch for year 3000)."""
|
||||
from animaltrack.constants import END_OF_TIME_UTC
|
||||
|
||||
assert END_OF_TIME_UTC == 32503680000000
|
||||
|
||||
def test_end_of_time_represents_year_3000(self):
|
||||
"""END_OF_TIME_UTC should represent January 1, 3000 00:00:00 UTC."""
|
||||
from animaltrack.constants import END_OF_TIME_UTC
|
||||
|
||||
# Convert ms to seconds for datetime
|
||||
dt = datetime.datetime.fromtimestamp(END_OF_TIME_UTC / 1000, tz=datetime.UTC)
|
||||
assert dt.year == 3000
|
||||
assert dt.month == 1
|
||||
assert dt.day == 1
|
||||
assert dt.hour == 0
|
||||
assert dt.minute == 0
|
||||
assert dt.second == 0
|
||||
|
||||
def test_end_of_time_is_integer(self):
|
||||
"""END_OF_TIME_UTC should be an integer (ms precision)."""
|
||||
from animaltrack.constants import END_OF_TIME_UTC
|
||||
|
||||
assert isinstance(END_OF_TIME_UTC, int)
|
||||
138
tests/test_db.py
Normal file
138
tests/test_db.py
Normal file
@@ -0,0 +1,138 @@
|
||||
# ABOUTME: Tests for the database module.
|
||||
# ABOUTME: Validates connection factory, pragmas, and transaction handling.
|
||||
|
||||
import threading
|
||||
import time
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
class TestGetDb:
|
||||
"""Test the get_db connection factory."""
|
||||
|
||||
def test_returns_database_connection(self, temp_db_path):
|
||||
"""get_db should return a usable database connection."""
|
||||
from animaltrack.db import get_db
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
assert db is not None
|
||||
# Should be able to execute a simple query
|
||||
result = db.execute("SELECT 1").fetchone()
|
||||
assert result[0] == 1
|
||||
|
||||
def test_pragma_journal_mode_wal(self, temp_db_path):
|
||||
"""Journal mode should be set to WAL."""
|
||||
from animaltrack.db import get_db
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
result = db.execute("PRAGMA journal_mode").fetchone()
|
||||
assert result[0].lower() == "wal"
|
||||
|
||||
def test_pragma_synchronous_full(self, temp_db_path):
|
||||
"""Synchronous mode should be set to FULL (2)."""
|
||||
from animaltrack.db import get_db
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
result = db.execute("PRAGMA synchronous").fetchone()
|
||||
# FULL = 2
|
||||
assert result[0] == 2
|
||||
|
||||
def test_pragma_foreign_keys_on(self, temp_db_path):
|
||||
"""Foreign keys should be enabled."""
|
||||
from animaltrack.db import get_db
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
result = db.execute("PRAGMA foreign_keys").fetchone()
|
||||
assert result[0] == 1
|
||||
|
||||
def test_pragma_busy_timeout(self, temp_db_path):
|
||||
"""Busy timeout should be set to 5000ms."""
|
||||
from animaltrack.db import get_db
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
result = db.execute("PRAGMA busy_timeout").fetchone()
|
||||
assert result[0] == 5000
|
||||
|
||||
|
||||
class TestTransaction:
|
||||
"""Test the transaction context manager."""
|
||||
|
||||
def test_commits_on_success(self, temp_db_path):
|
||||
"""Transaction should commit changes on successful completion."""
|
||||
from animaltrack.db import get_db, transaction
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
db.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, value TEXT)")
|
||||
|
||||
with transaction(db):
|
||||
db.execute("INSERT INTO test_table (value) VALUES ('test')")
|
||||
|
||||
# Should be committed - query outside transaction should see it
|
||||
result = db.execute("SELECT value FROM test_table").fetchone()
|
||||
assert result[0] == "test"
|
||||
|
||||
def test_rollback_on_exception(self, temp_db_path):
|
||||
"""Transaction should rollback changes on exception."""
|
||||
from animaltrack.db import get_db, transaction
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
db.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, value TEXT)")
|
||||
|
||||
with pytest.raises(ValueError):
|
||||
with transaction(db):
|
||||
db.execute("INSERT INTO test_table (value) VALUES ('test')")
|
||||
raise ValueError("Simulated error")
|
||||
|
||||
# Should be rolled back - query should return nothing
|
||||
result = db.execute("SELECT value FROM test_table").fetchone()
|
||||
assert result is None
|
||||
|
||||
def test_begin_immediate_blocks_concurrent_writes(self, temp_db_path):
|
||||
"""BEGIN IMMEDIATE should block concurrent write transactions."""
|
||||
from animaltrack.db import get_db, transaction
|
||||
|
||||
db1 = get_db(temp_db_path)
|
||||
db2 = get_db(temp_db_path)
|
||||
|
||||
db1.execute("CREATE TABLE test_table (id INTEGER PRIMARY KEY, value TEXT)")
|
||||
|
||||
results = {"blocked": False, "error": None}
|
||||
|
||||
def try_concurrent_write():
|
||||
try:
|
||||
# This should block or fail because db1 holds IMMEDIATE lock
|
||||
with transaction(db2):
|
||||
db2.execute("INSERT INTO test_table (value) VALUES ('from_thread')")
|
||||
except Exception as e:
|
||||
results["blocked"] = True
|
||||
results["error"] = str(e)
|
||||
|
||||
# Start transaction on db1 but don't commit yet
|
||||
with transaction(db1):
|
||||
db1.execute("INSERT INTO test_table (value) VALUES ('from_main')")
|
||||
|
||||
# Try to start another write transaction from another connection
|
||||
thread = threading.Thread(target=try_concurrent_write)
|
||||
thread.start()
|
||||
# Give it a moment to try to acquire lock
|
||||
time.sleep(0.1)
|
||||
# Thread should either be blocked waiting or have gotten an error
|
||||
# Since busy_timeout is 5000ms, it will wait
|
||||
|
||||
thread.join(timeout=1)
|
||||
|
||||
# After db1 commits, db2 should have been able to proceed
|
||||
# Both rows should exist
|
||||
result = db1.execute("SELECT COUNT(*) FROM test_table").fetchone()
|
||||
assert result[0] == 2 # Both inserts succeeded
|
||||
|
||||
def test_nested_transaction_raises_error(self, temp_db_path):
|
||||
"""Nested transactions should raise an error."""
|
||||
from animaltrack.db import get_db, transaction
|
||||
|
||||
db = get_db(temp_db_path)
|
||||
|
||||
with pytest.raises(RuntimeError, match="[Nn]ested"):
|
||||
with transaction(db):
|
||||
with transaction(db):
|
||||
pass
|
||||
Reference in New Issue
Block a user