Add web layer foundation: - FastHTML app factory with Beforeware pattern - Auth middleware validating trusted proxy IPs and X-Oidc-Username header - CSRF dual-token validation (cookie + header + Origin/Referer) - Request ID generation (ULID) and NDJSON request logging - Role-based permission helpers (can_edit_event, can_delete_event) 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
394 lines
13 KiB
Python
394 lines
13 KiB
Python
# ABOUTME: Tests for web authentication and authorization helpers.
|
|
# ABOUTME: Covers role checks, permission logic, and user extraction.
|
|
|
|
import os
|
|
|
|
import pytest
|
|
|
|
from animaltrack.models.reference import User, UserRole
|
|
|
|
|
|
def make_test_settings(
|
|
csrf_secret: str = "test-secret",
|
|
trusted_proxy_ips: str = "",
|
|
auth_header_name: str = "X-Oidc-Username",
|
|
):
|
|
"""Create Settings for testing by setting env vars temporarily."""
|
|
from animaltrack.config import Settings
|
|
|
|
# Settings loads from env, so we set env vars temporarily
|
|
old_env = os.environ.copy()
|
|
try:
|
|
os.environ["CSRF_SECRET"] = csrf_secret
|
|
os.environ["TRUSTED_PROXY_IPS"] = trusted_proxy_ips
|
|
os.environ["AUTH_HEADER_NAME"] = auth_header_name
|
|
return Settings()
|
|
finally:
|
|
os.environ.clear()
|
|
os.environ.update(old_env)
|
|
|
|
|
|
# Fixtures for test users
|
|
@pytest.fixture
|
|
def admin_user():
|
|
"""Create an admin user for testing."""
|
|
return User(
|
|
username="admin",
|
|
role=UserRole.ADMIN,
|
|
active=True,
|
|
created_at_utc=1000000,
|
|
updated_at_utc=1000000,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def recorder_user():
|
|
"""Create a recorder user for testing."""
|
|
return User(
|
|
username="recorder",
|
|
role=UserRole.RECORDER,
|
|
active=True,
|
|
created_at_utc=1000000,
|
|
updated_at_utc=1000000,
|
|
)
|
|
|
|
|
|
@pytest.fixture
|
|
def inactive_user():
|
|
"""Create an inactive user for testing."""
|
|
return User(
|
|
username="inactive",
|
|
role=UserRole.RECORDER,
|
|
active=False,
|
|
created_at_utc=1000000,
|
|
updated_at_utc=1000000,
|
|
)
|
|
|
|
|
|
class TestIsAdmin:
|
|
"""Tests for is_admin helper function."""
|
|
|
|
def test_returns_true_for_admin(self, admin_user):
|
|
"""is_admin returns True for admin role."""
|
|
from animaltrack.web.auth import is_admin
|
|
|
|
assert is_admin(admin_user) is True
|
|
|
|
def test_returns_false_for_recorder(self, recorder_user):
|
|
"""is_admin returns False for recorder role."""
|
|
from animaltrack.web.auth import is_admin
|
|
|
|
assert is_admin(recorder_user) is False
|
|
|
|
|
|
class TestIsRecorder:
|
|
"""Tests for is_recorder helper function."""
|
|
|
|
def test_returns_true_for_recorder(self, recorder_user):
|
|
"""is_recorder returns True for recorder role."""
|
|
from animaltrack.web.auth import is_recorder
|
|
|
|
assert is_recorder(recorder_user) is True
|
|
|
|
def test_returns_false_for_admin(self, admin_user):
|
|
"""is_recorder returns False for admin role."""
|
|
from animaltrack.web.auth import is_recorder
|
|
|
|
assert is_recorder(admin_user) is False
|
|
|
|
|
|
class TestCanEditEvent:
|
|
"""Tests for can_edit_event permission check."""
|
|
|
|
def test_admin_can_edit_any_event(self, admin_user):
|
|
"""Admin can edit events created by any user."""
|
|
from animaltrack.web.auth import can_edit_event
|
|
|
|
assert can_edit_event(admin_user, "other_user") is True
|
|
assert can_edit_event(admin_user, "admin") is True
|
|
|
|
def test_recorder_can_edit_own_events(self, recorder_user):
|
|
"""Recorder can edit their own events."""
|
|
from animaltrack.web.auth import can_edit_event
|
|
|
|
assert can_edit_event(recorder_user, "recorder") is True
|
|
|
|
def test_recorder_cannot_edit_other_events(self, recorder_user):
|
|
"""Recorder cannot edit events created by others."""
|
|
from animaltrack.web.auth import can_edit_event
|
|
|
|
assert can_edit_event(recorder_user, "other_user") is False
|
|
|
|
|
|
class TestCanDeleteEvent:
|
|
"""Tests for can_delete_event permission check."""
|
|
|
|
def test_admin_can_delete_any_event_without_dependents(self, admin_user):
|
|
"""Admin can delete any event without dependents."""
|
|
from animaltrack.web.auth import can_delete_event
|
|
|
|
assert can_delete_event(admin_user, "other_user", has_dependents=False) is True
|
|
assert can_delete_event(admin_user, "admin", has_dependents=False) is True
|
|
|
|
def test_admin_can_delete_any_event_with_dependents(self, admin_user):
|
|
"""Admin can cascade delete events with dependents."""
|
|
from animaltrack.web.auth import can_delete_event
|
|
|
|
assert can_delete_event(admin_user, "other_user", has_dependents=True) is True
|
|
assert can_delete_event(admin_user, "admin", has_dependents=True) is True
|
|
|
|
def test_recorder_can_delete_own_event_without_dependents(self, recorder_user):
|
|
"""Recorder can delete their own events without dependents."""
|
|
from animaltrack.web.auth import can_delete_event
|
|
|
|
assert can_delete_event(recorder_user, "recorder", has_dependents=False) is True
|
|
|
|
def test_recorder_cannot_delete_own_event_with_dependents(self, recorder_user):
|
|
"""Recorder cannot delete their own events if they have dependents."""
|
|
from animaltrack.web.auth import can_delete_event
|
|
|
|
assert can_delete_event(recorder_user, "recorder", has_dependents=True) is False
|
|
|
|
def test_recorder_cannot_delete_other_events(self, recorder_user):
|
|
"""Recorder cannot delete events created by others."""
|
|
from animaltrack.web.auth import can_delete_event
|
|
|
|
assert can_delete_event(recorder_user, "other_user", has_dependents=False) is False
|
|
assert can_delete_event(recorder_user, "other_user", has_dependents=True) is False
|
|
|
|
|
|
# HTTP-level auth middleware tests
|
|
|
|
|
|
class TestGetClientIp:
|
|
"""Tests for client IP extraction."""
|
|
|
|
def test_extracts_from_x_forwarded_for(self):
|
|
"""Extracts client IP from X-Forwarded-For header."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.headers = {"x-forwarded-for": "203.0.113.50"}
|
|
req.client = MagicMock(host="10.0.0.1")
|
|
|
|
assert get_client_ip(req) == "203.0.113.50"
|
|
|
|
def test_extracts_first_ip_from_chain(self):
|
|
"""Extracts first IP when multiple proxies in chain."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.headers = {"x-forwarded-for": "203.0.113.50, 10.0.0.2, 10.0.0.1"}
|
|
req.client = MagicMock(host="10.0.0.1")
|
|
|
|
assert get_client_ip(req) == "203.0.113.50"
|
|
|
|
def test_falls_back_to_client_host(self):
|
|
"""Falls back to direct client IP when no X-Forwarded-For."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.headers = {}
|
|
req.client = MagicMock(host="192.168.1.100")
|
|
|
|
assert get_client_ip(req) == "192.168.1.100"
|
|
|
|
def test_returns_unknown_when_no_client(self):
|
|
"""Returns 'unknown' when no client info available."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import get_client_ip
|
|
|
|
req = MagicMock()
|
|
req.headers = {}
|
|
req.client = None
|
|
|
|
assert get_client_ip(req) == "unknown"
|
|
|
|
|
|
class TestIsTrustedProxy:
|
|
"""Tests for trusted proxy validation."""
|
|
|
|
def test_accepts_trusted_ip(self):
|
|
"""Request from trusted IP proceeds."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import is_trusted_proxy
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="127.0.0.1")
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1,10.0.0.1")
|
|
|
|
assert is_trusted_proxy(req, settings) is True
|
|
|
|
def test_rejects_untrusted_ip(self):
|
|
"""Returns False for untrusted IP."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import is_trusted_proxy
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="192.168.1.1")
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1,10.0.0.1")
|
|
|
|
assert is_trusted_proxy(req, settings) is False
|
|
|
|
def test_empty_trusted_list_rejects_all(self):
|
|
"""When no trusted IPs configured, all requests rejected."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import is_trusted_proxy
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="127.0.0.1")
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="")
|
|
|
|
assert is_trusted_proxy(req, settings) is False
|
|
|
|
def test_rejects_when_no_client(self):
|
|
"""Returns False when no client info available."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import is_trusted_proxy
|
|
|
|
req = MagicMock()
|
|
req.client = None
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1")
|
|
|
|
assert is_trusted_proxy(req, settings) is False
|
|
|
|
|
|
class TestAuthBefore:
|
|
"""Tests for auth_before middleware function."""
|
|
|
|
def test_rejects_untrusted_proxy(self, seeded_db):
|
|
"""Returns 403 when request from untrusted IP."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import auth_before
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="192.168.1.1")
|
|
req.headers = {"x-oidc-username": "ppetru"}
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1")
|
|
|
|
resp = auth_before(req, settings, seeded_db)
|
|
assert resp is not None
|
|
assert resp.status_code == 403
|
|
assert b"not from trusted proxy" in resp.body
|
|
|
|
def test_rejects_missing_auth_header(self, seeded_db):
|
|
"""Returns 401 when auth header is missing."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import auth_before
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="127.0.0.1")
|
|
req.headers = {} # No auth header
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1")
|
|
|
|
resp = auth_before(req, settings, seeded_db)
|
|
assert resp is not None
|
|
assert resp.status_code == 401
|
|
assert b"Missing auth header" in resp.body
|
|
|
|
def test_rejects_unknown_user(self, seeded_db):
|
|
"""Returns 401 when user not in database."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import auth_before
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="127.0.0.1")
|
|
req.headers = {"x-oidc-username": "nonexistent_user"}
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1")
|
|
|
|
resp = auth_before(req, settings, seeded_db)
|
|
assert resp is not None
|
|
assert resp.status_code == 401
|
|
assert b"Unknown user" in resp.body
|
|
|
|
def test_rejects_inactive_user(self, seeded_db):
|
|
"""Returns 401 when user is inactive."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.models.reference import User, UserRole
|
|
from animaltrack.repositories.users import UserRepository
|
|
from animaltrack.web.middleware import auth_before
|
|
|
|
# Create an inactive user
|
|
user_repo = UserRepository(seeded_db)
|
|
inactive = User(
|
|
username="inactive_test",
|
|
role=UserRole.RECORDER,
|
|
active=False,
|
|
created_at_utc=1000000,
|
|
updated_at_utc=1000000,
|
|
)
|
|
user_repo.upsert(inactive)
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="127.0.0.1")
|
|
req.headers = {"x-oidc-username": "inactive_test"}
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1")
|
|
|
|
resp = auth_before(req, settings, seeded_db)
|
|
assert resp is not None
|
|
assert resp.status_code == 401
|
|
assert b"Inactive user" in resp.body
|
|
|
|
def test_sets_user_in_scope(self, seeded_db):
|
|
"""User object is set in req.scope['auth'] on success."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import auth_before
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="127.0.0.1")
|
|
req.headers = {"x-oidc-username": "ppetru"} # From seeds
|
|
req.scope = {}
|
|
|
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.1")
|
|
|
|
resp = auth_before(req, settings, seeded_db)
|
|
assert resp is None # Continue processing
|
|
assert "auth" in req.scope
|
|
assert req.scope["auth"].username == "ppetru"
|
|
assert req.scope["auth"].role == UserRole.ADMIN
|
|
|
|
def test_respects_custom_auth_header_name(self, seeded_db):
|
|
"""Uses AUTH_HEADER_NAME setting for header lookup."""
|
|
from unittest.mock import MagicMock
|
|
|
|
from animaltrack.web.middleware import auth_before
|
|
|
|
req = MagicMock()
|
|
req.client = MagicMock(host="127.0.0.1")
|
|
req.headers = {"x-custom-user": "ppetru"}
|
|
req.scope = {}
|
|
|
|
settings = make_test_settings(
|
|
trusted_proxy_ips="127.0.0.1",
|
|
auth_header_name="X-Custom-User",
|
|
)
|
|
|
|
resp = auth_before(req, settings, seeded_db)
|
|
assert resp is None # Continue processing
|
|
assert "auth" in req.scope
|
|
assert req.scope["auth"].username == "ppetru"
|