Compare commits
4 Commits
240cf440cb
...
3ac1e1140a
| Author | SHA1 | Date | |
|---|---|---|---|
| 3ac1e1140a | |||
| 743fe9d68d | |||
| 06421f38bb | |||
| f2145e4827 |
52
.gitea/workflows/deploy.yaml
Normal file
52
.gitea/workflows/deploy.yaml
Normal file
@@ -0,0 +1,52 @@
|
|||||||
|
name: Build and Deploy
|
||||||
|
|
||||||
|
on:
|
||||||
|
push:
|
||||||
|
branches: [main]
|
||||||
|
|
||||||
|
env:
|
||||||
|
REGISTRY: gitea.v.paler.net
|
||||||
|
IMAGE: ppetru/animaltrack
|
||||||
|
|
||||||
|
jobs:
|
||||||
|
build-and-deploy:
|
||||||
|
runs-on: nix
|
||||||
|
steps:
|
||||||
|
- uses: actions/checkout@v4
|
||||||
|
|
||||||
|
- name: Build Docker image
|
||||||
|
run: |
|
||||||
|
nix build .#dockerImage --out-link result
|
||||||
|
|
||||||
|
- name: Push to registry
|
||||||
|
run: |
|
||||||
|
nix shell nixpkgs#skopeo -c skopeo copy \
|
||||||
|
--dest-creds "${{ secrets.REGISTRY_USERNAME }}:${{ secrets.REGISTRY_PASSWORD }}" \
|
||||||
|
--insecure-policy \
|
||||||
|
docker-archive:result \
|
||||||
|
docker://${{ env.REGISTRY }}/${{ env.IMAGE }}:latest
|
||||||
|
|
||||||
|
- name: Deploy to Nomad
|
||||||
|
env:
|
||||||
|
NOMAD_ADDR: ${{ secrets.NOMAD_ADDR }}
|
||||||
|
run: |
|
||||||
|
# Force re-evaluation (uuid changes on each job run)
|
||||||
|
curl -sS -X POST "$NOMAD_ADDR/v1/job/animaltrack/evaluate"
|
||||||
|
|
||||||
|
# Wait for deployment
|
||||||
|
sleep 5
|
||||||
|
DEPLOY_ID=$(curl -sS "$NOMAD_ADDR/v1/job/animaltrack/deployments" | \
|
||||||
|
nix shell nixpkgs#jq -c jq -r '.[0].ID')
|
||||||
|
|
||||||
|
for i in $(seq 1 30); do
|
||||||
|
STATUS=$(curl -sS "$NOMAD_ADDR/v1/deployment/$DEPLOY_ID" | \
|
||||||
|
nix shell nixpkgs#jq -c jq -r '.Status')
|
||||||
|
echo "Deployment status: $STATUS"
|
||||||
|
case $STATUS in
|
||||||
|
successful) exit 0 ;;
|
||||||
|
failed|cancelled) exit 1 ;;
|
||||||
|
esac
|
||||||
|
sleep 10
|
||||||
|
done
|
||||||
|
echo "Timeout waiting for deployment"
|
||||||
|
exit 1
|
||||||
@@ -53,8 +53,8 @@ pkgs.dockerTools.buildImage {
|
|||||||
config = {
|
config = {
|
||||||
Env = [
|
Env = [
|
||||||
"DB_PATH=/var/lib/animaltrack/animaltrack.db"
|
"DB_PATH=/var/lib/animaltrack/animaltrack.db"
|
||||||
"PATH=${pkgs.lib.makeBinPath [ pkgs.busybox pkgs.bash pkgs.sqlite pythonEnv ]}"
|
"PATH=${pkgs.lib.makeBinPath [ pkgs.busybox pkgs.bash pkgs.sqlite pythonEnv animaltrack ]}"
|
||||||
"PYTHONPATH=${pythonEnv}/${pythonEnv.sitePackages}"
|
"PYTHONPATH=${pythonEnv}/${pythonEnv.sitePackages}:${animaltrack}/${pythonEnv.sitePackages}"
|
||||||
"PYTHONUNBUFFERED=1"
|
"PYTHONUNBUFFERED=1"
|
||||||
];
|
];
|
||||||
ExposedPorts = {
|
ExposedPorts = {
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
# ABOUTME: Application configuration loaded from environment variables.
|
# ABOUTME: Application configuration loaded from environment variables.
|
||||||
# ABOUTME: Uses Pydantic Settings for validation and type coercion.
|
# ABOUTME: Uses Pydantic Settings for validation and type coercion.
|
||||||
|
|
||||||
|
import ipaddress
|
||||||
import logging
|
import logging
|
||||||
from functools import cached_property
|
from functools import cached_property
|
||||||
|
|
||||||
@@ -44,6 +45,43 @@ class Settings(BaseSettings):
|
|||||||
"""Parse trusted proxy IPs from comma-separated raw string."""
|
"""Parse trusted proxy IPs from comma-separated raw string."""
|
||||||
return _parse_comma_separated(self.trusted_proxy_ips_raw)
|
return _parse_comma_separated(self.trusted_proxy_ips_raw)
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def trusted_proxy_networks(
|
||||||
|
self,
|
||||||
|
) -> list[ipaddress.IPv4Network | ipaddress.IPv6Network]:
|
||||||
|
"""Parse trusted proxy IPs/CIDRs into network objects.
|
||||||
|
|
||||||
|
Plain IPs become /32 (IPv4) or /128 (IPv6) networks.
|
||||||
|
CIDR notation is parsed directly.
|
||||||
|
Entries that cannot be parsed as IP/CIDR are skipped (handled by
|
||||||
|
trusted_proxy_literals for backwards compatibility).
|
||||||
|
"""
|
||||||
|
networks = []
|
||||||
|
for entry in self.trusted_proxy_ips:
|
||||||
|
try:
|
||||||
|
# ip_network with strict=False allows "192.168.1.1/24" to work
|
||||||
|
# (normalizes to "192.168.1.0/24")
|
||||||
|
network = ipaddress.ip_network(entry, strict=False)
|
||||||
|
networks.append(network)
|
||||||
|
except ValueError:
|
||||||
|
# Not a valid IP/network - will be handled by trusted_proxy_literals
|
||||||
|
pass
|
||||||
|
return networks
|
||||||
|
|
||||||
|
@cached_property
|
||||||
|
def trusted_proxy_literals(self) -> frozenset[str]:
|
||||||
|
"""Get non-IP entries for exact string matching.
|
||||||
|
|
||||||
|
For backwards compatibility with entries like "testclient" in tests.
|
||||||
|
"""
|
||||||
|
literals = set()
|
||||||
|
for entry in self.trusted_proxy_ips:
|
||||||
|
try:
|
||||||
|
ipaddress.ip_network(entry, strict=False)
|
||||||
|
except ValueError:
|
||||||
|
literals.add(entry)
|
||||||
|
return frozenset(literals)
|
||||||
|
|
||||||
@field_validator("log_level", mode="before")
|
@field_validator("log_level", mode="before")
|
||||||
@classmethod
|
@classmethod
|
||||||
def normalize_and_validate_log_level(cls, v: str) -> str:
|
def normalize_and_validate_log_level(cls, v: str) -> str:
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
# ABOUTME: Middleware functions for authentication, CSRF, and request logging.
|
# ABOUTME: Middleware functions for authentication, CSRF, and request logging.
|
||||||
# ABOUTME: Implements Beforeware pattern for FastHTML request processing.
|
# ABOUTME: Implements Beforeware pattern for FastHTML request processing.
|
||||||
|
|
||||||
|
import ipaddress
|
||||||
import json
|
import json
|
||||||
import time
|
import time
|
||||||
from urllib.parse import urlparse
|
from urllib.parse import urlparse
|
||||||
@@ -112,7 +113,7 @@ def get_client_ip(req: Request) -> str:
|
|||||||
|
|
||||||
|
|
||||||
def is_trusted_proxy(req: Request, settings: Settings) -> bool:
|
def is_trusted_proxy(req: Request, settings: Settings) -> bool:
|
||||||
"""Check if request comes from a trusted proxy IP.
|
"""Check if request comes from a trusted proxy IP or CIDR range.
|
||||||
|
|
||||||
Args:
|
Args:
|
||||||
req: The Starlette request object.
|
req: The Starlette request object.
|
||||||
@@ -121,18 +122,35 @@ def is_trusted_proxy(req: Request, settings: Settings) -> bool:
|
|||||||
Returns:
|
Returns:
|
||||||
True if request is from trusted proxy, False otherwise.
|
True if request is from trusted proxy, False otherwise.
|
||||||
"""
|
"""
|
||||||
trusted_ips = settings.trusted_proxy_ips
|
trusted_networks = settings.trusted_proxy_networks
|
||||||
if not trusted_ips:
|
trusted_literals = settings.trusted_proxy_literals
|
||||||
|
|
||||||
|
if not trusted_networks and not trusted_literals:
|
||||||
# If no trusted IPs configured, reject all (fail-secure)
|
# If no trusted IPs configured, reject all (fail-secure)
|
||||||
return False
|
return False
|
||||||
|
|
||||||
# Get the immediate connection IP (not X-Forwarded-For)
|
# Get the immediate connection IP (not X-Forwarded-For)
|
||||||
if req.client:
|
if req.client:
|
||||||
client_ip = req.client.host
|
client_ip_str = req.client.host
|
||||||
else:
|
else:
|
||||||
return False
|
return False
|
||||||
|
|
||||||
return client_ip in trusted_ips
|
# Check literal matches first (for backwards compatibility with "testclient" etc)
|
||||||
|
if client_ip_str in trusted_literals:
|
||||||
|
return True
|
||||||
|
|
||||||
|
# Try to parse as IP address for network matching
|
||||||
|
try:
|
||||||
|
client_ip = ipaddress.ip_address(client_ip_str)
|
||||||
|
except ValueError:
|
||||||
|
# Not a valid IP and not in literals
|
||||||
|
return False
|
||||||
|
|
||||||
|
for network in trusted_networks:
|
||||||
|
if client_ip in network:
|
||||||
|
return True
|
||||||
|
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def get_expected_host(req: Request, settings: Settings) -> str:
|
def get_expected_host(req: Request, settings: Settings) -> str:
|
||||||
@@ -194,7 +212,10 @@ def auth_before(req: Request, settings: Settings, db) -> Response | None:
|
|||||||
|
|
||||||
# Check trusted proxy
|
# Check trusted proxy
|
||||||
if not is_trusted_proxy(req, settings):
|
if not is_trusted_proxy(req, settings):
|
||||||
return PlainTextResponse("Forbidden: Request not from trusted proxy", status_code=403)
|
client_ip = req.client.host if req.client else "unknown"
|
||||||
|
return PlainTextResponse(
|
||||||
|
f"Forbidden: Request not from trusted proxy (source: {client_ip})", status_code=403
|
||||||
|
)
|
||||||
|
|
||||||
# Extract username from auth header
|
# Extract username from auth header
|
||||||
username = req.headers.get(settings.auth_header_name.lower())
|
username = req.headers.get(settings.auth_header_name.lower())
|
||||||
|
|||||||
@@ -227,6 +227,109 @@ class TestTrustedProxyIPs:
|
|||||||
settings = Settings()
|
settings = Settings()
|
||||||
assert settings.trusted_proxy_ips == []
|
assert settings.trusted_proxy_ips == []
|
||||||
|
|
||||||
|
def test_cidr_notation_parsed(self):
|
||||||
|
"""CIDR notation should be parsed into networks."""
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"TRUSTED_PROXY_IPS": "192.168.1.0/24", "CSRF_SECRET": "test-secret"},
|
||||||
|
clear=True,
|
||||||
|
):
|
||||||
|
from animaltrack.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
networks = settings.trusted_proxy_networks
|
||||||
|
assert len(networks) == 1
|
||||||
|
assert isinstance(networks[0], ipaddress.IPv4Network)
|
||||||
|
assert str(networks[0]) == "192.168.1.0/24"
|
||||||
|
|
||||||
|
def test_plain_ip_parsed_as_single_host_network(self):
|
||||||
|
"""Plain IP should be parsed as /32 network."""
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"TRUSTED_PROXY_IPS": "10.0.0.1", "CSRF_SECRET": "test-secret"},
|
||||||
|
clear=True,
|
||||||
|
):
|
||||||
|
from animaltrack.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
networks = settings.trusted_proxy_networks
|
||||||
|
assert len(networks) == 1
|
||||||
|
assert isinstance(networks[0], ipaddress.IPv4Network)
|
||||||
|
# Plain IP becomes /32 network
|
||||||
|
assert networks[0].num_addresses == 1
|
||||||
|
|
||||||
|
def test_mixed_ips_and_cidrs(self):
|
||||||
|
"""Mix of plain IPs and CIDR notation should all be parsed."""
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{
|
||||||
|
"TRUSTED_PROXY_IPS": "10.0.0.1,192.168.0.0/16,172.16.0.0/12",
|
||||||
|
"CSRF_SECRET": "test-secret",
|
||||||
|
},
|
||||||
|
clear=True,
|
||||||
|
):
|
||||||
|
from animaltrack.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
networks = settings.trusted_proxy_networks
|
||||||
|
assert len(networks) == 3
|
||||||
|
# All should be network objects
|
||||||
|
assert all(
|
||||||
|
isinstance(n, (ipaddress.IPv4Network, ipaddress.IPv6Network)) for n in networks
|
||||||
|
)
|
||||||
|
|
||||||
|
def test_ipv6_cidr_supported(self):
|
||||||
|
"""IPv6 CIDR notation should be supported."""
|
||||||
|
import ipaddress
|
||||||
|
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"TRUSTED_PROXY_IPS": "::1,fd00::/8", "CSRF_SECRET": "test-secret"},
|
||||||
|
clear=True,
|
||||||
|
):
|
||||||
|
from animaltrack.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
networks = settings.trusted_proxy_networks
|
||||||
|
assert len(networks) == 2
|
||||||
|
assert any(isinstance(n, ipaddress.IPv6Network) for n in networks)
|
||||||
|
|
||||||
|
def test_invalid_cidr_becomes_literal(self):
|
||||||
|
"""Invalid CIDR notation should become a literal for string matching."""
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"TRUSTED_PROXY_IPS": "192.168.1.0/33", "CSRF_SECRET": "test-secret"},
|
||||||
|
clear=True,
|
||||||
|
):
|
||||||
|
from animaltrack.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
# Invalid CIDR should not appear in networks
|
||||||
|
assert len(settings.trusted_proxy_networks) == 0
|
||||||
|
# But should appear in literals
|
||||||
|
assert "192.168.1.0/33" in settings.trusted_proxy_literals
|
||||||
|
|
||||||
|
def test_invalid_ip_becomes_literal(self):
|
||||||
|
"""Invalid IP address should become a literal for string matching."""
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"TRUSTED_PROXY_IPS": "not-an-ip", "CSRF_SECRET": "test-secret"},
|
||||||
|
clear=True,
|
||||||
|
):
|
||||||
|
from animaltrack.config import Settings
|
||||||
|
|
||||||
|
settings = Settings()
|
||||||
|
# Invalid IP should not appear in networks
|
||||||
|
assert len(settings.trusted_proxy_networks) == 0
|
||||||
|
# But should appear in literals
|
||||||
|
assert "not-an-ip" in settings.trusted_proxy_literals
|
||||||
|
|
||||||
|
|
||||||
class TestCsrfSecretRequired:
|
class TestCsrfSecretRequired:
|
||||||
"""Test that CSRF_SECRET is required."""
|
"""Test that CSRF_SECRET is required."""
|
||||||
|
|||||||
@@ -300,3 +300,89 @@ class TestLoggingAfter:
|
|||||||
assert before_ms <= parsed["ts"] <= after_ms
|
assert before_ms <= parsed["ts"] <= after_ms
|
||||||
# Should be a reasonable timestamp (year 2020+)
|
# Should be a reasonable timestamp (year 2020+)
|
||||||
assert parsed["ts"] > 1577836800000 # 2020-01-01
|
assert parsed["ts"] > 1577836800000 # 2020-01-01
|
||||||
|
|
||||||
|
|
||||||
|
class TestIsTrustedProxyCIDR:
|
||||||
|
"""Tests for CIDR support in is_trusted_proxy."""
|
||||||
|
|
||||||
|
def test_ip_within_cidr_is_trusted(self):
|
||||||
|
"""IP within CIDR range should be trusted."""
|
||||||
|
from animaltrack.web.middleware import is_trusted_proxy
|
||||||
|
|
||||||
|
req = MagicMock()
|
||||||
|
req.client = MagicMock(host="192.168.1.50")
|
||||||
|
|
||||||
|
settings = make_test_settings(trusted_proxy_ips="192.168.1.0/24")
|
||||||
|
|
||||||
|
assert is_trusted_proxy(req, settings) is True
|
||||||
|
|
||||||
|
def test_ip_outside_cidr_not_trusted(self):
|
||||||
|
"""IP outside CIDR range should not be trusted."""
|
||||||
|
from animaltrack.web.middleware import is_trusted_proxy
|
||||||
|
|
||||||
|
req = MagicMock()
|
||||||
|
req.client = MagicMock(host="192.168.2.50")
|
||||||
|
|
||||||
|
settings = make_test_settings(trusted_proxy_ips="192.168.1.0/24")
|
||||||
|
|
||||||
|
assert is_trusted_proxy(req, settings) is False
|
||||||
|
|
||||||
|
def test_exact_ip_still_works(self):
|
||||||
|
"""Exact IP matching should still work."""
|
||||||
|
from animaltrack.web.middleware import is_trusted_proxy
|
||||||
|
|
||||||
|
req = MagicMock()
|
||||||
|
req.client = MagicMock(host="10.0.0.1")
|
||||||
|
|
||||||
|
settings = make_test_settings(trusted_proxy_ips="10.0.0.1")
|
||||||
|
|
||||||
|
assert is_trusted_proxy(req, settings) is True
|
||||||
|
|
||||||
|
def test_mixed_exact_and_cidr(self):
|
||||||
|
"""Mix of exact IPs and CIDR should work."""
|
||||||
|
from animaltrack.web.middleware import is_trusted_proxy
|
||||||
|
|
||||||
|
settings = make_test_settings(trusted_proxy_ips="10.0.0.1,192.168.0.0/16")
|
||||||
|
|
||||||
|
# Exact IP match
|
||||||
|
req1 = MagicMock()
|
||||||
|
req1.client = MagicMock(host="10.0.0.1")
|
||||||
|
assert is_trusted_proxy(req1, settings) is True
|
||||||
|
|
||||||
|
# CIDR match
|
||||||
|
req2 = MagicMock()
|
||||||
|
req2.client = MagicMock(host="192.168.100.200")
|
||||||
|
assert is_trusted_proxy(req2, settings) is True
|
||||||
|
|
||||||
|
# No match
|
||||||
|
req3 = MagicMock()
|
||||||
|
req3.client = MagicMock(host="172.16.0.1")
|
||||||
|
assert is_trusted_proxy(req3, settings) is False
|
||||||
|
|
||||||
|
def test_ipv6_cidr_matching(self):
|
||||||
|
"""IPv6 CIDR matching should work."""
|
||||||
|
from animaltrack.web.middleware import is_trusted_proxy
|
||||||
|
|
||||||
|
settings = make_test_settings(trusted_proxy_ips="fd00::/8")
|
||||||
|
|
||||||
|
req = MagicMock()
|
||||||
|
req.client = MagicMock(host="fd12:3456:789a::1")
|
||||||
|
assert is_trusted_proxy(req, settings) is True
|
||||||
|
|
||||||
|
req2 = MagicMock()
|
||||||
|
req2.client = MagicMock(host="fe80::1")
|
||||||
|
assert is_trusted_proxy(req2, settings) is False
|
||||||
|
|
||||||
|
def test_localhost_cidr(self):
|
||||||
|
"""Localhost CIDR should work."""
|
||||||
|
from animaltrack.web.middleware import is_trusted_proxy
|
||||||
|
|
||||||
|
settings = make_test_settings(trusted_proxy_ips="127.0.0.0/8")
|
||||||
|
|
||||||
|
req = MagicMock()
|
||||||
|
req.client = MagicMock(host="127.0.0.1")
|
||||||
|
assert is_trusted_proxy(req, settings) is True
|
||||||
|
|
||||||
|
req2 = MagicMock()
|
||||||
|
req2.client = MagicMock(host="127.255.255.255")
|
||||||
|
assert is_trusted_proxy(req2, settings) is True
|
||||||
|
|||||||
Reference in New Issue
Block a user