implement rate limiting for authentication routes and add custom error handling page

This commit is contained in:
Eduardo Silva
2026-03-16 13:42:20 -03:00
parent 685b4eb971
commit e1f128f217
6 changed files with 61 additions and 2 deletions

View File

@@ -0,0 +1,14 @@
from fastapi import Request
from slowapi import Limiter
AUTH_RATE_LIMIT = "5/minute"
def get_real_client_ip(request: Request) -> str:
forwarded_for = request.headers.get("x-forwarded-for", "")
if forwarded_for:
return forwarded_for.split(",")[0].strip()
return request.client.host if request.client else "unknown"
limiter = Limiter(key_func=get_real_client_ip)

View File

@@ -4,6 +4,7 @@ from contextlib import asynccontextmanager
from pathlib import Path
from auth_gateway.config_loader import RuntimeConfigStore
from auth_gateway.limiter import get_real_client_ip, limiter
from auth_gateway.services.oidc_service import OIDCService
from auth_gateway.services.session_service import SessionService
from auth_gateway.settings import settings
@@ -11,8 +12,10 @@ from auth_gateway.storage.sqlite import SQLiteStorage
from auth_gateway.web.auth_routes import router as auth_router
from auth_gateway.web.login_routes import router as login_router
from fastapi import FastAPI, Request
from fastapi.responses import HTMLResponse
from fastapi.staticfiles import StaticFiles
from fastapi.templating import Jinja2Templates
from slowapi.errors import RateLimitExceeded
BASE_DIR = Path(__file__).resolve().parent
_access_logger = logging.getLogger("uvicorn.error")
@@ -30,6 +33,32 @@ async def lifespan(app: FastAPI):
app = FastAPI(title="Auth Gateway", lifespan=lifespan)
app.state.limiter = limiter
async def _rate_limit_handler(request: Request, exc: RateLimitExceeded) -> HTMLResponse:
username = None
try:
form = await request.form()
username = form.get("username")
except Exception:
pass
client = get_real_client_ip(request)
if username:
_access_logger.warning("AUTH rate limit exceeded for '%s' on %s from %s", username, request.url.path, client)
else:
_access_logger.warning("AUTH rate limit exceeded on %s from %s", request.url.path, client)
templates = request.app.state.templates
external_path = request.app.state.settings.external_path
return templates.TemplateResponse(
"ratelimit.html",
{"request": request, "external_path": external_path, "back_url": str(request.url.path)},
status_code=429,
)
app.add_exception_handler(RateLimitExceeded, _rate_limit_handler)
@app.middleware("http")
@@ -39,7 +68,7 @@ async def access_log(request: Request, call_next):
if request.url.path == "/auth/check" and response.status_code == 200:
return response
ms = (time.monotonic() - start) * 1000
client = request.client.host if request.client else "-"
client = get_real_client_ip(request)
_access_logger.info('%s - "%s %s" %d (%.0fms)', client, request.method, request.url.path, response.status_code, ms)
return response

View File

@@ -0,0 +1,7 @@
{% extends "base.html" %}
{% block title %}Too many attempts — Gatekeeper{% endblock %}
{% block content %}
<h1 class="card-title">Too many attempts</h1>
<p class="card-subtitle">You have made too many requests in a short period. Please wait a moment before trying again.</p>
<a class="btn btn-secondary" href="{{ back_url }}">Try again</a>
{% endblock %}

View File

@@ -19,6 +19,7 @@ from auth_gateway.web.dependencies import (
resolve_context_from_request,
session_is_allowed,
)
from auth_gateway.limiter import AUTH_RATE_LIMIT, limiter
from fastapi import APIRouter, Form, Request
from fastapi.responses import HTMLResponse, RedirectResponse
@@ -118,6 +119,7 @@ async def login_password_page(request: Request, next: str = "/"):
@router.post("/login/password")
@limiter.limit(AUTH_RATE_LIMIT)
async def login_password_submit(request: Request, next: str = Form("/"), username: str = Form(...), password: str = Form(...)):
runtime_config = get_runtime_config(request)
context = resolve_context_from_request(request, runtime_config, next)
@@ -168,6 +170,7 @@ async def login_totp_page(request: Request, next: str = "/"):
@router.post("/login/totp")
@limiter.limit(AUTH_RATE_LIMIT)
async def login_totp_submit(request: Request, next: str = Form("/"), token: str = Form(...)):
runtime_config = get_runtime_config(request)
context = resolve_context_from_request(request, runtime_config, next)
@@ -204,6 +207,7 @@ async def login_totp_submit(request: Request, next: str = Form("/"), token: str
@router.get("/login/oidc/start")
@limiter.limit(AUTH_RATE_LIMIT)
async def login_oidc_start(request: Request, next: str = "/"):
runtime_config = get_runtime_config(request)
context = resolve_context_from_request(request, runtime_config, next)