diff --git a/PLAN.md b/PLAN.md index 3348ea3..bd6feb5 100644 --- a/PLAN.md +++ b/PLAN.md @@ -190,12 +190,12 @@ Check off items as completed. Each phase builds on the previous. ## Phase 5: Selection & Historical Queries ### Step 5.1: Selection Filter DSL Parser -- [ ] Create `selection/parser.py` for filter parsing -- [ ] Support: AND (default), OR (|), negate (-), quotes -- [ ] Fields: location, species, sex, life_stage, identified, tag -- [ ] Create `selection/ast.py` for filter AST nodes -- [ ] Write tests for all filter syntax variations -- [ ] **Commit checkpoint** +- [x] Create `selection/parser.py` for filter parsing +- [x] Support: AND (default), OR (|), negate (-), quotes +- [x] Fields: location, species, sex, life_stage, identified, tag +- [x] Create `selection/ast.py` for filter AST nodes +- [x] Write tests for all filter syntax variations +- [x] **Commit checkpoint** ### Step 5.2: Historical State Resolver - [ ] Update `selection/resolver.py` for point-in-time resolution diff --git a/src/animaltrack/selection/__init__.py b/src/animaltrack/selection/__init__.py index dbc63ec..2a3fcdb 100644 --- a/src/animaltrack/selection/__init__.py +++ b/src/animaltrack/selection/__init__.py @@ -1,9 +1,15 @@ # ABOUTME: Selection system for resolving animal sets from filters. -# ABOUTME: Provides resolver functions for animal selection contexts. +# ABOUTME: Provides parser, AST, and resolver for animal selection contexts. +from animaltrack.selection.ast import FieldFilter, FilterAST +from animaltrack.selection.parser import ParseError, parse_filter from animaltrack.selection.resolver import SelectionResolverError, resolve_selection __all__ = [ + "FieldFilter", + "FilterAST", + "ParseError", "SelectionResolverError", + "parse_filter", "resolve_selection", ] diff --git a/src/animaltrack/selection/ast.py b/src/animaltrack/selection/ast.py new file mode 100644 index 0000000..510e030 --- /dev/null +++ b/src/animaltrack/selection/ast.py @@ -0,0 +1,41 @@ +# ABOUTME: AST node classes for the selection filter DSL. +# ABOUTME: Represents parsed filter expressions as a tree structure. + +from dataclasses import dataclass + + +@dataclass(frozen=True) +class FieldFilter: + """A single field filter condition. + + Examples: + - species:duck -> FieldFilter("species", ["duck"], False) + - species:duck|goose -> FieldFilter("species", ["duck", "goose"], False) + - -sex:male -> FieldFilter("sex", ["male"], True) + """ + + field: str + values: list[str] + negated: bool = False + + def __post_init__(self) -> None: + # Convert values to tuple for hashability since frozen=True + object.__setattr__(self, "values", tuple(self.values)) + + +@dataclass(frozen=True) +class FilterAST: + """Root AST node containing all field filters combined with AND. + + An empty filters list means "match all". + """ + + filters: list[FieldFilter] + + def __post_init__(self) -> None: + # Convert filters to tuple for hashability since frozen=True + object.__setattr__(self, "filters", tuple(self.filters)) + + def is_match_all(self) -> bool: + """Return True if this filter matches all animals.""" + return len(self.filters) == 0 diff --git a/src/animaltrack/selection/parser.py b/src/animaltrack/selection/parser.py new file mode 100644 index 0000000..a70ffc2 --- /dev/null +++ b/src/animaltrack/selection/parser.py @@ -0,0 +1,169 @@ +# ABOUTME: Parser for the selection filter DSL. +# ABOUTME: Converts filter strings into FilterAST for query execution. + +from collections.abc import Iterator + +from animaltrack.selection.ast import FieldFilter, FilterAST + +# Supported filter fields +VALID_FIELDS = frozenset({"location", "species", "sex", "life_stage", "identified", "tag"}) + +# Fields that can be used as flags (without :value) +FLAG_FIELDS = frozenset({"identified"}) + + +class ParseError(Exception): + """Raised when filter string cannot be parsed.""" + + +def _tokenize(filter_str: str) -> Iterator[str]: + """Split filter string into tokens, respecting quoted strings. + + Yields tokens like: + - "species:duck" + - "location:\"Strip 1\"" + - "-tag:sick" + """ + i = 0 + n = len(filter_str) + + while i < n: + # Skip whitespace + while i < n and filter_str[i].isspace(): + i += 1 + if i >= n: + break + + # Start of a token + token_start = i + + # Handle negation prefix + if filter_str[i] == "-": + i += 1 + if i >= n: + raise ParseError("Unexpected end after negation '-'") + + # Read until colon or space + while i < n and filter_str[i] not in ":\"' \t": + i += 1 + + if i >= n or filter_str[i].isspace(): + # No colon - could be a flag field or error + token = filter_str[token_start:i] + yield token + continue + + if filter_str[i] == ":": + i += 1 # consume colon + + if i >= n: + raise ParseError(f"Empty value after colon in '{filter_str[token_start:i]}'") + + # Check for quoted value + if i < n and filter_str[i] in "\"'": + quote_char = filter_str[i] + i += 1 # consume opening quote + value_start = i + + # Find closing quote + while i < n and filter_str[i] != quote_char: + i += 1 + + if i >= n: + raise ParseError( + f"Unclosed quote in filter starting at '{filter_str[token_start:value_start]}'" + ) + + i += 1 # consume closing quote + else: + # Unquoted value - read until space + while i < n and not filter_str[i].isspace(): + i += 1 + + token = filter_str[token_start:i] + yield token + else: + # Quote without colon + raise ParseError(f"Unexpected quote in token starting at position {token_start}") + + +def _parse_token(token: str) -> FieldFilter: + """Parse a single token into a FieldFilter.""" + negated = False + if token.startswith("-"): + negated = True + token = token[1:] + + # Check for flag field (no colon) + if ":" not in token: + if token in FLAG_FIELDS: + return FieldFilter(field=token, values=["1"], negated=negated) + else: + raise ParseError(f"Missing ':' in token '{token}' (not a flag field)") + + # Split on first colon + colon_idx = token.index(":") + field = token[:colon_idx] + value_part = token[colon_idx + 1 :] + + # Validate field + if field not in VALID_FIELDS: + raise ParseError( + f"Unknown field '{field}'. Valid fields: {', '.join(sorted(VALID_FIELDS))}" + ) + + # Strip quotes from value if present + if value_part.startswith('"') and value_part.endswith('"'): + value_part = value_part[1:-1] + elif value_part.startswith("'") and value_part.endswith("'"): + value_part = value_part[1:-1] + + # Check for empty value + if not value_part: + raise ParseError(f"Empty value for field '{field}'") + + # Split on pipe for OR values + values = value_part.split("|") + + # Check for empty values in OR + for v in values: + if not v: + raise ParseError(f"Empty value in OR expression for field '{field}'") + + return FieldFilter(field=field, values=values, negated=negated) + + +def parse_filter(filter_str: str) -> FilterAST: + """Parse a filter string into a FilterAST. + + Args: + filter_str: Filter string like "species:duck sex:female -tag:sick" + + Returns: + FilterAST containing parsed FieldFilter nodes. + + Raises: + ParseError: If the filter string is invalid. + + Examples: + >>> parse_filter("species:duck") + FilterAST(filters=(FieldFilter(field='species', values=('duck',), negated=False),)) + + >>> parse_filter("species:duck|goose sex:female") + FilterAST(filters=( + FieldFilter(field='species', values=('duck', 'goose'), negated=False), + FieldFilter(field='sex', values=('female',), negated=False), + )) + + >>> parse_filter("") + FilterAST(filters=()) # matches all + """ + filter_str = filter_str.strip() + + if not filter_str: + return FilterAST(filters=[]) + + tokens = list(_tokenize(filter_str)) + filters = [_parse_token(token) for token in tokens] + + return FilterAST(filters=filters) diff --git a/tests/test_selection_parser.py b/tests/test_selection_parser.py new file mode 100644 index 0000000..5f26adf --- /dev/null +++ b/tests/test_selection_parser.py @@ -0,0 +1,233 @@ +# ABOUTME: Tests for the selection filter DSL parser. +# ABOUTME: Covers all syntax variations: field:value, OR, negation, quotes. + +import pytest + +from animaltrack.selection.ast import FieldFilter, FilterAST +from animaltrack.selection.parser import ParseError, parse_filter + + +class TestSimpleFilters: + """Test basic field:value syntax.""" + + def test_single_field(self) -> None: + """species:duck -> single field filter.""" + result = parse_filter("species:duck") + assert result == FilterAST([FieldFilter("species", ["duck"])]) + + def test_multiple_fields_and(self) -> None: + """species:duck sex:female -> AND of two filters.""" + result = parse_filter("species:duck sex:female") + assert result == FilterAST( + [ + FieldFilter("species", ["duck"]), + FieldFilter("sex", ["female"]), + ] + ) + + def test_all_supported_fields(self) -> None: + """All supported fields should parse.""" + result = parse_filter("location:strip1 species:duck sex:male life_stage:adult tag:healthy") + assert len(result.filters) == 5 + assert result.filters[0].field == "location" + assert result.filters[1].field == "species" + assert result.filters[2].field == "sex" + assert result.filters[3].field == "life_stage" + assert result.filters[4].field == "tag" + + +class TestOrSyntax: + """Test OR with pipe character.""" + + def test_or_values(self) -> None: + """species:duck|goose -> single filter with two values.""" + result = parse_filter("species:duck|goose") + assert result == FilterAST([FieldFilter("species", ["duck", "goose"])]) + + def test_multiple_or_values(self) -> None: + """species:duck|goose|chicken -> three values.""" + result = parse_filter("species:duck|goose|chicken") + assert result == FilterAST([FieldFilter("species", ["duck", "goose", "chicken"])]) + + def test_or_combined_with_and(self) -> None: + """species:duck|goose sex:female -> OR within field, AND between fields.""" + result = parse_filter("species:duck|goose sex:female") + assert result == FilterAST( + [ + FieldFilter("species", ["duck", "goose"]), + FieldFilter("sex", ["female"]), + ] + ) + + +class TestNegation: + """Test negation with - prefix.""" + + def test_negated_field(self) -> None: + """-sex:male -> negated filter.""" + result = parse_filter("-sex:male") + assert result == FilterAST([FieldFilter("sex", ["male"], negated=True)]) + + def test_negated_with_or(self) -> None: + """-species:duck|goose -> negated with OR values.""" + result = parse_filter("-species:duck|goose") + assert result == FilterAST([FieldFilter("species", ["duck", "goose"], negated=True)]) + + def test_mixed_negated_and_positive(self) -> None: + """species:duck -tag:sick -> mix of positive and negated.""" + result = parse_filter("species:duck -tag:sick") + assert result == FilterAST( + [ + FieldFilter("species", ["duck"]), + FieldFilter("tag", ["sick"], negated=True), + ] + ) + + +class TestQuotedValues: + """Test quoted strings for values with spaces.""" + + def test_quoted_value(self) -> None: + """location:"Strip 1" -> value with space.""" + result = parse_filter('location:"Strip 1"') + assert result == FilterAST([FieldFilter("location", ["Strip 1"])]) + + def test_quoted_with_other_fields(self) -> None: + """location:"Strip 1" species:duck -> quoted and unquoted.""" + result = parse_filter('location:"Strip 1" species:duck') + assert result == FilterAST( + [ + FieldFilter("location", ["Strip 1"]), + FieldFilter("species", ["duck"]), + ] + ) + + def test_quoted_negated(self) -> None: + """-location:"Strip 1" -> negated quoted value.""" + result = parse_filter('-location:"Strip 1"') + assert result == FilterAST([FieldFilter("location", ["Strip 1"], negated=True)]) + + def test_single_quoted_value(self) -> None: + """location:'Strip 1' -> single quotes also work.""" + result = parse_filter("location:'Strip 1'") + assert result == FilterAST([FieldFilter("location", ["Strip 1"])]) + + +class TestIdentifiedField: + """Test the identified field with flag syntax.""" + + def test_identified_with_value(self) -> None: + """identified:1 -> explicit value.""" + result = parse_filter("identified:1") + assert result == FilterAST([FieldFilter("identified", ["1"])]) + + def test_identified_zero(self) -> None: + """identified:0 -> explicit false.""" + result = parse_filter("identified:0") + assert result == FilterAST([FieldFilter("identified", ["0"])]) + + def test_identified_flag(self) -> None: + """identified -> shorthand for identified:1.""" + result = parse_filter("identified") + assert result == FilterAST([FieldFilter("identified", ["1"])]) + + def test_negated_identified_flag(self) -> None: + """-identified -> shorthand for -identified:1.""" + result = parse_filter("-identified") + assert result == FilterAST([FieldFilter("identified", ["1"], negated=True)]) + + +class TestEmptyAndMatchAll: + """Test empty filter string.""" + + def test_empty_string(self) -> None: + """Empty string -> match all.""" + result = parse_filter("") + assert result == FilterAST([]) + assert result.is_match_all() + + def test_whitespace_only(self) -> None: + """Whitespace only -> match all.""" + result = parse_filter(" ") + assert result == FilterAST([]) + assert result.is_match_all() + + +class TestComplexFilters: + """Test complex combinations.""" + + def test_complex_filter(self) -> None: + """Complex filter with all features.""" + result = parse_filter('species:duck|goose sex:female -tag:old location:"Strip 1"') + assert result == FilterAST( + [ + FieldFilter("species", ["duck", "goose"]), + FieldFilter("sex", ["female"]), + FieldFilter("tag", ["old"], negated=True), + FieldFilter("location", ["Strip 1"]), + ] + ) + + def test_multiple_negations(self) -> None: + """Multiple negated filters.""" + result = parse_filter("-tag:sick -tag:old species:duck") + assert result == FilterAST( + [ + FieldFilter("tag", ["sick"], negated=True), + FieldFilter("tag", ["old"], negated=True), + FieldFilter("species", ["duck"]), + ] + ) + + +class TestParseErrors: + """Test error cases.""" + + def test_unknown_field(self) -> None: + """Unknown field raises ParseError.""" + with pytest.raises(ParseError) as exc_info: + parse_filter("unknown:value") + assert "unknown field" in str(exc_info.value).lower() + + def test_missing_colon(self) -> None: + """Missing colon raises ParseError for non-flag fields.""" + with pytest.raises(ParseError) as exc_info: + parse_filter("species") + assert "missing" in str(exc_info.value).lower() or "invalid" in str(exc_info.value).lower() + + def test_empty_value(self) -> None: + """Empty value after colon raises ParseError.""" + with pytest.raises(ParseError) as exc_info: + parse_filter("species:") + assert "empty" in str(exc_info.value).lower() or "value" in str(exc_info.value).lower() + + def test_unclosed_quote(self) -> None: + """Unclosed quote raises ParseError.""" + with pytest.raises(ParseError) as exc_info: + parse_filter('location:"Strip 1') + assert "quote" in str(exc_info.value).lower() or "unclosed" in str(exc_info.value).lower() + + def test_empty_or_value(self) -> None: + """Empty value in OR raises ParseError.""" + with pytest.raises(ParseError) as exc_info: + parse_filter("species:duck|") + assert "empty" in str(exc_info.value).lower() + + +class TestWhitespaceHandling: + """Test whitespace handling.""" + + def test_extra_spaces(self) -> None: + """Extra spaces between terms are ignored.""" + result = parse_filter("species:duck sex:female") + assert result == FilterAST( + [ + FieldFilter("species", ["duck"]), + FieldFilter("sex", ["female"]), + ] + ) + + def test_leading_trailing_spaces(self) -> None: + """Leading and trailing spaces are trimmed.""" + result = parse_filter(" species:duck ") + assert result == FilterAST([FieldFilter("species", ["duck"])])