diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e31997..f751d7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,109 @@ and this file MUST be updated together whenever `__version__` changes. --- +## [0.8.0-dev8] — Second sensory publisher: Meraki webhook → NATS + +First webhook-side sensory publisher. The web process now publishes +`sensory.*` events when Meraki Dashboard webhooks arrive, so a port +flap detected at the network edge reaches the reflex pipeline in +50–200ms instead of waiting up to a full SNMP poll cycle (5 minutes). + +This is also the first release where the dedup store added in +0.8.0-dev3 actually does anything in production: the same port flap +will now be observed by both the Meraki webhook (immediate) and the +SNMP poller (next cycle), and the second arrival within the 60s dedup +window collapses to a single `:ReflexEvent`. + +### Added +- `netcortex/webhooks/event_publisher.py` — web-process bus + coordination. One `NatsEventBus` per process on `app.state.event_bus`; + one cached `SensoryPublisher` per source token on + `app.state._sensory_publishers`. Graceful degradation when + `NATS_URL` is unset (publishing disabled, sync trigger still fires). +- `netcortex/webhooks/meraki_events.py` — pure mapper from Meraki's + vendor dialect (`alertType` + `alertData`) to our closed + `SENSORY_EVENT_CLASSES` vocabulary. Initial coverage: + - `Port connectivity` / `Switch port connection changed` → `link_up` / `link_down` + - `Uplink status change` → `link_up` / `link_down` + - `IDS alerted` / `Malware detected` → `security_alert` + - `Settings changed` / `Configuration change` → `config_change` + - Unknown alertTypes log `webhook.meraki.unmapped_alert_type` and skip + (so we grow coverage based on real traffic, not speculation) +- Target shape: `meraki:|` for port events, + `meraki:|` for IDS, etc. Uses the same + `meraki:` prefix as `Device.id` in the live graph so the dev7 + `:AFFECTS` edge resolver lands the edge automatically. + +### Changed +- `netcortex/webhooks/meraki.py::handle_meraki_webhook` accepts an + optional `publisher: SensoryPublisher | None` parameter. When + provided, the handler runs the mapper and publishes one event per + mapped alert. Pre-dev8 sync-trigger behavior is unchanged when + the publisher is absent. +- `netcortex/webhooks/router.py` resolves the publisher via + `get_publisher(request.app, "meraki_webhook")` and threads it + through to the handler. +- `netcortex/main.py` lifespan now initializes the NATS bus on + startup (`init_event_bus`) and closes it on shutdown + (`close_event_bus`). Both steps are best-effort — a missing + `NATS_URL` or transient outage degrades webhooks back to their + pre-dev8 sync-only behavior, never blocks startup. +- Webhook response body adds `sensory_events_published: ` so + operators can confirm at-a-glance which webhooks made it through + the publish path. + +### Tests +- `tests/webhooks/test_meraki_events.py` — 18 mapper unit tests + covering every supported alertType, both up and down states, + synonym handling, missing-field degradation, and a subject-builder + compatibility smoke test. +- `tests/webhooks/test_meraki_webhook_route.py` — 10 HTTP integration + tests covering HMAC valid/invalid/missing, signature prefix + compatibility, malformed JSON, no-secret-configured degradation, + unknown alertType, IDS publish, publisher-unavailable path, and + bus-failure-during-publish path. +- This is the first FastAPI `TestClient` test module in the repo; + pattern (capturing bus + minimal app + secret fixture) is + generalized via `conftest.py` so dev9/dev10/dev11 vendor + receivers can reuse it. + +### Operational notes +- The web deployment in `deploy/helm` already sets `NATS_URL` when + `nats.enabled` is true — no Helm change required. +- HMAC secrets continue to live at + `netcortex/webhooks/meraki/` in AWS Secrets + Manager (pre-existing storage path). +- A new Cypher query to find webhook-sourced events: + ```cypher + MATCH (e:ReflexEvent {source: 'meraki_webhook'}) + WHERE e.recorded_at > timestamp() - 3600000 + RETURN e.subject, e.target, e.outcome + ORDER BY e.observed_at_ms DESC + ``` +- To see dedup in action (webhook + SNMP poll observing the same + flap), join on target within a tight window: + ```cypher + MATCH (e1:ReflexEvent), (e2:ReflexEvent) + WHERE e1.target = e2.target + AND e1.source = 'meraki_webhook' + AND e2.source = 'snmp_poll' + AND e1.event_class = e2.event_class + AND abs(e1.observed_at_ms - e2.observed_at_ms) < 60000 + RETURN e1.target, e1.outcome AS webhook_outcome, e2.outcome AS snmp_outcome + ``` + In a well-dedupping system the SNMP-side outcome will be `skipped` + for any flap the webhook caught first. + +### Not in this release (deferred) +- `:ThousandEyes` / `:NexusDashboard` / `:cdFMC` webhook receivers + (dev9, dev10, dev11). +- `nexus_dashboard_webhook` and `fmc_webhook` source tokens (added + alongside their respective receivers). +- JetStream durable subscriptions for guaranteed at-least-once across + worker restarts. + +--- + ## [0.8.0-dev7] — :AFFECTS edge resolution + target/subject consistency Second small polish to the dev5 pipeline, surfaced by deployment diff --git a/netcortex/__init__.py b/netcortex/__init__.py index 75abd62..43231f5 100644 --- a/netcortex/__init__.py +++ b/netcortex/__init__.py @@ -22,4 +22,4 @@ ``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes. """ -__version__ = "0.8.0-dev7" +__version__ = "0.8.0-dev8" diff --git a/netcortex/main.py b/netcortex/main.py index dd98c4a..701c470 100644 --- a/netcortex/main.py +++ b/netcortex/main.py @@ -278,6 +278,21 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg] state.redis_status = redis_status state.redis_message = redis_msg + # ── NATS sensory publisher bus (0.8.0-dev8+) ────────────────────────── + # The web process publishes `sensory.*` events when webhooks arrive; + # the worker process owns the runner that consumes them. Bus init is + # best-effort: a missing NATS_URL or a transient NATS outage degrades + # webhooks back to their pre-dev8 sync-trigger behavior, which still + # reconciles the live-state graph. + nats_url = ( + getattr(cfg, "nats_url", "") if cfg else "" + ) or os.environ.get("NATS_URL", "") + try: + from netcortex.webhooks.event_publisher import init_event_bus + await init_event_bus(app, nats_url=nats_url or None) + except Exception as exc: + log.warning("netcortex.sensory_publisher_init_failed", error=str(exc)) + # ── Load and health-check adapters ─────────────────────────────────── if cfg: await _probe_adapters() @@ -332,6 +347,11 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg] # ── Shutdown ────────────────────────────────────────────────────────── refresh_adapter_task.cancel() refresh_graph_task.cancel() + try: + from netcortex.webhooks.event_publisher import close_event_bus + await close_event_bus(app) + except Exception as exc: + log.warning("netcortex.sensory_publisher_close_failed", error=str(exc)) await neo4j_client.close() log.info("netcortex.shutdown") diff --git a/netcortex/webhooks/event_publisher.py b/netcortex/webhooks/event_publisher.py new file mode 100644 index 0000000..dfab878 --- /dev/null +++ b/netcortex/webhooks/event_publisher.py @@ -0,0 +1,141 @@ +"""Web-process sensory publisher coordination. + +The web process needs to publish ``sensory.*`` events when webhooks +arrive from upstream platforms (Meraki, ThousandEyes, Nexus Dashboard, +cdFMC, …). The worker process owns the :class:`ReflexRunner` that +*consumes* those events; web only ever publishes. Keeping the two +responsibilities split means: + +* the web process stays a thin HTTP front (no dedup state, no Neo4j + sink, no handler scheduling — those all live in worker) +* webhook latency stays low: publish, return 200, done. The reflex + pipeline runs asynchronously on the worker subscription side. +* a worker outage does not lose webhook events — NATS holds them until + the worker reconnects (or, with JetStream later, persists them durably) + +Design +------ +One :class:`NatsEventBus` per process, cached on ``app.state.event_bus``. +One :class:`SensoryPublisher` per source token, lazily created and +cached on ``app.state._sensory_publishers``. A typo in a source name +fails at the first call (validated against ``SENSORY_SOURCES``) rather +than silently producing unsubscribable subjects. + +If ``NATS_URL`` is not configured the bus is ``None`` and +:func:`get_publisher` returns ``None``. Routes that try to publish in +that case should degrade silently (the existing sync-trigger behavior +remains intact — losing the sensory side is a recoverable annoyance, +not a data-loss bug). This mirrors the worker's +``_start_reflex_pipeline`` behavior from 0.8.0-dev5. + +Shutdown semantics +------------------ +:func:`close_event_bus` flushes pending publishes and closes the NATS +connection. Called from ``main.py``'s lifespan teardown. Idempotent. +""" + +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any + +import structlog + +from netcortex.contracts.subjects import SENSORY_SOURCES + +if TYPE_CHECKING: + from fastapi import FastAPI + from netcortex.thalamus import NatsEventBus, SensoryPublisher + +log = structlog.get_logger(__name__) + + +async def init_event_bus(app: "FastAPI", nats_url: str | None = None) -> "NatsEventBus | None": + """Initialize the web process's NATS publisher bus. + + Resolves ``nats_url`` in this order: + 1. explicit ``nats_url`` argument (test injection) + 2. ``NATS_URL`` environment variable (Helm-provided) + + Stores the bus on ``app.state.event_bus`` so route handlers and the + teardown path can reach it. Returns the bus (or ``None`` if NATS is + not configured / unreachable). + + Failure here logs and returns ``None`` rather than raising — + the web process must boot even when NATS is down. The reflex/sensory + feature degrades; the rest of the API keeps working. + """ + resolved = nats_url or os.environ.get("NATS_URL", "") + if not resolved: + log.info("web.sensory_publisher_disabled", reason="NATS_URL not set") + app.state.event_bus = None + app.state._sensory_publishers = {} + return None + try: + from netcortex.thalamus import NatsEventBus + bus = NatsEventBus(resolved) + app.state.event_bus = bus + app.state._sensory_publishers = {} + log.info("web.sensory_publisher_ready", nats_url=resolved) + return bus + except Exception as exc: + log.warning("web.sensory_publisher_init_failed", error=str(exc)) + app.state.event_bus = None + app.state._sensory_publishers = {} + return None + + +def get_publisher(app: Any, source: str) -> "SensoryPublisher | None": + """Return a cached :class:`SensoryPublisher` for ``source``, or None. + + Cheap to call per-request: the first call for a given source builds + the publisher (which validates ``source`` against ``SENSORY_SOURCES`` + at construction); subsequent calls return the cached instance. + + Returns ``None`` when the bus isn't initialized — callers should + handle that as "publishing disabled, fall through to other side- + effects" rather than as an error condition. + + ``source`` is validated against the closed vocabulary even if the + bus is unavailable, so a typo in a webhook route still fails fast + in unit tests (we don't want "all webhook routes silently skip + publishing because of a typo nobody noticed"). + """ + if source not in SENSORY_SOURCES: + raise ValueError( + f"unknown sensory source {source!r}; " + f"add to SENSORY_SOURCES in netcortex.contracts.subjects " + f"and update docs/architecture/subjects.md" + ) + bus = getattr(app.state, "event_bus", None) + if bus is None: + return None + publishers: dict[str, Any] = getattr(app.state, "_sensory_publishers", None) or {} + if source not in publishers: + from netcortex.thalamus import SensoryPublisher + publishers[source] = SensoryPublisher(bus, source=source) + app.state._sensory_publishers = publishers + return publishers[source] + + +async def close_event_bus(app: Any) -> None: + """Flush and close the publisher bus. Safe to call multiple times. + + Called from the FastAPI lifespan teardown. Errors are logged and + swallowed — a connection that already died will fail to close + cleanly, and that should not block the shutdown sequence. + """ + bus = getattr(app.state, "event_bus", None) + if bus is None: + return + try: + await bus.close() + log.info("web.sensory_publisher_closed") + except Exception as exc: + log.warning("web.sensory_publisher_close_failed", error=str(exc)) + finally: + app.state.event_bus = None + app.state._sensory_publishers = {} + + +__all__ = ["init_event_bus", "get_publisher", "close_event_bus"] diff --git a/netcortex/webhooks/meraki.py b/netcortex/webhooks/meraki.py index cb0b699..b79c9da 100644 --- a/netcortex/webhooks/meraki.py +++ b/netcortex/webhooks/meraki.py @@ -14,11 +14,14 @@ import hashlib import hmac import json -from typing import Any +from typing import TYPE_CHECKING, Any import structlog from fastapi import BackgroundTasks, HTTPException, status +if TYPE_CHECKING: + from netcortex.thalamus import SensoryPublisher + log = structlog.get_logger(__name__) _SECRET_CACHE: dict[str, str] = {} # instance_name → shared_secret @@ -62,8 +65,26 @@ async def handle_meraki_webhook( body: bytes, signature_header: str | None, background_tasks: BackgroundTasks, + publisher: "SensoryPublisher | None" = None, ) -> dict[str, str]: - """Validate and enqueue a Meraki webhook event.""" + """Validate and process a Meraki webhook event. + + Two side effects, both best-effort and independent: + + 1. **Publish sensory events to NATS** (0.8.0-dev8 and later) so + reflex handlers and the episodic memory layer see the event in + seconds, not minutes. Driven by the + :mod:`netcortex.webhooks.meraki_events` mapper. + 2. **Trigger a targeted adapter sync** (pre-dev8 behavior) so the + Neo4j live-state graph reflects the change. This stays even + when the sensory side is wired in: graph sync re-derives + authoritative state, sensory events feed the reflex layer. + + The two paths are intentionally decoupled: a NATS hiccup must not + block the sync, and a misbehaving adapter must not block the + publish. Both run inside the request handler (publish is async + + fast; sync is in BackgroundTasks). + """ shared_secret = await _get_shared_secret(instance_name) if shared_secret is not None: @@ -101,7 +122,32 @@ async def handle_meraki_webhook( org_id=org_id, ) - # Targeted refresh: only re-sync the affected network if possible. + # ── Publish sensory events ──────────────────────────────────────────── + # Pure mapper from vendor dialect to our subjects taxonomy. The + # publisher itself is best-effort; per-event publish failures are + # logged inside :class:`SensoryPublisher` and don't propagate. + sensory_published = 0 + if publisher is not None: + try: + from netcortex.webhooks.meraki_events import map_meraki_payload + events = map_meraki_payload(payload) + for ev in events: + await publisher.publish( + ev.event_class, *ev.target_parts, payload=ev.payload + ) + sensory_published = len(events) + except Exception as exc: + # Mapper bugs or malformed payloads should not 5xx the + # webhook — they should log and let the sync trigger fire + # so live state still reconciles. + log.warning( + "webhook.meraki.publish_failed", + instance=instance_name, + event_type=event_type, + error=str(exc), + ) + + # ── Targeted adapter sync (pre-dev8 behavior, retained) ─────────────── background_tasks.add_task( _sync_meraki_network, instance_name=instance_name, @@ -114,6 +160,7 @@ async def handle_meraki_webhook( "status": "queued", "adapter": f"meraki/{instance_name}", "event_type": event_type, + "sensory_events_published": str(sensory_published), } diff --git a/netcortex/webhooks/meraki_events.py b/netcortex/webhooks/meraki_events.py new file mode 100644 index 0000000..b0b11ce --- /dev/null +++ b/netcortex/webhooks/meraki_events.py @@ -0,0 +1,385 @@ +"""Meraki webhook payload → sensory event mapper. + +Meraki Dashboard sends one webhook POST per alert with a top-level +``alertType`` and a vendor-specific ``alertData`` blob. This module is +the pure-function bridge from that vendor dialect to our closed +:data:`SENSORY_EVENT_CLASSES` vocabulary. + +Why pure functions +------------------ +The mapper has no I/O — no NATS, no Neo4j, no HTTP. It takes a parsed +payload and returns a list of structured events. That makes it trivial +to unit-test with sample payloads and equally easy to evolve when +Meraki adds a new alert type (just add a clause + a test). + +Coverage in 0.8.0-dev8 +---------------------- +We deliberately ship a small, high-value initial set: + +* **Port connectivity** (``port_connectivity``) → ``link_up`` / + ``link_down``. This is the headline use case: the same port flap that + the SNMP poller will detect in the next 30-second cycle now arrives + in 50–200ms via webhook, and the dedup store added in 0.8.0-dev3 + collapses the duplicate into a single ``:ReflexEvent``. +* **Switch port connection changed** — same mapping, treated as a + synonym (Meraki uses both names interchangeably). +* **IDS alert** (``ids_alerted``) → ``security_alert``. +* **Malware detected** → ``security_alert``. +* **Settings changed** / **Configuration change** → ``config_change``. +* **Uplink status change** → ``link_up`` / ``link_down`` (treating the + WAN uplink as an interface; matches existing reflex handler pattern). + +Everything else returns an empty list and emits a single +``webhook.meraki.unmapped_alert_type`` log line so we can spot new +types in production and add them in subsequent releases. We do +**not** publish a generic ``sensory.unknown.*`` event — that would be +unsubscribable noise. + +Target shape +------------ +Targets follow the existing convention from the SNMP publisher: + +* Port events: ``meraki:|`` (or + ``Port_`` when portName is absent). The ``meraki:`` prefix + matches how the Meraki adapter keys ``Device.id`` in Neo4j, so the + ``:AFFECTS`` edge resolver added in 0.8.0-dev7 matches against + ``Device.id`` and the edge lands cleanly. +* Device-level events: ``meraki:``. +* IDS events: ``meraki:|`` when client MAC is + known, otherwise ``meraki:``. +* Network-level events with no specific device: ``meraki:``. + +Payload echo +------------ +Every emitted event carries a normalized payload with: + +* ``device`` (human name when present, else serial-prefixed form) +* ``device_id`` (always ``meraki:``) +* ``interface`` (port name verbatim, original whitespace preserved) +* ``meraki_alert_type`` (the raw alertType for forensics) +* ``meraki_network_id`` (for network/org-level grouping queries) +* ``occurred_at`` (Meraki's ``occurredAt``, ISO-8601) + +The :class:`SensoryPublisher` will add ``source`` and ``recorded_at`` +automatically. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +import structlog + +log = structlog.get_logger(__name__) +_LOG = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class MerakiSensoryEvent: + """One sensory event derived from a Meraki webhook payload. + + Instances of this class are pure data — the mapper produces them, + the route consumes them and hands them to a + :class:`SensoryPublisher`. Tests assert on the dataclass equality + rather than on side effects. + """ + + event_class: str + target_parts: tuple[str, ...] + payload: dict[str, Any] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def map_meraki_payload(payload: dict[str, Any]) -> list[MerakiSensoryEvent]: + """Map one Meraki webhook payload to zero-or-more sensory events. + + Returns an empty list for: + + * unrecognized ``alertType`` values (logged once at INFO so we can + grow coverage based on real traffic) + * payloads where the required identifiers (``deviceSerial`` / + ``networkId``) are missing — without those we can't construct a + meaningful target and would only produce subjects that no one + subscribes to + """ + alert_type_raw = payload.get("alertType") or payload.get("alertTypeId") or "" + alert_type = str(alert_type_raw).strip() + if not alert_type: + log.info("webhook.meraki.no_alert_type", payload_keys=sorted(payload.keys())[:16]) + return [] + + handler = _ROUTE.get(_normalize_alert_type(alert_type)) + if handler is None: + log.info( + "webhook.meraki.unmapped_alert_type", + alert_type=alert_type, + payload_keys=sorted(payload.keys())[:16], + ) + return [] + return handler(payload, alert_type) + + +# --------------------------------------------------------------------------- +# Per-alert-type handlers +# --------------------------------------------------------------------------- + + +def _handle_port_connectivity( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """Port up/down events. + + Meraki sends ``alertData.currentValue`` and ``previousValue`` as + one of ``"Connected"`` / ``"Disconnected"``. We map on the new + state only — the reflex layer (and ``:ReflexEvent`` queries) tell + the up/down story over time without us baking the transition into + the event_class. + """ + alert_data = _alert_data(payload) + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + if not device_serial: + return [] + + port_name = ( + alert_data.get("portName") + or alert_data.get("port") + or (f"Port {alert_data['portNum']}" if alert_data.get("portNum") else None) + ) + if not port_name: + log.info( + "webhook.meraki.port_event_missing_port", + alert_type=alert_type, + device_serial=device_serial, + alert_data_keys=sorted(alert_data.keys())[:16], + ) + return [] + + current = str(alert_data.get("currentValue") or "").strip().lower() + if current == "connected": + event_class = "link_up" + elif current == "disconnected": + event_class = "link_down" + else: + log.info( + "webhook.meraki.port_event_unknown_state", + alert_type=alert_type, + device_serial=device_serial, + current_value=current, + ) + return [] + + device_id = f"meraki:{device_serial}" + device_name = payload.get("deviceName") or device_id + target_parts = (device_id, str(port_name)) + return [ + MerakiSensoryEvent( + event_class=event_class, + target_parts=target_parts, + payload={ + "device": device_name, + "device_id": device_id, + "interface": port_name, + "meraki_alert_type": alert_type, + "meraki_network_id": payload.get("networkId"), + "occurred_at": payload.get("occurredAt"), + "previous_state": alert_data.get("previousValue"), + "current_state": alert_data.get("currentValue"), + }, + ) + ] + + +def _handle_uplink_status( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """WAN uplink up/down — same mapping as a switch port, but the + "interface" is the uplink interface (``wan1`` / ``wan2`` / ``cellular``). + """ + alert_data = _alert_data(payload) + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + if not device_serial: + return [] + + uplink_name = ( + alert_data.get("uplinkName") + or alert_data.get("uplink") + or alert_data.get("interface") + or "wan" + ) + current = str(alert_data.get("currentValue") or alert_data.get("status") or "").strip().lower() + # Meraki uplink states observed in the wild: "active" / "ready" / + # "failed" / "not connected". We classify the binary down vs not-down. + if current in {"active", "ready", "connected", "up"}: + event_class = "link_up" + elif current in {"failed", "not connected", "disconnected", "down"}: + event_class = "link_down" + else: + log.info( + "webhook.meraki.uplink_unknown_state", + alert_type=alert_type, + device_serial=device_serial, + current_value=current, + ) + return [] + + device_id = f"meraki:{device_serial}" + device_name = payload.get("deviceName") or device_id + return [ + MerakiSensoryEvent( + event_class=event_class, + target_parts=(device_id, str(uplink_name)), + payload={ + "device": device_name, + "device_id": device_id, + "interface": uplink_name, + "meraki_alert_type": alert_type, + "meraki_network_id": payload.get("networkId"), + "occurred_at": payload.get("occurredAt"), + "previous_state": alert_data.get("previousValue"), + "current_state": alert_data.get("currentValue") or alert_data.get("status"), + }, + ) + ] + + +def _handle_ids_alert( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """IDS / IPS alerts from MX appliances.""" + alert_data = _alert_data(payload) + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + network_id = payload.get("networkId") or alert_data.get("networkId") + if not device_serial and not network_id: + return [] + + client_mac = ( + alert_data.get("clientMac") + or alert_data.get("srcMac") + or alert_data.get("src_mac") + ) + # IDS targets prefer device-scoped form when we have one, falling + # back to network-scoped. Including client MAC when known so the + # dedup key naturally distinguishes per-client incidents. + if device_serial: + device_id = f"meraki:{device_serial}" + target_parts: tuple[str, ...] = ( + (device_id, str(client_mac)) if client_mac else (device_id,) + ) + device_name = payload.get("deviceName") or device_id + else: + target_parts = (f"meraki:{network_id}",) if not client_mac else ( + f"meraki:{network_id}", str(client_mac) + ) + device_id = None + device_name = payload.get("networkName") or f"meraki:{network_id}" + + return [ + MerakiSensoryEvent( + event_class="security_alert", + target_parts=target_parts, + payload={ + "device": device_name, + "device_id": device_id, + "alertType": alert_type, + "meraki_alert_type": alert_type, + "meraki_network_id": network_id, + "clientMac": client_mac, + "deviceSerial": device_serial, + "networkId": network_id, + "occurred_at": payload.get("occurredAt"), + "signature": alert_data.get("signature") or alert_data.get("signatureId"), + "message": alert_data.get("message") or alert_data.get("description"), + "severity_raw": alert_data.get("severity"), + }, + ) + ] + + +def _handle_config_change( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """Settings/configuration change events.""" + alert_data = _alert_data(payload) + network_id = payload.get("networkId") or alert_data.get("networkId") + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + if not network_id and not device_serial: + return [] + + if device_serial: + device_id = f"meraki:{device_serial}" + device_name = payload.get("deviceName") or device_id + target_parts: tuple[str, ...] = (device_id,) + else: + device_id = None + device_name = payload.get("networkName") or f"meraki:{network_id}" + target_parts = (f"meraki:{network_id}",) + + return [ + MerakiSensoryEvent( + event_class="config_change", + target_parts=target_parts, + payload={ + "device": device_name, + "device_id": device_id, + "meraki_alert_type": alert_type, + "meraki_network_id": network_id, + "occurred_at": payload.get("occurredAt"), + "admin_email": alert_data.get("adminEmail") or alert_data.get("admin"), + "change_summary": alert_data.get("description") or alert_data.get("changes"), + }, + ) + ] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _alert_data(payload: dict[str, Any]) -> dict[str, Any]: + """Return ``payload['alertData']`` as a dict, or ``{}`` if absent / malformed. + + Meraki sometimes nests the useful fields under ``alertData`` and + sometimes lifts them to the top level. The handlers below tolerate + both by looking up keys in ``alertData`` first and the payload as + a fallback. + """ + data = payload.get("alertData") + return data if isinstance(data, dict) else {} + + +def _normalize_alert_type(alert_type: str) -> str: + """Fold Meraki's two equivalent naming conventions into one key. + + Meraki sends both human ("Port connectivity") and machine + ("port_connectivity") forms depending on the dashboard version and + the specific alert. We lowercase, strip, and collapse spaces to + underscores so a single ``_ROUTE`` entry handles both. + """ + return alert_type.strip().lower().replace(" ", "_") + + +# Mapping from normalized alert_type to handler. Edit this dict to add +# new coverage — every entry MUST have a corresponding test in +# tests/webhooks/test_meraki_events.py. +_ROUTE: dict[str, Any] = { + "port_connectivity": _handle_port_connectivity, + "switch_port_connection_changed": _handle_port_connectivity, + "uplink_status_change": _handle_uplink_status, + "uplink_status": _handle_uplink_status, + "ids_alerted": _handle_ids_alert, + "ids_alert": _handle_ids_alert, + "malware_detected": _handle_ids_alert, + "settings_changed": _handle_config_change, + "configuration_change": _handle_config_change, + "configuration_changes": _handle_config_change, +} + + +__all__ = ["MerakiSensoryEvent", "map_meraki_payload"] diff --git a/netcortex/webhooks/router.py b/netcortex/webhooks/router.py index a25376d..01a4e2d 100644 --- a/netcortex/webhooks/router.py +++ b/netcortex/webhooks/router.py @@ -24,6 +24,7 @@ from netcortex.webhooks.meraki import handle_meraki_webhook from netcortex.webhooks.catalyst_center import handle_catalyst_center_webhook +from netcortex.webhooks.event_publisher import get_publisher from netcortex.webhooks.telemetry import handle_telemetry_push, telemetry_event_stream log = structlog.get_logger(__name__) @@ -62,11 +63,16 @@ async def meraki_webhook( bytes=len(body), has_sig=x_cisco_meraki_signature is not None, ) + # Lazily fetch the per-app SensoryPublisher for the meraki_webhook + # source. Returns None when NATS_URL isn't configured — handler + # degrades to its pre-dev8 sync-only behavior in that case. + publisher = get_publisher(request.app, "meraki_webhook") result = await handle_meraki_webhook( instance_name=instance_name, body=body, signature_header=x_cisco_meraki_signature, background_tasks=background_tasks, + publisher=publisher, ) return result diff --git a/pyproject.toml b/pyproject.toml index f04fcea..d873398 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "netcortex" -version = "0.8.0.dev7" +version = "0.8.0.dev8" description = "The intelligence layer for your network — multi-dimensional graph of the network bridging Meraki, Catalyst Center, Intersight, and more with NetBox as SoT" readme = "README.md" requires-python = ">=3.12" diff --git a/tests/webhooks/__init__.py b/tests/webhooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/webhooks/conftest.py b/tests/webhooks/conftest.py new file mode 100644 index 0000000..c4dfe6c --- /dev/null +++ b/tests/webhooks/conftest.py @@ -0,0 +1,158 @@ +"""Shared fixtures for webhook integration tests. + +This is the first test module in the repo that drives FastAPI routes +end-to-end (previous tests stayed below the HTTP layer). The fixtures +here exist to keep that pattern consistent for the dev9/dev10/dev11 +vendor receivers that will land in subsequent releases. + +Design choices +-------------- +* We build a **minimal** FastAPI app per test, mounting only the + ``webhook_router``. This avoids dragging in the full ``netcortex.main`` + app, which needs Neo4j, Redis, the secret backend, MCP, etc. — none + of which a webhook route touches. + +* The bus is a small in-test :class:`CapturingEventBus` that records + every publish in a list. We deliberately do not use the production + :class:`NatsEventBus` (would need a NATS server) or the + :class:`InMemoryEventBus` from the contract tests (which is built for + async-iterator subscriptions — overkill here). The capturing bus + exercises the full validation path via :class:`SensoryPublisher` + while letting tests assert synchronously on what was published. + +* HMAC shared secrets are injected directly into the + :mod:`netcortex.webhooks.meraki` module-level cache so tests do not + reach the AWS Secrets Manager backend. + +* Adapter instances are patched to an empty dict so the + ``BackgroundTasks`` adapter-sync callback no-ops cleanly. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from netcortex.thalamus import SensoryPublisher +from netcortex.webhooks.router import router as webhook_router + + +class CapturingEventBus: + """Test-only :class:`EventBus` substitute that records every publish. + + Implements the same async ``publish`` / ``subscribe`` / ``close`` + surface so :class:`SensoryPublisher` accepts it. Subscribe is a + no-op iterator because the webhook tests assert on the publish + side only — runner/handler behavior is covered separately in + ``tests/reflex/`` against the real :class:`InMemoryEventBus`. + """ + + def __init__(self) -> None: + self.published: list[tuple[str, dict[str, Any]]] = [] + self.publish_failures: list[Exception] = [] + self.closed = False + + async def publish( + self, + subject: str, + payload: dict[str, Any], + *, + headers: dict[str, str] | None = None, + ) -> None: + if self.publish_failures: + raise self.publish_failures.pop(0) + self.published.append((subject, dict(payload))) + + async def subscribe(self, pattern: str) -> Any: # pragma: no cover - unused + async def _empty() -> Any: + if False: + yield None + return _empty() + + async def close(self) -> None: + self.closed = True + + +@pytest.fixture +def capturing_bus() -> CapturingEventBus: + return CapturingEventBus() + + +@pytest.fixture +def webhook_app(capturing_bus: CapturingEventBus) -> FastAPI: + """Minimal FastAPI app with just the webhook router and a captured bus.""" + app = FastAPI() + app.include_router(webhook_router) + app.state.event_bus = capturing_bus + app.state._sensory_publishers = { + "meraki_webhook": SensoryPublisher(capturing_bus, source="meraki_webhook"), + } + return app + + +@pytest.fixture +def webhook_client(webhook_app: FastAPI) -> TestClient: + """Synchronous TestClient against the minimal webhook app.""" + return TestClient(webhook_app) + + +@pytest.fixture +def webhook_app_no_bus() -> FastAPI: + """Variant with ``event_bus = None`` — exercises the + publisher-unavailable degradation path.""" + app = FastAPI() + app.include_router(webhook_router) + app.state.event_bus = None + app.state._sensory_publishers = {} + return app + + +@pytest.fixture +def webhook_client_no_bus(webhook_app_no_bus: FastAPI) -> TestClient: + return TestClient(webhook_app_no_bus) + + +@pytest.fixture +def meraki_shared_secret(monkeypatch: pytest.MonkeyPatch) -> str: + """Inject a known HMAC shared secret for instance 'TEST_TENANT'. + + Bypasses the AWS Secrets Manager backend entirely by pre-populating + the module-level ``_SECRET_CACHE`` dict. + """ + from netcortex.webhooks import meraki as meraki_module + + secret = "test-shared-secret-do-not-use-in-prod" # noqa: S105 — fixture-scoped test secret + monkeypatch.setitem(meraki_module._SECRET_CACHE, "TEST_TENANT", secret) + return secret + + +@pytest.fixture +def meraki_no_secret(monkeypatch: pytest.MonkeyPatch) -> None: + """Ensure ``_get_shared_secret`` returns None — exercises the + 'no signing key configured' degradation path (still accepts the + request but emits a warning).""" + from netcortex.webhooks import meraki as meraki_module + + async def _no_secret(_instance_name: str) -> str | None: + return None + + monkeypatch.setattr(meraki_module, "_get_shared_secret", _no_secret) + + +@pytest.fixture(autouse=True) +def stub_adapter_instances(monkeypatch: pytest.MonkeyPatch) -> None: + """Patch ``get_instances`` to return an empty mapping for every + webhook test, so the ``BackgroundTasks`` adapter-sync callback + finishes cleanly instead of trying to reach a real Meraki API. + """ + from netcortex.adapters import __init__ as adapters_init # noqa: F401 + + def _empty_instances() -> dict[str, Any]: + return {} + + monkeypatch.setattr( + "netcortex.adapters.get_instances", _empty_instances + ) diff --git a/tests/webhooks/test_meraki_events.py b/tests/webhooks/test_meraki_events.py new file mode 100644 index 0000000..63658ae --- /dev/null +++ b/tests/webhooks/test_meraki_events.py @@ -0,0 +1,403 @@ +"""Unit tests for the Meraki webhook → sensory event mapper. + +The mapper is a pure function so tests are blazingly fast and don't +need any fixtures beyond raw sample payloads. + +Sample payload shapes are intentionally varied — the mapper has to +handle Meraki's habit of sending the same field under different keys +depending on the alert type and dashboard version (``portName`` vs +``port``, ``adminEmail`` vs ``admin``, ``alertType`` vs ``alertTypeId``). + +All identifiers (deviceSerial, networkId) and email addresses are +fabricated and contain ``EXAMPLE`` or use RFC 5737 / RFC 2606 +documentation values where applicable. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from netcortex.webhooks.meraki_events import ( + MerakiSensoryEvent, + map_meraki_payload, +) + + +# --------------------------------------------------------------------------- +# Port connectivity (the headline dev8 use case — dedup overlap with SNMP) +# --------------------------------------------------------------------------- + + +def test_port_connectivity_disconnected_emits_link_down() -> None: + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "alertTypeId": "port_connectivity", + "networkId": "L_EXAMPLE001", + "networkName": "main-office", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "occurredAt": "2026-06-05T20:00:00Z", + "alertData": { + "portNum": "12", + "portName": "Port 12", + "previousValue": "Connected", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + ev = events[0] + assert ev.event_class == "link_down" + assert ev.target_parts == ("meraki:Q4CD-EXAM-PLE1", "Port 12") + assert ev.payload["device"] == "cpn-arlington-ms1" + assert ev.payload["device_id"] == "meraki:Q4CD-EXAM-PLE1" + assert ev.payload["interface"] == "Port 12" + assert ev.payload["meraki_alert_type"] == "Port connectivity" + assert ev.payload["meraki_network_id"] == "L_EXAMPLE001" + assert ev.payload["previous_state"] == "Connected" + assert ev.payload["current_state"] == "Disconnected" + + +def test_port_connectivity_connected_emits_link_up() -> None: + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 12", + "currentValue": "Connected", + "previousValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_up" + assert events[0].target_parts == ("meraki:Q4CD-EXAM-PLE1", "Port 12") + + +def test_port_connectivity_synonym_switch_port_connection_changed() -> None: + """Meraki uses two interchangeable names for the same alert.""" + payload: dict[str, Any] = { + "alertType": "Switch port connection changed", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 3", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_down" + + +def test_port_connectivity_builds_port_name_from_portnum_when_missing() -> None: + """Older payloads omit portName and only send numeric portNum.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portNum": "5", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].target_parts == ("meraki:Q4CD-EXAM-PLE1", "Port 5") + assert events[0].payload["interface"] == "Port 5" + + +def test_port_event_missing_device_serial_returns_empty() -> None: + """No way to construct a meaningful target without the device serial.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 1", + "currentValue": "Disconnected", + }, + } + assert map_meraki_payload(payload) == [] + + +def test_port_event_missing_port_returns_empty() -> None: + """No port identifier at all — skip rather than publish a malformed target.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"currentValue": "Disconnected"}, + } + assert map_meraki_payload(payload) == [] + + +def test_port_event_unknown_state_returns_empty() -> None: + """``currentValue`` of something we don't recognize is not silently mapped.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 1", + "currentValue": "PartiallyOnline", + }, + } + assert map_meraki_payload(payload) == [] + + +def test_port_event_device_name_falls_back_to_serial_form() -> None: + """When deviceName is missing, payload['device'] falls back to the + serial-prefixed form so the dev7 :AFFECTS resolver still matches + on ``Device.id`` (which uses the same ``meraki:`` form).""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 1", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].payload["device"] == "meraki:Q4CD-EXAM-PLE1" + assert events[0].payload["device_id"] == "meraki:Q4CD-EXAM-PLE1" + + +# --------------------------------------------------------------------------- +# Uplink status +# --------------------------------------------------------------------------- + + +def test_uplink_status_change_failed_emits_link_down() -> None: + payload: dict[str, Any] = { + "alertType": "Uplink status change", + "deviceSerial": "Q4XX-EXAM-PLE2", + "deviceName": "cpn-mx-arlington", + "networkId": "L_EXAMPLE001", + "alertData": { + "uplinkName": "wan1", + "currentValue": "failed", + "previousValue": "active", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_down" + assert events[0].target_parts == ("meraki:Q4XX-EXAM-PLE2", "wan1") + assert events[0].payload["interface"] == "wan1" + + +def test_uplink_status_change_active_emits_link_up() -> None: + payload: dict[str, Any] = { + "alertType": "Uplink status change", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": { + "uplinkName": "wan2", + "currentValue": "active", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_up" + + +# --------------------------------------------------------------------------- +# IDS / security +# --------------------------------------------------------------------------- + + +def test_ids_alerted_with_client_mac_emits_security_alert() -> None: + payload: dict[str, Any] = { + "alertType": "IDS alerted", + "alertTypeId": "ids_alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "deviceName": "cpn-mx-arlington", + "networkId": "L_EXAMPLE001", + "occurredAt": "2026-06-05T20:01:00Z", + "alertData": { + "clientMac": "aa:bb:cc:dd:ee:ff", + "signature": "1:2008725:1", + "message": "ET POLICY example", + "severity": "high", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + ev = events[0] + assert ev.event_class == "security_alert" + assert ev.target_parts == ("meraki:Q4XX-EXAM-PLE2", "aa:bb:cc:dd:ee:ff") + assert ev.payload["clientMac"] == "aa:bb:cc:dd:ee:ff" + assert ev.payload["signature"] == "1:2008725:1" + assert ev.payload["severity_raw"] == "high" + + +def test_ids_alerted_without_client_mac_uses_device_only_target() -> None: + payload: dict[str, Any] = { + "alertType": "IDS alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": {"signature": "1:2008725:1"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].target_parts == ("meraki:Q4XX-EXAM-PLE2",) + + +def test_malware_detected_maps_to_security_alert() -> None: + payload: dict[str, Any] = { + "alertType": "Malware detected", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": {"clientMac": "aa:bb:cc:dd:ee:ff"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "security_alert" + + +# --------------------------------------------------------------------------- +# Config changes +# --------------------------------------------------------------------------- + + +def test_settings_changed_at_network_level_emits_config_change() -> None: + payload: dict[str, Any] = { + "alertType": "Settings changed", + "networkId": "L_EXAMPLE001", + "networkName": "main-office", + "alertData": { + "adminEmail": "ops@example.com", + "description": "Updated SSID encryption", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + ev = events[0] + assert ev.event_class == "config_change" + assert ev.target_parts == ("meraki:L_EXAMPLE001",) + assert ev.payload["admin_email"] == "ops@example.com" + assert ev.payload["change_summary"] == "Updated SSID encryption" + + +def test_settings_changed_at_device_level_uses_device_target() -> None: + payload: dict[str, Any] = { + "alertType": "Configuration change", + "networkId": "L_EXAMPLE001", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "alertData": {"adminEmail": "ops@example.com"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].target_parts == ("meraki:Q4CD-EXAM-PLE1",) + assert events[0].payload["device"] == "cpn-arlington-ms1" + + +# --------------------------------------------------------------------------- +# Unmapped / malformed +# --------------------------------------------------------------------------- + + +def test_unknown_alert_type_returns_empty_list() -> None: + """Unknown alertTypes do not crash and do not silently publish.""" + payload: dict[str, Any] = { + "alertType": "Camera detected motion", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"foo": "bar"}, + } + assert map_meraki_payload(payload) == [] + + +def test_missing_alert_type_returns_empty_list() -> None: + """A payload with no alertType at all — never publish.""" + payload: dict[str, Any] = {"deviceSerial": "Q4CD-EXAM-PLE1"} + assert map_meraki_payload(payload) == [] + + +def test_alert_type_id_fallback_when_alert_type_absent() -> None: + """Some dashboard variants send only the snake_case ID.""" + payload: dict[str, Any] = { + "alertTypeId": "port_connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"portName": "Port 1", "currentValue": "Disconnected"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_down" + + +def test_alert_data_missing_returns_empty_for_port_event() -> None: + """Mapper must not raise on a payload that has no alertData.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + } + assert map_meraki_payload(payload) == [] + + +def test_alert_data_wrong_type_does_not_raise() -> None: + """alertData is a string by accident — handled gracefully.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": "not a dict", + } + assert map_meraki_payload(payload) == [] + + +# --------------------------------------------------------------------------- +# Subject builder compatibility +# --------------------------------------------------------------------------- + + +def test_emitted_events_compose_into_valid_sensory_subjects() -> None: + """Smoke-test: every emitted target_parts tuple must be acceptable to + :func:`sensory_subject` after the publisher's whitespace sanitization. + + The publisher (added in 0.8.0-dev6) sanitizes whitespace in target + parts but rejects dots; this guards against introducing an alert + type whose target parts contain dots.""" + from netcortex.contracts.subjects import sensory_subject + + payloads = [ + { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"portName": "Port 12", "currentValue": "Disconnected"}, + }, + { + "alertType": "IDS alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": {"clientMac": "aa:bb:cc:dd:ee:ff"}, + }, + { + "alertType": "Settings changed", + "networkId": "L_EXAMPLE001", + "alertData": {"adminEmail": "ops@example.com"}, + }, + ] + for p in payloads: + events = map_meraki_payload(p) + assert events, f"expected at least one event for {p['alertType']}" + for ev in events: + # Sanitize whitespace as the publisher would, then construct + # the subject. This must not raise. + sanitized = tuple(part.replace(" ", "_") for part in ev.target_parts) + subject = sensory_subject(ev.event_class, "meraki_webhook", *sanitized) + assert subject.startswith(f"sensory.{ev.event_class}.meraki_webhook.") + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/webhooks/test_meraki_webhook_route.py b/tests/webhooks/test_meraki_webhook_route.py new file mode 100644 index 0000000..0ce4792 --- /dev/null +++ b/tests/webhooks/test_meraki_webhook_route.py @@ -0,0 +1,344 @@ +"""End-to-end HTTP tests for the Meraki webhook route. + +These tests are the first in the repo that drive FastAPI routes via +:class:`TestClient`. The pattern (capturing bus + minimal app + secret +fixture) is intentionally generic so dev9/dev10/dev11 webhook +receivers can reuse it. + +Coverage +-------- +* HMAC verification: valid, invalid, missing signature header +* No-secret-configured degradation (logs warning, still publishes) +* Body parsing: valid JSON, malformed JSON +* End-to-end publish: a port-flap payload produces the expected + ``sensory.link_down.meraki_webhook.`` subject +* Mapper integration: unknown alertType returns 200 with zero + published events (sync trigger still queues) +* Publisher unavailable: app with ``event_bus = None`` still 200s + and routes the sync trigger correctly +* Bus failure: a transient publish error doesn't 5xx the webhook +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +from typing import Any + +import pytest +from fastapi.testclient import TestClient + +from tests.webhooks.conftest import CapturingEventBus + + +def _sign(body: bytes, secret: str) -> str: + return hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + + +def _port_disconnected_payload() -> dict[str, Any]: + """Realistic Meraki port-disconnect payload (sanitized identifiers).""" + return { + "alertType": "Port connectivity", + "alertTypeId": "port_connectivity", + "version": "0.1", + "sentAt": "2026-06-05T20:00:00Z", + "organizationId": "EXAMPLE_ORG", + "organizationName": "Example Org", + "networkId": "L_EXAMPLE001", + "networkName": "main-office", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "alertId": "test-alert-001", + "occurredAt": "2026-06-05T20:00:00Z", + "alertData": { + "portNum": "12", + "portName": "Port 12", + "previousValue": "Connected", + "currentValue": "Disconnected", + }, + } + + +# --------------------------------------------------------------------------- +# HMAC verification +# --------------------------------------------------------------------------- + + +def test_valid_hmac_returns_200_and_publishes( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + body = json.dumps(_port_disconnected_payload()).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + j = resp.json() + assert j["status"] == "queued" + assert j["adapter"] == "meraki/TEST_TENANT" + assert j["sensory_events_published"] == "1" + + assert len(capturing_bus.published) == 1 + subject, payload = capturing_bus.published[0] + # The publisher sanitizes whitespace in target parts (dev6); the + # original ``Port 12`` is preserved in the payload (dev6 contract). + assert subject == "sensory.link_down.meraki_webhook.meraki:Q4CD-EXAM-PLE1.Port_12" + assert payload["device"] == "cpn-arlington-ms1" + assert payload["device_id"] == "meraki:Q4CD-EXAM-PLE1" + assert payload["interface"] == "Port 12" + assert payload["meraki_alert_type"] == "Port connectivity" + assert payload["source"] == "meraki_webhook" # injected by publisher + + +def test_invalid_hmac_returns_401_no_publish( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + body = json.dumps(_port_disconnected_payload()).encode() + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": "deadbeef" * 8, + }, + ) + + assert resp.status_code == 401 + assert "Invalid Meraki webhook signature" in resp.text + assert capturing_bus.published == [] + + +def test_missing_signature_returns_401_no_publish( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + body = json.dumps(_port_disconnected_payload()).encode() + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 401 + assert capturing_bus.published == [] + + +def test_signature_prefix_sha256_accepted( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + """Newer Meraki firmware prefixes the signature with 'sha256='.""" + body = json.dumps(_port_disconnected_payload()).encode() + sig = "sha256=" + _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + assert len(capturing_bus.published) == 1 + + +# --------------------------------------------------------------------------- +# Body parsing +# --------------------------------------------------------------------------- + + +def test_malformed_json_returns_400( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_no_secret: None, +) -> None: + body = b"this is not json at all" + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 400 + assert capturing_bus.published == [] + + +# --------------------------------------------------------------------------- +# No-secret-configured degradation +# --------------------------------------------------------------------------- + + +def test_no_secret_configured_still_publishes_with_warning( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_no_secret: None, + caplog: pytest.LogCaptureFixture, +) -> None: + """When the per-tenant secret isn't in the backend, the handler + accepts the request and logs a loud warning. We still process the + event (operators bootstrapping the integration would otherwise + see no events at all and assume the pipeline is broken).""" + import logging + caplog.set_level(logging.WARNING) + body = json.dumps(_port_disconnected_payload()).encode() + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 200 + assert len(capturing_bus.published) == 1 + + +# --------------------------------------------------------------------------- +# Mapper integration +# --------------------------------------------------------------------------- + + +def test_unknown_alert_type_returns_200_with_zero_published( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + """Unmapped alertTypes still 200 (the adapter sync is still + valuable for ground-truth reconciliation) but produce no + sensory events.""" + payload: dict[str, Any] = { + "alertType": "Camera detected motion", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"foo": "bar"}, + } + body = json.dumps(payload).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + assert resp.json()["sensory_events_published"] == "0" + assert capturing_bus.published == [] + + +def test_ids_alerted_publishes_security_alert( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + payload: dict[str, Any] = { + "alertType": "IDS alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "deviceName": "cpn-mx-arlington", + "networkId": "L_EXAMPLE001", + "occurredAt": "2026-06-05T20:01:00Z", + "alertData": { + "clientMac": "aa:bb:cc:dd:ee:ff", + "signature": "1:2008725:1", + "message": "ET POLICY example", + "severity": "high", + }, + } + body = json.dumps(payload).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + assert len(capturing_bus.published) == 1 + subject, pub_payload = capturing_bus.published[0] + assert subject.startswith("sensory.security_alert.meraki_webhook.") + assert pub_payload["clientMac"] == "aa:bb:cc:dd:ee:ff" + + +# --------------------------------------------------------------------------- +# Publisher unavailable / failure degradation +# --------------------------------------------------------------------------- + + +def test_route_works_without_publisher_configured( + webhook_client_no_bus: TestClient, + meraki_shared_secret: str, +) -> None: + """When NATS_URL is unset (``app.state.event_bus = None``), the + route still accepts the webhook and queues the sync trigger. + Pre-dev8 behavior is preserved end-to-end.""" + body = json.dumps(_port_disconnected_payload()).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client_no_bus.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + # No publisher → no events counted, but request succeeded. + assert resp.json()["sensory_events_published"] == "0" + + +def test_bus_publish_failure_does_not_fail_request( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + """A transient NATS failure mid-publish must not 5xx the webhook; + the sync trigger is still queued. The SensoryPublisher swallows + the underlying bus error.""" + capturing_bus.publish_failures.append(RuntimeError("nats connection lost")) + body = json.dumps(_port_disconnected_payload()).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + # The publish failed and was swallowed inside SensoryPublisher; + # the handler still reports one event was attempted (the mapper + # produced one) so the count reflects mapper output, not bus + # success — operationally we have ``bus.published`` and + # ``sensory_publisher.publish_failed`` log lines to disambiguate. + assert resp.json()["sensory_events_published"] == "1" + assert capturing_bus.published == []