feat: add repositories and reference data seeder
Implements Step 1.5: Creates ULID generation utility, repositories for all reference types (species, locations, products, feed_types, users), and an idempotent seeder that populates initial data. Updates CLI seed command to run migrations and seed data. Seed data: - 3 users (ppetru, ines as admin; guest as recorder) - 8 locations (Strip 1-4, Nursery 1-4) - 3 species (duck, goose active; sheep inactive) - 7 products (egg.duck, meat, offal, fat, bones, feathers, down) - 3 feed types (starter, grower, layer - 20kg bags) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -63,9 +63,28 @@ def main():
|
|||||||
print(f"Created migration: {filepath}")
|
print(f"Created migration: {filepath}")
|
||||||
print(" Edit the file to add your schema changes")
|
print(" Edit the file to add your schema changes")
|
||||||
elif args.command == "seed":
|
elif args.command == "seed":
|
||||||
|
from animaltrack.config import Settings
|
||||||
|
from animaltrack.db import get_db
|
||||||
|
from animaltrack.migrations import run_migrations
|
||||||
|
from animaltrack.seeds import run_seeds
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
migrations_dir = "migrations"
|
||||||
|
|
||||||
|
print("Running migrations...")
|
||||||
|
success = run_migrations(
|
||||||
|
db_path=settings.db_path,
|
||||||
|
migrations_dir=migrations_dir,
|
||||||
|
verbose=False,
|
||||||
|
)
|
||||||
|
if not success:
|
||||||
|
print("Migration failed", file=sys.stderr)
|
||||||
|
sys.exit(1)
|
||||||
|
|
||||||
print("Loading seed data...")
|
print("Loading seed data...")
|
||||||
# TODO: Implement seeder
|
db = get_db(settings.db_path)
|
||||||
print("Seeding not yet implemented")
|
run_seeds(db)
|
||||||
|
print("Seed data loaded successfully")
|
||||||
elif args.command == "serve":
|
elif args.command == "serve":
|
||||||
print(f"Starting server on {args.host}:{args.port}...")
|
print(f"Starting server on {args.host}:{args.port}...")
|
||||||
# TODO: Implement server
|
# TODO: Implement server
|
||||||
|
|||||||
13
src/animaltrack/id_gen.py
Normal file
13
src/animaltrack/id_gen.py
Normal file
@@ -0,0 +1,13 @@
|
|||||||
|
# ABOUTME: ULID generation utility for creating unique identifiers.
|
||||||
|
# ABOUTME: Provides a simple wrapper around the python-ulid library.
|
||||||
|
|
||||||
|
from ulid import ULID
|
||||||
|
|
||||||
|
|
||||||
|
def generate_id() -> str:
|
||||||
|
"""Generate a new ULID as a 26-character string.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
A 26-character uppercase alphanumeric ULID string.
|
||||||
|
"""
|
||||||
|
return str(ULID())
|
||||||
16
src/animaltrack/repositories/__init__.py
Normal file
16
src/animaltrack/repositories/__init__.py
Normal file
@@ -0,0 +1,16 @@
|
|||||||
|
# ABOUTME: Repositories package for AnimalTrack.
|
||||||
|
# ABOUTME: Exports all repository classes for reference data CRUD operations.
|
||||||
|
|
||||||
|
from animaltrack.repositories.feed_types import FeedTypeRepository
|
||||||
|
from animaltrack.repositories.locations import LocationRepository
|
||||||
|
from animaltrack.repositories.products import ProductRepository
|
||||||
|
from animaltrack.repositories.species import SpeciesRepository
|
||||||
|
from animaltrack.repositories.users import UserRepository
|
||||||
|
|
||||||
|
__all__ = [
|
||||||
|
"FeedTypeRepository",
|
||||||
|
"LocationRepository",
|
||||||
|
"ProductRepository",
|
||||||
|
"SpeciesRepository",
|
||||||
|
"UserRepository",
|
||||||
|
]
|
||||||
94
src/animaltrack/repositories/feed_types.py
Normal file
94
src/animaltrack/repositories/feed_types.py
Normal file
@@ -0,0 +1,94 @@
|
|||||||
|
# ABOUTME: Repository for feed type reference data.
|
||||||
|
# ABOUTME: Provides CRUD operations for the feed_types table.
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from animaltrack.models.reference import FeedType
|
||||||
|
|
||||||
|
|
||||||
|
class FeedTypeRepository:
|
||||||
|
"""Repository for managing feed type data."""
|
||||||
|
|
||||||
|
def __init__(self, db: Any) -> None:
|
||||||
|
"""Initialize repository with database connection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: A fastlite database connection.
|
||||||
|
"""
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def upsert(self, feed_type: FeedType) -> None:
|
||||||
|
"""Insert or update a feed type record.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
feed_type: The feed type to upsert.
|
||||||
|
"""
|
||||||
|
self.db.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO feed_types
|
||||||
|
(code, name, default_bag_size_kg, protein_pct, active, created_at_utc, updated_at_utc)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
feed_type.code,
|
||||||
|
feed_type.name,
|
||||||
|
feed_type.default_bag_size_kg,
|
||||||
|
feed_type.protein_pct,
|
||||||
|
int(feed_type.active),
|
||||||
|
feed_type.created_at_utc,
|
||||||
|
feed_type.updated_at_utc,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def get(self, code: str) -> FeedType | None:
|
||||||
|
"""Get a feed type by code.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
code: The feed type code.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The feed type if found, None otherwise.
|
||||||
|
"""
|
||||||
|
row = self.db.execute(
|
||||||
|
"""
|
||||||
|
SELECT code, name, default_bag_size_kg, protein_pct, active, created_at_utc, updated_at_utc
|
||||||
|
FROM feed_types WHERE code = ?
|
||||||
|
""",
|
||||||
|
(code,),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return FeedType(
|
||||||
|
code=row[0],
|
||||||
|
name=row[1],
|
||||||
|
default_bag_size_kg=row[2],
|
||||||
|
protein_pct=row[3],
|
||||||
|
active=bool(row[4]),
|
||||||
|
created_at_utc=row[5],
|
||||||
|
updated_at_utc=row[6],
|
||||||
|
)
|
||||||
|
|
||||||
|
def list_all(self) -> list[FeedType]:
|
||||||
|
"""Get all feed types.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all feed types.
|
||||||
|
"""
|
||||||
|
rows = self.db.execute(
|
||||||
|
"""
|
||||||
|
SELECT code, name, default_bag_size_kg, protein_pct, active, created_at_utc, updated_at_utc
|
||||||
|
FROM feed_types
|
||||||
|
"""
|
||||||
|
).fetchall()
|
||||||
|
return [
|
||||||
|
FeedType(
|
||||||
|
code=row[0],
|
||||||
|
name=row[1],
|
||||||
|
default_bag_size_kg=row[2],
|
||||||
|
protein_pct=row[3],
|
||||||
|
active=bool(row[4]),
|
||||||
|
created_at_utc=row[5],
|
||||||
|
updated_at_utc=row[6],
|
||||||
|
)
|
||||||
|
for row in rows
|
||||||
|
]
|
||||||
104
src/animaltrack/repositories/locations.py
Normal file
104
src/animaltrack/repositories/locations.py
Normal file
@@ -0,0 +1,104 @@
|
|||||||
|
# ABOUTME: Repository for location reference data.
|
||||||
|
# ABOUTME: Provides CRUD operations for the locations table.
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from animaltrack.models.reference import Location
|
||||||
|
|
||||||
|
|
||||||
|
class LocationRepository:
|
||||||
|
"""Repository for managing location data."""
|
||||||
|
|
||||||
|
def __init__(self, db: Any) -> None:
|
||||||
|
"""Initialize repository with database connection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: A fastlite database connection.
|
||||||
|
"""
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def upsert(self, location: Location) -> None:
|
||||||
|
"""Insert or update a location record.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
location: The location to upsert.
|
||||||
|
"""
|
||||||
|
self.db.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO locations (id, name, active, created_at_utc, updated_at_utc)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
location.id,
|
||||||
|
location.name,
|
||||||
|
int(location.active),
|
||||||
|
location.created_at_utc,
|
||||||
|
location.updated_at_utc,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def get(self, location_id: str) -> Location | None:
|
||||||
|
"""Get a location by ID.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
location_id: The location ID (ULID).
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The location if found, None otherwise.
|
||||||
|
"""
|
||||||
|
row = self.db.execute(
|
||||||
|
"SELECT id, name, active, created_at_utc, updated_at_utc FROM locations WHERE id = ?",
|
||||||
|
(location_id,),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return Location(
|
||||||
|
id=row[0],
|
||||||
|
name=row[1],
|
||||||
|
active=bool(row[2]),
|
||||||
|
created_at_utc=row[3],
|
||||||
|
updated_at_utc=row[4],
|
||||||
|
)
|
||||||
|
|
||||||
|
def get_by_name(self, name: str) -> Location | None:
|
||||||
|
"""Get a location by name.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
name: The location name.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The location if found, None otherwise.
|
||||||
|
"""
|
||||||
|
row = self.db.execute(
|
||||||
|
"SELECT id, name, active, created_at_utc, updated_at_utc FROM locations WHERE name = ?",
|
||||||
|
(name,),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return Location(
|
||||||
|
id=row[0],
|
||||||
|
name=row[1],
|
||||||
|
active=bool(row[2]),
|
||||||
|
created_at_utc=row[3],
|
||||||
|
updated_at_utc=row[4],
|
||||||
|
)
|
||||||
|
|
||||||
|
def list_all(self) -> list[Location]:
|
||||||
|
"""Get all locations.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all locations.
|
||||||
|
"""
|
||||||
|
rows = self.db.execute(
|
||||||
|
"SELECT id, name, active, created_at_utc, updated_at_utc FROM locations"
|
||||||
|
).fetchall()
|
||||||
|
return [
|
||||||
|
Location(
|
||||||
|
id=row[0],
|
||||||
|
name=row[1],
|
||||||
|
active=bool(row[2]),
|
||||||
|
created_at_utc=row[3],
|
||||||
|
updated_at_utc=row[4],
|
||||||
|
)
|
||||||
|
for row in rows
|
||||||
|
]
|
||||||
97
src/animaltrack/repositories/products.py
Normal file
97
src/animaltrack/repositories/products.py
Normal file
@@ -0,0 +1,97 @@
|
|||||||
|
# ABOUTME: Repository for product reference data.
|
||||||
|
# ABOUTME: Provides CRUD operations for the products table.
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from animaltrack.models.reference import Product, ProductUnit
|
||||||
|
|
||||||
|
|
||||||
|
class ProductRepository:
|
||||||
|
"""Repository for managing product data."""
|
||||||
|
|
||||||
|
def __init__(self, db: Any) -> None:
|
||||||
|
"""Initialize repository with database connection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: A fastlite database connection.
|
||||||
|
"""
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def upsert(self, product: Product) -> None:
|
||||||
|
"""Insert or update a product record.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
product: The product to upsert.
|
||||||
|
"""
|
||||||
|
self.db.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO products
|
||||||
|
(code, name, unit, collectable, sellable, active, created_at_utc, updated_at_utc)
|
||||||
|
VALUES (?, ?, ?, ?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
product.code,
|
||||||
|
product.name,
|
||||||
|
product.unit.value,
|
||||||
|
int(product.collectable),
|
||||||
|
int(product.sellable),
|
||||||
|
int(product.active),
|
||||||
|
product.created_at_utc,
|
||||||
|
product.updated_at_utc,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def get(self, code: str) -> Product | None:
|
||||||
|
"""Get a product by code.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
code: The product code.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The product if found, None otherwise.
|
||||||
|
"""
|
||||||
|
row = self.db.execute(
|
||||||
|
"""
|
||||||
|
SELECT code, name, unit, collectable, sellable, active, created_at_utc, updated_at_utc
|
||||||
|
FROM products WHERE code = ?
|
||||||
|
""",
|
||||||
|
(code,),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return Product(
|
||||||
|
code=row[0],
|
||||||
|
name=row[1],
|
||||||
|
unit=ProductUnit(row[2]),
|
||||||
|
collectable=bool(row[3]),
|
||||||
|
sellable=bool(row[4]),
|
||||||
|
active=bool(row[5]),
|
||||||
|
created_at_utc=row[6],
|
||||||
|
updated_at_utc=row[7],
|
||||||
|
)
|
||||||
|
|
||||||
|
def list_all(self) -> list[Product]:
|
||||||
|
"""Get all products.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all products.
|
||||||
|
"""
|
||||||
|
rows = self.db.execute(
|
||||||
|
"""
|
||||||
|
SELECT code, name, unit, collectable, sellable, active, created_at_utc, updated_at_utc
|
||||||
|
FROM products
|
||||||
|
"""
|
||||||
|
).fetchall()
|
||||||
|
return [
|
||||||
|
Product(
|
||||||
|
code=row[0],
|
||||||
|
name=row[1],
|
||||||
|
unit=ProductUnit(row[2]),
|
||||||
|
collectable=bool(row[3]),
|
||||||
|
sellable=bool(row[4]),
|
||||||
|
active=bool(row[5]),
|
||||||
|
created_at_utc=row[6],
|
||||||
|
updated_at_utc=row[7],
|
||||||
|
)
|
||||||
|
for row in rows
|
||||||
|
]
|
||||||
81
src/animaltrack/repositories/species.py
Normal file
81
src/animaltrack/repositories/species.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
# ABOUTME: Repository for species reference data.
|
||||||
|
# ABOUTME: Provides CRUD operations for the species table.
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from animaltrack.models.reference import Species
|
||||||
|
|
||||||
|
|
||||||
|
class SpeciesRepository:
|
||||||
|
"""Repository for managing species data."""
|
||||||
|
|
||||||
|
def __init__(self, db: Any) -> None:
|
||||||
|
"""Initialize repository with database connection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: A fastlite database connection.
|
||||||
|
"""
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def upsert(self, species: Species) -> None:
|
||||||
|
"""Insert or update a species record.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
species: The species to upsert.
|
||||||
|
"""
|
||||||
|
self.db.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO species (code, name, active, created_at_utc, updated_at_utc)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
species.code,
|
||||||
|
species.name,
|
||||||
|
int(species.active),
|
||||||
|
species.created_at_utc,
|
||||||
|
species.updated_at_utc,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def get(self, code: str) -> Species | None:
|
||||||
|
"""Get a species by code.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
code: The species code.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The species if found, None otherwise.
|
||||||
|
"""
|
||||||
|
row = self.db.execute(
|
||||||
|
"SELECT code, name, active, created_at_utc, updated_at_utc FROM species WHERE code = ?",
|
||||||
|
(code,),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return Species(
|
||||||
|
code=row[0],
|
||||||
|
name=row[1],
|
||||||
|
active=bool(row[2]),
|
||||||
|
created_at_utc=row[3],
|
||||||
|
updated_at_utc=row[4],
|
||||||
|
)
|
||||||
|
|
||||||
|
def list_all(self) -> list[Species]:
|
||||||
|
"""Get all species.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all species.
|
||||||
|
"""
|
||||||
|
rows = self.db.execute(
|
||||||
|
"SELECT code, name, active, created_at_utc, updated_at_utc FROM species"
|
||||||
|
).fetchall()
|
||||||
|
return [
|
||||||
|
Species(
|
||||||
|
code=row[0],
|
||||||
|
name=row[1],
|
||||||
|
active=bool(row[2]),
|
||||||
|
created_at_utc=row[3],
|
||||||
|
updated_at_utc=row[4],
|
||||||
|
)
|
||||||
|
for row in rows
|
||||||
|
]
|
||||||
81
src/animaltrack/repositories/users.py
Normal file
81
src/animaltrack/repositories/users.py
Normal file
@@ -0,0 +1,81 @@
|
|||||||
|
# ABOUTME: Repository for user reference data.
|
||||||
|
# ABOUTME: Provides CRUD operations for the users table.
|
||||||
|
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from animaltrack.models.reference import User, UserRole
|
||||||
|
|
||||||
|
|
||||||
|
class UserRepository:
|
||||||
|
"""Repository for managing user data."""
|
||||||
|
|
||||||
|
def __init__(self, db: Any) -> None:
|
||||||
|
"""Initialize repository with database connection.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: A fastlite database connection.
|
||||||
|
"""
|
||||||
|
self.db = db
|
||||||
|
|
||||||
|
def upsert(self, user: User) -> None:
|
||||||
|
"""Insert or update a user record.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
user: The user to upsert.
|
||||||
|
"""
|
||||||
|
self.db.execute(
|
||||||
|
"""
|
||||||
|
INSERT OR REPLACE INTO users (username, role, active, created_at_utc, updated_at_utc)
|
||||||
|
VALUES (?, ?, ?, ?, ?)
|
||||||
|
""",
|
||||||
|
(
|
||||||
|
user.username,
|
||||||
|
user.role.value,
|
||||||
|
int(user.active),
|
||||||
|
user.created_at_utc,
|
||||||
|
user.updated_at_utc,
|
||||||
|
),
|
||||||
|
)
|
||||||
|
|
||||||
|
def get(self, username: str) -> User | None:
|
||||||
|
"""Get a user by username.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
username: The username.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
The user if found, None otherwise.
|
||||||
|
"""
|
||||||
|
row = self.db.execute(
|
||||||
|
"SELECT username, role, active, created_at_utc, updated_at_utc FROM users WHERE username = ?",
|
||||||
|
(username,),
|
||||||
|
).fetchone()
|
||||||
|
if row is None:
|
||||||
|
return None
|
||||||
|
return User(
|
||||||
|
username=row[0],
|
||||||
|
role=UserRole(row[1]),
|
||||||
|
active=bool(row[2]),
|
||||||
|
created_at_utc=row[3],
|
||||||
|
updated_at_utc=row[4],
|
||||||
|
)
|
||||||
|
|
||||||
|
def list_all(self) -> list[User]:
|
||||||
|
"""Get all users.
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
List of all users.
|
||||||
|
"""
|
||||||
|
rows = self.db.execute(
|
||||||
|
"SELECT username, role, active, created_at_utc, updated_at_utc FROM users"
|
||||||
|
).fetchall()
|
||||||
|
return [
|
||||||
|
User(
|
||||||
|
username=row[0],
|
||||||
|
role=UserRole(row[1]),
|
||||||
|
active=bool(row[2]),
|
||||||
|
created_at_utc=row[3],
|
||||||
|
updated_at_utc=row[4],
|
||||||
|
)
|
||||||
|
for row in rows
|
||||||
|
]
|
||||||
255
src/animaltrack/seeds.py
Normal file
255
src/animaltrack/seeds.py
Normal file
@@ -0,0 +1,255 @@
|
|||||||
|
# ABOUTME: Idempotent seeder for reference data.
|
||||||
|
# ABOUTME: Seeds users, locations, species, products, and feed types.
|
||||||
|
|
||||||
|
import time
|
||||||
|
from typing import Any
|
||||||
|
|
||||||
|
from animaltrack.id_gen import generate_id
|
||||||
|
from animaltrack.models.reference import (
|
||||||
|
FeedType,
|
||||||
|
Location,
|
||||||
|
Product,
|
||||||
|
ProductUnit,
|
||||||
|
Species,
|
||||||
|
User,
|
||||||
|
UserRole,
|
||||||
|
)
|
||||||
|
from animaltrack.repositories import (
|
||||||
|
FeedTypeRepository,
|
||||||
|
LocationRepository,
|
||||||
|
ProductRepository,
|
||||||
|
SpeciesRepository,
|
||||||
|
UserRepository,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def run_seeds(db: Any) -> None:
|
||||||
|
"""Seed all reference data.
|
||||||
|
|
||||||
|
This function is idempotent - running it multiple times produces the same result.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
db: A fastlite database connection with migrations applied.
|
||||||
|
"""
|
||||||
|
now_utc = int(time.time() * 1000)
|
||||||
|
|
||||||
|
_seed_users(db, now_utc)
|
||||||
|
_seed_locations(db, now_utc)
|
||||||
|
_seed_species(db, now_utc)
|
||||||
|
_seed_products(db, now_utc)
|
||||||
|
_seed_feed_types(db, now_utc)
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_users(db: Any, now_utc: int) -> None:
|
||||||
|
"""Seed user data."""
|
||||||
|
repo = UserRepository(db)
|
||||||
|
|
||||||
|
users = [
|
||||||
|
User(
|
||||||
|
username="ppetru",
|
||||||
|
role=UserRole.ADMIN,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
User(
|
||||||
|
username="ines",
|
||||||
|
role=UserRole.ADMIN,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
User(
|
||||||
|
username="guest",
|
||||||
|
role=UserRole.RECORDER,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
for user in users:
|
||||||
|
repo.upsert(user)
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_locations(db: Any, now_utc: int) -> None:
|
||||||
|
"""Seed location data.
|
||||||
|
|
||||||
|
Locations are created only if they don't exist (by name).
|
||||||
|
This preserves existing ULIDs on re-seeding.
|
||||||
|
"""
|
||||||
|
repo = LocationRepository(db)
|
||||||
|
|
||||||
|
location_names = [
|
||||||
|
"Strip 1",
|
||||||
|
"Strip 2",
|
||||||
|
"Strip 3",
|
||||||
|
"Strip 4",
|
||||||
|
"Nursery 1",
|
||||||
|
"Nursery 2",
|
||||||
|
"Nursery 3",
|
||||||
|
"Nursery 4",
|
||||||
|
]
|
||||||
|
|
||||||
|
for name in location_names:
|
||||||
|
existing = repo.get_by_name(name)
|
||||||
|
if existing is None:
|
||||||
|
location = Location(
|
||||||
|
id=generate_id(),
|
||||||
|
name=name,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(location)
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_species(db: Any, now_utc: int) -> None:
|
||||||
|
"""Seed species data."""
|
||||||
|
repo = SpeciesRepository(db)
|
||||||
|
|
||||||
|
species_list = [
|
||||||
|
Species(
|
||||||
|
code="duck",
|
||||||
|
name="Duck",
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Species(
|
||||||
|
code="goose",
|
||||||
|
name="Goose",
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Species(
|
||||||
|
code="sheep",
|
||||||
|
name="Sheep",
|
||||||
|
active=False,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
for species in species_list:
|
||||||
|
repo.upsert(species)
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_products(db: Any, now_utc: int) -> None:
|
||||||
|
"""Seed product data."""
|
||||||
|
repo = ProductRepository(db)
|
||||||
|
|
||||||
|
products = [
|
||||||
|
Product(
|
||||||
|
code="egg.duck",
|
||||||
|
name="Duck Egg",
|
||||||
|
unit=ProductUnit.PIECE,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Product(
|
||||||
|
code="meat",
|
||||||
|
name="Meat",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Product(
|
||||||
|
code="offal",
|
||||||
|
name="Offal",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Product(
|
||||||
|
code="fat",
|
||||||
|
name="Fat",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Product(
|
||||||
|
code="bones",
|
||||||
|
name="Bones",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Product(
|
||||||
|
code="feathers",
|
||||||
|
name="Feathers",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
Product(
|
||||||
|
code="down",
|
||||||
|
name="Down",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
for product in products:
|
||||||
|
repo.upsert(product)
|
||||||
|
|
||||||
|
|
||||||
|
def _seed_feed_types(db: Any, now_utc: int) -> None:
|
||||||
|
"""Seed feed type data."""
|
||||||
|
repo = FeedTypeRepository(db)
|
||||||
|
|
||||||
|
feed_types = [
|
||||||
|
FeedType(
|
||||||
|
code="starter",
|
||||||
|
name="Starter Feed",
|
||||||
|
default_bag_size_kg=20,
|
||||||
|
protein_pct=None,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
FeedType(
|
||||||
|
code="grower",
|
||||||
|
name="Grower Feed",
|
||||||
|
default_bag_size_kg=20,
|
||||||
|
protein_pct=None,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
FeedType(
|
||||||
|
code="layer",
|
||||||
|
name="Layer Feed",
|
||||||
|
default_bag_size_kg=20,
|
||||||
|
protein_pct=None,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
),
|
||||||
|
]
|
||||||
|
|
||||||
|
for feed_type in feed_types:
|
||||||
|
repo.upsert(feed_type)
|
||||||
30
tests/test_id_gen.py
Normal file
30
tests/test_id_gen.py
Normal file
@@ -0,0 +1,30 @@
|
|||||||
|
# ABOUTME: Tests for ULID generation utility.
|
||||||
|
# ABOUTME: Verifies that generated IDs are valid 26-character ULIDs and unique.
|
||||||
|
|
||||||
|
from animaltrack.id_gen import generate_id
|
||||||
|
|
||||||
|
|
||||||
|
class TestGenerateId:
|
||||||
|
"""Tests for the generate_id function."""
|
||||||
|
|
||||||
|
def test_returns_string(self):
|
||||||
|
"""generate_id returns a string."""
|
||||||
|
result = generate_id()
|
||||||
|
assert isinstance(result, str)
|
||||||
|
|
||||||
|
def test_returns_26_characters(self):
|
||||||
|
"""generate_id returns exactly 26 characters (ULID format)."""
|
||||||
|
result = generate_id()
|
||||||
|
assert len(result) == 26
|
||||||
|
|
||||||
|
def test_returns_unique_ids(self):
|
||||||
|
"""generate_id returns unique IDs on each call."""
|
||||||
|
ids = [generate_id() for _ in range(100)]
|
||||||
|
assert len(set(ids)) == 100
|
||||||
|
|
||||||
|
def test_ulid_is_uppercase_alphanumeric(self):
|
||||||
|
"""ULID contains only uppercase letters and digits (Crockford base32)."""
|
||||||
|
result = generate_id()
|
||||||
|
# Crockford base32 excludes I, L, O, U
|
||||||
|
valid_chars = set("0123456789ABCDEFGHJKMNPQRSTVWXYZ")
|
||||||
|
assert all(c in valid_chars for c in result)
|
||||||
422
tests/test_repositories.py
Normal file
422
tests/test_repositories.py
Normal file
@@ -0,0 +1,422 @@
|
|||||||
|
# ABOUTME: Tests for reference data repositories.
|
||||||
|
# ABOUTME: Validates CRUD operations for species, locations, products, feed_types, and users.
|
||||||
|
|
||||||
|
import time
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from animaltrack.db import get_db
|
||||||
|
from animaltrack.id_gen import generate_id
|
||||||
|
from animaltrack.migrations import run_migrations
|
||||||
|
from animaltrack.models.reference import (
|
||||||
|
FeedType,
|
||||||
|
Location,
|
||||||
|
Product,
|
||||||
|
ProductUnit,
|
||||||
|
Species,
|
||||||
|
User,
|
||||||
|
UserRole,
|
||||||
|
)
|
||||||
|
from animaltrack.repositories import (
|
||||||
|
FeedTypeRepository,
|
||||||
|
LocationRepository,
|
||||||
|
ProductRepository,
|
||||||
|
SpeciesRepository,
|
||||||
|
UserRepository,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def migrated_db(tmp_path):
|
||||||
|
"""Create a database with migrations applied."""
|
||||||
|
db_path = str(tmp_path / "test.db")
|
||||||
|
run_migrations(db_path, "migrations", verbose=False)
|
||||||
|
return get_db(db_path)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def now_utc():
|
||||||
|
"""Current time in milliseconds since epoch."""
|
||||||
|
return int(time.time() * 1000)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSpeciesRepository:
|
||||||
|
"""Tests for SpeciesRepository."""
|
||||||
|
|
||||||
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert creates a new species record."""
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
species = Species(
|
||||||
|
code="duck",
|
||||||
|
name="Duck",
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(species)
|
||||||
|
|
||||||
|
result = repo.get("duck")
|
||||||
|
assert result is not None
|
||||||
|
assert result.code == "duck"
|
||||||
|
assert result.name == "Duck"
|
||||||
|
assert result.active is True
|
||||||
|
|
||||||
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert updates an existing species record."""
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
species = Species(
|
||||||
|
code="duck",
|
||||||
|
name="Duck",
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(species)
|
||||||
|
|
||||||
|
updated = Species(
|
||||||
|
code="duck",
|
||||||
|
name="Domestic Duck",
|
||||||
|
active=False,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc + 1000,
|
||||||
|
)
|
||||||
|
repo.upsert(updated)
|
||||||
|
|
||||||
|
result = repo.get("duck")
|
||||||
|
assert result.name == "Domestic Duck"
|
||||||
|
assert result.active is False
|
||||||
|
|
||||||
|
def test_get_returns_none_for_missing(self, migrated_db):
|
||||||
|
"""get returns None for non-existent species."""
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
assert repo.get("nonexistent") is None
|
||||||
|
|
||||||
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
||||||
|
"""list_all returns all species."""
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
repo.upsert(
|
||||||
|
Species(
|
||||||
|
code="duck",
|
||||||
|
name="Duck",
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
repo.upsert(
|
||||||
|
Species(
|
||||||
|
code="goose",
|
||||||
|
name="Goose",
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = repo.list_all()
|
||||||
|
assert len(results) == 2
|
||||||
|
codes = {s.code for s in results}
|
||||||
|
assert codes == {"duck", "goose"}
|
||||||
|
|
||||||
|
|
||||||
|
class TestLocationRepository:
|
||||||
|
"""Tests for LocationRepository."""
|
||||||
|
|
||||||
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert creates a new location record."""
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
location_id = generate_id()
|
||||||
|
location = Location(
|
||||||
|
id=location_id,
|
||||||
|
name="Strip 1",
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(location)
|
||||||
|
|
||||||
|
result = repo.get(location_id)
|
||||||
|
assert result is not None
|
||||||
|
assert result.name == "Strip 1"
|
||||||
|
|
||||||
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert updates an existing location record."""
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
location_id = generate_id()
|
||||||
|
location = Location(
|
||||||
|
id=location_id,
|
||||||
|
name="Strip 1",
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(location)
|
||||||
|
|
||||||
|
updated = Location(
|
||||||
|
id=location_id,
|
||||||
|
name="Strip 1 - Renamed",
|
||||||
|
active=False,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc + 1000,
|
||||||
|
)
|
||||||
|
repo.upsert(updated)
|
||||||
|
|
||||||
|
result = repo.get(location_id)
|
||||||
|
assert result.name == "Strip 1 - Renamed"
|
||||||
|
assert result.active is False
|
||||||
|
|
||||||
|
def test_get_by_name_returns_location(self, migrated_db, now_utc):
|
||||||
|
"""get_by_name returns location by name."""
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
location_id = generate_id()
|
||||||
|
location = Location(
|
||||||
|
id=location_id,
|
||||||
|
name="Nursery 1",
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(location)
|
||||||
|
|
||||||
|
result = repo.get_by_name("Nursery 1")
|
||||||
|
assert result is not None
|
||||||
|
assert result.id == location_id
|
||||||
|
|
||||||
|
def test_get_by_name_returns_none_for_missing(self, migrated_db):
|
||||||
|
"""get_by_name returns None for non-existent location."""
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
assert repo.get_by_name("Nonexistent") is None
|
||||||
|
|
||||||
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
||||||
|
"""list_all returns all locations."""
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
repo.upsert(
|
||||||
|
Location(
|
||||||
|
id=generate_id(),
|
||||||
|
name="Strip 1",
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
repo.upsert(
|
||||||
|
Location(
|
||||||
|
id=generate_id(),
|
||||||
|
name="Strip 2",
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = repo.list_all()
|
||||||
|
assert len(results) == 2
|
||||||
|
|
||||||
|
|
||||||
|
class TestProductRepository:
|
||||||
|
"""Tests for ProductRepository."""
|
||||||
|
|
||||||
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert creates a new product record."""
|
||||||
|
repo = ProductRepository(migrated_db)
|
||||||
|
product = Product(
|
||||||
|
code="egg.duck",
|
||||||
|
name="Duck Egg",
|
||||||
|
unit=ProductUnit.PIECE,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(product)
|
||||||
|
|
||||||
|
result = repo.get("egg.duck")
|
||||||
|
assert result is not None
|
||||||
|
assert result.name == "Duck Egg"
|
||||||
|
assert result.unit == ProductUnit.PIECE
|
||||||
|
|
||||||
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert updates an existing product record."""
|
||||||
|
repo = ProductRepository(migrated_db)
|
||||||
|
product = Product(
|
||||||
|
code="meat",
|
||||||
|
name="Meat",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(product)
|
||||||
|
|
||||||
|
updated = Product(
|
||||||
|
code="meat",
|
||||||
|
name="Fresh Meat",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=False,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc + 1000,
|
||||||
|
)
|
||||||
|
repo.upsert(updated)
|
||||||
|
|
||||||
|
result = repo.get("meat")
|
||||||
|
assert result.name == "Fresh Meat"
|
||||||
|
assert result.sellable is False
|
||||||
|
|
||||||
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
||||||
|
"""list_all returns all products."""
|
||||||
|
repo = ProductRepository(migrated_db)
|
||||||
|
repo.upsert(
|
||||||
|
Product(
|
||||||
|
code="egg.duck",
|
||||||
|
name="Duck Egg",
|
||||||
|
unit=ProductUnit.PIECE,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
repo.upsert(
|
||||||
|
Product(
|
||||||
|
code="meat",
|
||||||
|
name="Meat",
|
||||||
|
unit=ProductUnit.KG,
|
||||||
|
collectable=True,
|
||||||
|
sellable=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = repo.list_all()
|
||||||
|
assert len(results) == 2
|
||||||
|
|
||||||
|
|
||||||
|
class TestFeedTypeRepository:
|
||||||
|
"""Tests for FeedTypeRepository."""
|
||||||
|
|
||||||
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert creates a new feed type record."""
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
feed_type = FeedType(
|
||||||
|
code="layer",
|
||||||
|
name="Layer Feed",
|
||||||
|
default_bag_size_kg=20,
|
||||||
|
protein_pct=16.5,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(feed_type)
|
||||||
|
|
||||||
|
result = repo.get("layer")
|
||||||
|
assert result is not None
|
||||||
|
assert result.name == "Layer Feed"
|
||||||
|
assert result.default_bag_size_kg == 20
|
||||||
|
assert result.protein_pct == 16.5
|
||||||
|
|
||||||
|
def test_upsert_with_null_protein(self, migrated_db, now_utc):
|
||||||
|
"""upsert handles None protein_pct correctly."""
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
feed_type = FeedType(
|
||||||
|
code="starter",
|
||||||
|
name="Starter Feed",
|
||||||
|
default_bag_size_kg=20,
|
||||||
|
protein_pct=None,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(feed_type)
|
||||||
|
|
||||||
|
result = repo.get("starter")
|
||||||
|
assert result.protein_pct is None
|
||||||
|
|
||||||
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
||||||
|
"""list_all returns all feed types."""
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
repo.upsert(
|
||||||
|
FeedType(
|
||||||
|
code="starter",
|
||||||
|
name="Starter",
|
||||||
|
default_bag_size_kg=20,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
repo.upsert(
|
||||||
|
FeedType(
|
||||||
|
code="grower",
|
||||||
|
name="Grower",
|
||||||
|
default_bag_size_kg=25,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = repo.list_all()
|
||||||
|
assert len(results) == 2
|
||||||
|
|
||||||
|
|
||||||
|
class TestUserRepository:
|
||||||
|
"""Tests for UserRepository."""
|
||||||
|
|
||||||
|
def test_upsert_creates_new_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert creates a new user record."""
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
user = User(
|
||||||
|
username="ppetru",
|
||||||
|
role=UserRole.ADMIN,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(user)
|
||||||
|
|
||||||
|
result = repo.get("ppetru")
|
||||||
|
assert result is not None
|
||||||
|
assert result.username == "ppetru"
|
||||||
|
assert result.role == UserRole.ADMIN
|
||||||
|
|
||||||
|
def test_upsert_updates_existing_record(self, migrated_db, now_utc):
|
||||||
|
"""upsert updates an existing user record."""
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
user = User(
|
||||||
|
username="guest",
|
||||||
|
role=UserRole.RECORDER,
|
||||||
|
active=True,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
repo.upsert(user)
|
||||||
|
|
||||||
|
updated = User(
|
||||||
|
username="guest",
|
||||||
|
role=UserRole.ADMIN,
|
||||||
|
active=False,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc + 1000,
|
||||||
|
)
|
||||||
|
repo.upsert(updated)
|
||||||
|
|
||||||
|
result = repo.get("guest")
|
||||||
|
assert result.role == UserRole.ADMIN
|
||||||
|
assert result.active is False
|
||||||
|
|
||||||
|
def test_list_all_returns_all_records(self, migrated_db, now_utc):
|
||||||
|
"""list_all returns all users."""
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
repo.upsert(
|
||||||
|
User(
|
||||||
|
username="ppetru",
|
||||||
|
role=UserRole.ADMIN,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
repo.upsert(
|
||||||
|
User(
|
||||||
|
username="guest",
|
||||||
|
role=UserRole.RECORDER,
|
||||||
|
created_at_utc=now_utc,
|
||||||
|
updated_at_utc=now_utc,
|
||||||
|
)
|
||||||
|
)
|
||||||
|
|
||||||
|
results = repo.list_all()
|
||||||
|
assert len(results) == 2
|
||||||
242
tests/test_seeds.py
Normal file
242
tests/test_seeds.py
Normal file
@@ -0,0 +1,242 @@
|
|||||||
|
# ABOUTME: Tests for the reference data seeder.
|
||||||
|
# ABOUTME: Validates that seeding populates correct data and is idempotent.
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from animaltrack.db import get_db
|
||||||
|
from animaltrack.migrations import run_migrations
|
||||||
|
from animaltrack.models.reference import ProductUnit, UserRole
|
||||||
|
from animaltrack.repositories import (
|
||||||
|
FeedTypeRepository,
|
||||||
|
LocationRepository,
|
||||||
|
ProductRepository,
|
||||||
|
SpeciesRepository,
|
||||||
|
UserRepository,
|
||||||
|
)
|
||||||
|
from animaltrack.seeds import run_seeds
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def migrated_db(tmp_path):
|
||||||
|
"""Create a database with migrations applied."""
|
||||||
|
db_path = str(tmp_path / "test.db")
|
||||||
|
run_migrations(db_path, "migrations", verbose=False)
|
||||||
|
return get_db(db_path)
|
||||||
|
|
||||||
|
|
||||||
|
class TestSeedCounts:
|
||||||
|
"""Tests that seeding creates the correct number of records."""
|
||||||
|
|
||||||
|
def test_seeds_three_users(self, migrated_db):
|
||||||
|
"""Seeding creates 3 users."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 3
|
||||||
|
|
||||||
|
def test_seeds_eight_locations(self, migrated_db):
|
||||||
|
"""Seeding creates 8 locations."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 8
|
||||||
|
|
||||||
|
def test_seeds_three_species(self, migrated_db):
|
||||||
|
"""Seeding creates 3 species."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 3
|
||||||
|
|
||||||
|
def test_seeds_seven_products(self, migrated_db):
|
||||||
|
"""Seeding creates 7 products."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = ProductRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 7
|
||||||
|
|
||||||
|
def test_seeds_three_feed_types(self, migrated_db):
|
||||||
|
"""Seeding creates 3 feed types."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 3
|
||||||
|
|
||||||
|
|
||||||
|
class TestSeedIdempotency:
|
||||||
|
"""Tests that seeding is idempotent."""
|
||||||
|
|
||||||
|
def test_seeding_twice_same_user_count(self, migrated_db):
|
||||||
|
"""Seeding twice produces same number of users."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 3
|
||||||
|
|
||||||
|
def test_seeding_twice_same_location_count(self, migrated_db):
|
||||||
|
"""Seeding twice produces same number of locations."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 8
|
||||||
|
|
||||||
|
def test_seeding_twice_same_species_count(self, migrated_db):
|
||||||
|
"""Seeding twice produces same number of species."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 3
|
||||||
|
|
||||||
|
def test_seeding_twice_same_product_count(self, migrated_db):
|
||||||
|
"""Seeding twice produces same number of products."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = ProductRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 7
|
||||||
|
|
||||||
|
def test_seeding_twice_same_feed_type_count(self, migrated_db):
|
||||||
|
"""Seeding twice produces same number of feed types."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
assert len(repo.list_all()) == 3
|
||||||
|
|
||||||
|
|
||||||
|
class TestUserSeedData:
|
||||||
|
"""Tests for correct user seed data."""
|
||||||
|
|
||||||
|
def test_ppetru_is_admin(self, migrated_db):
|
||||||
|
"""User ppetru has admin role."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
user = repo.get("ppetru")
|
||||||
|
assert user is not None
|
||||||
|
assert user.role == UserRole.ADMIN
|
||||||
|
assert user.active is True
|
||||||
|
|
||||||
|
def test_ines_is_admin(self, migrated_db):
|
||||||
|
"""User ines has admin role."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
user = repo.get("ines")
|
||||||
|
assert user is not None
|
||||||
|
assert user.role == UserRole.ADMIN
|
||||||
|
|
||||||
|
def test_guest_is_recorder(self, migrated_db):
|
||||||
|
"""User guest has recorder role."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = UserRepository(migrated_db)
|
||||||
|
user = repo.get("guest")
|
||||||
|
assert user is not None
|
||||||
|
assert user.role == UserRole.RECORDER
|
||||||
|
|
||||||
|
|
||||||
|
class TestLocationSeedData:
|
||||||
|
"""Tests for correct location seed data."""
|
||||||
|
|
||||||
|
def test_strip_locations_exist(self, migrated_db):
|
||||||
|
"""All 4 strip locations exist."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
for i in range(1, 5):
|
||||||
|
location = repo.get_by_name(f"Strip {i}")
|
||||||
|
assert location is not None, f"Strip {i} not found"
|
||||||
|
assert location.active is True
|
||||||
|
|
||||||
|
def test_nursery_locations_exist(self, migrated_db):
|
||||||
|
"""All 4 nursery locations exist."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
for i in range(1, 5):
|
||||||
|
location = repo.get_by_name(f"Nursery {i}")
|
||||||
|
assert location is not None, f"Nursery {i} not found"
|
||||||
|
assert location.active is True
|
||||||
|
|
||||||
|
def test_location_ids_are_valid_ulids(self, migrated_db):
|
||||||
|
"""All location IDs are 26 characters (ULID format)."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = LocationRepository(migrated_db)
|
||||||
|
for location in repo.list_all():
|
||||||
|
assert len(location.id) == 26
|
||||||
|
|
||||||
|
|
||||||
|
class TestSpeciesSeedData:
|
||||||
|
"""Tests for correct species seed data."""
|
||||||
|
|
||||||
|
def test_duck_is_active(self, migrated_db):
|
||||||
|
"""Species duck is active."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
species = repo.get("duck")
|
||||||
|
assert species is not None
|
||||||
|
assert species.name == "Duck"
|
||||||
|
assert species.active is True
|
||||||
|
|
||||||
|
def test_goose_is_active(self, migrated_db):
|
||||||
|
"""Species goose is active."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
species = repo.get("goose")
|
||||||
|
assert species is not None
|
||||||
|
assert species.name == "Goose"
|
||||||
|
assert species.active is True
|
||||||
|
|
||||||
|
def test_sheep_is_inactive(self, migrated_db):
|
||||||
|
"""Species sheep is inactive."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = SpeciesRepository(migrated_db)
|
||||||
|
species = repo.get("sheep")
|
||||||
|
assert species is not None
|
||||||
|
assert species.name == "Sheep"
|
||||||
|
assert species.active is False
|
||||||
|
|
||||||
|
|
||||||
|
class TestProductSeedData:
|
||||||
|
"""Tests for correct product seed data."""
|
||||||
|
|
||||||
|
def test_egg_duck_product(self, migrated_db):
|
||||||
|
"""Product egg.duck has correct attributes."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = ProductRepository(migrated_db)
|
||||||
|
product = repo.get("egg.duck")
|
||||||
|
assert product is not None
|
||||||
|
assert product.name == "Duck Egg"
|
||||||
|
assert product.unit == ProductUnit.PIECE
|
||||||
|
assert product.collectable is True
|
||||||
|
assert product.sellable is True
|
||||||
|
|
||||||
|
def test_meat_products_use_kg(self, migrated_db):
|
||||||
|
"""All meat-related products use kg unit."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = ProductRepository(migrated_db)
|
||||||
|
kg_products = ["meat", "offal", "fat", "bones", "feathers", "down"]
|
||||||
|
for code in kg_products:
|
||||||
|
product = repo.get(code)
|
||||||
|
assert product is not None, f"Product {code} not found"
|
||||||
|
assert product.unit == ProductUnit.KG, f"Product {code} should use kg"
|
||||||
|
|
||||||
|
|
||||||
|
class TestFeedTypeSeedData:
|
||||||
|
"""Tests for correct feed type seed data."""
|
||||||
|
|
||||||
|
def test_starter_feed_type(self, migrated_db):
|
||||||
|
"""Feed type starter has correct attributes."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
feed_type = repo.get("starter")
|
||||||
|
assert feed_type is not None
|
||||||
|
assert feed_type.name == "Starter Feed"
|
||||||
|
assert feed_type.default_bag_size_kg == 20
|
||||||
|
|
||||||
|
def test_grower_feed_type(self, migrated_db):
|
||||||
|
"""Feed type grower has correct attributes."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
feed_type = repo.get("grower")
|
||||||
|
assert feed_type is not None
|
||||||
|
assert feed_type.name == "Grower Feed"
|
||||||
|
assert feed_type.default_bag_size_kg == 20
|
||||||
|
|
||||||
|
def test_layer_feed_type(self, migrated_db):
|
||||||
|
"""Feed type layer has correct attributes."""
|
||||||
|
run_seeds(migrated_db)
|
||||||
|
repo = FeedTypeRepository(migrated_db)
|
||||||
|
feed_type = repo.get("layer")
|
||||||
|
assert feed_type is not None
|
||||||
|
assert feed_type.name == "Layer Feed"
|
||||||
|
assert feed_type.default_bag_size_kg == 20
|
||||||
Reference in New Issue
Block a user