|
| 1 | +"""Request pipeline middleware: request ID, correlation, security headers, logging. |
| 2 | +
|
| 3 | +Generates a UUID v4 request_id for every request, propagates correlation IDs, |
| 4 | +applies security headers, and logs each request with status-appropriate level. |
| 5 | +
|
| 6 | +This middleware MUST be the outermost middleware (added last in code) so that |
| 7 | +security headers and X-Request-ID are set on ALL responses, including CORS |
| 8 | +preflight OPTIONS responses handled by CORSMiddleware. |
| 9 | +""" |
| 10 | + |
| 11 | +from __future__ import annotations |
| 12 | + |
| 13 | +import re |
| 14 | +import time |
| 15 | +from typing import Any |
| 16 | +from uuid import uuid4 |
| 17 | + |
| 18 | +import structlog |
| 19 | +from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint |
| 20 | +from starlette.requests import Request |
| 21 | +from starlette.responses import Response |
| 22 | +from starlette.types import ASGIApp |
| 23 | + |
| 24 | +# Correlation ID validation: alphanumeric, hyphens, underscores, dots; max 128 chars. |
| 25 | +# Rejects injection payloads (newlines, control chars) that could forge log entries. |
| 26 | +_CORRELATION_ID_PATTERN = re.compile(r"^[a-zA-Z0-9\-_.]{1,128}$") |
| 27 | + |
| 28 | +# Security headers applied to every response (PRD Section 4.1.12) |
| 29 | +_SECURITY_HEADERS: dict[str, str] = { |
| 30 | + "X-Content-Type-Options": "nosniff", |
| 31 | + "X-Frame-Options": "DENY", |
| 32 | + "X-XSS-Protection": "0", |
| 33 | + "Referrer-Policy": "strict-origin-when-cross-origin", |
| 34 | + "Permissions-Policy": "camera=(), microphone=(), geolocation=()", |
| 35 | +} |
| 36 | + |
| 37 | +# HSTS header only applied in production |
| 38 | +_HSTS_VALUE = "max-age=31536000; includeSubDomains" |
| 39 | + |
| 40 | + |
| 41 | +class RequestPipelineMiddleware(BaseHTTPMiddleware): |
| 42 | + """Middleware that provides request tracing, security headers, and request logging. |
| 43 | +
|
| 44 | + Args: |
| 45 | + app: The ASGI application. |
| 46 | + environment: Deployment environment (e.g. "local", "staging", "production"). |
| 47 | + Controls whether HSTS header is applied. |
| 48 | + """ |
| 49 | + |
| 50 | + def __init__(self, app: ASGIApp, environment: str = "local") -> None: |
| 51 | + super().__init__(app) |
| 52 | + self.environment = environment |
| 53 | + |
| 54 | + async def dispatch( |
| 55 | + self, request: Request, call_next: RequestResponseEndpoint |
| 56 | + ) -> Response: |
| 57 | + # 1. Generate request_id (UUID v4) |
| 58 | + request_id = str(uuid4()) |
| 59 | + |
| 60 | + # 2. Read X-Correlation-ID or fall back to request_id. |
| 61 | + # Validate format to prevent log injection (SEC-001). |
| 62 | + raw_correlation = request.headers.get("x-correlation-id") |
| 63 | + if raw_correlation and _CORRELATION_ID_PATTERN.match(raw_correlation): |
| 64 | + correlation_id = raw_correlation |
| 65 | + else: |
| 66 | + correlation_id = request_id |
| 67 | + |
| 68 | + # 3. Store in request.state for downstream handlers and error handlers |
| 69 | + request.state.request_id = request_id |
| 70 | + request.state.correlation_id = correlation_id |
| 71 | + |
| 72 | + # 4. Bind to structlog contextvars for automatic inclusion in all logs |
| 73 | + structlog.contextvars.clear_contextvars() |
| 74 | + structlog.contextvars.bind_contextvars( |
| 75 | + request_id=request_id, |
| 76 | + correlation_id=correlation_id, |
| 77 | + ) |
| 78 | + |
| 79 | + # 5. Record start time |
| 80 | + start = time.perf_counter() |
| 81 | + |
| 82 | + # 6. Process request — wrap in try/except so headers are set even |
| 83 | + # when an exception propagates past all exception handlers. |
| 84 | + try: |
| 85 | + response = await call_next(request) |
| 86 | + except Exception: |
| 87 | + # Log the exception so it's not silently swallowed (BUG-001). |
| 88 | + # In practice, global exception handlers (errors.py) catch most |
| 89 | + # exceptions before they reach here. |
| 90 | + structlog.get_logger().exception( |
| 91 | + "unhandled_exception_in_middleware", |
| 92 | + method=request.method, |
| 93 | + path=request.url.path, |
| 94 | + ) |
| 95 | + response = Response( |
| 96 | + content='{"error":"INTERNAL_ERROR","message":"An unexpected error occurred."}', |
| 97 | + status_code=500, |
| 98 | + media_type="application/json", |
| 99 | + ) |
| 100 | + |
| 101 | + # 7. Calculate duration |
| 102 | + duration_ms = round((time.perf_counter() - start) * 1000, 2) |
| 103 | + |
| 104 | + # 8. Apply security headers |
| 105 | + _apply_security_headers(response, self.environment) |
| 106 | + |
| 107 | + # 9. Set X-Request-ID response header |
| 108 | + response.headers["X-Request-ID"] = request_id |
| 109 | + |
| 110 | + # 10. Log request with appropriate level based on status code |
| 111 | + _log_request(request, response, duration_ms) |
| 112 | + |
| 113 | + # 11. Clear contextvars after logging to prevent leakage (FUNC-001) |
| 114 | + structlog.contextvars.clear_contextvars() |
| 115 | + |
| 116 | + return response |
| 117 | + |
| 118 | + |
| 119 | +def _apply_security_headers(response: Response, environment: str) -> None: |
| 120 | + """Apply security headers to the response.""" |
| 121 | + for header, value in _SECURITY_HEADERS.items(): |
| 122 | + response.headers[header] = value |
| 123 | + |
| 124 | + if environment == "production": |
| 125 | + response.headers["Strict-Transport-Security"] = _HSTS_VALUE |
| 126 | + |
| 127 | + |
| 128 | +def _log_request(request: Request, response: Response, duration_ms: float) -> None: |
| 129 | + """Log the completed request at the appropriate level. |
| 130 | +
|
| 131 | + - 2xx → info |
| 132 | + - 4xx → warning |
| 133 | + - 5xx → error |
| 134 | + """ |
| 135 | + logger: Any = structlog.get_logger() |
| 136 | + |
| 137 | + log_kwargs: dict[str, Any] = { |
| 138 | + "method": request.method, |
| 139 | + "path": request.url.path, |
| 140 | + "status_code": response.status_code, |
| 141 | + "duration_ms": duration_ms, |
| 142 | + } |
| 143 | + |
| 144 | + # Include user_id if set by auth middleware/handler |
| 145 | + user_id = getattr(request.state, "user_id", None) |
| 146 | + if user_id is not None: |
| 147 | + log_kwargs["user_id"] = user_id |
| 148 | + |
| 149 | + status = response.status_code |
| 150 | + if status >= 500: |
| 151 | + logger.error("request_completed", **log_kwargs) |
| 152 | + elif status >= 400: |
| 153 | + logger.warning("request_completed", **log_kwargs) |
| 154 | + else: |
| 155 | + logger.info("request_completed", **log_kwargs) |
0 commit comments