From 5c10a750cebff13f04ada9eac7464c8de6286e5a Mon Sep 17 00:00:00 2001 From: Petru Paler Date: Mon, 29 Dec 2025 08:02:24 +0000 Subject: [PATCH] feat: add feed inventory schema and purchase service MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Implement FeedPurchased event handling: - Add migration for feed_inventory table - Create FeedInventoryProjection to track purchases - Create FeedService with purchase_feed method - Calculate price_per_kg_cents from bag details Purchases accumulate in inventory with: - purchased_kg, given_kg, balance_kg tracking - Last purchase price stored in cents - Timestamps for last purchase/given 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 --- migrations/0006-feed-inventory.sql | 18 +++ src/animaltrack/projections/feed.py | 88 ++++++++++ src/animaltrack/services/feed.py | 110 +++++++++++++ tests/test_service_feed.py | 238 ++++++++++++++++++++++++++++ 4 files changed, 454 insertions(+) create mode 100644 migrations/0006-feed-inventory.sql create mode 100644 src/animaltrack/projections/feed.py create mode 100644 src/animaltrack/services/feed.py create mode 100644 tests/test_service_feed.py diff --git a/migrations/0006-feed-inventory.sql b/migrations/0006-feed-inventory.sql new file mode 100644 index 0000000..60e5a49 --- /dev/null +++ b/migrations/0006-feed-inventory.sql @@ -0,0 +1,18 @@ +-- ABOUTME: Migration to create feed_inventory table. +-- ABOUTME: Tracks feed purchases, amounts given, and current balance. + +-- Feed inventory tracks global feed levels per type +-- Stores amounts in kg, prices in cents +CREATE TABLE feed_inventory ( + feed_type_code TEXT PRIMARY KEY REFERENCES feed_types(code), + purchased_kg INTEGER NOT NULL DEFAULT 0, + given_kg INTEGER NOT NULL DEFAULT 0, + balance_kg INTEGER NOT NULL DEFAULT 0, + last_purchase_price_per_kg_cents INTEGER, + last_purchase_at_utc INTEGER, + last_given_at_utc INTEGER, + updated_at_utc INTEGER NOT NULL +); + +-- Index for finding when last purchase/given occurred +CREATE INDEX idx_feed_inventory_last_on ON feed_inventory(last_purchase_at_utc, last_given_at_utc); diff --git a/src/animaltrack/projections/feed.py b/src/animaltrack/projections/feed.py new file mode 100644 index 0000000..af1d7c7 --- /dev/null +++ b/src/animaltrack/projections/feed.py @@ -0,0 +1,88 @@ +# ABOUTME: Projection for feed inventory tracking. +# ABOUTME: Handles FeedPurchased and FeedGiven events. + +from typing import Any + +from animaltrack.events.types import FEED_PURCHASED +from animaltrack.models.events import Event +from animaltrack.projections.base import Projection + + +class FeedInventoryProjection(Projection): + """Maintains feed inventory levels. + + This projection handles feed purchase and given events, maintaining: + - feed_inventory: Current inventory levels per feed type + """ + + def __init__(self, db: Any) -> None: + """Initialize the projection with a database connection. + + Args: + db: A fastlite database connection. + """ + super().__init__(db) + + def get_event_types(self) -> list[str]: + """Return the event types this projection handles.""" + return [FEED_PURCHASED] + + def apply(self, event: Event) -> None: + """Apply feed event to update inventory.""" + if event.type == FEED_PURCHASED: + self._apply_feed_purchased(event) + + def revert(self, event: Event) -> None: + """Revert feed event from inventory.""" + if event.type == FEED_PURCHASED: + self._revert_feed_purchased(event) + + def _apply_feed_purchased(self, event: Event) -> None: + """Apply feed purchase to inventory. + + - Upsert feed_inventory row + - Increment purchased_kg and balance_kg + - Update last_purchase_price_per_kg_cents + - Update last_purchase_at_utc + """ + feed_type_code = event.entity_refs.get("feed_type_code") + total_kg = event.entity_refs.get("total_kg") + price_per_kg_cents = event.entity_refs.get("price_per_kg_cents") + ts_utc = event.ts_utc + + # Upsert: create if not exists, update if exists + self.db.execute( + """ + INSERT INTO feed_inventory + (feed_type_code, purchased_kg, given_kg, balance_kg, + last_purchase_price_per_kg_cents, last_purchase_at_utc, updated_at_utc) + VALUES (?, ?, 0, ?, ?, ?, ?) + ON CONFLICT(feed_type_code) DO UPDATE SET + purchased_kg = purchased_kg + excluded.purchased_kg, + balance_kg = balance_kg + excluded.purchased_kg, + last_purchase_price_per_kg_cents = excluded.last_purchase_price_per_kg_cents, + last_purchase_at_utc = excluded.last_purchase_at_utc, + updated_at_utc = excluded.updated_at_utc + """, + (feed_type_code, total_kg, total_kg, price_per_kg_cents, ts_utc, ts_utc), + ) + + def _revert_feed_purchased(self, event: Event) -> None: + """Revert feed purchase from inventory. + + - Decrement purchased_kg and balance_kg + """ + feed_type_code = event.entity_refs.get("feed_type_code") + total_kg = event.entity_refs.get("total_kg") + ts_utc = event.ts_utc + + self.db.execute( + """ + UPDATE feed_inventory + SET purchased_kg = purchased_kg - ?, + balance_kg = balance_kg - ?, + updated_at_utc = ? + WHERE feed_type_code = ? + """, + (total_kg, total_kg, ts_utc, feed_type_code), + ) diff --git a/src/animaltrack/services/feed.py b/src/animaltrack/services/feed.py new file mode 100644 index 0000000..25c7e4e --- /dev/null +++ b/src/animaltrack/services/feed.py @@ -0,0 +1,110 @@ +# ABOUTME: Service layer for feed operations. +# ABOUTME: Coordinates event creation with projection updates for feed inventory. + +from typing import Any + +from animaltrack.db import transaction +from animaltrack.events.payloads import FeedPurchasedPayload +from animaltrack.events.processor import process_event +from animaltrack.events.store import EventStore +from animaltrack.events.types import FEED_PURCHASED +from animaltrack.models.events import Event +from animaltrack.projections import ProjectionRegistry +from animaltrack.repositories.feed_types import FeedTypeRepository + + +class FeedServiceError(Exception): + """Base exception for feed service errors.""" + + +class ValidationError(FeedServiceError): + """Raised when validation fails.""" + + +class FeedService: + """Service for feed-related operations. + + Provides methods to purchase feed and record feed given events. + All operations are atomic and maintain inventory consistency. + """ + + def __init__( + self, + db: Any, + event_store: EventStore, + registry: ProjectionRegistry, + ) -> None: + """Initialize the service. + + Args: + db: A fastlite database connection. + event_store: The event store for appending events. + registry: Registry of projections to update. + """ + self.db = db + self.event_store = event_store + self.registry = registry + self.feed_type_repo = FeedTypeRepository(db) + + def purchase_feed( + self, + payload: FeedPurchasedPayload, + ts_utc: int, + actor: str, + nonce: str | None = None, + route: str | None = None, + ) -> Event: + """Record a feed purchase. + + Creates a FeedPurchased event and updates inventory. + Calculates total_kg and price_per_kg_cents from payload. + + Args: + payload: Validated purchase payload. + ts_utc: Timestamp in milliseconds since epoch. + actor: The user performing the purchase. + nonce: Optional idempotency nonce. + route: Required if nonce provided. + + Returns: + The created event. + + Raises: + ValidationError: If feed type doesn't exist or is inactive. + """ + # Validate feed type exists and is active + feed_type = self.feed_type_repo.get(payload.feed_type_code) + if feed_type is None: + msg = f"Feed type '{payload.feed_type_code}' not found" + raise ValidationError(msg) + + if not feed_type.active: + msg = f"Feed type '{payload.feed_type_code}' is inactive" + raise ValidationError(msg) + + # Calculate derived values + total_kg = payload.bag_size_kg * payload.bags_count + total_price_cents = payload.bag_price_cents * payload.bags_count + price_per_kg_cents = total_price_cents // total_kg # Floor division + + # Build entity_refs + entity_refs = { + "feed_type_code": payload.feed_type_code, + "total_kg": total_kg, + "price_per_kg_cents": price_per_kg_cents, + } + + with transaction(self.db): + event = self.event_store.append_event( + event_type=FEED_PURCHASED, + ts_utc=ts_utc, + actor=actor, + entity_refs=entity_refs, + payload=payload.model_dump(), + nonce=nonce, + route=route, + ) + + process_event(event, self.registry) + + return event diff --git a/tests/test_service_feed.py b/tests/test_service_feed.py new file mode 100644 index 0000000..5a6580e --- /dev/null +++ b/tests/test_service_feed.py @@ -0,0 +1,238 @@ +# ABOUTME: Tests for FeedService operations. +# ABOUTME: Tests purchase_feed with inventory tracking and price storage. + +import time + +import pytest + +from animaltrack.events.payloads import FeedPurchasedPayload +from animaltrack.events.store import EventStore +from animaltrack.events.types import FEED_PURCHASED +from animaltrack.projections import ProjectionRegistry +from animaltrack.services.feed import FeedService, ValidationError + + +@pytest.fixture +def event_store(seeded_db): + """Create an EventStore for testing.""" + return EventStore(seeded_db) + + +@pytest.fixture +def projection_registry(seeded_db): + """Create a ProjectionRegistry with feed projections registered.""" + from animaltrack.projections.feed import FeedInventoryProjection + + registry = ProjectionRegistry() + registry.register(FeedInventoryProjection(seeded_db)) + return registry + + +@pytest.fixture +def feed_service(seeded_db, event_store, projection_registry): + """Create a FeedService for testing.""" + return FeedService(seeded_db, event_store, projection_registry) + + +def make_purchase_payload( + feed_type_code: str = "layer", + bag_size_kg: int = 20, + bags_count: int = 2, + bag_price_cents: int = 2400, + vendor: str | None = None, +) -> FeedPurchasedPayload: + """Create a purchase payload for testing.""" + return FeedPurchasedPayload( + feed_type_code=feed_type_code, + bag_size_kg=bag_size_kg, + bags_count=bags_count, + bag_price_cents=bag_price_cents, + vendor=vendor, + ) + + +# ============================================================================= +# purchase_feed Tests +# ============================================================================= + + +class TestFeedServicePurchase: + """Tests for purchase_feed().""" + + def test_creates_feed_purchased_event(self, seeded_db, feed_service): + """purchase_feed creates a FeedPurchased event.""" + payload = make_purchase_payload() + ts_utc = int(time.time() * 1000) + + event = feed_service.purchase_feed(payload, ts_utc, "test_user") + + assert event.type == FEED_PURCHASED + assert event.actor == "test_user" + assert event.ts_utc == ts_utc + + def test_event_has_feed_type_in_entity_refs(self, seeded_db, feed_service): + """Event entity_refs contains feed_type_code.""" + payload = make_purchase_payload(feed_type_code="starter") + ts_utc = int(time.time() * 1000) + + event = feed_service.purchase_feed(payload, ts_utc, "test_user") + + assert event.entity_refs["feed_type_code"] == "starter" + + def test_event_has_total_kg_in_entity_refs(self, seeded_db, feed_service): + """Event entity_refs contains calculated total_kg.""" + payload = make_purchase_payload(bag_size_kg=20, bags_count=3) + ts_utc = int(time.time() * 1000) + + event = feed_service.purchase_feed(payload, ts_utc, "test_user") + + assert event.entity_refs["total_kg"] == 60 # 20 * 3 + + def test_event_has_price_per_kg_cents_in_entity_refs(self, seeded_db, feed_service): + """Event entity_refs contains calculated price_per_kg_cents.""" + # 2 bags of 20kg at €24 each = €48 total / 40kg = €1.20/kg = 120 cents + payload = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=2400) + ts_utc = int(time.time() * 1000) + + event = feed_service.purchase_feed(payload, ts_utc, "test_user") + + assert event.entity_refs["price_per_kg_cents"] == 120 + + def test_increments_inventory_purchased_kg(self, seeded_db, feed_service): + """purchase_feed increments purchased_kg in feed_inventory.""" + payload = make_purchase_payload(feed_type_code="layer", bag_size_kg=20, bags_count=2) + ts_utc = int(time.time() * 1000) + + feed_service.purchase_feed(payload, ts_utc, "test_user") + + row = seeded_db.execute( + "SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = ?", + ("layer",), + ).fetchone() + + assert row[0] == 40 # purchased_kg + assert row[1] == 40 # balance_kg (no given yet) + + def test_updates_last_purchase_price(self, seeded_db, feed_service): + """purchase_feed updates last_purchase_price_per_kg_cents.""" + payload = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=2400) + ts_utc = int(time.time() * 1000) + + feed_service.purchase_feed(payload, ts_utc, "test_user") + + row = seeded_db.execute( + "SELECT last_purchase_price_per_kg_cents FROM feed_inventory WHERE feed_type_code = ?", + ("layer",), + ).fetchone() + + assert row[0] == 120 # €1.20/kg in cents + + def test_updates_last_purchase_at_utc(self, seeded_db, feed_service): + """purchase_feed updates last_purchase_at_utc.""" + payload = make_purchase_payload() + ts_utc = int(time.time() * 1000) + + feed_service.purchase_feed(payload, ts_utc, "test_user") + + row = seeded_db.execute( + "SELECT last_purchase_at_utc FROM feed_inventory WHERE feed_type_code = ?", + ("layer",), + ).fetchone() + + assert row[0] == ts_utc + + def test_multiple_purchases_accumulate(self, seeded_db, feed_service): + """Multiple purchases accumulate in inventory.""" + ts_utc = int(time.time() * 1000) + + # First purchase: 40kg + payload1 = make_purchase_payload(bag_size_kg=20, bags_count=2) + feed_service.purchase_feed(payload1, ts_utc, "test_user") + + # Second purchase: 60kg + payload2 = make_purchase_payload(bag_size_kg=20, bags_count=3) + feed_service.purchase_feed(payload2, ts_utc + 1000, "test_user") + + row = seeded_db.execute( + "SELECT purchased_kg, balance_kg FROM feed_inventory WHERE feed_type_code = ?", + ("layer",), + ).fetchone() + + assert row[0] == 100 # purchased_kg: 40 + 60 + assert row[1] == 100 # balance_kg + + def test_latest_purchase_updates_price(self, seeded_db, feed_service): + """Latest purchase updates the price per kg.""" + ts_utc = int(time.time() * 1000) + + # First purchase at €1.20/kg + payload1 = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=2400) + feed_service.purchase_feed(payload1, ts_utc, "test_user") + + # Second purchase at €1.50/kg + payload2 = make_purchase_payload(bag_size_kg=20, bags_count=2, bag_price_cents=3000) + feed_service.purchase_feed(payload2, ts_utc + 1000, "test_user") + + row = seeded_db.execute( + "SELECT last_purchase_price_per_kg_cents FROM feed_inventory WHERE feed_type_code = ?", + ("layer",), + ).fetchone() + + assert row[0] == 150 # €1.50/kg in cents + + +class TestFeedServicePurchaseValidation: + """Tests for purchase_feed() validation.""" + + def test_rejects_nonexistent_feed_type(self, seeded_db, feed_service): + """Raises ValidationError for non-existent feed_type_code.""" + payload = make_purchase_payload(feed_type_code="nonexistent") + + with pytest.raises(ValidationError, match="not found"): + feed_service.purchase_feed(payload, int(time.time() * 1000), "test_user") + + def test_rejects_inactive_feed_type(self, seeded_db, feed_service): + """Raises ValidationError for inactive feed_type.""" + # Deactivate the layer feed type + seeded_db.execute("UPDATE feed_types SET active = 0 WHERE code = 'layer'") + + payload = make_purchase_payload(feed_type_code="layer") + + with pytest.raises(ValidationError, match="inactive"): + feed_service.purchase_feed(payload, int(time.time() * 1000), "test_user") + + +class TestFeedServicePurchasePriceCalculation: + """Tests for price per kg calculation.""" + + def test_price_per_kg_whole_number(self, seeded_db, feed_service): + """Price per kg calculated correctly for whole number result.""" + # 1 bag of 20kg at €24 = €1.20/kg = 120 cents + payload = make_purchase_payload(bag_size_kg=20, bags_count=1, bag_price_cents=2400) + ts_utc = int(time.time() * 1000) + + event = feed_service.purchase_feed(payload, ts_utc, "test_user") + + assert event.entity_refs["price_per_kg_cents"] == 120 + + def test_price_per_kg_rounds_down(self, seeded_db, feed_service): + """Price per kg rounds down for fractional cents.""" + # 1 bag of 15kg at €20 = €1.333.../kg = 133.33 cents, rounds to 133 + payload = make_purchase_payload(bag_size_kg=15, bags_count=1, bag_price_cents=2000) + ts_utc = int(time.time() * 1000) + + event = feed_service.purchase_feed(payload, ts_utc, "test_user") + + # 2000 cents / 15 kg = 133.33... -> 133 (floor) + assert event.entity_refs["price_per_kg_cents"] == 133 + + def test_price_per_kg_multiple_bags(self, seeded_db, feed_service): + """Price per kg calculated correctly for multiple bags.""" + # 3 bags of 25kg at €30 each = €90 total / 75kg = €1.20/kg = 120 cents + payload = make_purchase_payload(bag_size_kg=25, bags_count=3, bag_price_cents=3000) + ts_utc = int(time.time() * 1000) + + event = feed_service.purchase_feed(payload, ts_utc, "test_user") + + # (3000 * 3) / (25 * 3) = 9000 / 75 = 120 + assert event.entity_refs["price_per_kg_cents"] == 120