- Enable pytest-xdist with -n auto for parallel test execution - Consolidate migrated_db fixture in conftest.py (was duplicated in 4 files) - Remove local fixture definitions and unused imports from test files Test execution: ~24s -> ~5s (~5x speedup) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
413 lines
12 KiB
Python
413 lines
12 KiB
Python
# ABOUTME: Tests for reference data repositories.
|
|
# ABOUTME: Validates CRUD operations for species, locations, products, feed_types, and users.
|
|
|
|
import time
|
|
|
|
import pytest
|
|
|
|
from animaltrack.id_gen import generate_id
|
|
from animaltrack.models.reference import (
|
|
FeedType,
|
|
Location,
|
|
Product,
|
|
ProductUnit,
|
|
Species,
|
|
User,
|
|
UserRole,
|
|
)
|
|
from animaltrack.repositories import (
|
|
FeedTypeRepository,
|
|
LocationRepository,
|
|
ProductRepository,
|
|
SpeciesRepository,
|
|
UserRepository,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def now_utc():
|
|
"""Current time in milliseconds since epoch."""
|
|
return int(time.time() * 1000)
|
|
|
|
|
|
class TestSpeciesRepository:
|
|
"""Tests for SpeciesRepository."""
|
|
|
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
|
"""upsert creates a new species record."""
|
|
repo = SpeciesRepository(migrated_db)
|
|
species = Species(
|
|
code="duck",
|
|
name="Duck",
|
|
active=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(species)
|
|
|
|
result = repo.get("duck")
|
|
assert result is not None
|
|
assert result.code == "duck"
|
|
assert result.name == "Duck"
|
|
assert result.active is True
|
|
|
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
|
"""upsert updates an existing species record."""
|
|
repo = SpeciesRepository(migrated_db)
|
|
species = Species(
|
|
code="duck",
|
|
name="Duck",
|
|
active=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(species)
|
|
|
|
updated = Species(
|
|
code="duck",
|
|
name="Domestic Duck",
|
|
active=False,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc + 1000,
|
|
)
|
|
repo.upsert(updated)
|
|
|
|
result = repo.get("duck")
|
|
assert result.name == "Domestic Duck"
|
|
assert result.active is False
|
|
|
|
def test_get_returns_none_for_missing(self, migrated_db):
|
|
"""get returns None for non-existent species."""
|
|
repo = SpeciesRepository(migrated_db)
|
|
assert repo.get("nonexistent") is None
|
|
|
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
|
"""list_all returns all species."""
|
|
repo = SpeciesRepository(migrated_db)
|
|
repo.upsert(
|
|
Species(
|
|
code="duck",
|
|
name="Duck",
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
repo.upsert(
|
|
Species(
|
|
code="goose",
|
|
name="Goose",
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
|
|
results = repo.list_all()
|
|
assert len(results) == 2
|
|
codes = {s.code for s in results}
|
|
assert codes == {"duck", "goose"}
|
|
|
|
|
|
class TestLocationRepository:
|
|
"""Tests for LocationRepository."""
|
|
|
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
|
"""upsert creates a new location record."""
|
|
repo = LocationRepository(migrated_db)
|
|
location_id = generate_id()
|
|
location = Location(
|
|
id=location_id,
|
|
name="Strip 1",
|
|
active=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(location)
|
|
|
|
result = repo.get(location_id)
|
|
assert result is not None
|
|
assert result.name == "Strip 1"
|
|
|
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
|
"""upsert updates an existing location record."""
|
|
repo = LocationRepository(migrated_db)
|
|
location_id = generate_id()
|
|
location = Location(
|
|
id=location_id,
|
|
name="Strip 1",
|
|
active=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(location)
|
|
|
|
updated = Location(
|
|
id=location_id,
|
|
name="Strip 1 - Renamed",
|
|
active=False,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc + 1000,
|
|
)
|
|
repo.upsert(updated)
|
|
|
|
result = repo.get(location_id)
|
|
assert result.name == "Strip 1 - Renamed"
|
|
assert result.active is False
|
|
|
|
def test_get_by_name_returns_location(self, migrated_db, now_utc):
|
|
"""get_by_name returns location by name."""
|
|
repo = LocationRepository(migrated_db)
|
|
location_id = generate_id()
|
|
location = Location(
|
|
id=location_id,
|
|
name="Nursery 1",
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(location)
|
|
|
|
result = repo.get_by_name("Nursery 1")
|
|
assert result is not None
|
|
assert result.id == location_id
|
|
|
|
def test_get_by_name_returns_none_for_missing(self, migrated_db):
|
|
"""get_by_name returns None for non-existent location."""
|
|
repo = LocationRepository(migrated_db)
|
|
assert repo.get_by_name("Nonexistent") is None
|
|
|
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
|
"""list_all returns all locations."""
|
|
repo = LocationRepository(migrated_db)
|
|
repo.upsert(
|
|
Location(
|
|
id=generate_id(),
|
|
name="Strip 1",
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
repo.upsert(
|
|
Location(
|
|
id=generate_id(),
|
|
name="Strip 2",
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
|
|
results = repo.list_all()
|
|
assert len(results) == 2
|
|
|
|
|
|
class TestProductRepository:
|
|
"""Tests for ProductRepository."""
|
|
|
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
|
"""upsert creates a new product record."""
|
|
repo = ProductRepository(migrated_db)
|
|
product = Product(
|
|
code="egg.duck",
|
|
name="Duck Egg",
|
|
unit=ProductUnit.PIECE,
|
|
collectable=True,
|
|
sellable=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(product)
|
|
|
|
result = repo.get("egg.duck")
|
|
assert result is not None
|
|
assert result.name == "Duck Egg"
|
|
assert result.unit == ProductUnit.PIECE
|
|
|
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
|
"""upsert updates an existing product record."""
|
|
repo = ProductRepository(migrated_db)
|
|
product = Product(
|
|
code="meat",
|
|
name="Meat",
|
|
unit=ProductUnit.KG,
|
|
collectable=True,
|
|
sellable=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(product)
|
|
|
|
updated = Product(
|
|
code="meat",
|
|
name="Fresh Meat",
|
|
unit=ProductUnit.KG,
|
|
collectable=True,
|
|
sellable=False,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc + 1000,
|
|
)
|
|
repo.upsert(updated)
|
|
|
|
result = repo.get("meat")
|
|
assert result.name == "Fresh Meat"
|
|
assert result.sellable is False
|
|
|
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
|
"""list_all returns all products."""
|
|
repo = ProductRepository(migrated_db)
|
|
repo.upsert(
|
|
Product(
|
|
code="egg.duck",
|
|
name="Duck Egg",
|
|
unit=ProductUnit.PIECE,
|
|
collectable=True,
|
|
sellable=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
repo.upsert(
|
|
Product(
|
|
code="meat",
|
|
name="Meat",
|
|
unit=ProductUnit.KG,
|
|
collectable=True,
|
|
sellable=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
|
|
results = repo.list_all()
|
|
assert len(results) == 2
|
|
|
|
|
|
class TestFeedTypeRepository:
|
|
"""Tests for FeedTypeRepository."""
|
|
|
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
|
"""upsert creates a new feed type record."""
|
|
repo = FeedTypeRepository(migrated_db)
|
|
feed_type = FeedType(
|
|
code="layer",
|
|
name="Layer Feed",
|
|
default_bag_size_kg=20,
|
|
protein_pct=16.5,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(feed_type)
|
|
|
|
result = repo.get("layer")
|
|
assert result is not None
|
|
assert result.name == "Layer Feed"
|
|
assert result.default_bag_size_kg == 20
|
|
assert result.protein_pct == 16.5
|
|
|
|
def test_upsert_with_null_protein(self, migrated_db, now_utc):
|
|
"""upsert handles None protein_pct correctly."""
|
|
repo = FeedTypeRepository(migrated_db)
|
|
feed_type = FeedType(
|
|
code="starter",
|
|
name="Starter Feed",
|
|
default_bag_size_kg=20,
|
|
protein_pct=None,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(feed_type)
|
|
|
|
result = repo.get("starter")
|
|
assert result.protein_pct is None
|
|
|
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
|
"""list_all returns all feed types."""
|
|
repo = FeedTypeRepository(migrated_db)
|
|
repo.upsert(
|
|
FeedType(
|
|
code="starter",
|
|
name="Starter",
|
|
default_bag_size_kg=20,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
repo.upsert(
|
|
FeedType(
|
|
code="grower",
|
|
name="Grower",
|
|
default_bag_size_kg=25,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
|
|
results = repo.list_all()
|
|
assert len(results) == 2
|
|
|
|
|
|
class TestUserRepository:
|
|
"""Tests for UserRepository."""
|
|
|
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
|
"""upsert creates a new user record."""
|
|
repo = UserRepository(migrated_db)
|
|
user = User(
|
|
username="ppetru",
|
|
role=UserRole.ADMIN,
|
|
active=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(user)
|
|
|
|
result = repo.get("ppetru")
|
|
assert result is not None
|
|
assert result.username == "ppetru"
|
|
assert result.role == UserRole.ADMIN
|
|
|
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
|
"""upsert updates an existing user record."""
|
|
repo = UserRepository(migrated_db)
|
|
user = User(
|
|
username="guest",
|
|
role=UserRole.RECORDER,
|
|
active=True,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
repo.upsert(user)
|
|
|
|
updated = User(
|
|
username="guest",
|
|
role=UserRole.ADMIN,
|
|
active=False,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc + 1000,
|
|
)
|
|
repo.upsert(updated)
|
|
|
|
result = repo.get("guest")
|
|
assert result.role == UserRole.ADMIN
|
|
assert result.active is False
|
|
|
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
|
"""list_all returns all users."""
|
|
repo = UserRepository(migrated_db)
|
|
repo.upsert(
|
|
User(
|
|
username="ppetru",
|
|
role=UserRole.ADMIN,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
repo.upsert(
|
|
User(
|
|
username="guest",
|
|
role=UserRole.RECORDER,
|
|
created_at_utc=now_utc,
|
|
updated_at_utc=now_utc,
|
|
)
|
|
)
|
|
|
|
results = repo.list_all()
|
|
assert len(results) == 2
|