feat: add animal tagging projection and service
Implement AnimalTagged and AnimalTagEnded event handling: - Add migration for tag_suggestions table - Create TagProjection for tag intervals and suggestions - Update EventAnimalsProjection to handle tag events - Add add_tag() and end_tag() to AnimalService Key behaviors: - No-op idempotence (adding active tag or ending inactive tag) - Updates live_animals_by_location.tags JSON array - Tracks tag usage statistics in tag_suggestions 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,8 @@ from animaltrack.events.types import (
|
||||
ANIMAL_ATTRIBUTES_UPDATED,
|
||||
ANIMAL_COHORT_CREATED,
|
||||
ANIMAL_MOVED,
|
||||
ANIMAL_TAG_ENDED,
|
||||
ANIMAL_TAGGED,
|
||||
)
|
||||
from animaltrack.models.events import Event
|
||||
from animaltrack.projections.base import Projection
|
||||
@@ -30,7 +32,13 @@ class EventAnimalsProjection(Projection):
|
||||
|
||||
def get_event_types(self) -> list[str]:
|
||||
"""Return the event types this projection handles."""
|
||||
return [ANIMAL_COHORT_CREATED, ANIMAL_MOVED, ANIMAL_ATTRIBUTES_UPDATED]
|
||||
return [
|
||||
ANIMAL_COHORT_CREATED,
|
||||
ANIMAL_MOVED,
|
||||
ANIMAL_ATTRIBUTES_UPDATED,
|
||||
ANIMAL_TAGGED,
|
||||
ANIMAL_TAG_ENDED,
|
||||
]
|
||||
|
||||
def apply(self, event: Event) -> None:
|
||||
"""Link event to affected animals."""
|
||||
|
||||
235
src/animaltrack/projections/tags.py
Normal file
235
src/animaltrack/projections/tags.py
Normal file
@@ -0,0 +1,235 @@
|
||||
# ABOUTME: Projection for animal tag intervals and tag suggestions.
|
||||
# ABOUTME: Handles AnimalTagged and AnimalTagEnded events.
|
||||
|
||||
import json
|
||||
from typing import Any
|
||||
|
||||
from animaltrack.events.types import ANIMAL_TAG_ENDED, ANIMAL_TAGGED
|
||||
from animaltrack.models.events import Event
|
||||
from animaltrack.projections.base import Projection
|
||||
|
||||
|
||||
class TagProjection(Projection):
|
||||
"""Maintains tag intervals and tag suggestions.
|
||||
|
||||
This projection handles tag add/end events, maintaining:
|
||||
- animal_tag_intervals: Historical record of when animals had tags
|
||||
- live_animals_by_location.tags: Current tags JSON array
|
||||
- tag_suggestions: Usage statistics for autocomplete
|
||||
"""
|
||||
|
||||
def __init__(self, db: Any) -> None:
|
||||
"""Initialize the projection with a database connection.
|
||||
|
||||
Args:
|
||||
db: A fastlite database connection.
|
||||
"""
|
||||
super().__init__(db)
|
||||
|
||||
def get_event_types(self) -> list[str]:
|
||||
"""Return the event types this projection handles."""
|
||||
return [ANIMAL_TAGGED, ANIMAL_TAG_ENDED]
|
||||
|
||||
def apply(self, event: Event) -> None:
|
||||
"""Apply tag event to update projections."""
|
||||
if event.type == ANIMAL_TAGGED:
|
||||
self._apply_animal_tagged(event)
|
||||
elif event.type == ANIMAL_TAG_ENDED:
|
||||
self._apply_animal_tag_ended(event)
|
||||
|
||||
def revert(self, event: Event) -> None:
|
||||
"""Revert tag event from projections."""
|
||||
if event.type == ANIMAL_TAGGED:
|
||||
self._revert_animal_tagged(event)
|
||||
elif event.type == ANIMAL_TAG_ENDED:
|
||||
self._revert_animal_tag_ended(event)
|
||||
|
||||
def _apply_animal_tagged(self, event: Event) -> None:
|
||||
"""Add tag to animals.
|
||||
|
||||
For each animal that doesn't already have this tag active:
|
||||
- Create open interval in animal_tag_intervals
|
||||
- Add tag to live_animals_by_location.tags JSON
|
||||
- Update tag_suggestions counters
|
||||
"""
|
||||
tag = event.entity_refs.get("tag")
|
||||
ts_utc = event.ts_utc
|
||||
actually_tagged = event.entity_refs.get("actually_tagged", [])
|
||||
|
||||
for animal_id in actually_tagged:
|
||||
# Create new open interval
|
||||
self.db.execute(
|
||||
"""
|
||||
INSERT INTO animal_tag_intervals (animal_id, tag, start_utc, end_utc)
|
||||
VALUES (?, ?, ?, NULL)
|
||||
""",
|
||||
(animal_id, tag, ts_utc),
|
||||
)
|
||||
|
||||
# Update tags JSON in live_animals_by_location
|
||||
self._add_tag_to_live_animals(animal_id, tag)
|
||||
|
||||
# Update tag suggestions
|
||||
if actually_tagged:
|
||||
self._update_tag_suggestions_on_add(tag, len(actually_tagged), ts_utc)
|
||||
|
||||
def _revert_animal_tagged(self, event: Event) -> None:
|
||||
"""Revert tag add by removing intervals and updating counts."""
|
||||
tag = event.entity_refs.get("tag")
|
||||
ts_utc = event.ts_utc
|
||||
actually_tagged = event.entity_refs.get("actually_tagged", [])
|
||||
|
||||
for animal_id in actually_tagged:
|
||||
# Delete the interval created by this event
|
||||
self.db.execute(
|
||||
"""
|
||||
DELETE FROM animal_tag_intervals
|
||||
WHERE animal_id = ? AND tag = ? AND start_utc = ?
|
||||
""",
|
||||
(animal_id, tag, ts_utc),
|
||||
)
|
||||
|
||||
# Remove tag from live_animals_by_location
|
||||
self._remove_tag_from_live_animals(animal_id, tag)
|
||||
|
||||
# Revert tag suggestions
|
||||
if actually_tagged:
|
||||
self._revert_tag_suggestions_on_add(tag, len(actually_tagged))
|
||||
|
||||
def _apply_animal_tag_ended(self, event: Event) -> None:
|
||||
"""End tag for animals.
|
||||
|
||||
For each animal that has this tag active:
|
||||
- Close the interval by setting end_utc
|
||||
- Remove tag from live_animals_by_location.tags JSON
|
||||
- Update tag_suggestions active_animals count
|
||||
"""
|
||||
tag = event.entity_refs.get("tag")
|
||||
ts_utc = event.ts_utc
|
||||
actually_ended = event.entity_refs.get("actually_ended", [])
|
||||
|
||||
for animal_id in actually_ended:
|
||||
# Close the open interval
|
||||
self.db.execute(
|
||||
"""
|
||||
UPDATE animal_tag_intervals
|
||||
SET end_utc = ?
|
||||
WHERE animal_id = ? AND tag = ? AND end_utc IS NULL
|
||||
""",
|
||||
(ts_utc, animal_id, tag),
|
||||
)
|
||||
|
||||
# Remove tag from live_animals_by_location
|
||||
self._remove_tag_from_live_animals(animal_id, tag)
|
||||
|
||||
# Update tag suggestions
|
||||
if actually_ended:
|
||||
self._update_tag_suggestions_on_end(tag, len(actually_ended), ts_utc)
|
||||
|
||||
def _revert_animal_tag_ended(self, event: Event) -> None:
|
||||
"""Revert tag end by reopening intervals."""
|
||||
tag = event.entity_refs.get("tag")
|
||||
ts_utc = event.ts_utc
|
||||
actually_ended = event.entity_refs.get("actually_ended", [])
|
||||
|
||||
for animal_id in actually_ended:
|
||||
# Reopen the interval
|
||||
self.db.execute(
|
||||
"""
|
||||
UPDATE animal_tag_intervals
|
||||
SET end_utc = NULL
|
||||
WHERE animal_id = ? AND tag = ? AND end_utc = ?
|
||||
""",
|
||||
(animal_id, tag, ts_utc),
|
||||
)
|
||||
|
||||
# Add tag back to live_animals_by_location
|
||||
self._add_tag_to_live_animals(animal_id, tag)
|
||||
|
||||
# Revert tag suggestions
|
||||
if actually_ended:
|
||||
self._revert_tag_suggestions_on_end(tag, len(actually_ended))
|
||||
|
||||
def _add_tag_to_live_animals(self, animal_id: str, tag: str) -> None:
|
||||
"""Add tag to the tags JSON array in live_animals_by_location."""
|
||||
# Get current tags
|
||||
row = self.db.execute(
|
||||
"SELECT tags FROM live_animals_by_location WHERE animal_id = ?",
|
||||
(animal_id,),
|
||||
).fetchone()
|
||||
|
||||
if row:
|
||||
tags = json.loads(row[0])
|
||||
if tag not in tags:
|
||||
tags.append(tag)
|
||||
self.db.execute(
|
||||
"UPDATE live_animals_by_location SET tags = ? WHERE animal_id = ?",
|
||||
(json.dumps(tags), animal_id),
|
||||
)
|
||||
|
||||
def _remove_tag_from_live_animals(self, animal_id: str, tag: str) -> None:
|
||||
"""Remove tag from the tags JSON array in live_animals_by_location."""
|
||||
row = self.db.execute(
|
||||
"SELECT tags FROM live_animals_by_location WHERE animal_id = ?",
|
||||
(animal_id,),
|
||||
).fetchone()
|
||||
|
||||
if row:
|
||||
tags = json.loads(row[0])
|
||||
if tag in tags:
|
||||
tags.remove(tag)
|
||||
self.db.execute(
|
||||
"UPDATE live_animals_by_location SET tags = ? WHERE animal_id = ?",
|
||||
(json.dumps(tags), animal_id),
|
||||
)
|
||||
|
||||
def _update_tag_suggestions_on_add(self, tag: str, count: int, ts_utc: int) -> None:
|
||||
"""Update tag_suggestions when tags are added."""
|
||||
self.db.execute(
|
||||
"""
|
||||
INSERT INTO tag_suggestions (tag, total_assignments, active_animals, last_used_utc, updated_at_utc)
|
||||
VALUES (?, ?, ?, ?, ?)
|
||||
ON CONFLICT(tag) DO UPDATE SET
|
||||
total_assignments = total_assignments + excluded.total_assignments,
|
||||
active_animals = active_animals + excluded.active_animals,
|
||||
last_used_utc = excluded.last_used_utc,
|
||||
updated_at_utc = excluded.updated_at_utc
|
||||
""",
|
||||
(tag, count, count, ts_utc, ts_utc),
|
||||
)
|
||||
|
||||
def _revert_tag_suggestions_on_add(self, tag: str, count: int) -> None:
|
||||
"""Revert tag_suggestions when add is reverted."""
|
||||
self.db.execute(
|
||||
"""
|
||||
UPDATE tag_suggestions
|
||||
SET total_assignments = total_assignments - ?,
|
||||
active_animals = active_animals - ?
|
||||
WHERE tag = ?
|
||||
""",
|
||||
(count, count, tag),
|
||||
)
|
||||
|
||||
def _update_tag_suggestions_on_end(self, tag: str, count: int, ts_utc: int) -> None:
|
||||
"""Update tag_suggestions when tags are ended."""
|
||||
self.db.execute(
|
||||
"""
|
||||
UPDATE tag_suggestions
|
||||
SET active_animals = active_animals - ?,
|
||||
last_used_utc = ?,
|
||||
updated_at_utc = ?
|
||||
WHERE tag = ?
|
||||
""",
|
||||
(count, ts_utc, ts_utc, tag),
|
||||
)
|
||||
|
||||
def _revert_tag_suggestions_on_end(self, tag: str, count: int) -> None:
|
||||
"""Revert tag_suggestions when end is reverted."""
|
||||
self.db.execute(
|
||||
"""
|
||||
UPDATE tag_suggestions
|
||||
SET active_animals = active_animals + ?
|
||||
WHERE tag = ?
|
||||
""",
|
||||
(count, tag),
|
||||
)
|
||||
@@ -8,6 +8,8 @@ from animaltrack.events.payloads import (
|
||||
AnimalAttributesUpdatedPayload,
|
||||
AnimalCohortCreatedPayload,
|
||||
AnimalMovedPayload,
|
||||
AnimalTagEndedPayload,
|
||||
AnimalTaggedPayload,
|
||||
)
|
||||
from animaltrack.events.processor import process_event
|
||||
from animaltrack.events.store import EventStore
|
||||
@@ -15,6 +17,8 @@ from animaltrack.events.types import (
|
||||
ANIMAL_ATTRIBUTES_UPDATED,
|
||||
ANIMAL_COHORT_CREATED,
|
||||
ANIMAL_MOVED,
|
||||
ANIMAL_TAG_ENDED,
|
||||
ANIMAL_TAGGED,
|
||||
)
|
||||
from animaltrack.id_gen import generate_id
|
||||
from animaltrack.models.events import Event
|
||||
@@ -370,3 +374,178 @@ class AnimalService:
|
||||
changed_attrs[animal_id] = animal_changes
|
||||
|
||||
return changed_attrs
|
||||
|
||||
def add_tag(
|
||||
self,
|
||||
payload: AnimalTaggedPayload,
|
||||
ts_utc: int,
|
||||
actor: str,
|
||||
nonce: str | None = None,
|
||||
route: str | None = None,
|
||||
) -> Event:
|
||||
"""Add a tag to animals.
|
||||
|
||||
Creates an AnimalTagged event and processes it through
|
||||
all registered projections. Animals that already have this
|
||||
tag active will be skipped (no-op behavior).
|
||||
|
||||
Args:
|
||||
payload: Validated tag payload with resolved_ids and tag.
|
||||
ts_utc: Timestamp in milliseconds since epoch.
|
||||
actor: The user performing the action.
|
||||
nonce: Optional idempotency nonce.
|
||||
route: Required if nonce provided.
|
||||
|
||||
Returns:
|
||||
The created event.
|
||||
|
||||
Raises:
|
||||
ValidationError: If validation fails.
|
||||
"""
|
||||
# Validate all animals exist
|
||||
self._validate_animals_exist(payload.resolved_ids)
|
||||
|
||||
# Determine which animals don't already have this tag active
|
||||
actually_tagged = self._find_animals_without_active_tag(payload.resolved_ids, payload.tag)
|
||||
|
||||
# Build entity_refs
|
||||
entity_refs = {
|
||||
"animal_ids": payload.resolved_ids,
|
||||
"tag": payload.tag,
|
||||
"actually_tagged": actually_tagged,
|
||||
}
|
||||
|
||||
with transaction(self.db):
|
||||
event = self.event_store.append_event(
|
||||
event_type=ANIMAL_TAGGED,
|
||||
ts_utc=ts_utc,
|
||||
actor=actor,
|
||||
entity_refs=entity_refs,
|
||||
payload=payload.model_dump(),
|
||||
nonce=nonce,
|
||||
route=route,
|
||||
)
|
||||
|
||||
process_event(event, self.registry)
|
||||
|
||||
return event
|
||||
|
||||
def end_tag(
|
||||
self,
|
||||
payload: AnimalTagEndedPayload,
|
||||
ts_utc: int,
|
||||
actor: str,
|
||||
nonce: str | None = None,
|
||||
route: str | None = None,
|
||||
) -> Event:
|
||||
"""End a tag for animals.
|
||||
|
||||
Creates an AnimalTagEnded event and processes it through
|
||||
all registered projections. Animals that don't have this
|
||||
tag active will be skipped (no-op behavior).
|
||||
|
||||
Args:
|
||||
payload: Validated tag end payload with resolved_ids and tag.
|
||||
ts_utc: Timestamp in milliseconds since epoch.
|
||||
actor: The user performing the action.
|
||||
nonce: Optional idempotency nonce.
|
||||
route: Required if nonce provided.
|
||||
|
||||
Returns:
|
||||
The created event.
|
||||
|
||||
Raises:
|
||||
ValidationError: If validation fails.
|
||||
"""
|
||||
# Validate all animals exist
|
||||
self._validate_animals_exist(payload.resolved_ids)
|
||||
|
||||
# Determine which animals actually have this tag active
|
||||
actually_ended = self._find_animals_with_active_tag(payload.resolved_ids, payload.tag)
|
||||
|
||||
# Build entity_refs
|
||||
entity_refs = {
|
||||
"animal_ids": payload.resolved_ids,
|
||||
"tag": payload.tag,
|
||||
"actually_ended": actually_ended,
|
||||
}
|
||||
|
||||
with transaction(self.db):
|
||||
event = self.event_store.append_event(
|
||||
event_type=ANIMAL_TAG_ENDED,
|
||||
ts_utc=ts_utc,
|
||||
actor=actor,
|
||||
entity_refs=entity_refs,
|
||||
payload=payload.model_dump(),
|
||||
nonce=nonce,
|
||||
route=route,
|
||||
)
|
||||
|
||||
process_event(event, self.registry)
|
||||
|
||||
return event
|
||||
|
||||
def _validate_animals_exist(self, animal_ids: list[str]) -> None:
|
||||
"""Validate all animals exist.
|
||||
|
||||
Args:
|
||||
animal_ids: List of animal IDs to validate.
|
||||
|
||||
Raises:
|
||||
ValidationError: If any animal doesn't exist.
|
||||
"""
|
||||
for animal_id in animal_ids:
|
||||
row = self.db.execute(
|
||||
"SELECT 1 FROM animal_registry WHERE animal_id = ?",
|
||||
(animal_id,),
|
||||
).fetchone()
|
||||
|
||||
if row is None:
|
||||
msg = f"Animal {animal_id} not found"
|
||||
raise ValidationError(msg)
|
||||
|
||||
def _find_animals_without_active_tag(self, animal_ids: list[str], tag: str) -> list[str]:
|
||||
"""Find animals that don't have the tag active.
|
||||
|
||||
Args:
|
||||
animal_ids: List of animal IDs to check.
|
||||
tag: The tag to look for.
|
||||
|
||||
Returns:
|
||||
List of animal IDs that don't have an open interval for this tag.
|
||||
"""
|
||||
result = []
|
||||
for animal_id in animal_ids:
|
||||
row = self.db.execute(
|
||||
"""SELECT 1 FROM animal_tag_intervals
|
||||
WHERE animal_id = ? AND tag = ? AND end_utc IS NULL""",
|
||||
(animal_id, tag),
|
||||
).fetchone()
|
||||
|
||||
if row is None:
|
||||
result.append(animal_id)
|
||||
|
||||
return result
|
||||
|
||||
def _find_animals_with_active_tag(self, animal_ids: list[str], tag: str) -> list[str]:
|
||||
"""Find animals that have the tag active.
|
||||
|
||||
Args:
|
||||
animal_ids: List of animal IDs to check.
|
||||
tag: The tag to look for.
|
||||
|
||||
Returns:
|
||||
List of animal IDs that have an open interval for this tag.
|
||||
"""
|
||||
result = []
|
||||
for animal_id in animal_ids:
|
||||
row = self.db.execute(
|
||||
"""SELECT 1 FROM animal_tag_intervals
|
||||
WHERE animal_id = ? AND tag = ? AND end_utc IS NULL""",
|
||||
(animal_id, tag),
|
||||
).fetchone()
|
||||
|
||||
if row is not None:
|
||||
result.append(animal_id)
|
||||
|
||||
return result
|
||||
|
||||
Reference in New Issue
Block a user