diff --git a/CHANGELOG.md b/CHANGELOG.md index 1f0b1e7..0e6b767 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,93 @@ and this file MUST be updated together whenever `__version__` changes. --- +## [0.8.0-dev10] — Webhook & API security hardening (security) + +A full security review of the inbound webhook surface (requested before +opening a public firewall port for the Meraki receiver) found nine +issues spanning authentication, exposure, DoS, and replay. This release +fixes all of them. The theme is **fail closed**: receivers reject +requests they cannot authenticate, the public ingress exposes only the +receiver/health paths, and request bodies are bounded everywhere. + +### New configuration (core secret / env) + +All optional; secure defaults. Set via the `netcortex/core` secret or +the matching `NETCORTEX_*` env var. + +| Key | Default | Purpose | +| --- | --- | --- | +| `api_secret` | `""` | When set, `/`, `/api/*`, `/metrics`, docs, and the telemetry SSE monitor require `Authorization: Bearer `. | +| `webhook_allow_unsigned` | `false` | Master fail-open switch. `false` = reject webhooks for tenants with no configured secret. Enable only to bootstrap a new receiver. | +| `webhook_max_body_bytes` | `1048576` (1 MiB) | Hard cap on webhook/telemetry request bodies (413 over cap). | +| `webhook_replay_window_seconds` | `300` | Reject webhooks whose trusted timestamp (e.g. Meraki `sentAt`) is outside this window. `0` disables. | +| `telemetry_secret` | `""` | Shared `X-Telemetry-Token` required on the HTTP telemetry-push ingest. | +| `cors_allow_origins` | `[]` | Explicit browser-origin allow-list. We never ship `*`. | + +### Findings fixed + +- **F1 — HMAC/token auth failed open when no secret was configured.** + `meraki.py` and `catalyst_center.py` now **fail closed** (503) when no + signing secret is provisioned for a tenant, unless + `webhook_allow_unsigned=true`. Previously they accepted the request + with only a warning. +- **F2 — Entire API + topology exposed unauthenticated on the public + port.** Two layers: (a) a new app-level `_api_auth` middleware gates + every non-receiver path behind `api_secret` when set; (b) the Helm + ingress now defaults to `exposeApi=false`, publishing only + `/webhooks`, `/ingest`, and `/health` — the UI/API/metrics/MCP stay + cluster-internal unless `exposeApi=true` is opted into. +- **F3 — No request body-size limit (memory-exhaustion DoS).** New + in-handler `Content-Length` + materialized-body guards (413) plus an + ingress `proxy-body-size: 1m` annotation. +- **F4 — Unauthenticated amplification via background adapter sync.** + Webhook-driven syncs now funnel through a coalescing scheduler + (`sync_coalesce.py`): single-flight + trailing coalesce + min-interval + + global concurrency cap. The generic catch-all endpoint is now gated + behind the administrative `api_secret`. +- **F5 — Non-constant-time token comparison (Catalyst Center).** Now + uses `hmac.compare_digest`. +- **F6 — Unauthenticated telemetry ingest + SSE stream.** Ingest + requires `X-Telemetry-Token` (`telemetry_secret`); the SSE monitor is + gated behind `api_secret`. Both are body-size capped. +- **F7 — Nexus Dashboard API key never verified.** New dedicated + `nexus_dashboard.py` handler verifies `X-ND-API-Key` in constant time + and fails closed. +- **F8 — Broad CORS + info leak via `/api` and `/metrics`.** CORS no + longer ships `*` (explicit allow-list via `NETCORTEX_CORS_ALLOW_ORIGINS`, + default none); `/metrics` and `/api/*` are covered by `_api_auth`. +- **F9 — No webhook replay protection.** Meraki deliveries are rejected + (403) when `sentAt` falls outside `webhook_replay_window_seconds`. + +### Shared hardening module + +New `netcortex/webhooks/auth.py` centralizes body-size enforcement, +fail-closed gating, constant-time comparison, replay checks, and the +admin/telemetry token gates — so every current and future receiver +inherits the same controls in one place (the same consolidation that +would have prevented the dev9 per-vendor regression). + +### Operational notes + +- **The status UI is no longer public by default.** After deploy, reach + it via `kubectl port-forward svc/-web 8000:8000`, or set + `ingress.exposeApi=true` **and** an `api_secret`. +- Store webhook secrets at `netcortex/webhooks//` + (`shared_secret`, or `api_key` for Nexus Dashboard) before the + receiver will accept traffic. + +### Tests + +- `tests/webhooks/test_auth_helpers.py` — the shared auth/guard helpers. +- `tests/webhooks/test_other_webhook_routes.py` — Catalyst Center, + Nexus Dashboard, generic, and telemetry route auth. +- `tests/webhooks/test_sync_coalesce.py` — coalescing scheduler. +- `tests/test_api_auth.py` — API-auth middleware + path classification. +- Updated `tests/webhooks/test_meraki_webhook_route.py` for fail-closed, + replay (403), and body-size (413) behavior. + +--- + ## [0.8.0-dev9] — Fix webhook HMAC verify silently failing open (security) **Pre-existing bug caught by the dev8 deploy.** While verifying the diff --git a/deploy/helm/templates/ingress.yaml b/deploy/helm/templates/ingress.yaml index de75c20..99b00fb 100644 --- a/deploy/helm/templates/ingress.yaml +++ b/deploy/helm/templates/ingress.yaml @@ -5,8 +5,17 @@ metadata: name: {{ include "netcortex.fullname" . }} labels: {{- include "netcortex.labels" . | nindent 4 }} - {{- with .Values.ingress.annotations }} annotations: + # Body-size cap (F3, 0.8.0-dev10): outermost defense against + # memory-exhaustion via large unauthenticated POST bodies. Keep aligned + # with the in-app webhook_max_body_bytes guard (default 1 MiB). + nginx.ingress.kubernetes.io/proxy-body-size: {{ .Values.ingress.proxyBodySize | quote }} + # Per-source request-rate cap (defense-in-depth against webhook floods). + {{- if .Values.ingress.rateLimit.enabled }} + nginx.ingress.kubernetes.io/limit-rps: {{ .Values.ingress.rateLimit.rps | quote }} + nginx.ingress.kubernetes.io/limit-connections: {{ .Values.ingress.rateLimit.connections | quote }} + {{- end }} + {{- with .Values.ingress.annotations }} {{- toYaml . | nindent 4 }} {{- end }} spec: @@ -23,6 +32,10 @@ spec: - host: {{ .Values.ingress.hostname }} http: paths: + {{- if .Values.ingress.exposeApi }} + # exposeApi=true: the whole app (status UI, /api, /metrics, /mcp) + # is reachable on this host. ONLY do this on a trusted network or + # with api_secret set so /api and /metrics require a bearer token. - path: / pathType: Prefix backend: @@ -30,4 +43,19 @@ spec: name: {{ include "netcortex.fullname" . }}-web port: number: {{ .Values.service.port }} + {{- else }} + # Default (F2, 0.8.0-dev10): expose ONLY the public receiver and + # health surface. The status UI, topology/inventory API, metrics, + # and MCP endpoint stay cluster-internal (reach them via + # `kubectl port-forward` or a separate internal-only ingress). + {{- range .Values.ingress.publicPaths }} + - path: {{ . }} + pathType: Prefix + backend: + service: + name: {{ include "netcortex.fullname" $ }}-web + port: + number: {{ $.Values.service.port }} + {{- end }} + {{- end }} {{- end }} diff --git a/deploy/helm/values.yaml b/deploy/helm/values.yaml index ac0470c..1c7b08d 100644 --- a/deploy/helm/values.yaml +++ b/deploy/helm/values.yaml @@ -220,6 +220,34 @@ ingress: # annotations: # route.openshift.io/termination: edge + # --------------------------------------------------------------------------- + # Public exposure surface (0.8.0-dev10 security hardening) + # --------------------------------------------------------------------------- + # exposeApi=false (default, SECURE): the public ingress serves ONLY the + # webhook/telemetry receiver and health paths below. The status UI, the + # topology/inventory API (/api/*), /metrics, and the MCP endpoint stay + # cluster-internal. Reach them with `kubectl port-forward svc/-web + # 8000:8000` or a separate internal-only ingress. + # + # exposeApi=true: serve the WHOLE app on this host. Only do this on a + # trusted network, or set `api_secret` in the core secret so /api, + # /metrics, /mcp, and the UI require `Authorization: Bearer `. + exposeApi: false + publicPaths: + - /webhooks + - /ingest + - /health + + # Hard cap on request body size at the ingress (F3). Mirror the in-app + # webhook_max_body_bytes (default 1 MiB). + proxyBodySize: "1m" + + # Per-source rate limiting (defense-in-depth against webhook floods). + rateLimit: + enabled: true + rps: 20 # requests/second per client IP + connections: 10 # concurrent connections per client IP + # ----------------------------------------------------------------------------- # Service Account # ----------------------------------------------------------------------------- diff --git a/docs/secrets.md b/docs/secrets.md index 05f7d73..af64997 100644 --- a/docs/secrets.md +++ b/docs/secrets.md @@ -108,10 +108,44 @@ The **instance ID** used throughout NetCortex (MCP tools, sync status, logs, Net "ssh_timeout": 30, "netconf_port": 830, "restconf_port": 443, - "status_refresh_interval": 30 + "status_refresh_interval": 30, + + "api_secret": "random-32-byte-hex-secret", + "webhook_allow_unsigned": false, + "webhook_max_body_bytes": 1048576, + "webhook_replay_window_seconds": 300, + "telemetry_secret": "random-32-byte-hex-secret", + "cors_allow_origins": [] } ``` +### HTTP API & webhook security keys (0.8.0-dev10) + +All optional with secure defaults. Each also has a `NETCORTEX_*` env +equivalent for bootstrap before the backend is reachable. + +| Key | Default | Effect | +| --- | --- | --- | +| `api_secret` | `""` | When set, every non-receiver HTTP path (`/`, `/api/*`, `/metrics`, docs, telemetry SSE) requires `Authorization: Bearer `. Leave empty only when the API is reached solely cluster-internally (the default ingress keeps it private). | +| `webhook_allow_unsigned` | `false` | Master fail-open switch. `false` = a webhook for a tenant with **no** configured secret is rejected (503). Set `true` only to bootstrap a brand-new receiver before its secret is stored, then turn it back off. | +| `webhook_max_body_bytes` | `1048576` | Hard body-size cap (413 over cap). Keep aligned with the ingress `proxy-body-size`. | +| `webhook_replay_window_seconds` | `300` | Reject webhooks whose trusted timestamp (e.g. Meraki `sentAt`) is outside ±this window. `0` disables. | +| `telemetry_secret` | `""` | Shared token required in `X-Telemetry-Token` on `POST /ingest/telemetry/{device}`. Fails closed when unset (unless `webhook_allow_unsigned`). | +| `cors_allow_origins` | `[]` | Browser-origin allow-list. Never `*`. The bundled UI is same-origin so it needs none. | + +#### Per-vendor webhook secrets + +Stored at `netcortex/webhooks//`: + +```json +// netcortex/webhooks/meraki/ → {"shared_secret": "..."} +// netcortex/webhooks/catalyst_center/ → {"shared_secret": "..."} +// netcortex/webhooks/nexus_dashboard/ → {"api_key": "..."} +``` + +A receiver **rejects all traffic (503)** until its secret is provisioned +(fail closed), unless `webhook_allow_unsigned=true`. + ### `netcortex/devices/site/building-a` example ```json diff --git a/netcortex/__init__.py b/netcortex/__init__.py index ed2c263..0179b77 100644 --- a/netcortex/__init__.py +++ b/netcortex/__init__.py @@ -22,4 +22,4 @@ ``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes. """ -__version__ = "0.8.0-dev9" +__version__ = "0.8.0-dev10" diff --git a/netcortex/config.py b/netcortex/config.py index 8fbddf6..0bfcc3e 100644 --- a/netcortex/config.py +++ b/netcortex/config.py @@ -117,6 +117,36 @@ class Settings: mcp_transport: str mcp_secret: str + # HTTP API / webhook security (0.8.0-dev10) + # + # api_secret: when non-empty, every non-public HTTP route (the status + # UI, /api/*, /metrics, the telemetry SSE monitor) requires + # `Authorization: Bearer `. Empty disables app-level API + # auth — in that mode the *ingress* is the control: the shipped chart + # only exposes the webhook/health receiver paths publicly, so /api + # stays cluster-internal. Set this whenever the API is reachable from + # an untrusted network. + # webhook_allow_unsigned: master fail-open switch. Default False = + # fail closed: a webhook for a tenant with no configured secret is + # rejected (503) instead of silently accepted. Set True only to + # bootstrap a brand-new receiver before its secret is provisioned. + # webhook_max_body_bytes: hard cap on webhook/telemetry request bodies. + # Requests larger than this are rejected (413) before parsing. + # webhook_replay_window_seconds: when a webhook payload carries a + # trusted timestamp (e.g. Meraki `sentAt`), reject it if older than + # this many seconds. 0 disables the freshness check. + # telemetry_secret: shared token required (header `X-Telemetry-Token`) + # on the HTTP telemetry-push ingest endpoint. Same fail-closed rules + # as webhook secrets. + # cors_allow_origins: explicit allow-list of browser origins. Empty = + # no cross-origin access (same-origin only) — we never ship `*`. + api_secret: str + webhook_allow_unsigned: bool + webhook_max_body_bytes: int + webhook_replay_window_seconds: int + telemetry_secret: str + cors_allow_origins: list[str] + # Sync engine sync_backend: str sync_conflict_policy: str @@ -187,6 +217,21 @@ def __init__(self, bootstrap: BootstrapSettings) -> None: self.redis_url = os.environ.get("REDIS_URL", "redis://redis:6379/0") self.mcp_transport = "http" self.mcp_secret = "" + + # HTTP API / webhook security (0.8.0-dev10). Env vars provide the + # bootstrap defaults; the core secret can override during hydrate(). + self.api_secret = os.environ.get("NETCORTEX_API_SECRET", "") + self.webhook_allow_unsigned = _env_bool( + "NETCORTEX_WEBHOOK_ALLOW_UNSIGNED", default=False + ) + self.webhook_max_body_bytes = _env_int( + "NETCORTEX_WEBHOOK_MAX_BODY_BYTES", default=1_048_576 # 1 MiB + ) + self.webhook_replay_window_seconds = _env_int( + "NETCORTEX_WEBHOOK_REPLAY_WINDOW_SECONDS", default=300 + ) + self.telemetry_secret = os.environ.get("NETCORTEX_TELEMETRY_SECRET", "") + self.cors_allow_origins = _env_csv("NETCORTEX_CORS_ALLOW_ORIGINS") # Secure-by-default. Override with NETBOX_VERIFY_SSL=0 or # core-secret `netbox_verify_ssl=false` for self-signed labs. _verify_env = os.environ.get("NETBOX_VERIFY_SSL") @@ -278,6 +323,40 @@ async def hydrate(self) -> None: self.redis_url = core.get("redis_url") or self.redis_url self.mcp_transport = core.get("mcp_transport", self.mcp_transport) self.mcp_secret = core.get("mcp_secret", self.mcp_secret) + + # HTTP API / webhook security — core secret overrides env defaults. + self.api_secret = core.get("api_secret", self.api_secret) + raw_allow_unsigned = core.get("webhook_allow_unsigned", self.webhook_allow_unsigned) + if isinstance(raw_allow_unsigned, str): + self.webhook_allow_unsigned = raw_allow_unsigned.strip().lower() in { + "1", "true", "yes", "on", + } + else: + self.webhook_allow_unsigned = bool(raw_allow_unsigned) + self.webhook_max_body_bytes = int( + core.get("webhook_max_body_bytes", self.webhook_max_body_bytes) + ) + self.webhook_replay_window_seconds = int( + core.get("webhook_replay_window_seconds", self.webhook_replay_window_seconds) + ) + self.telemetry_secret = core.get("telemetry_secret", self.telemetry_secret) + raw_cors = core.get("cors_allow_origins", None) + if raw_cors is not None: + if isinstance(raw_cors, str): + self.cors_allow_origins = [ + o.strip() for o in raw_cors.split(",") if o.strip() + ] + elif isinstance(raw_cors, (list, tuple)): + self.cors_allow_origins = [str(o).strip() for o in raw_cors if str(o).strip()] + # Loud warning if the operator left fail-open enabled — this should + # never be true in a production deployment. + if self.webhook_allow_unsigned: + log.warning( + "settings.webhook_allow_unsigned_enabled", + hint="Webhooks for tenants without a configured secret will be " + "ACCEPTED. Set webhook_allow_unsigned=false once secrets " + "are provisioned.", + ) self.sync_backend = core.get("sync_backend", self.sync_backend) self.sync_conflict_policy = core.get("sync_conflict_policy", self.sync_conflict_policy) @@ -342,6 +421,29 @@ def _require(d: dict[str, Any], key: str, path: str) -> Any: return d[key] +def _env_bool(name: str, *, default: bool) -> bool: + raw = os.environ.get(name) + if raw is None: + return default + return raw.strip().lower() in {"1", "true", "yes", "on"} + + +def _env_int(name: str, *, default: int) -> int: + raw = os.environ.get(name) + if raw is None or not raw.strip(): + return default + try: + return int(raw) + except ValueError: + log.warning("settings.env_int_invalid", name=name, value=raw, fallback=default) + return default + + +def _env_csv(name: str) -> list[str]: + raw = os.environ.get(name, "") + return [item.strip() for item in raw.split(",") if item.strip()] + + # --------------------------------------------------------------------------- # Singleton — populated at startup via Settings.create() # --------------------------------------------------------------------------- diff --git a/netcortex/main.py b/netcortex/main.py index 701c470..751916b 100644 --- a/netcortex/main.py +++ b/netcortex/main.py @@ -366,15 +366,29 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg] lifespan=lifespan, ) +# CORS (F8, 0.8.0-dev10): never ship a wildcard. The bundled UI is served +# same-origin (from "/") so it needs no cross-origin grant. Browser-based +# MCP/web agents that call from another origin must be explicitly allow- +# listed via NETCORTEX_CORS_ALLOW_ORIGINS (comma-separated). Empty = no +# cross-origin access. This is read from the environment (not the secret +# backend) because middleware is wired at import time, before settings +# hydrate; origins are not secret. +_CORS_ALLOW_ORIGINS = [ + o.strip() + for o in os.environ.get("NETCORTEX_CORS_ALLOW_ORIGINS", "").split(",") + if o.strip() +] app.add_middleware( CORSMiddleware, - allow_origins=["*"], + allow_origins=_CORS_ALLOW_ORIGINS, # MCP clients (Cursor / Claude / web agents) preflight with OPTIONS # and DELETE (DELETE is used to terminate streamable-http sessions), # so the public CORS surface needs to include those verbs. allow_methods=["GET", "POST", "OPTIONS", "DELETE"], allow_headers=["*"], ) +if _CORS_ALLOW_ORIGINS: + log.info("netcortex.cors.configured", origins=_CORS_ALLOW_ORIGINS) # Mount the MCP HTTP transport. This MUST happen AFTER ``app`` is # created but BEFORE the catch-all routes — Starlette resolves mounts @@ -500,6 +514,75 @@ async def _metrics(request, call_next): return response +# ── API authentication (F2 / F8, 0.8.0-dev10) ───────────────────────────────── +# +# When ``api_secret`` is configured, every request EXCEPT the public +# receiver/health surface requires ``Authorization: Bearer ``. +# This protects the status UI ("/"), the topology/inventory API ("/api/*"), +# the OpenAPI docs, and the Prometheus "/metrics" endpoint — all of which +# leak full network state and were previously unauthenticated. +# +# When ``api_secret`` is empty the middleware is a no-op: in that mode the +# *ingress* is the control (the shipped chart only exposes the webhook and +# health paths publicly, so /api stays cluster-internal). Set api_secret +# whenever the API may be reached from an untrusted network. +# +# Paths that authenticate themselves or must stay public are skipped: +# * /webhooks/* — per-vendor HMAC / token auth (and fail-closed) +# * /ingest/* — telemetry token / admin token auth +# * /health — k8s liveness/readiness probes +# * the MCP mount — its own Bearer(mcp_secret) middleware +# Registered last so it is the OUTERMOST middleware (runs before query +# budget / metrics). + +# Public prefixes that the API-auth gate must NOT block. +_API_AUTH_PUBLIC_PREFIXES = ("/webhooks", "/ingest", "/health") + + +def _is_api_auth_public(path: str) -> bool: + if path == "/health": + return True + for prefix in _API_AUTH_PUBLIC_PREFIXES: + if path == prefix or path.startswith(prefix + "/"): + return True + # The MCP mount enforces its own Bearer(mcp_secret); don't double-gate. + if path == _MCP_PATH or path.startswith(_MCP_PATH + "/"): + return True + return False + + +@app.middleware("http") +async def _api_auth(request, call_next): + import hmac as _hmac + from starlette.responses import JSONResponse + + # CORS preflight must never require auth. + if request.method == "OPTIONS": + return await call_next(request) + + path = request.url.path + if _is_api_auth_public(path): + return await call_next(request) + + try: + secret = get_settings().api_secret or "" + except RuntimeError: + secret = "" + + if secret: + auth_header = request.headers.get("authorization", "") + token = auth_header.removeprefix("Bearer ").strip() + if not _hmac.compare_digest(token, secret): + log.warning("api.auth.denied", path=path, has_header=bool(auth_header)) + return JSONResponse( + status_code=401, + content={"error": "unauthorized"}, + headers={"WWW-Authenticate": 'Bearer realm="NetCortex API"'}, + ) + + return await call_next(request) + + @app.get("/metrics", include_in_schema=False) async def metrics() -> "PlainTextResponse": """Prometheus exposition format. No external library required.""" diff --git a/netcortex/webhooks/auth.py b/netcortex/webhooks/auth.py new file mode 100644 index 0000000..61a4b48 --- /dev/null +++ b/netcortex/webhooks/auth.py @@ -0,0 +1,287 @@ +"""Shared webhook authentication and request-hardening helpers. + +Every inbound webhook receiver (Meraki, Catalyst Center, Nexus Dashboard, +the generic catch-all, and HTTP telemetry push) shares the same security +requirements: + +* **Fail closed.** A receiver for a tenant whose signing secret is not + configured must *reject* the request, not silently accept it. The only + exception is an explicit, default-off bootstrap switch + (``webhook_allow_unsigned``) for standing up a brand-new integration + before its secret is provisioned. +* **Bounded body size.** Reject oversized requests (413) before parsing, + so an unauthenticated caller cannot exhaust memory. +* **Constant-time secret comparison.** Never leak secret bytes through a + timing side channel. +* **Replay resistance.** When the payload carries a trustworthy + timestamp, reject stale requests. + +Centralizing these here means a new receiver gets them by calling a +couple of helpers, and a security fix lands in one place for every +vendor at once (the dev9 incident — where one wrong backend-method call +silently disabled HMAC verification — is exactly the class of bug this +consolidation prevents from recurring per-vendor). + +All settings are read defensively: if :func:`get_settings` has not run +yet (e.g. in a unit test that imports a handler directly), we fall back +to the **secure** defaults (fail closed, 1 MiB cap, 300 s replay window). +""" + +from __future__ import annotations + +import hashlib +import hmac +from datetime import datetime, timezone + +import structlog +from fastapi import HTTPException, status + +log = structlog.get_logger(__name__) + +# Secure fallbacks used when Settings is not initialized (unit tests that +# import a handler in isolation). These mirror the config.py defaults. +_DEFAULT_MAX_BODY_BYTES = 1_048_576 # 1 MiB +_DEFAULT_REPLAY_WINDOW_S = 300 + + +def _allow_unsigned() -> bool: + try: + from netcortex.config import get_settings + return bool(get_settings().webhook_allow_unsigned) + except Exception: + # Fail closed when config is unavailable. + return False + + +def _max_body_bytes() -> int: + try: + from netcortex.config import get_settings + val = int(get_settings().webhook_max_body_bytes) + return val if val > 0 else _DEFAULT_MAX_BODY_BYTES + except Exception: + return _DEFAULT_MAX_BODY_BYTES + + +def _replay_window_seconds() -> int: + try: + from netcortex.config import get_settings + return int(get_settings().webhook_replay_window_seconds) + except Exception: + return _DEFAULT_REPLAY_WINDOW_S + + +# --------------------------------------------------------------------------- +# Body-size enforcement (F3) +# --------------------------------------------------------------------------- + + +def enforce_content_length(content_length: str | int | None) -> None: + """Reject (413) based on the ``Content-Length`` header before reading + the body. This is the cheap first gate — it lets us refuse a large + upload without buffering it. ``await request.body()`` is still bounded + by :func:`enforce_body_size` afterwards in case the header lies or is + absent (the ingress ``proxy-body-size`` is the third, outermost gate). + """ + if content_length is None: + return + try: + length = int(content_length) + except (TypeError, ValueError): + return + cap = _max_body_bytes() + if length > cap: + raise HTTPException( + status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, + detail=f"Request body exceeds {cap} bytes", + ) + + +def enforce_body_size(body: bytes) -> None: + """Reject (413) after reading the body — defends against a missing or + dishonest ``Content-Length`` header.""" + cap = _max_body_bytes() + if len(body) > cap: + raise HTTPException( + status_code=status.HTTP_413_REQUEST_ENTITY_TOO_LARGE, + detail=f"Request body exceeds {cap} bytes", + ) + + +# --------------------------------------------------------------------------- +# Fail-closed gate for unconfigured secrets (F1, F7) +# --------------------------------------------------------------------------- + + +def reject_if_unsigned(*, kind: str, instance_name: str) -> None: + """Enforce fail-closed behavior when no signing secret is configured. + + Called by a handler when its secret lookup returned ``None``. Raises + 503 unless the operator has explicitly enabled the + ``webhook_allow_unsigned`` bootstrap switch, in which case it logs a + loud warning and returns (allowing the request through unverified). + """ + if _allow_unsigned(): + log.warning( + "webhook.unsigned_accepted", + kind=kind, + instance=instance_name, + hint="webhook_allow_unsigned is ENABLED — this request was not " + "authenticated. Provision the secret and disable the flag.", + ) + return + log.warning( + "webhook.rejected_no_secret", + kind=kind, + instance=instance_name, + hint=f"No signing secret configured at netcortex/webhooks/{kind}/" + f"{instance_name}. Store it, or set webhook_allow_unsigned=true " + f"to bootstrap (NOT for production).", + ) + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Webhook receiver not provisioned (no signing secret configured)", + ) + + +# --------------------------------------------------------------------------- +# Constant-time secret comparison (F1, F5, F7) +# --------------------------------------------------------------------------- + + +def verify_hmac_sha256(body: bytes, signature_header: str | None, secret: str) -> bool: + """Verify an HMAC-SHA256 hex-digest signature in constant time. + + Tolerates an optional ``sha256=`` prefix (newer Meraki firmware). + """ + if not signature_header: + return False + received = signature_header.removeprefix("sha256=").strip() + expected = hmac.new(secret.encode("utf-8"), body, hashlib.sha256).hexdigest() + return hmac.compare_digest(expected, received) + + +def verify_shared_token(provided: str | None, expected: str) -> bool: + """Constant-time equality for a shared bearer/header token.""" + if not provided or not expected: + return False + return hmac.compare_digest(provided, expected) + + +def _setting(name: str, default: str = "") -> str: + try: + from netcortex.config import get_settings + return getattr(get_settings(), name, default) or default + except Exception: + return default + + +def require_admin_token(provided: str | None) -> None: + """Gate an administrative / high-privilege endpoint behind ``api_secret``. + + Used for the generic webhook catch-all (which can trigger discovery on + *any* adapter — F4 amplification) and the telemetry SSE monitor (which + streams operational data — F6). Fails closed: if no ``api_secret`` is + configured the endpoint is disabled (503) unless the bootstrap switch + is set. + """ + secret = _setting("api_secret") + if not secret: + if _allow_unsigned(): + log.warning( + "webhook.admin_unsigned_accepted", + hint="api_secret unset and webhook_allow_unsigned enabled — " + "administrative endpoint is OPEN.", + ) + return + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Administrative endpoint disabled (no api_secret configured)", + ) + if not verify_shared_token(provided, secret): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or missing administrative token", + ) + + +def require_telemetry_token(provided: str | None) -> None: + """Gate the HTTP telemetry-push ingest behind ``telemetry_secret`` (F6). + + Network devices send a shared ``X-Telemetry-Token`` header. Fails closed + unless the bootstrap switch is set. + """ + secret = _setting("telemetry_secret") + if not secret: + if _allow_unsigned(): + log.warning( + "telemetry.unsigned_accepted", + hint="telemetry_secret unset and webhook_allow_unsigned enabled " + "— telemetry ingest is OPEN.", + ) + return + raise HTTPException( + status_code=status.HTTP_503_SERVICE_UNAVAILABLE, + detail="Telemetry ingest not provisioned (no telemetry_secret configured)", + ) + if not verify_shared_token(provided, secret): + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid or missing telemetry token", + ) + + +# --------------------------------------------------------------------------- +# Replay / freshness (F9) +# --------------------------------------------------------------------------- + + +def is_timestamp_fresh(ts_iso: str | None) -> bool: + """Return True if ``ts_iso`` is within the configured replay window. + + Returns True when: + * the replay window is disabled (0), or + * no timestamp is supplied (we can't judge freshness; the HMAC + signature is still the primary control), or + * the timestamp parses and is within ``+window`` / ``-window``. + + Returns False only when a timestamp is present, parses, and falls + outside the window. A small positive skew is tolerated for clock + drift between the vendor cloud and this host. + """ + window = _replay_window_seconds() + if window <= 0 or not ts_iso: + return True + parsed = _parse_iso8601(ts_iso) + if parsed is None: + # Unparseable timestamp — don't hard-fail on a format we don't + # recognize; the signature remains the authority. + return True + now = datetime.now(tz=timezone.utc) + age = (now - parsed).total_seconds() + # Reject if older than the window, or implausibly far in the future. + return -window <= age <= window + + +def _parse_iso8601(value: str) -> datetime | None: + raw = value.strip() + if raw.endswith("Z"): + raw = raw[:-1] + "+00:00" + try: + dt = datetime.fromisoformat(raw) + except ValueError: + return None + if dt.tzinfo is None: + dt = dt.replace(tzinfo=timezone.utc) + return dt + + +__all__ = [ + "enforce_content_length", + "enforce_body_size", + "reject_if_unsigned", + "require_admin_token", + "require_telemetry_token", + "verify_hmac_sha256", + "verify_shared_token", + "is_timestamp_fresh", +] diff --git a/netcortex/webhooks/catalyst_center.py b/netcortex/webhooks/catalyst_center.py index a867d4e..1844ad3 100644 --- a/netcortex/webhooks/catalyst_center.py +++ b/netcortex/webhooks/catalyst_center.py @@ -18,6 +18,8 @@ import structlog from fastapi import BackgroundTasks, HTTPException, status +from netcortex.webhooks.auth import reject_if_unsigned, verify_shared_token + log = structlog.get_logger(__name__) _SECRET_CACHE: dict[str, str] = {} @@ -64,7 +66,9 @@ async def handle_catalyst_center_webhook( shared_secret = await _get_shared_secret(instance_name) if shared_secret is not None: - if auth_token != shared_secret: + # Constant-time comparison (0.8.0-dev10, F5): the previous `!=` + # leaked the token length/prefix through a timing side channel. + if not verify_shared_token(auth_token, shared_secret): log.warning( "webhook.catc.invalid_token", instance=instance_name, @@ -75,11 +79,8 @@ async def handle_catalyst_center_webhook( detail="Invalid Catalyst Center webhook token", ) else: - log.warning( - "webhook.catc.no_secret_configured", - instance=instance_name, - hint="Store shared_secret at netcortex/webhooks/catalyst_center/", - ) + # Fail closed (0.8.0-dev10, F1): reject when no token is configured. + reject_if_unsigned(kind="catalyst_center", instance_name=instance_name) try: payload: dict[str, Any] = json.loads(body) @@ -106,13 +107,17 @@ async def handle_catalyst_center_webhook( device=device, ) - background_tasks.add_task( - _sync_catalyst_center, - instance_name=instance_name, - event_type=event_type, - domain=domain, - device=device, - payload=payload, + # Coalesced sync (0.8.0-dev10, F4) — single-flight, rate-limited. + from netcortex.webhooks.sync_coalesce import schedule_sync + schedule_sync( + f"catalyst_center/{instance_name}", + lambda: _sync_catalyst_center( + instance_name=instance_name, + event_type=event_type, + domain=domain, + device=device, + payload=payload, + ), ) return { diff --git a/netcortex/webhooks/meraki.py b/netcortex/webhooks/meraki.py index b06e19c..9f5da89 100644 --- a/netcortex/webhooks/meraki.py +++ b/netcortex/webhooks/meraki.py @@ -11,14 +11,18 @@ from __future__ import annotations -import hashlib -import hmac import json from typing import TYPE_CHECKING, Any import structlog from fastapi import BackgroundTasks, HTTPException, status +from netcortex.webhooks.auth import ( + is_timestamp_fresh, + reject_if_unsigned, + verify_hmac_sha256, +) + if TYPE_CHECKING: from netcortex.thalamus import SensoryPublisher @@ -74,18 +78,12 @@ async def _get_shared_secret(instance_name: str) -> str | None: def _verify_signature(body: bytes, signature_header: str | None, shared_secret: str) -> bool: - """Return True if the Meraki HMAC-SHA256 signature is valid.""" - if not signature_header: - return False - # Meraki sends the raw hex digest (no "sha256=" prefix in older versions; - # newer versions may add it — strip it for compatibility). - received = signature_header.removeprefix("sha256=").strip() - expected = hmac.new( - shared_secret.encode("utf-8"), - body, - hashlib.sha256, - ).hexdigest() - return hmac.compare_digest(expected, received) + """Return True if the Meraki HMAC-SHA256 signature is valid. + + Thin wrapper over the shared :func:`netcortex.webhooks.auth.verify_hmac_sha256` + so all vendors use one constant-time implementation. + """ + return verify_hmac_sha256(body, signature_header, shared_secret) async def handle_meraki_webhook( @@ -128,17 +126,31 @@ async def handle_meraki_webhook( detail="Invalid Meraki webhook signature", ) else: - log.warning( - "webhook.meraki.no_secret_configured", - instance=instance_name, - hint="Store shared_secret at netcortex/webhooks/meraki/", - ) + # Fail closed (0.8.0-dev10): a tenant with no configured secret is + # rejected (503) unless the operator opted into the explicit + # bootstrap switch. Previously this branch accepted the webhook. + reject_if_unsigned(kind="meraki", instance_name=instance_name) try: payload: dict[str, Any] = json.loads(body) except Exception: raise HTTPException(status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON body") + # Replay resistance (0.8.0-dev10): Meraki stamps each alert with an ISO + # `sentAt`. Reject stale deliveries (captured-and-replayed payloads) + # outside the configured freshness window. Only enforced when a secret + # is configured — an unsigned bootstrap request has no integrity anyway. + if shared_secret is not None and not is_timestamp_fresh(payload.get("sentAt")): + log.warning( + "webhook.meraki.stale_timestamp", + instance=instance_name, + sent_at=payload.get("sentAt"), + ) + raise HTTPException( + status_code=status.HTTP_403_FORBIDDEN, + detail="Webhook timestamp outside the accepted freshness window", + ) + event_type = payload.get("alertType") or payload.get("eventType") or "unknown" network_id = payload.get("networkId") org_id = payload.get("organizationId") @@ -177,12 +189,17 @@ async def handle_meraki_webhook( ) # ── Targeted adapter sync (pre-dev8 behavior, retained) ─────────────── - background_tasks.add_task( - _sync_meraki_network, - instance_name=instance_name, - event_type=event_type, - network_id=network_id, - payload=payload, + # Funneled through the coalescing scheduler (0.8.0-dev10, F4) so a + # webhook flood can't spawn unbounded concurrent discoveries. + from netcortex.webhooks.sync_coalesce import schedule_sync + schedule_sync( + f"meraki/{instance_name}", + lambda: _sync_meraki_network( + instance_name=instance_name, + event_type=event_type, + network_id=network_id, + payload=payload, + ), ) return { diff --git a/netcortex/webhooks/nexus_dashboard.py b/netcortex/webhooks/nexus_dashboard.py new file mode 100644 index 0000000..09ce1cd --- /dev/null +++ b/netcortex/webhooks/nexus_dashboard.py @@ -0,0 +1,124 @@ +"""Nexus Dashboard / NDFC webhook handler. + +Nexus Dashboard can POST event notifications to an external endpoint with +a shared API key in the ``X-ND-API-Key`` header. + +Shared secret storage path: + netcortex/webhooks/nexus_dashboard/ → {"api_key": "..."} + +Prior to 0.8.0-dev10 the Nexus Dashboard route accepted *any* request — +the ``X-ND-API-Key`` header was declared but never checked (F7). This +handler verifies it in constant time and fails closed when no key is +provisioned. +""" + +from __future__ import annotations + +import json +from typing import Any + +import structlog +from fastapi import BackgroundTasks, HTTPException, status + +from netcortex.webhooks.auth import reject_if_unsigned, verify_shared_token + +log = structlog.get_logger(__name__) + +_SECRET_CACHE: dict[str, str] = {} + + +async def _get_api_key(instance_name: str) -> str | None: + """Fetch the Nexus Dashboard webhook API key from the secret backend. + + Accepts either ``api_key`` or ``shared_secret`` in the stored blob so + operators can use whichever convention matches the rest of the chart. + """ + if instance_name in _SECRET_CACHE: + return _SECRET_CACHE[instance_name] + path = f"netcortex/webhooks/nexus_dashboard/{instance_name}" + try: + from netcortex.secrets import get_secret_backend + backend = get_secret_backend() + data = await backend.get(path, required=False) + except Exception as exc: + log.warning( + "webhook.nd.secret_fetch_failed", + instance=instance_name, + path=path, + error=str(exc), + ) + return None + if not data: + return None + key = data.get("api_key") or data.get("shared_secret") + if key: + _SECRET_CACHE[instance_name] = key + return key + + +async def handle_nexus_dashboard_webhook( + *, + instance_name: str, + body: bytes, + api_key: str | None, + background_tasks: BackgroundTasks, +) -> dict[str, str]: + """Validate and enqueue a Nexus Dashboard event notification.""" + expected = await _get_api_key(instance_name) + + if expected is not None: + if not verify_shared_token(api_key, expected): + log.warning( + "webhook.nd.invalid_key", + instance=instance_name, + has_key=api_key is not None, + ) + raise HTTPException( + status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid Nexus Dashboard API key", + ) + else: + reject_if_unsigned(kind="nexus_dashboard", instance_name=instance_name) + + try: + payload: dict[str, Any] = json.loads(body) + except Exception: + raise HTTPException( + status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid JSON body" + ) + + event_type = payload.get("eventType") or payload.get("type") + log.info( + "webhook.nd.accepted", + instance=instance_name, + event_type=event_type, + ) + + from netcortex.webhooks.sync_coalesce import schedule_sync + schedule_sync( + f"nexus_dashboard/{instance_name}", + lambda: _sync_nexus_dashboard(instance_name=instance_name, payload=payload), + ) + + return { + "status": "queued", + "adapter": f"nexus_dashboard/{instance_name}", + "event_type": event_type or "", + } + + +async def _sync_nexus_dashboard( + *, instance_name: str, payload: dict[str, Any] +) -> None: + """Trigger a sync of the Nexus Dashboard adapter after an event.""" + instance_id = f"nexus_dashboard/{instance_name}" + try: + from netcortex.adapters import get_instances + adapter = get_instances().get(instance_id) + if adapter is None: + log.warning("webhook.nd.adapter_not_found", instance_id=instance_id) + return + log.info("webhook.nd.full_sync", instance_id=instance_id) + await adapter.discover() + except Exception as exc: + log.error("webhook.nd.sync_failed", instance_id=instance_id, error=str(exc)) diff --git a/netcortex/webhooks/router.py b/netcortex/webhooks/router.py index 01a4e2d..7df0190 100644 --- a/netcortex/webhooks/router.py +++ b/netcortex/webhooks/router.py @@ -22,8 +22,15 @@ from fastapi import APIRouter, BackgroundTasks, Header, HTTPException, Request, status from fastapi.responses import StreamingResponse +from netcortex.webhooks.auth import ( + enforce_body_size, + enforce_content_length, + require_admin_token, + require_telemetry_token, +) from netcortex.webhooks.meraki import handle_meraki_webhook from netcortex.webhooks.catalyst_center import handle_catalyst_center_webhook +from netcortex.webhooks.nexus_dashboard import handle_nexus_dashboard_webhook from netcortex.webhooks.event_publisher import get_publisher from netcortex.webhooks.telemetry import handle_telemetry_push, telemetry_event_stream @@ -32,6 +39,19 @@ router = APIRouter(tags=["webhooks & telemetry"]) +async def _read_bounded_body(request: Request) -> bytes: + """Read the request body with a hard size cap (F3). + + Rejects (413) on the ``Content-Length`` header first, then re-checks + the materialized body in case the header is absent or dishonest. The + ingress ``proxy-body-size`` annotation is the outermost gate. + """ + enforce_content_length(request.headers.get("content-length")) + body = await request.body() + enforce_body_size(body) + return body + + # ── Meraki ──────────────────────────────────────────────────────────────────── @router.post( @@ -56,7 +76,7 @@ async def meraki_webhook( The handler validates the signature then queues a targeted sync of the affected network so state is refreshed within seconds of the event. """ - body = await request.body() + body = await _read_bounded_body(request) log.info( "webhook.meraki.received", instance=instance_name, @@ -96,7 +116,7 @@ async def catalyst_center_webhook( Store the shared token at: ``netcortex/webhooks/catalyst_center`` → ``{"shared_secret": "..."}`` """ - body = await request.body() + body = await _read_bounded_body(request) log.info( "webhook.catalyst_center.received", instance=instance_name, @@ -125,20 +145,24 @@ async def nexus_dashboard_webhook( background_tasks: BackgroundTasks, x_nd_api_key: str | None = Header(None, alias="X-ND-API-Key"), ) -> dict[str, str]: - """Receive Nexus Dashboard / NDFC event notifications.""" - body = await request.body() - try: - payload = json.loads(body) - except Exception: - raise HTTPException(status_code=400, detail="Invalid JSON body") + """Receive Nexus Dashboard / NDFC event notifications. + Authenticated with a shared API key (``X-ND-API-Key``). Store it at: + ``netcortex/webhooks/nexus_dashboard/`` → ``{"api_key": "..."}`` + """ + body = await _read_bounded_body(request) log.info( "webhook.nexus_dashboard.received", instance=instance_name, - event_type=payload.get("eventType") or payload.get("type"), + bytes=len(body), + has_key=x_nd_api_key is not None, + ) + return await handle_nexus_dashboard_webhook( + instance_name=instance_name, + body=body, + api_key=x_nd_api_key, + background_tasks=background_tasks, ) - background_tasks.add_task(_trigger_sync, "nexus_dashboard", instance_name, payload) - return {"status": "queued", "adapter": f"nexus_dashboard/{instance_name}"} # ── Generic catch-all ───────────────────────────────────────────────────────── @@ -153,12 +177,19 @@ async def generic_webhook( instance_name: str, request: Request, background_tasks: BackgroundTasks, + x_netcortex_token: str | None = Header(None, alias="X-NetCortex-Token"), ) -> dict[str, str]: """Accept webhooks from platforms without a dedicated handler. - The payload is logged and queued for a full adapter sync. + Because this endpoint can trigger discovery on *any* configured + adapter (an amplification vector — F4), it is gated behind the + administrative ``api_secret`` (header ``X-NetCortex-Token``). Without + an ``api_secret`` configured it is disabled (503) — prefer adding a + dedicated, per-vendor authenticated handler instead. """ - body = await request.body() + require_admin_token(x_netcortex_token) + + body = await _read_bounded_body(request) try: payload = json.loads(body) except Exception: @@ -170,7 +201,11 @@ async def generic_webhook( instance=instance_name, keys=list(payload.keys()) if isinstance(payload, dict) else None, ) - background_tasks.add_task(_trigger_sync, platform, instance_name, payload) + from netcortex.webhooks.sync_coalesce import schedule_sync + schedule_sync( + f"{platform}/{instance_name}", + lambda: _trigger_sync(platform, instance_name, payload), + ) return {"status": "queued", "adapter": f"{platform}/{instance_name}"} @@ -187,6 +222,7 @@ async def receive_telemetry( background_tasks: BackgroundTasks, x_collection_id: str | None = Header(None, alias="X-Collection-Id"), x_yang_path: str | None = Header(None, alias="X-Yang-Path"), + x_telemetry_token: str | None = Header(None, alias="X-Telemetry-Token"), ) -> dict[str, Any]: """Accept HTTP-push streaming telemetry from network devices. @@ -200,8 +236,12 @@ async def receive_telemetry( For gRPC dial-out MDT (gNMI Subscribe), see the telemetry-grpc sidecar service (port 57500) in the Helm chart. + + Authenticated with a shared ``X-Telemetry-Token`` header (F6); store + the token in the core secret as ``telemetry_secret``. """ - body = await request.body() + require_telemetry_token(x_telemetry_token) + body = await _read_bounded_body(request) content_type = request.headers.get("content-type", "application/json") result = await handle_telemetry_push( @@ -220,12 +260,17 @@ async def receive_telemetry( summary="SSE stream of ingest events", response_class=StreamingResponse, ) -async def telemetry_sse_stream(request: Request) -> StreamingResponse: +async def telemetry_sse_stream( + request: Request, + x_netcortex_token: str | None = Header(None, alias="X-NetCortex-Token"), +) -> StreamingResponse: """Server-Sent Events stream of recent telemetry ingest activity. - Useful for real-time monitoring of what data is flowing in. - Connect with: curl -N https://.../ingest/telemetry/stream + Streams operational data, so it is gated behind the administrative + ``api_secret`` (header ``X-NetCortex-Token``) — F6. Connect with: + curl -N -H "X-NetCortex-Token: " https://.../ingest/telemetry/stream """ + require_admin_token(x_netcortex_token) return StreamingResponse( telemetry_event_stream(request), media_type="text/event-stream", diff --git a/netcortex/webhooks/sync_coalesce.py b/netcortex/webhooks/sync_coalesce.py new file mode 100644 index 0000000..909a0e4 --- /dev/null +++ b/netcortex/webhooks/sync_coalesce.py @@ -0,0 +1,102 @@ +"""Coalescing scheduler for webhook-triggered adapter syncs (F4). + +Webhooks are an *unauthenticated-amplification* vector before this layer: +a flood of webhook deliveries (or a single noisy device flapping) each +scheduled an independent ``adapter.discover()`` background task. A burst +could spawn unbounded concurrent discoveries, hammering the upstream +vendor API and exhausting the event loop / connection pools. + +This module funnels every webhook-driven sync through a small scheduler +that guarantees, per adapter instance: + +* **Single-flight** — at most one sync runs at a time. +* **Trailing coalesce** — deliveries that arrive while a sync is in + flight collapse into a single follow-up run (we only ever need the + latest authoritative state). +* **Minimum interval** — a given instance syncs at most once per + ``_MIN_INTERVAL_S``; extra triggers are debounced. +* **Global concurrency cap** — across *all* instances, no more than + ``_MAX_CONCURRENT`` syncs run simultaneously. + +Handlers call :func:`schedule_sync` with the instance id and a zero-arg +factory that returns the sync coroutine. +""" + +from __future__ import annotations + +import asyncio +import time +from typing import Awaitable, Callable + +import structlog + +log = structlog.get_logger(__name__) + +# Don't sync the same adapter instance more than once per this many seconds. +_MIN_INTERVAL_S = 10.0 +# Hard ceiling on concurrent syncs across all instances. +_MAX_CONCURRENT = 4 + +_SyncFactory = Callable[[], Awaitable[None]] + +_sem: asyncio.Semaphore | None = None +_inflight: dict[str, asyncio.Task] = {} +_pending: dict[str, _SyncFactory] = {} +_last_run: dict[str, float] = {} + + +def _semaphore() -> asyncio.Semaphore: + # Created lazily so the module imports without a running event loop. + global _sem + if _sem is None: + _sem = asyncio.Semaphore(_MAX_CONCURRENT) + return _sem + + +def schedule_sync(instance_id: str, factory: _SyncFactory) -> None: + """Schedule a coalesced, rate-limited sync for ``instance_id``. + + Safe to call on every webhook delivery. If a sync is already running + for this instance, the latest factory is stored and run once after + the current one finishes (trailing coalesce). + """ + if instance_id in _inflight: + _pending[instance_id] = factory + log.debug("webhook.sync.coalesced", instance_id=instance_id) + return + try: + _inflight[instance_id] = asyncio.create_task(_run(instance_id, factory)) + except RuntimeError: + # No running loop (e.g. a unit test calling a handler synchronously + # without an event loop). Nothing to schedule. + log.debug("webhook.sync.no_loop", instance_id=instance_id) + + +async def _run(instance_id: str, factory: _SyncFactory) -> None: + try: + elapsed = time.monotonic() - _last_run.get(instance_id, 0.0) + if elapsed < _MIN_INTERVAL_S: + await asyncio.sleep(_MIN_INTERVAL_S - elapsed) + async with _semaphore(): + _last_run[instance_id] = time.monotonic() + try: + await factory() + except Exception as exc: + log.error("webhook.sync.failed", instance_id=instance_id, error=str(exc)) + finally: + _inflight.pop(instance_id, None) + nxt = _pending.pop(instance_id, None) + if nxt is not None: + schedule_sync(instance_id, nxt) + + +def _reset_for_tests() -> None: + """Clear all scheduler state. Test-only.""" + global _sem + _sem = None + _inflight.clear() + _pending.clear() + _last_run.clear() + + +__all__ = ["schedule_sync"] diff --git a/pyproject.toml b/pyproject.toml index 24feb7b..159a6d4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "netcortex" -version = "0.8.0.dev9" +version = "0.8.0.dev10" description = "The intelligence layer for your network — multi-dimensional graph of the network bridging Meraki, Catalyst Center, Intersight, and more with NetBox as SoT" readme = "README.md" requires-python = ">=3.12" diff --git a/tests/test_api_auth.py b/tests/test_api_auth.py new file mode 100644 index 0000000..1fc8941 --- /dev/null +++ b/tests/test_api_auth.py @@ -0,0 +1,86 @@ +"""Tests for the API-authentication middleware (0.8.0-dev10, F2/F8). + +Builds a fresh FastAPI app and attaches the production ``_api_auth`` +middleware from :mod:`netcortex.main`, so we exercise the real path +classification and bearer-token check without standing up the full app +(which needs Neo4j / Redis / the secret backend). +""" + +from __future__ import annotations + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +import netcortex.main as main_module + + +class _FakeSettings: + def __init__(self, api_secret: str) -> None: + self.api_secret = api_secret + + +def _build_app() -> FastAPI: + app = FastAPI() + app.middleware("http")(main_module._api_auth) + + @app.get("/api/inventory") + async def _inv() -> dict: + return {"ok": True} + + @app.get("/metrics") + async def _metrics() -> dict: + return {"ok": True} + + @app.get("/health") + async def _health() -> dict: + return {"ok": True} + + @app.post("/webhooks/meraki/x") + async def _wh() -> dict: + return {"ok": True} + + @app.get("/") + async def _root() -> dict: + return {"ok": True} + + return app + + +def test_api_blocked_without_token(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(main_module, "get_settings", lambda: _FakeSettings("s3cret")) + c = TestClient(_build_app()) + assert c.get("/api/inventory").status_code == 401 + assert c.get("/metrics").status_code == 401 + assert c.get("/").status_code == 401 + + +def test_api_allowed_with_token(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(main_module, "get_settings", lambda: _FakeSettings("s3cret")) + c = TestClient(_build_app()) + r = c.get("/api/inventory", headers={"Authorization": "Bearer s3cret"}) + assert r.status_code == 200 + + +def test_public_paths_never_blocked(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(main_module, "get_settings", lambda: _FakeSettings("s3cret")) + c = TestClient(_build_app()) + assert c.get("/health").status_code == 200 + assert c.post("/webhooks/meraki/x").status_code == 200 + + +def test_noop_when_no_secret(monkeypatch: pytest.MonkeyPatch) -> None: + """With api_secret unset, the middleware is a no-op (ingress is the + control in that mode).""" + monkeypatch.setattr(main_module, "get_settings", lambda: _FakeSettings("")) + c = TestClient(_build_app()) + assert c.get("/api/inventory").status_code == 200 + + +def test_is_api_auth_public_classification() -> None: + assert main_module._is_api_auth_public("/health") is True + assert main_module._is_api_auth_public("/webhooks/meraki/x") is True + assert main_module._is_api_auth_public("/ingest/telemetry/d") is True + assert main_module._is_api_auth_public("/api/inventory") is False + assert main_module._is_api_auth_public("/metrics") is False + assert main_module._is_api_auth_public("/") is False diff --git a/tests/webhooks/test_auth_helpers.py b/tests/webhooks/test_auth_helpers.py new file mode 100644 index 0000000..2214023 --- /dev/null +++ b/tests/webhooks/test_auth_helpers.py @@ -0,0 +1,130 @@ +"""Unit tests for the shared webhook auth/hardening helpers (0.8.0-dev10).""" + +from __future__ import annotations + +import hashlib +import hmac +from datetime import datetime, timedelta, timezone + +import pytest +from fastapi import HTTPException + +from netcortex.webhooks import auth + + +def _hexsig(body: bytes, secret: str) -> str: + return hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + + +# ── HMAC verification ────────────────────────────────────────────────────── + + +def test_verify_hmac_valid() -> None: + body = b'{"a":1}' + sig = _hexsig(body, "sek") + assert auth.verify_hmac_sha256(body, sig, "sek") is True + + +def test_verify_hmac_accepts_sha256_prefix() -> None: + body = b'{"a":1}' + sig = "sha256=" + _hexsig(body, "sek") + assert auth.verify_hmac_sha256(body, sig, "sek") is True + + +def test_verify_hmac_rejects_wrong_secret_and_missing_header() -> None: + body = b'{"a":1}' + assert auth.verify_hmac_sha256(body, _hexsig(body, "other"), "sek") is False + assert auth.verify_hmac_sha256(body, None, "sek") is False + + +# ── Shared-token comparison ──────────────────────────────────────────────── + + +def test_verify_shared_token() -> None: + assert auth.verify_shared_token("tok", "tok") is True + assert auth.verify_shared_token("nope", "tok") is False + assert auth.verify_shared_token(None, "tok") is False + assert auth.verify_shared_token("tok", "") is False + + +# ── Body-size guards (F3) ────────────────────────────────────────────────── + + +def test_enforce_content_length(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_max_body_bytes", lambda: 100) + auth.enforce_content_length("50") # under cap → no raise + auth.enforce_content_length(None) # absent → no raise + auth.enforce_content_length("not-a-number") # unparseable → no raise + with pytest.raises(HTTPException) as ei: + auth.enforce_content_length("101") + assert ei.value.status_code == 413 + + +def test_enforce_body_size(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_max_body_bytes", lambda: 10) + auth.enforce_body_size(b"x" * 10) + with pytest.raises(HTTPException) as ei: + auth.enforce_body_size(b"x" * 11) + assert ei.value.status_code == 413 + + +# ── Fail-closed gate (F1/F7) ─────────────────────────────────────────────── + + +def test_reject_if_unsigned_fails_closed(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_allow_unsigned", lambda: False) + with pytest.raises(HTTPException) as ei: + auth.reject_if_unsigned(kind="meraki", instance_name="t1") + assert ei.value.status_code == 503 + + +def test_reject_if_unsigned_allows_with_flag(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_allow_unsigned", lambda: True) + auth.reject_if_unsigned(kind="meraki", instance_name="t1") # no raise + + +# ── Admin / telemetry gates ──────────────────────────────────────────────── + + +def test_require_admin_token(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_setting", lambda name, default="": "adm" if name == "api_secret" else default) + auth.require_admin_token("adm") # no raise + with pytest.raises(HTTPException) as ei: + auth.require_admin_token("wrong") + assert ei.value.status_code == 401 + + +def test_require_admin_token_disabled_without_secret(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_setting", lambda name, default="": default) + monkeypatch.setattr(auth, "_allow_unsigned", lambda: False) + with pytest.raises(HTTPException) as ei: + auth.require_admin_token("anything") + assert ei.value.status_code == 503 + + +def test_require_telemetry_token(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_setting", lambda name, default="": "tel" if name == "telemetry_secret" else default) + auth.require_telemetry_token("tel") + with pytest.raises(HTTPException) as ei: + auth.require_telemetry_token(None) + assert ei.value.status_code == 401 + + +# ── Replay / freshness (F9) ──────────────────────────────────────────────── + + +def test_is_timestamp_fresh(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_replay_window_seconds", lambda: 300) + now = datetime.now(tz=timezone.utc) + fresh = now.strftime("%Y-%m-%dT%H:%M:%SZ") + stale = (now - timedelta(hours=1)).strftime("%Y-%m-%dT%H:%M:%SZ") + assert auth.is_timestamp_fresh(fresh) is True + assert auth.is_timestamp_fresh(stale) is False + assert auth.is_timestamp_fresh(None) is True # nothing to judge + assert auth.is_timestamp_fresh("garbage") is True # unparseable → defer to HMAC + + +def test_is_timestamp_fresh_window_disabled(monkeypatch: pytest.MonkeyPatch) -> None: + monkeypatch.setattr(auth, "_replay_window_seconds", lambda: 0) + stale = (datetime.now(tz=timezone.utc) - timedelta(days=7)).strftime("%Y-%m-%dT%H:%M:%SZ") + assert auth.is_timestamp_fresh(stale) is True diff --git a/tests/webhooks/test_meraki_webhook_route.py b/tests/webhooks/test_meraki_webhook_route.py index 0ce4792..f1f9f47 100644 --- a/tests/webhooks/test_meraki_webhook_route.py +++ b/tests/webhooks/test_meraki_webhook_route.py @@ -24,6 +24,7 @@ import hashlib import hmac import json +from datetime import datetime, timedelta, timezone from typing import Any import pytest @@ -36,13 +37,19 @@ def _sign(body: bytes, secret: str) -> str: return hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() -def _port_disconnected_payload() -> dict[str, Any]: +def _now_iso() -> str: + """Current UTC time as an ISO8601 'Z' string (fresh for the replay + guard added in 0.8.0-dev10).""" + return datetime.now(tz=timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") + + +def _port_disconnected_payload(sent_at: str | None = None) -> dict[str, Any]: """Realistic Meraki port-disconnect payload (sanitized identifiers).""" return { "alertType": "Port connectivity", "alertTypeId": "port_connectivity", "version": "0.1", - "sentAt": "2026-06-05T20:00:00Z", + "sentAt": sent_at or _now_iso(), "organizationId": "EXAMPLE_ORG", "organizationName": "Example Org", "networkId": "L_EXAMPLE001", @@ -168,14 +175,19 @@ def test_signature_prefix_sha256_accepted( def test_malformed_json_returns_400( webhook_client: TestClient, capturing_bus: CapturingEventBus, - meraki_no_secret: None, + meraki_shared_secret: str, ) -> None: + # A correctly-signed but non-JSON body: signature passes, parsing fails. body = b"this is not json at all" + sig = _sign(body, meraki_shared_secret) resp = webhook_client.post( "/webhooks/meraki/TEST_TENANT", content=body, - headers={"Content-Type": "application/json"}, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, ) assert resp.status_code == 400 @@ -183,22 +195,42 @@ def test_malformed_json_returns_400( # --------------------------------------------------------------------------- -# No-secret-configured degradation +# Fail-closed: no-secret-configured (0.8.0-dev10, F1) # --------------------------------------------------------------------------- -def test_no_secret_configured_still_publishes_with_warning( +def test_no_secret_configured_rejected_503( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_no_secret: None, +) -> None: + """When no signing secret is configured for the tenant, the receiver + now FAILS CLOSED (503) instead of silently accepting the unsigned + request. Pre-dev10 this returned 200.""" + body = json.dumps(_port_disconnected_payload()).encode() + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 503 + assert capturing_bus.published == [] + + +def test_no_secret_bootstrap_allows_when_flag_set( webhook_client: TestClient, capturing_bus: CapturingEventBus, meraki_no_secret: None, - caplog: pytest.LogCaptureFixture, + monkeypatch: pytest.MonkeyPatch, ) -> None: - """When the per-tenant secret isn't in the backend, the handler - accepts the request and logs a loud warning. We still process the - event (operators bootstrapping the integration would otherwise - see no events at all and assume the pipeline is broken).""" - import logging - caplog.set_level(logging.WARNING) + """The explicit ``webhook_allow_unsigned`` bootstrap switch lets an + unsigned request through (with a loud warning) so operators can + validate the receive path before provisioning the secret.""" + from netcortex.webhooks import auth as auth_module + + monkeypatch.setattr(auth_module, "_allow_unsigned", lambda: True) body = json.dumps(_port_disconnected_payload()).encode() resp = webhook_client.post( @@ -211,6 +243,65 @@ def test_no_secret_configured_still_publishes_with_warning( assert len(capturing_bus.published) == 1 +# --------------------------------------------------------------------------- +# Replay resistance (0.8.0-dev10, F9) +# --------------------------------------------------------------------------- + + +def test_stale_timestamp_rejected_403( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + """A correctly-signed payload whose ``sentAt`` is far outside the + freshness window is rejected as a replay.""" + stale = (datetime.now(tz=timezone.utc) - timedelta(hours=2)).strftime( + "%Y-%m-%dT%H:%M:%SZ" + ) + body = json.dumps(_port_disconnected_payload(sent_at=stale)).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 403 + assert capturing_bus.published == [] + + +# --------------------------------------------------------------------------- +# Body-size cap (0.8.0-dev10, F3) +# --------------------------------------------------------------------------- + + +def test_oversized_body_rejected_413( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, + monkeypatch: pytest.MonkeyPatch, +) -> None: + """A body larger than the configured cap is rejected (413) before + parsing — even with a valid Content-Length header.""" + from netcortex.webhooks import auth as auth_module + + monkeypatch.setattr(auth_module, "_max_body_bytes", lambda: 1024) + big = b"x" * 4096 + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=big, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 413 + assert capturing_bus.published == [] + + # --------------------------------------------------------------------------- # Mapper integration # --------------------------------------------------------------------------- diff --git a/tests/webhooks/test_other_webhook_routes.py b/tests/webhooks/test_other_webhook_routes.py new file mode 100644 index 0000000..5f64f43 --- /dev/null +++ b/tests/webhooks/test_other_webhook_routes.py @@ -0,0 +1,180 @@ +"""HTTP route tests for the non-Meraki receivers and telemetry ingest +after the 0.8.0-dev10 security hardening (fail-closed auth, const-time +token compare, Nexus Dashboard key verification, admin/telemetry gates, +body-size caps).""" + +from __future__ import annotations + +import json + +import pytest +from fastapi.testclient import TestClient + +from netcortex.webhooks import auth as auth_module + + +# ── Catalyst Center (F1 fail-closed, F5 const-time) ───────────────────────── + + +@pytest.fixture +def catc_secret(monkeypatch: pytest.MonkeyPatch) -> str: + from netcortex.webhooks import catalyst_center as cc + secret = "catc-token-test" # noqa: S105 — test fixture + monkeypatch.setitem(cc._SECRET_CACHE, "T1", secret) + return secret + + +def test_catc_valid_token_200(webhook_client: TestClient, catc_secret: str) -> None: + body = json.dumps({"eventId": "e1", "type": "NETWORK-EVENT"}).encode() + resp = webhook_client.post( + "/webhooks/catalyst_center/T1", + content=body, + headers={"Content-Type": "application/json", "X-Auth-Token": catc_secret}, + ) + assert resp.status_code == 200 + assert resp.json()["adapter"] == "catalyst_center/T1" + + +def test_catc_invalid_token_401(webhook_client: TestClient, catc_secret: str) -> None: + body = json.dumps({"eventId": "e1"}).encode() + resp = webhook_client.post( + "/webhooks/catalyst_center/T1", + content=body, + headers={"Content-Type": "application/json", "X-Auth-Token": "wrong"}, + ) + assert resp.status_code == 401 + + +def test_catc_no_secret_fails_closed_503( + webhook_client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + from netcortex.webhooks import catalyst_center as cc + + async def _none(_name: str) -> None: + return None + + monkeypatch.setattr(cc, "_get_shared_secret", _none) + monkeypatch.setattr(auth_module, "_allow_unsigned", lambda: False) + resp = webhook_client.post( + "/webhooks/catalyst_center/T1", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 503 + + +# ── Nexus Dashboard (F7 verify key, fail-closed) ──────────────────────────── + + +@pytest.fixture +def nd_key(monkeypatch: pytest.MonkeyPatch) -> str: + from netcortex.webhooks import nexus_dashboard as nd + key = "nd-api-key-test" # noqa: S105 — test fixture + monkeypatch.setitem(nd._SECRET_CACHE, "N1", key) + return key + + +def test_nd_valid_key_200(webhook_client: TestClient, nd_key: str) -> None: + body = json.dumps({"eventType": "fabric.event"}).encode() + resp = webhook_client.post( + "/webhooks/nexus_dashboard/N1", + content=body, + headers={"Content-Type": "application/json", "X-ND-API-Key": nd_key}, + ) + assert resp.status_code == 200 + assert resp.json()["adapter"] == "nexus_dashboard/N1" + + +def test_nd_invalid_key_401(webhook_client: TestClient, nd_key: str) -> None: + resp = webhook_client.post( + "/webhooks/nexus_dashboard/N1", + content=b"{}", + headers={"Content-Type": "application/json", "X-ND-API-Key": "nope"}, + ) + assert resp.status_code == 401 + + +def test_nd_no_key_fails_closed_503( + webhook_client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + from netcortex.webhooks import nexus_dashboard as nd + + async def _none(_name: str) -> None: + return None + + monkeypatch.setattr(nd, "_get_api_key", _none) + monkeypatch.setattr(auth_module, "_allow_unsigned", lambda: False) + resp = webhook_client.post( + "/webhooks/nexus_dashboard/N1", + content=b"{}", + headers={"Content-Type": "application/json"}, + ) + assert resp.status_code == 503 + + +# ── Generic catch-all (F4 admin-gated) ────────────────────────────────────── + + +def test_generic_requires_admin_token( + webhook_client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + auth_module, "_setting", + lambda name, default="": "adm" if name == "api_secret" else default, + ) + # No token → 401 + r1 = webhook_client.post("/webhooks/generic/foo/bar", content=b"{}") + assert r1.status_code == 401 + # Correct token → 200 + r2 = webhook_client.post( + "/webhooks/generic/foo/bar", + content=b"{}", + headers={"X-NetCortex-Token": "adm"}, + ) + assert r2.status_code == 200 + + +def test_generic_disabled_without_api_secret( + webhook_client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr(auth_module, "_setting", lambda name, default="": default) + monkeypatch.setattr(auth_module, "_allow_unsigned", lambda: False) + resp = webhook_client.post("/webhooks/generic/foo/bar", content=b"{}") + assert resp.status_code == 503 + + +# ── Telemetry ingest + SSE (F6) ───────────────────────────────────────────── + + +def test_telemetry_requires_token( + webhook_client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + auth_module, "_setting", + lambda name, default="": "tel" if name == "telemetry_secret" else default, + ) + # Missing token → 401 + r1 = webhook_client.post( + "/ingest/telemetry/dev1", + content=json.dumps({"x": 1}).encode(), + headers={"Content-Type": "application/json"}, + ) + assert r1.status_code == 401 + # Correct token → 202 accepted + r2 = webhook_client.post( + "/ingest/telemetry/dev1", + content=json.dumps({"x": 1}).encode(), + headers={"Content-Type": "application/json", "X-Telemetry-Token": "tel"}, + ) + assert r2.status_code == 202 + + +def test_telemetry_sse_admin_gated( + webhook_client: TestClient, monkeypatch: pytest.MonkeyPatch +) -> None: + monkeypatch.setattr( + auth_module, "_setting", + lambda name, default="": "adm" if name == "api_secret" else default, + ) + resp = webhook_client.get("/ingest/telemetry/stream") + assert resp.status_code == 401 diff --git a/tests/webhooks/test_sync_coalesce.py b/tests/webhooks/test_sync_coalesce.py new file mode 100644 index 0000000..1f99113 --- /dev/null +++ b/tests/webhooks/test_sync_coalesce.py @@ -0,0 +1,65 @@ +"""Tests for the webhook sync coalescing scheduler (0.8.0-dev10, F4).""" + +from __future__ import annotations + +import asyncio + +import pytest + +from netcortex.webhooks import sync_coalesce as sc + + +@pytest.fixture(autouse=True) +def _fast_and_clean(monkeypatch: pytest.MonkeyPatch): + # Remove the inter-run debounce delay so tests don't sleep. + monkeypatch.setattr(sc, "_MIN_INTERVAL_S", 0.0) + sc._reset_for_tests() + yield + sc._reset_for_tests() + + +async def test_single_schedule_runs_once() -> None: + calls: list[int] = [] + + async def factory() -> None: + calls.append(1) + + sc.schedule_sync("a", factory) + await asyncio.sleep(0.05) + assert calls == [1] + + +async def test_bursts_coalesce_to_one_trailing_run() -> None: + """While one sync is in flight, a burst of further triggers collapses + into a single trailing run — not one run per trigger.""" + calls: list[int] = [] + started = asyncio.Event() + release = asyncio.Event() + + async def factory() -> None: + calls.append(1) + started.set() + await release.wait() + + sc.schedule_sync("a", factory) # run #1 begins + await started.wait() # confirm it's in flight + for _ in range(5): + sc.schedule_sync("a", factory) # 5 triggers → 1 coalesced trailing + release.set() + await asyncio.sleep(0.1) # let #1 finish + trailing run + + assert calls == [1, 1] # exactly two executions + + +async def test_distinct_instances_run_independently() -> None: + calls: list[str] = [] + + async def make(name: str): + async def factory() -> None: + calls.append(name) + return factory + + sc.schedule_sync("a", await make("a")) + sc.schedule_sync("b", await make("b")) + await asyncio.sleep(0.05) + assert sorted(calls) == ["a", "b"]