# ABOUTME: Tests for stats service operations. # ABOUTME: Tests get_egg_stats with E2E baseline verification per spec section 21. import time import pytest from animaltrack.events.payloads import ( AnimalCohortCreatedPayload, FeedGivenPayload, FeedPurchasedPayload, ProductCollectedPayload, ) from animaltrack.events.store import EventStore from animaltrack.projections import ProjectionRegistry from animaltrack.projections.animal_registry import AnimalRegistryProjection from animaltrack.projections.event_animals import EventAnimalsProjection from animaltrack.projections.feed import FeedInventoryProjection from animaltrack.projections.intervals import IntervalProjection from animaltrack.projections.products import ProductsProjection from animaltrack.services.animal import AnimalService from animaltrack.services.feed import FeedService from animaltrack.services.products import ProductService from animaltrack.services.stats import EggStats, get_egg_stats @pytest.fixture def event_store(seeded_db): """Create an EventStore for testing.""" return EventStore(seeded_db) @pytest.fixture def projection_registry(seeded_db): """Create a ProjectionRegistry with all needed projections.""" registry = ProjectionRegistry() registry.register(AnimalRegistryProjection(seeded_db)) registry.register(EventAnimalsProjection(seeded_db)) registry.register(IntervalProjection(seeded_db)) registry.register(ProductsProjection(seeded_db)) registry.register(FeedInventoryProjection(seeded_db)) return registry @pytest.fixture def animal_service(seeded_db, event_store, projection_registry): """Create an AnimalService for testing.""" return AnimalService(seeded_db, event_store, projection_registry) @pytest.fixture def feed_service(seeded_db, event_store, projection_registry): """Create a FeedService for testing.""" return FeedService(seeded_db, event_store, projection_registry) @pytest.fixture def product_service(seeded_db, event_store, projection_registry): """Create a ProductService for testing.""" return ProductService(seeded_db, event_store, projection_registry) @pytest.fixture def location_id(seeded_db): """Get Strip 1 location_id from seeded data.""" row = seeded_db.execute("SELECT id FROM locations WHERE name = 'Strip 1'").fetchone() return row[0] @pytest.fixture def e2e_test1_setup(seeded_db, animal_service, feed_service, product_service, location_id): """Set up E2E test #1: 13 ducks at Strip 1. Setup from spec section 21: - 10 adult female ducks + 3 adult male ducks at Strip 1 - 40kg feed purchased @ EUR 1.20/kg (2x20kg bags @ EUR 24) - 6kg feed given - 12 eggs collected Animals are created 1 day before feed/eggs events so bird-days calculation shows 1 day of presence (13 animal-days total, 10 layer-eligible). """ one_day_ms = 24 * 60 * 60 * 1000 base_ts = int(time.time() * 1000) animal_creation_ts = base_ts - one_day_ms # Animals created 1 day ago # Create 10 adult female ducks (1 day ago) females_payload = AnimalCohortCreatedPayload( species="duck", count=10, life_stage="adult", sex="female", location_id=location_id, origin="purchased", ) females_event = animal_service.create_cohort(females_payload, animal_creation_ts, "test_user") female_ids = females_event.entity_refs["animal_ids"] # Create 3 adult male ducks (1 day ago) males_payload = AnimalCohortCreatedPayload( species="duck", count=3, life_stage="adult", sex="male", location_id=location_id, origin="purchased", ) males_event = animal_service.create_cohort(males_payload, animal_creation_ts, "test_user") male_ids = males_event.entity_refs["animal_ids"] # Purchase 40kg feed @ EUR 1.20/kg (2 bags of 20kg @ EUR 24 each) purchase_payload = FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=2, bag_price_cents=2400, ) feed_service.purchase_feed(purchase_payload, base_ts + 1000, "test_user") # Give 6kg feed give_payload = FeedGivenPayload( location_id=location_id, feed_type_code="layer", amount_kg=6, ) feed_service.give_feed(give_payload, base_ts + 2000, "test_user") # Collect 12 eggs all_animal_ids = female_ids + male_ids collect_payload = ProductCollectedPayload( location_id=location_id, product_code="egg.duck", quantity=12, resolved_ids=all_animal_ids, ) product_service.collect_product(collect_payload, base_ts + 3000, "test_user") return { "location_id": location_id, "ts_utc": base_ts + 3000, "female_ids": female_ids, "male_ids": male_ids, "base_ts": base_ts, } # ============================================================================= # Migration Tests # ============================================================================= class TestEggStatsMigration: """Tests for egg_stats_30d_by_location table schema.""" def test_table_exists(self, seeded_db): """The egg_stats_30d_by_location table exists after migration.""" row = seeded_db.execute( """ SELECT name FROM sqlite_master WHERE type='table' AND name='egg_stats_30d_by_location' """ ).fetchone() assert row is not None def test_table_has_all_columns(self, seeded_db): """Table has all required columns.""" rows = seeded_db.execute("PRAGMA table_info(egg_stats_30d_by_location)").fetchall() column_names = {row[1] for row in rows} expected_columns = { "location_id", "window_start_utc", "window_end_utc", "eggs_total_pcs", "feed_total_g", "feed_layers_g", "cost_per_egg_all_eur", "cost_per_egg_layers_eur", "layer_eligible_bird_days", "layer_eligible_count_now", "updated_at_utc", } assert expected_columns == column_names # ============================================================================= # E2E Test #1: Baseline Eggs+Feed+Costs # ============================================================================= class TestEggStatsE2EBaseline: """Tests for E2E test #1 baseline values from spec section 21.""" def test_returns_egg_stats_dataclass(self, seeded_db, e2e_test1_setup): """get_egg_stats returns an EggStats instance.""" stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert isinstance(stats, EggStats) def test_eggs_total_pcs_is_12(self, seeded_db, e2e_test1_setup): """eggs_total_pcs should be 12.""" stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert stats.eggs_total_pcs == 12 def test_feed_total_g_is_6000(self, seeded_db, e2e_test1_setup): """feed_total_g should be 6000 (6kg in grams).""" stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert stats.feed_total_g == 6000 def test_feed_layers_g_is_4615(self, seeded_db, e2e_test1_setup): """feed_layers_g should be 4615 (6000 * 10/13 = 4615.38 -> 4615). This uses INTEGER truncation, not rounding. """ stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert stats.feed_layers_g == 4615 def test_cost_per_egg_all_is_0_600(self, seeded_db, e2e_test1_setup): """cost_per_egg_all should be 0.600 +/- 0.001. 6kg @ EUR 1.20/kg = EUR 7.20 / 12 eggs = EUR 0.60/egg """ stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert abs(stats.cost_per_egg_all_eur - 0.600) < 0.001 def test_cost_per_egg_layers_is_0_462(self, seeded_db, e2e_test1_setup): """cost_per_egg_layers should be 0.462 +/- 0.001. share = 10/13 = 0.769230769 layer_cost = EUR 7.20 * 0.769230769 = EUR 5.538461538 cost_per_egg_layers = EUR 5.538 / 12 = EUR 0.46153... """ stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert abs(stats.cost_per_egg_layers_eur - 0.462) < 0.001 def test_layer_eligible_bird_days_is_10(self, seeded_db, e2e_test1_setup): """layer_eligible_bird_days should be 10. 10 adult female ducks, all present for same duration = 10 bird-days. """ stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert stats.layer_eligible_bird_days == 10 def test_layer_eligible_count_now_is_10(self, seeded_db, e2e_test1_setup): """layer_eligible_count_now should be 10. 10 adult female ducks currently at the location. """ stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) assert stats.layer_eligible_count_now == 10 # ============================================================================= # Edge Case Tests # ============================================================================= class TestEggStatsEdgeCases: """Tests for edge cases in stats computation.""" def test_no_eggs_returns_zero_costs(self, seeded_db, location_id): """When eggs_total_pcs=0, costs should be 0.0.""" ts_utc = int(time.time() * 1000) stats = get_egg_stats(seeded_db, location_id, ts_utc) assert stats.eggs_total_pcs == 0 assert stats.cost_per_egg_all_eur == 0.0 assert stats.cost_per_egg_layers_eur == 0.0 def test_no_feed_returns_zero_costs( self, seeded_db, animal_service, product_service, location_id ): """When no feed given, costs should be 0.0.""" base_ts = int(time.time() * 1000) # Create animals payload = AnimalCohortCreatedPayload( species="duck", count=5, life_stage="adult", sex="female", location_id=location_id, origin="purchased", ) event = animal_service.create_cohort(payload, base_ts, "test_user") animal_ids = event.entity_refs["animal_ids"] # Collect eggs without any feed collect_payload = ProductCollectedPayload( location_id=location_id, product_code="egg.duck", quantity=10, resolved_ids=animal_ids, ) product_service.collect_product(collect_payload, base_ts + 1000, "test_user") stats = get_egg_stats(seeded_db, location_id, base_ts + 1000) assert stats.eggs_total_pcs == 10 assert stats.feed_total_g == 0 assert stats.cost_per_egg_all_eur == 0.0 assert stats.cost_per_egg_layers_eur == 0.0 def test_no_animals_returns_zero_bird_days(self, seeded_db, location_id): """When no animals at location, bird-days should be 0.""" ts_utc = int(time.time() * 1000) stats = get_egg_stats(seeded_db, location_id, ts_utc) assert stats.layer_eligible_bird_days == 0 assert stats.layer_eligible_count_now == 0 # ============================================================================= # Filtering Tests # ============================================================================= class TestEggStatsFiltering: """Tests for layer-eligible filtering criteria.""" def test_males_not_layer_eligible( self, seeded_db, animal_service, feed_service, product_service, location_id ): """Male animals should not be counted in layer-eligible bird-days.""" base_ts = int(time.time() * 1000) # Create only male ducks payload = AnimalCohortCreatedPayload( species="duck", count=5, life_stage="adult", sex="male", location_id=location_id, origin="purchased", ) event = animal_service.create_cohort(payload, base_ts, "test_user") animal_ids = event.entity_refs["animal_ids"] # Purchase and give feed feed_service.purchase_feed( FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=1, bag_price_cents=2400 ), base_ts + 1000, "test_user", ) feed_service.give_feed( FeedGivenPayload(location_id=location_id, feed_type_code="layer", amount_kg=5), base_ts + 2000, "test_user", ) # Collect eggs product_service.collect_product( ProductCollectedPayload( location_id=location_id, product_code="egg.duck", quantity=6, resolved_ids=animal_ids, ), base_ts + 3000, "test_user", ) stats = get_egg_stats(seeded_db, location_id, base_ts + 3000) assert stats.layer_eligible_bird_days == 0 assert stats.layer_eligible_count_now == 0 def test_juveniles_not_layer_eligible( self, seeded_db, animal_service, feed_service, product_service, location_id ): """Juvenile animals should not be counted in layer-eligible bird-days.""" base_ts = int(time.time() * 1000) # Create juvenile females payload = AnimalCohortCreatedPayload( species="duck", count=5, life_stage="juvenile", sex="female", location_id=location_id, origin="hatched", ) event = animal_service.create_cohort(payload, base_ts, "test_user") animal_ids = event.entity_refs["animal_ids"] # Purchase and give feed feed_service.purchase_feed( FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=1, bag_price_cents=2400 ), base_ts + 1000, "test_user", ) feed_service.give_feed( FeedGivenPayload(location_id=location_id, feed_type_code="layer", amount_kg=5), base_ts + 2000, "test_user", ) # Collect eggs product_service.collect_product( ProductCollectedPayload( location_id=location_id, product_code="egg.duck", quantity=6, resolved_ids=animal_ids, ), base_ts + 3000, "test_user", ) stats = get_egg_stats(seeded_db, location_id, base_ts + 3000) assert stats.layer_eligible_bird_days == 0 assert stats.layer_eligible_count_now == 0 def test_wrong_species_not_layer_eligible( self, seeded_db, animal_service, feed_service, product_service, location_id ): """Goose animals should not be counted for egg.duck product.""" base_ts = int(time.time() * 1000) # Create adult female geese payload = AnimalCohortCreatedPayload( species="goose", count=5, life_stage="adult", sex="female", location_id=location_id, origin="purchased", ) event = animal_service.create_cohort(payload, base_ts, "test_user") animal_ids = event.entity_refs["animal_ids"] # Purchase and give feed feed_service.purchase_feed( FeedPurchasedPayload( feed_type_code="layer", bag_size_kg=20, bags_count=1, bag_price_cents=2400 ), base_ts + 1000, "test_user", ) feed_service.give_feed( FeedGivenPayload(location_id=location_id, feed_type_code="layer", amount_kg=5), base_ts + 2000, "test_user", ) # Collect duck eggs (geese producing duck eggs is unusual but tests filtering) product_service.collect_product( ProductCollectedPayload( location_id=location_id, product_code="egg.duck", quantity=6, resolved_ids=animal_ids, ), base_ts + 3000, "test_user", ) stats = get_egg_stats(seeded_db, location_id, base_ts + 3000) # Geese don't count as layer-eligible for duck eggs assert stats.layer_eligible_bird_days == 0 assert stats.layer_eligible_count_now == 0 # ============================================================================= # Caching Tests # ============================================================================= class TestEggStatsCaching: """Tests for stats caching behavior.""" def test_stats_upserted_to_table(self, seeded_db, e2e_test1_setup): """Stats are cached in egg_stats_30d_by_location table.""" get_egg_stats(seeded_db, e2e_test1_setup["location_id"], e2e_test1_setup["ts_utc"]) row = seeded_db.execute( "SELECT eggs_total_pcs FROM egg_stats_30d_by_location WHERE location_id = ?", (e2e_test1_setup["location_id"],), ).fetchone() assert row is not None assert row[0] == 12 def test_cached_stats_have_window_bounds(self, seeded_db, e2e_test1_setup): """Cached stats include window_start_utc and window_end_utc.""" ts_utc = e2e_test1_setup["ts_utc"] stats = get_egg_stats(seeded_db, e2e_test1_setup["location_id"], ts_utc) row = seeded_db.execute( """ SELECT window_start_utc, window_end_utc FROM egg_stats_30d_by_location WHERE location_id = ? """, (e2e_test1_setup["location_id"],), ).fetchone() assert row is not None # Cached bounds should match what get_egg_stats returned assert row[0] == stats.window_start_utc assert row[1] == stats.window_end_utc