feat: implement animal lifecycle events (Step 6.3)

Add 5 animal lifecycle event handlers with TDD:
- HatchRecorded: Creates hatchling animals at brood/event location
- AnimalOutcome: Records death/harvest/sold with yields, status updates
- AnimalPromoted: Sets identified flag, nickname, optionally updates sex/repro_status
- AnimalMerged: Merges animal records, creates aliases, removes merged from live roster
- AnimalStatusCorrected: Admin-only status correction with required reason

All events include:
- Projection handlers in animal_registry.py and intervals.py
- Event-animal linking in event_animals.py
- Service methods with validation in animal.py
- 51 unit tests covering event creation, projections, and validation
- E2E test #7 (harvest with yields) per spec §21.7

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
2025-12-29 19:20:33 +00:00
parent da95e85452
commit 1153f6c5b6
7 changed files with 2728 additions and 9 deletions

View File

@@ -6,7 +6,12 @@ from typing import Any
from animaltrack.events.types import (
ANIMAL_ATTRIBUTES_UPDATED,
ANIMAL_COHORT_CREATED,
ANIMAL_MERGED,
ANIMAL_MOVED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_STATUS_CORRECTED,
HATCH_RECORDED,
)
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
@@ -30,7 +35,16 @@ class AnimalRegistryProjection(Projection):
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED, ANIMAL_MOVED, ANIMAL_ATTRIBUTES_UPDATED]
return [
ANIMAL_COHORT_CREATED,
ANIMAL_MOVED,
ANIMAL_ATTRIBUTES_UPDATED,
HATCH_RECORDED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_MERGED,
ANIMAL_STATUS_CORRECTED,
]
def apply(self, event: Event) -> None:
"""Apply an event to update registry tables."""
@@ -40,6 +54,16 @@ class AnimalRegistryProjection(Projection):
self._apply_animal_moved(event)
elif event.type == ANIMAL_ATTRIBUTES_UPDATED:
self._apply_attributes_updated(event)
elif event.type == HATCH_RECORDED:
self._apply_hatch_recorded(event)
elif event.type == ANIMAL_OUTCOME:
self._apply_animal_outcome(event)
elif event.type == ANIMAL_PROMOTED:
self._apply_animal_promoted(event)
elif event.type == ANIMAL_MERGED:
self._apply_animal_merged(event)
elif event.type == ANIMAL_STATUS_CORRECTED:
self._apply_status_corrected(event)
def revert(self, event: Event) -> None:
"""Revert an event from registry tables."""
@@ -49,6 +73,16 @@ class AnimalRegistryProjection(Projection):
self._revert_animal_moved(event)
elif event.type == ANIMAL_ATTRIBUTES_UPDATED:
self._revert_attributes_updated(event)
elif event.type == HATCH_RECORDED:
self._revert_hatch_recorded(event)
elif event.type == ANIMAL_OUTCOME:
self._revert_animal_outcome(event)
elif event.type == ANIMAL_PROMOTED:
self._revert_animal_promoted(event)
elif event.type == ANIMAL_MERGED:
self._revert_animal_merged(event)
elif event.type == ANIMAL_STATUS_CORRECTED:
self._revert_status_corrected(event)
def _apply_cohort_created(self, event: Event) -> None:
"""Create animals in registry from cohort event.
@@ -302,3 +336,378 @@ class AnimalRegistryProjection(Projection):
""",
values_live,
)
# =========================================================================
# HatchRecorded handlers
# =========================================================================
def _apply_hatch_recorded(self, event: Event) -> None:
"""Create hatchling animals in registry from hatch event.
Creates new animals with life_stage=hatchling, sex=unknown,
status=alive, origin=hatched at the specified location.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
location_id = event.entity_refs.get("location_id")
species = event.payload["species"]
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Insert into animal_registry
self.db.execute(
"""
INSERT INTO animal_registry (
animal_id, species_code, identified, nickname,
sex, repro_status, life_stage, status,
location_id, origin, born_or_hatched_at, acquired_at,
first_seen_utc, last_event_utc
) VALUES (?, ?, 0, NULL, 'unknown', 'unknown', 'hatchling', 'alive',
?, 'hatched', ?, NULL, ?, ?)
""",
(animal_id, species, location_id, ts_utc, ts_utc, ts_utc),
)
# Insert into live_animals_by_location
self.db.execute(
"""
INSERT INTO live_animals_by_location (
animal_id, location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc, last_move_utc, tags
) VALUES (?, ?, ?, 0, NULL, 'unknown', 'unknown', 'hatchling', ?, NULL, '[]')
""",
(animal_id, location_id, species, ts_utc),
)
def _revert_hatch_recorded(self, event: Event) -> None:
"""Remove animals created by hatch event."""
animal_ids = event.entity_refs.get("animal_ids", [])
for animal_id in animal_ids:
# Delete from live_animals_by_location first
self.db.execute(
"DELETE FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
)
# Then delete from animal_registry
self.db.execute(
"DELETE FROM animal_registry WHERE animal_id = ?",
(animal_id,),
)
# =========================================================================
# AnimalOutcome handlers
# =========================================================================
def _apply_animal_outcome(self, event: Event) -> None:
"""Update animal status and remove from live roster.
Maps outcome to status and removes animals from live_animals_by_location
since they are no longer alive.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
new_status = event.entity_refs.get("new_status")
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Update status in animal_registry
self.db.execute(
"""
UPDATE animal_registry
SET status = ?, last_event_utc = ?
WHERE animal_id = ?
""",
(new_status, ts_utc, animal_id),
)
# Remove from live_animals_by_location
self.db.execute(
"DELETE FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
)
def _revert_animal_outcome(self, event: Event) -> None:
"""Restore animals to alive status and re-add to live roster."""
animal_ids = event.entity_refs.get("animal_ids", [])
for animal_id in animal_ids:
# Restore status to alive
self.db.execute(
"UPDATE animal_registry SET status = 'alive' WHERE animal_id = ?",
(animal_id,),
)
# Get animal data to re-add to live roster
row = self.db.execute(
"""SELECT location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc
FROM animal_registry WHERE animal_id = ?""",
(animal_id,),
).fetchone()
if row:
self.db.execute(
"""INSERT INTO live_animals_by_location
(animal_id, location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc, last_move_utc, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, '[]')""",
(animal_id,) + row,
)
# =========================================================================
# AnimalPromoted handlers
# =========================================================================
def _apply_animal_promoted(self, event: Event) -> None:
"""Set identified flag and nickname for promoted animal.
Updates both animal_registry and live_animals_by_location with
the promoted state and any changed attributes.
"""
animal_id = event.entity_refs.get("animal_ids", [])[0]
nickname = event.entity_refs.get("nickname")
changed_attrs = event.entity_refs.get("changed_attrs", {}).get(animal_id, {})
ts_utc = event.ts_utc
# Build update for animal_registry
set_clauses = ["identified = 1", "last_event_utc = ?"]
values = [ts_utc]
if nickname:
set_clauses.append("nickname = ?")
values.append(nickname)
for attr, change in changed_attrs.items():
set_clauses.append(f"{attr} = ?")
values.append(change["new"])
values.append(animal_id)
self.db.execute(
f"UPDATE animal_registry SET {', '.join(set_clauses)} WHERE animal_id = ?",
values,
)
# Update live_animals_by_location
set_clauses_live = ["identified = 1"]
values_live = []
if nickname:
set_clauses_live.append("nickname = ?")
values_live.append(nickname)
for attr, change in changed_attrs.items():
set_clauses_live.append(f"{attr} = ?")
values_live.append(change["new"])
values_live.append(animal_id)
self.db.execute(
f"UPDATE live_animals_by_location SET {', '.join(set_clauses_live)} WHERE animal_id = ?",
values_live,
)
def _revert_animal_promoted(self, event: Event) -> None:
"""Revert promotion by clearing identified flag and nickname."""
animal_id = event.entity_refs.get("animal_ids", [])[0]
nickname = event.entity_refs.get("nickname")
changed_attrs = event.entity_refs.get("changed_attrs", {}).get(animal_id, {})
# Restore in animal_registry
set_clauses = ["identified = 0"]
values = []
if nickname:
set_clauses.append("nickname = NULL")
for attr, change in changed_attrs.items():
set_clauses.append(f"{attr} = ?")
values.append(change["old"])
values.append(animal_id)
self.db.execute(
f"UPDATE animal_registry SET {', '.join(set_clauses)} WHERE animal_id = ?",
values,
)
# Restore in live_animals_by_location
set_clauses_live = ["identified = 0"]
values_live = []
if nickname:
set_clauses_live.append("nickname = NULL")
for attr, change in changed_attrs.items():
set_clauses_live.append(f"{attr} = ?")
values_live.append(change["old"])
values_live.append(animal_id)
self.db.execute(
f"UPDATE live_animals_by_location SET {', '.join(set_clauses_live)} WHERE animal_id = ?",
values_live,
)
# =========================================================================
# AnimalMerged handlers
# =========================================================================
def _apply_animal_merged(self, event: Event) -> None:
"""Mark merged animals as merged_into and create alias records.
Merged animals are removed from live roster and added to aliases table.
Survivor animal remains unchanged.
"""
merged_ids = event.entity_refs.get("merged_animal_ids", [])
survivor_id = event.entity_refs.get("survivor_animal_id")
ts_utc = event.ts_utc
for animal_id in merged_ids:
# Update status in animal_registry
self.db.execute(
"""
UPDATE animal_registry
SET status = 'merged_into', last_event_utc = ?
WHERE animal_id = ?
""",
(ts_utc, animal_id),
)
# Remove from live_animals_by_location
self.db.execute(
"DELETE FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
)
# Create alias record
self.db.execute(
"""
INSERT INTO animal_aliases
(alias_animal_id, survivor_animal_id, merged_at_utc)
VALUES (?, ?, ?)
""",
(animal_id, survivor_id, ts_utc),
)
def _revert_animal_merged(self, event: Event) -> None:
"""Restore merged animals to alive status."""
merged_ids = event.entity_refs.get("merged_animal_ids", [])
for animal_id in merged_ids:
# Delete alias record
self.db.execute(
"DELETE FROM animal_aliases WHERE alias_animal_id = ?",
(animal_id,),
)
# Restore status
self.db.execute(
"UPDATE animal_registry SET status = 'alive' WHERE animal_id = ?",
(animal_id,),
)
# Re-add to live_animals_by_location
row = self.db.execute(
"""SELECT location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc
FROM animal_registry WHERE animal_id = ?""",
(animal_id,),
).fetchone()
if row:
self.db.execute(
"""INSERT INTO live_animals_by_location
(animal_id, location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc, last_move_utc, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, '[]')""",
(animal_id,) + row,
)
# =========================================================================
# AnimalStatusCorrected handlers
# =========================================================================
def _apply_status_corrected(self, event: Event) -> None:
"""Apply status correction to animals.
Handles transitions to/from alive by managing live_animals_by_location.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
new_status = event.entity_refs.get("new_status")
old_status_map = event.entity_refs.get("old_status_map", {})
ts_utc = event.ts_utc
for animal_id in animal_ids:
old_status = old_status_map.get(animal_id)
# Update status in animal_registry
self.db.execute(
"""
UPDATE animal_registry
SET status = ?, last_event_utc = ?
WHERE animal_id = ?
""",
(new_status, ts_utc, animal_id),
)
# Handle live_animals_by_location
if new_status == "alive" and old_status != "alive":
# Add back to roster
row = self.db.execute(
"""SELECT location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc
FROM animal_registry WHERE animal_id = ?""",
(animal_id,),
).fetchone()
if row:
self.db.execute(
"""INSERT INTO live_animals_by_location
(animal_id, location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc, last_move_utc, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, '[]')""",
(animal_id,) + row,
)
elif new_status != "alive" and old_status == "alive":
# Remove from roster
self.db.execute(
"DELETE FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
)
def _revert_status_corrected(self, event: Event) -> None:
"""Revert status correction."""
animal_ids = event.entity_refs.get("animal_ids", [])
new_status = event.entity_refs.get("new_status")
old_status_map = event.entity_refs.get("old_status_map", {})
for animal_id in animal_ids:
old_status = old_status_map.get(animal_id)
# Restore old status
self.db.execute(
"UPDATE animal_registry SET status = ? WHERE animal_id = ?",
(old_status, animal_id),
)
# Handle live_animals_by_location (opposite of apply)
if old_status == "alive" and new_status != "alive":
# Add back to roster
row = self.db.execute(
"""SELECT location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc
FROM animal_registry WHERE animal_id = ?""",
(animal_id,),
).fetchone()
if row:
self.db.execute(
"""INSERT INTO live_animals_by_location
(animal_id, location_id, species_code, identified, nickname,
sex, repro_status, life_stage, first_seen_utc, last_move_utc, tags)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, NULL, '[]')""",
(animal_id,) + row,
)
elif old_status != "alive" and new_status == "alive":
# Remove from roster
self.db.execute(
"DELETE FROM live_animals_by_location WHERE animal_id = ?",
(animal_id,),
)

View File

@@ -6,9 +6,14 @@ from typing import Any
from animaltrack.events.types import (
ANIMAL_ATTRIBUTES_UPDATED,
ANIMAL_COHORT_CREATED,
ANIMAL_MERGED,
ANIMAL_MOVED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_STATUS_CORRECTED,
ANIMAL_TAG_ENDED,
ANIMAL_TAGGED,
HATCH_RECORDED,
PRODUCT_COLLECTED,
)
from animaltrack.models.events import Event
@@ -40,6 +45,11 @@ class EventAnimalsProjection(Projection):
ANIMAL_TAGGED,
ANIMAL_TAG_ENDED,
PRODUCT_COLLECTED,
HATCH_RECORDED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_MERGED,
ANIMAL_STATUS_CORRECTED,
]
def apply(self, event: Event) -> None:

View File

@@ -6,7 +6,12 @@ from typing import Any
from animaltrack.events.types import (
ANIMAL_ATTRIBUTES_UPDATED,
ANIMAL_COHORT_CREATED,
ANIMAL_MERGED,
ANIMAL_MOVED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_STATUS_CORRECTED,
HATCH_RECORDED,
)
from animaltrack.models.events import Event
from animaltrack.projections.base import Projection
@@ -33,7 +38,16 @@ class IntervalProjection(Projection):
def get_event_types(self) -> list[str]:
"""Return the event types this projection handles."""
return [ANIMAL_COHORT_CREATED, ANIMAL_MOVED, ANIMAL_ATTRIBUTES_UPDATED]
return [
ANIMAL_COHORT_CREATED,
ANIMAL_MOVED,
ANIMAL_ATTRIBUTES_UPDATED,
HATCH_RECORDED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_MERGED,
ANIMAL_STATUS_CORRECTED,
]
def apply(self, event: Event) -> None:
"""Create intervals for event."""
@@ -43,6 +57,16 @@ class IntervalProjection(Projection):
self._apply_animal_moved(event)
elif event.type == ANIMAL_ATTRIBUTES_UPDATED:
self._apply_attributes_updated(event)
elif event.type == HATCH_RECORDED:
self._apply_hatch_recorded(event)
elif event.type == ANIMAL_OUTCOME:
self._apply_animal_outcome(event)
elif event.type == ANIMAL_PROMOTED:
self._apply_animal_promoted(event)
elif event.type == ANIMAL_MERGED:
self._apply_animal_merged(event)
elif event.type == ANIMAL_STATUS_CORRECTED:
self._apply_status_corrected(event)
def revert(self, event: Event) -> None:
"""Remove intervals created by event."""
@@ -52,6 +76,16 @@ class IntervalProjection(Projection):
self._revert_animal_moved(event)
elif event.type == ANIMAL_ATTRIBUTES_UPDATED:
self._revert_attributes_updated(event)
elif event.type == HATCH_RECORDED:
self._revert_hatch_recorded(event)
elif event.type == ANIMAL_OUTCOME:
self._revert_animal_outcome(event)
elif event.type == ANIMAL_PROMOTED:
self._revert_animal_promoted(event)
elif event.type == ANIMAL_MERGED:
self._revert_animal_merged(event)
elif event.type == ANIMAL_STATUS_CORRECTED:
self._revert_status_corrected(event)
def _apply_cohort_created(self, event: Event) -> None:
"""Create initial intervals for new animals.
@@ -254,3 +288,392 @@ class IntervalProjection(Projection):
""",
(animal_id, attr, old_value, ts_utc),
)
# =========================================================================
# HatchRecorded handlers
# =========================================================================
def _apply_hatch_recorded(self, event: Event) -> None:
"""Create initial intervals for hatched animals.
For each animal:
- Create an open location interval at the hatch location
- Create open attribute intervals (sex=unknown, life_stage=hatchling,
repro_status=unknown, status=alive)
"""
animal_ids = event.entity_refs.get("animal_ids", [])
location_id = event.entity_refs.get("location_id")
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Create location interval (open-ended)
self.db.execute(
"""
INSERT INTO animal_location_intervals
(animal_id, location_id, start_utc, end_utc)
VALUES (?, ?, ?, NULL)
""",
(animal_id, location_id, ts_utc),
)
# Create attribute intervals for hatchlings
attrs = [
("sex", "unknown"),
("life_stage", "hatchling"),
("repro_status", "unknown"),
("status", "alive"),
]
for attr, value in attrs:
self.db.execute(
"""
INSERT INTO animal_attr_intervals
(animal_id, attr, value, start_utc, end_utc)
VALUES (?, ?, ?, ?, NULL)
""",
(animal_id, attr, value, ts_utc),
)
def _revert_hatch_recorded(self, event: Event) -> None:
"""Remove intervals for hatched animals."""
animal_ids = event.entity_refs.get("animal_ids", [])
for animal_id in animal_ids:
# Delete location intervals
self.db.execute(
"DELETE FROM animal_location_intervals WHERE animal_id = ?",
(animal_id,),
)
# Delete attribute intervals
self.db.execute(
"DELETE FROM animal_attr_intervals WHERE animal_id = ?",
(animal_id,),
)
# =========================================================================
# AnimalOutcome handlers
# =========================================================================
def _apply_animal_outcome(self, event: Event) -> None:
"""Close intervals for animals with outcome.
For each animal:
- Close open location interval
- Close open status=alive interval
- Create new status interval (harvested/dead/sold)
"""
animal_ids = event.entity_refs.get("animal_ids", [])
new_status = event.entity_refs.get("new_status")
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Close open location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = ?
WHERE animal_id = ? AND end_utc IS NULL
""",
(ts_utc, animal_id),
)
# Close open status interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND end_utc IS NULL
""",
(ts_utc, animal_id),
)
# Create new status interval
self.db.execute(
"""
INSERT INTO animal_attr_intervals
(animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', ?, ?, NULL)
""",
(animal_id, new_status, ts_utc),
)
def _revert_animal_outcome(self, event: Event) -> None:
"""Revert outcome by reopening intervals."""
animal_ids = event.entity_refs.get("animal_ids", [])
new_status = event.entity_refs.get("new_status")
ts_utc = event.ts_utc
for animal_id in animal_ids:
# Delete the terminal status interval
self.db.execute(
"""
DELETE FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = ? AND start_utc = ?
""",
(animal_id, new_status, ts_utc),
)
# Reopen the alive status interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = NULL
WHERE animal_id = ? AND attr = 'status' AND value = 'alive' AND end_utc = ?
""",
(animal_id, ts_utc),
)
# Reopen location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = NULL
WHERE animal_id = ? AND end_utc = ?
""",
(animal_id, ts_utc),
)
# =========================================================================
# AnimalPromoted handlers
# =========================================================================
def _apply_animal_promoted(self, event: Event) -> None:
"""Create intervals for changed attributes in promotion."""
animal_id = event.entity_refs.get("animal_ids", [])[0]
changed_attrs = event.entity_refs.get("changed_attrs", {}).get(animal_id, {})
ts_utc = event.ts_utc
for attr, change in changed_attrs.items():
# Close old interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = ? AND value = ? AND end_utc IS NULL
""",
(ts_utc, animal_id, attr, change["old"]),
)
# Create new interval
self.db.execute(
"""
INSERT INTO animal_attr_intervals
(animal_id, attr, value, start_utc, end_utc)
VALUES (?, ?, ?, ?, NULL)
""",
(animal_id, attr, change["new"], ts_utc),
)
def _revert_animal_promoted(self, event: Event) -> None:
"""Revert attribute intervals from promotion."""
animal_id = event.entity_refs.get("animal_ids", [])[0]
changed_attrs = event.entity_refs.get("changed_attrs", {}).get(animal_id, {})
ts_utc = event.ts_utc
for attr, change in changed_attrs.items():
# Delete new interval
self.db.execute(
"""
DELETE FROM animal_attr_intervals
WHERE animal_id = ? AND attr = ? AND value = ? AND start_utc = ?
""",
(animal_id, attr, change["new"], ts_utc),
)
# Reopen old interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = NULL
WHERE animal_id = ? AND attr = ? AND value = ? AND end_utc = ?
""",
(animal_id, attr, change["old"], ts_utc),
)
# =========================================================================
# AnimalMerged handlers
# =========================================================================
def _apply_animal_merged(self, event: Event) -> None:
"""Close intervals for merged animals."""
merged_ids = event.entity_refs.get("merged_animal_ids", [])
ts_utc = event.ts_utc
for animal_id in merged_ids:
# Close location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = ?
WHERE animal_id = ? AND end_utc IS NULL
""",
(ts_utc, animal_id),
)
# Close status interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND end_utc IS NULL
""",
(ts_utc, animal_id),
)
# Create merged_into status interval
self.db.execute(
"""
INSERT INTO animal_attr_intervals
(animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', 'merged_into', ?, NULL)
""",
(animal_id, ts_utc),
)
def _revert_animal_merged(self, event: Event) -> None:
"""Revert intervals for merged animals."""
merged_ids = event.entity_refs.get("merged_animal_ids", [])
ts_utc = event.ts_utc
for animal_id in merged_ids:
# Delete merged_into status interval
self.db.execute(
"""
DELETE FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = 'merged_into' AND start_utc = ?
""",
(animal_id, ts_utc),
)
# Reopen alive status interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = NULL
WHERE animal_id = ? AND attr = 'status' AND value = 'alive' AND end_utc = ?
""",
(animal_id, ts_utc),
)
# Reopen location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = NULL
WHERE animal_id = ? AND end_utc = ?
""",
(animal_id, ts_utc),
)
# =========================================================================
# AnimalStatusCorrected handlers
# =========================================================================
def _apply_status_corrected(self, event: Event) -> None:
"""Apply interval changes for status correction.
Handles transitions to/from alive by managing location intervals.
"""
animal_ids = event.entity_refs.get("animal_ids", [])
new_status = event.entity_refs.get("new_status")
old_status_map = event.entity_refs.get("old_status_map", {})
ts_utc = event.ts_utc
for animal_id in animal_ids:
old_status = old_status_map.get(animal_id)
# Close old status interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = ?
WHERE animal_id = ? AND attr = 'status' AND value = ? AND end_utc IS NULL
""",
(ts_utc, animal_id, old_status),
)
# Create new status interval
self.db.execute(
"""
INSERT INTO animal_attr_intervals
(animal_id, attr, value, start_utc, end_utc)
VALUES (?, 'status', ?, ?, NULL)
""",
(animal_id, new_status, ts_utc),
)
# Handle location interval based on alive transition
if new_status == "alive" and old_status != "alive":
# Reopen the most recently closed location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = NULL
WHERE animal_id = ? AND end_utc = (
SELECT MAX(end_utc) FROM animal_location_intervals WHERE animal_id = ?
)
""",
(animal_id, animal_id),
)
elif new_status != "alive" and old_status == "alive":
# Close the open location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = ?
WHERE animal_id = ? AND end_utc IS NULL
""",
(ts_utc, animal_id),
)
def _revert_status_corrected(self, event: Event) -> None:
"""Revert interval changes from status correction."""
animal_ids = event.entity_refs.get("animal_ids", [])
new_status = event.entity_refs.get("new_status")
old_status_map = event.entity_refs.get("old_status_map", {})
ts_utc = event.ts_utc
for animal_id in animal_ids:
old_status = old_status_map.get(animal_id)
# Delete new status interval
self.db.execute(
"""
DELETE FROM animal_attr_intervals
WHERE animal_id = ? AND attr = 'status' AND value = ? AND start_utc = ?
""",
(animal_id, new_status, ts_utc),
)
# Reopen old status interval
self.db.execute(
"""
UPDATE animal_attr_intervals
SET end_utc = NULL
WHERE animal_id = ? AND attr = 'status' AND value = ? AND end_utc = ?
""",
(animal_id, old_status, ts_utc),
)
# Handle location intervals (opposite of apply)
if old_status == "alive" and new_status != "alive":
# Reopen location interval
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = NULL
WHERE animal_id = ? AND end_utc = ?
""",
(animal_id, ts_utc),
)
elif old_status != "alive" and new_status == "alive":
# Close the location interval that was reopened
self.db.execute(
"""
UPDATE animal_location_intervals
SET end_utc = ?
WHERE animal_id = ? AND end_utc IS NULL
""",
(ts_utc, animal_id),
)

View File

@@ -4,21 +4,32 @@
from typing import Any
from animaltrack.db import transaction
from animaltrack.events.enums import Outcome
from animaltrack.events.payloads import (
AnimalAttributesUpdatedPayload,
AnimalCohortCreatedPayload,
AnimalMergedPayload,
AnimalMovedPayload,
AnimalOutcomePayload,
AnimalPromotedPayload,
AnimalStatusCorrectedPayload,
AnimalTagEndedPayload,
AnimalTaggedPayload,
HatchRecordedPayload,
)
from animaltrack.events.processor import process_event
from animaltrack.events.store import EventStore
from animaltrack.events.types import (
ANIMAL_ATTRIBUTES_UPDATED,
ANIMAL_COHORT_CREATED,
ANIMAL_MERGED,
ANIMAL_MOVED,
ANIMAL_OUTCOME,
ANIMAL_PROMOTED,
ANIMAL_STATUS_CORRECTED,
ANIMAL_TAG_ENDED,
ANIMAL_TAGGED,
HATCH_RECORDED,
)
from animaltrack.id_gen import generate_id
from animaltrack.models.events import Event
@@ -549,3 +560,382 @@ class AnimalService:
result.append(animal_id)
return result
# =========================================================================
# Lifecycle Events
# =========================================================================
def record_hatch(
self,
payload: HatchRecordedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Record a hatch event creating new hatchling animals.
Creates a HatchRecorded event and generates new animal records
for the specified number of hatchlings.
Args:
payload: Validated hatch payload with species, location, count.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user recording the hatch.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If validation fails.
"""
# Validate location_id exists and is active
self._validate_location(payload.location_id)
# Determine actual location (brood location if provided)
actual_location_id = payload.location_id
if payload.assigned_brood_location_id:
self._validate_location(payload.assigned_brood_location_id)
actual_location_id = payload.assigned_brood_location_id
# Validate species is active
self._validate_species(payload.species)
# Generate animal IDs for hatchlings
animal_ids = [generate_id() for _ in range(payload.hatched_live)]
# Build entity_refs
entity_refs = {
"location_id": actual_location_id,
"animal_ids": animal_ids,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=HATCH_RECORDED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event
def record_outcome(
self,
payload: AnimalOutcomePayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Record an outcome (death/harvest/sold) for animals.
Creates an AnimalOutcome event and updates animal status.
Args:
payload: Validated outcome payload with resolved_ids and outcome.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user recording the outcome.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If validation fails.
"""
# Validate all animals exist and are alive
self._validate_animals_alive(payload.resolved_ids)
# Map outcome to status
outcome_to_status = {
Outcome.DEATH: "dead",
Outcome.HARVEST: "harvested",
Outcome.SOLD: "sold",
Outcome.PREDATOR_LOSS: "dead",
Outcome.UNKNOWN: "dead",
}
new_status = outcome_to_status[payload.outcome]
# Build entity_refs
entity_refs = {
"animal_ids": payload.resolved_ids,
"new_status": new_status,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=ANIMAL_OUTCOME,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event
def _validate_animals_alive(self, animal_ids: list[str]) -> None:
"""Validate all animals exist and are status=alive.
Args:
animal_ids: List of animal IDs to validate.
Raises:
ValidationError: If any animal doesn't exist or is not alive.
"""
for animal_id in animal_ids:
row = self.db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
if row is None:
msg = f"Animal {animal_id} not found"
raise ValidationError(msg)
if row[0] != "alive":
msg = f"Animal {animal_id} is not alive (status: {row[0]})"
raise ValidationError(msg)
def promote_animal(
self,
payload: AnimalPromotedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Promote an animal to identified status with optional nickname.
Creates an AnimalPromoted event and updates animal identity.
Args:
payload: Validated promotion payload with animal_id and options.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the promotion.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If validation fails.
"""
# Validate animal exists and is alive
self._validate_animal_alive(payload.animal_id)
# Validate nickname uniqueness if provided
if payload.nickname:
self._validate_nickname_unique(payload.nickname, payload.animal_id)
# Get current attributes for change tracking
changed_attrs: dict[str, dict[str, str]] = {}
row = self.db.execute(
"SELECT sex, repro_status FROM animal_registry WHERE animal_id = ?",
(payload.animal_id,),
).fetchone()
current_sex, current_repro_status = row
if payload.sex and payload.sex.value != current_sex:
changed_attrs["sex"] = {"old": current_sex, "new": payload.sex.value}
if payload.repro_status and payload.repro_status.value != current_repro_status:
changed_attrs["repro_status"] = {
"old": current_repro_status,
"new": payload.repro_status.value,
}
# Build entity_refs
entity_refs = {
"animal_ids": [payload.animal_id],
"nickname": payload.nickname,
"changed_attrs": {payload.animal_id: changed_attrs} if changed_attrs else {},
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=ANIMAL_PROMOTED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event
def _validate_animal_alive(self, animal_id: str) -> None:
"""Validate a single animal exists and is alive.
Args:
animal_id: The animal ID to validate.
Raises:
ValidationError: If animal doesn't exist or is not alive.
"""
row = self.db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
if row is None:
msg = f"Animal {animal_id} not found"
raise ValidationError(msg)
if row[0] != "alive":
msg = f"Animal {animal_id} is not alive (status: {row[0]})"
raise ValidationError(msg)
def _validate_nickname_unique(self, nickname: str, animal_id: str) -> None:
"""Validate nickname is not in use by another active animal.
Args:
nickname: The nickname to validate.
animal_id: The animal receiving this nickname (excluded from check).
Raises:
ValidationError: If nickname is already in use.
"""
row = self.db.execute(
"""SELECT animal_id FROM animal_registry
WHERE nickname = ? AND animal_id != ?
AND status NOT IN ('dead', 'harvested', 'sold', 'merged_into')""",
(nickname, animal_id),
).fetchone()
if row:
msg = f"Nickname '{nickname}' is already in use"
raise ValidationError(msg)
def merge_animals(
self,
payload: AnimalMergedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Merge multiple animal records into a survivor.
Creates an AnimalMerged event and creates alias records.
Args:
payload: Validated merge payload with survivor and merged IDs.
ts_utc: Timestamp in milliseconds since epoch.
actor: The user performing the merge.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If validation fails.
"""
# Validate survivor exists and is alive
self._validate_animal_alive(payload.survivor_animal_id)
# Validate survivor is not in merged list
if payload.survivor_animal_id in payload.merged_animal_ids:
msg = "Survivor cannot be in the merged list"
raise ValidationError(msg)
# Validate all merged animals exist and are alive
for animal_id in payload.merged_animal_ids:
self._validate_animal_alive(animal_id)
# Build entity_refs - include both survivor and merged in animal_ids
entity_refs = {
"animal_ids": [payload.survivor_animal_id] + payload.merged_animal_ids,
"survivor_animal_id": payload.survivor_animal_id,
"merged_animal_ids": payload.merged_animal_ids,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=ANIMAL_MERGED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event
def correct_status(
self,
payload: AnimalStatusCorrectedPayload,
ts_utc: int,
actor: str,
nonce: str | None = None,
route: str | None = None,
) -> Event:
"""Correct animal status (admin-only with required reason).
Creates an AnimalStatusCorrected event for audit trail.
Args:
payload: Validated correction payload with resolved_ids, new_status, reason.
ts_utc: Timestamp in milliseconds since epoch.
actor: The admin performing the correction.
nonce: Optional idempotency nonce.
route: Required if nonce provided.
Returns:
The created event.
Raises:
ValidationError: If validation fails.
"""
# Validate all animals exist (note: we don't require alive - this is for corrections)
self._validate_animals_exist(payload.resolved_ids)
# Capture current status for entity_refs
old_status_map = {}
for animal_id in payload.resolved_ids:
row = self.db.execute(
"SELECT status FROM animal_registry WHERE animal_id = ?",
(animal_id,),
).fetchone()
old_status_map[animal_id] = row[0]
# Build entity_refs
entity_refs = {
"animal_ids": payload.resolved_ids,
"new_status": payload.new_status.value,
"old_status_map": old_status_map,
}
with transaction(self.db):
event = self.event_store.append_event(
event_type=ANIMAL_STATUS_CORRECTED,
ts_utc=ts_utc,
actor=actor,
entity_refs=entity_refs,
payload=payload.model_dump(),
nonce=nonce,
route=route,
)
process_event(event, self.registry)
return event