feat: add feed inventory schema and purchase service

Implement FeedPurchased event handling:
- Add migration for feed_inventory table
- Create FeedInventoryProjection to track purchases
- Create FeedService with purchase_feed method
- Calculate price_per_kg_cents from bag details

Purchases accumulate in inventory with:
- purchased_kg, given_kg, balance_kg tracking
- Last purchase price stored in cents
- Timestamps for last purchase/given

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 08:02:24 +00:00
parent 7c972f31d7
commit 5c10a750ce
4 changed files with 454 additions and 0 deletions

View File

@@ -0,0 +1,18 @@
-- ABOUTME: Migration to create feed_inventory table.
-- ABOUTME: Tracks feed purchases, amounts given, and current balance.
-- Feed inventory tracks global feed levels per type
-- Stores amounts in kg, prices in cents
CREATE TABLE feed_inventory (
feed_type_code TEXT PRIMARY KEY REFERENCES feed_types(code),
purchased_kg INTEGER NOT NULL DEFAULT 0,
given_kg INTEGER NOT NULL DEFAULT 0,
balance_kg INTEGER NOT NULL DEFAULT 0,
last_purchase_price_per_kg_cents INTEGER,
last_purchase_at_utc INTEGER,
last_given_at_utc INTEGER,
updated_at_utc INTEGER NOT NULL
);
-- Index for finding when last purchase/given occurred
CREATE INDEX idx_feed_inventory_last_on ON feed_inventory(last_purchase_at_utc, last_given_at_utc);

View File

@@ -0,0 +1,88 @@
# ABOUTME: Projection for feed inventory tracking.
# ABOUTME: Handles FeedPurchased and FeedGiven events.
from typing import Any
from animaltrack.events.types import FEED_PURCHASED
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
class FeedInventoryProjection(Projection):
"""Maintains feed inventory levels.
This projection handles feed purchase and given events, maintaining:
- feed_inventory: Current inventory levels per feed type
"""
def __init__(self, db: Any) -> None:
"""Initialize the projection with a database connection.
Args:
db: A fastlite database connection.
"""
super().__init__(db)
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [FEED_PURCHASED]
def apply(self, event: Event) -> None:
"""Apply feed event to update inventory."""
if event.type == FEED_PURCHASED:
self._apply_feed_purchased(event)
def revert(self, event: Event) -> None:
"""Revert feed event from inventory."""
if event.type == FEED_PURCHASED:
self._revert_feed_purchased(event)
def _apply_feed_purchased(self, event: Event) -> None:
"""Apply feed purchase to inventory.
- Upsert feed_inventory row
- Increment purchased_kg and balance_kg
- Update last_purchase_price_per_kg_cents
- Update last_purchase_at_utc
"""
feed_type_code = event.entity_refs.get("feed_type_code")
total_kg = event.entity_refs.get("total_kg")
price_per_kg_cents = event.entity_refs.get("price_per_kg_cents")
ts_utc = event.ts_utc
# Upsert: create if not exists, update if exists
self.db.execute(
"""
INSERT INTO feed_inventory
(feed_type_code, purchased_kg, given_kg, balance_kg,
last_purchase_price_per_kg_cents, last_purchase_at_utc, updated_at_utc)
VALUES (?, ?, 0, ?, ?, ?, ?)
ON CONFLICT(feed_type_code) DO UPDATE SET
purchased_kg = purchased_kg + excluded.purchased_kg,
balance_kg = balance_kg + excluded.purchased_kg,
last_purchase_price_per_kg_cents = excluded.last_purchase_price_per_kg_cents,
last_purchase_at_utc = excluded.last_purchase_at_utc,
updated_at_utc = excluded.updated_at_utc
""",
(feed_type_code, total_kg, total_kg, price_per_kg_cents, ts_utc, ts_utc),
)
def _revert_feed_purchased(self, event: Event) -> None:
"""Revert feed purchase from inventory.
- Decrement purchased_kg and balance_kg
"""
feed_type_code = event.entity_refs.get("feed_type_code")
total_kg = event.entity_refs.get("total_kg")
ts_utc = event.ts_utc
self.db.execute(
"""
UPDATE feed_inventory
SET purchased_kg = purchased_kg - ?,
balance_kg = balance_kg - ?,
updated_at_utc = ?
WHERE feed_type_code = ?
""",
(total_kg, total_kg, ts_utc, feed_type_code),
)

View File

@@ -0,0 +1,110 @@
# ABOUTME: Service layer for feed operations.
# ABOUTME: Coordinates event creation with projection updates for feed inventory.
from typing import Any
from animaltrack.db import transaction
from animaltrack.events.payloads import FeedPurchasedPayload
from animaltrack.events.processor import process_event
from animaltrack.events.store import EventStore
from animaltrack.events.types import FEED_PURCHASED
from animaltrack.models.events import Event
from animaltrack.projections import ProjectionRegistry
from animaltrack.repositories.feed_types import FeedTypeRepository
class FeedServiceError(Exception):
"""Base exception for feed service errors."""
class ValidationError(FeedServiceError):
"""Raised when validation fails."""
class FeedService:
"""Service for feed-related operations.
Provides methods to purchase feed and record feed given events.
All operations are atomic and maintain inventory consistency.
"""
def __init__(
self,
db: Any,
event_store: EventStore,
registry: ProjectionRegistry,
) -> None:
"""Initialize the service.
Args:
db: A fastlite database connection.
event_store: The event store for appending events.
registry: Registry of projections to update.
"""
self.db = db
self.event_store = event_store
self.registry = registry
self.feed_type_repo = FeedTypeRepository(db)
def purchase_feed(
self,
payload: FeedPurchasedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Record a feed purchase.
Creates a FeedPurchased event and updates inventory.
Calculates total_kg and price_per_kg_cents from payload.
Args:
payload: Validated purchase payload.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the purchase.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If feed type doesn't exist or is inactive.
"""
# Validate feed type exists and is active
feed_type = self.feed_type_repo.get(payload.feed_type_code)
if feed_type is None:
msg = f"Feed type '{payload.feed_type_code}' not found"
raise ValidationError(msg)
if not feed_type.active:
msg = f"Feed type '{payload.feed_type_code}' is inactive"
raise ValidationError(msg)
# Calculate derived values
total_kg = payload.bag_size_kg * payload.bags_count
total_price_cents = payload.bag_price_cents * payload.bags_count
price_per_kg_cents = total_price_cents // total_kg # Floor division
# Build entity_refs
entity_refs = {
"feed_type_code": payload.feed_type_code,
"total_kg": total_kg,
"price_per_kg_cents": price_per_kg_cents,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=FEED_PURCHASED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event

238
tests/test_service_feed.py Normal file
View File

@@ -0,0 +1,238 @@
# ABOUTME: Tests for FeedService operations.
# ABOUTME: Tests purchase_feed with inventory tracking and price storage.
import time
import pytest
from animaltrack.events.payloads import FeedPurchasedPayload
from animaltrack.events.store import EventStore
from animaltrack.events.types import FEED_PURCHASED
from animaltrack.projections import ProjectionRegistry
from animaltrack.services.feed import FeedService, ValidationError
@pytest.fixture
def event_store(seeded_db):
"""Create an EventStore for testing."""
return EventStore(seeded_db)
@pytest.fixture
def projection_registry(seeded_db):
"""Create a ProjectionRegistry with feed projections registered."""
from animaltrack.projections.feed import FeedInventoryProjection
registry = ProjectionRegistry()
registry.register(FeedInventoryProjection(seeded_db))
return registry
@pytest.fixture
def feed_service(seeded_db, event_store, projection_registry):
"""Create a FeedService for testing."""
return FeedService(seeded_db, event_store, projection_registry)
def make_purchase_payload(
feed_type_code: str = "layer",
bag_size_kg: int = 20,
bags_count: int = 2,
bag_price_cents: int = 2400,
vendor: str | None = None,
) -> FeedPurchasedPayload:
"""Create a purchase payload for testing."""
return FeedPurchasedPayload(
feed_type_code=feed_type_code,
bag_size_kg=bag_size_kg,
bags_count=bags_count,
bag_price_cents=bag_price_cents,
vendor=vendor,
)
# =============================================================================
# purchase_feed Tests
# =============================================================================
class TestFeedServicePurchase:
"""Tests for purchase_feed()."""
def test_creates_feed_purchased_event(self, seeded_db, feed_service):
"""purchase_feed creates a FeedPurchased event."""
payload = make_purchase_payload()
ts_utc = int(time.time() * 1000)
event = feed_service.purchase_feed(payload, ts_utc, "test_user")
assert event.type == FEED_PURCHASED
assert event.actor == "test_user"
assert event.ts_utc == ts_utc
def test_event_has_feed_type_in_entity_refs(self, seeded_db, feed_service):
"""Event entity_refs contains feed_type_code."""
payload = make_purchase_payload(feed_type_code="starter")
ts_utc = int(time.time() * 1000)
event = feed_service.purchase_feed(payload, ts_utc, "test_user")
assert event.entity_refs["feed_type_code"] == "starter"
def test_event_has_total_kg_in_entity_refs(self, seeded_db, feed_service):
"""Event entity_refs contains calculated total_kg."""
payload = make_purchase_payload(bag_size_kg=20, bags_count=3)
ts_utc = int(time.time() * 1000)
event = feed_service.purchase_feed(payload, ts_utc, "test_user")
assert event.entity_refs["total_kg"] == 60 # 20 * 3
def test_event_has_price_per_kg_cents_in_entity_refs(self, seeded_db, feed_service):
"""Event entity_refs contains calculated price_per_kg_cents."""
# 2 bags of 20kg at €24 each = €48 total / 40kg = €1.20/kg = 120 cents
payload = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=2400)
ts_utc = int(time.time() * 1000)
event = feed_service.purchase_feed(payload, ts_utc, "test_user")
assert event.entity_refs["price_per_kg_cents"] == 120
def test_increments_inventory_purchased_kg(self, seeded_db, feed_service):
"""purchase_feed increments purchased_kg in feed_inventory."""
payload = make_purchase_payload(feed_type_code="layer", bag_size_kg=20, bags_count=2)
ts_utc = int(time.time() * 1000)
feed_service.purchase_feed(payload, ts_utc, "test_user")
row = seeded_db.execute(
"SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = ?",
("layer",),
).fetchone()
assert row[0] == 40 # purchased_kg
assert row[1] == 40 # balance_kg (no given yet)
def test_updates_last_purchase_price(self, seeded_db, feed_service):
"""purchase_feed updates last_purchase_price_per_kg_cents."""
payload = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=2400)
ts_utc = int(time.time() * 1000)
feed_service.purchase_feed(payload, ts_utc, "test_user")
row = seeded_db.execute(
"SELECT last_purchase_price_per_kg_cents FROM feed_inventory WHERE feed_type_code = ?",
("layer",),
).fetchone()
assert row[0] == 120 # €1.20/kg in cents
def test_updates_last_purchase_at_utc(self, seeded_db, feed_service):
"""purchase_feed updates last_purchase_at_utc."""
payload = make_purchase_payload()
ts_utc = int(time.time() * 1000)
feed_service.purchase_feed(payload, ts_utc, "test_user")
row = seeded_db.execute(
"SELECT last_purchase_at_utc FROM feed_inventory WHERE feed_type_code = ?",
("layer",),
).fetchone()
assert row[0] == ts_utc
def test_multiple_purchases_accumulate(self, seeded_db, feed_service):
"""Multiple purchases accumulate in inventory."""
ts_utc = int(time.time() * 1000)
# First purchase: 40kg
payload1 = make_purchase_payload(bag_size_kg=20, bags_count=2)
feed_service.purchase_feed(payload1, ts_utc, "test_user")
# Second purchase: 60kg
payload2 = make_purchase_payload(bag_size_kg=20, bags_count=3)
feed_service.purchase_feed(payload2, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = ?",
("layer",),
).fetchone()
assert row[0] == 100 # purchased_kg: 40 + 60
assert row[1] == 100 # balance_kg
def test_latest_purchase_updates_price(self, seeded_db, feed_service):
"""Latest purchase updates the price per kg."""
ts_utc = int(time.time() * 1000)
# First purchase at €1.20/kg
payload1 = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=2400)
feed_service.purchase_feed(payload1, ts_utc, "test_user")
# Second purchase at €1.50/kg
payload2 = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=3000)
feed_service.purchase_feed(payload2, ts_utc + 1000, "test_user")
row = seeded_db.execute(
"SELECT last_purchase_price_per_kg_cents FROM feed_inventory WHERE feed_type_code = ?",
("layer",),
).fetchone()
assert row[0] == 150 # €1.50/kg in cents
class TestFeedServicePurchaseValidation:
"""Tests for purchase_feed() validation."""
def test_rejects_nonexistent_feed_type(self, seeded_db, feed_service):
"""Raises ValidationError for non-existent feed_type_code."""
payload = make_purchase_payload(feed_type_code="nonexistent")
with pytest.raises(ValidationError, match="not found"):
feed_service.purchase_feed(payload, int(time.time() * 1000), "test_user")
def test_rejects_inactive_feed_type(self, seeded_db, feed_service):
"""Raises ValidationError for inactive feed_type."""
# Deactivate the layer feed type
seeded_db.execute("UPDATE feed_types SET active = 0 WHERE code = 'layer'")
payload = make_purchase_payload(feed_type_code="layer")
with pytest.raises(ValidationError, match="inactive"):
feed_service.purchase_feed(payload, int(time.time() * 1000), "test_user")
class TestFeedServicePurchasePriceCalculation:
"""Tests for price per kg calculation."""
def test_price_per_kg_whole_number(self, seeded_db, feed_service):
"""Price per kg calculated correctly for whole number result."""
# 1 bag of 20kg at €24 = €1.20/kg = 120 cents
payload = make_purchase_payload(bag_size_kg=20, bags_count=1, bag_price_cents=2400)
ts_utc = int(time.time() * 1000)
event = feed_service.purchase_feed(payload, ts_utc, "test_user")
assert event.entity_refs["price_per_kg_cents"] == 120
def test_price_per_kg_rounds_down(self, seeded_db, feed_service):
"""Price per kg rounds down for fractional cents."""
# 1 bag of 15kg at €20 = €1.333.../kg = 133.33 cents, rounds to 133
payload = make_purchase_payload(bag_size_kg=15, bags_count=1, bag_price_cents=2000)
ts_utc = int(time.time() * 1000)
event = feed_service.purchase_feed(payload, ts_utc, "test_user")
# 2000 cents / 15 kg = 133.33... -> 133 (floor)
assert event.entity_refs["price_per_kg_cents"] == 133
def test_price_per_kg_multiple_bags(self, seeded_db, feed_service):
"""Price per kg calculated correctly for multiple bags."""
# 3 bags of 25kg at €30 each = €90 total / 75kg = €1.20/kg = 120 cents
payload = make_purchase_payload(bag_size_kg=25, bags_count=3, bag_price_cents=3000)
ts_utc = int(time.time() * 1000)
event = feed_service.purchase_feed(payload, ts_utc, "test_user")
# (3000 * 3) / (25 * 3) = 9000 / 75 = 120
assert event.entity_refs["price_per_kg_cents"] == 120