From 80fe22de057780d63a0dc821e737ce0b5b39edef Mon Sep 17 00:00:00 2001 From: Steve NCA Date: Fri, 5 Jun 2026 21:09:52 +0000 Subject: [PATCH] =?UTF-8?q?0.8.0-dev8:=20second=20sensory=20publisher=20?= =?UTF-8?q?=E2=80=94=20Meraki=20webhook=20=E2=86=92=20NATS?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit First webhook-side sensory publisher. The web process now publishes sensory.* events when Meraki Dashboard webhooks arrive, so a port flap detected at the network edge reaches the reflex pipeline in 50-200ms instead of waiting up to a full SNMP poll cycle (5 minutes). This is the first release where the dedup store added in dev3 actually does anything in production: the same port flap will now be observed by both the Meraki webhook (immediate) and the SNMP poller (next cycle), and the second arrival within the 60s dedup window collapses to a single :ReflexEvent. New abstractions (all swappable / additive): - webhooks/event_publisher.py: web-process bus coordination. One NatsEventBus per process; one cached SensoryPublisher per source. Graceful degradation when NATS_URL is unset. - webhooks/meraki_events.py: pure Meraki-dialect-to-our-taxonomy mapper. Initial coverage: Port connectivity (link_up/down), Switch port connection changed (synonym), Uplink status change, IDS alerted, Malware detected, Settings changed, Configuration change. - Target shape: meraki:| — matches Device.id in the live graph so the dev7 :AFFECTS edge resolver lands edges automatically. Wiring: - handle_meraki_webhook accepts optional publisher; runs mapper and publishes one event per mapped alert. Pre-dev8 sync-trigger behavior preserved when publisher absent. - main.py lifespan inits NATS bus on startup (best-effort), closes on shutdown. Web deployment already has NATS_URL via Helm. - Webhook response body adds sensory_events_published count for operator at-a-glance verification. Tests (28 new): - 18 mapper unit tests covering every supported alertType, both states, synonym handling, missing-field degradation, and a subject-builder compatibility smoke test. - 10 HTTP integration tests covering HMAC valid/invalid/missing, signature prefix variants, malformed JSON, no-secret degradation, unknown alertType, IDS publish, publisher-unavailable, and bus-failure-during-publish. - First FastAPI TestClient test module in the repo; pattern generalized via tests/webhooks/conftest.py for the dev9/10/11 vendor receivers to reuse. Co-authored-by: Cursor --- CHANGELOG.md | 103 +++++ netcortex/__init__.py | 2 +- netcortex/main.py | 20 + netcortex/webhooks/event_publisher.py | 141 +++++++ netcortex/webhooks/meraki.py | 53 ++- netcortex/webhooks/meraki_events.py | 385 +++++++++++++++++++ netcortex/webhooks/router.py | 6 + pyproject.toml | 2 +- tests/webhooks/__init__.py | 0 tests/webhooks/conftest.py | 158 ++++++++ tests/webhooks/test_meraki_events.py | 403 ++++++++++++++++++++ tests/webhooks/test_meraki_webhook_route.py | 344 +++++++++++++++++ 12 files changed, 1612 insertions(+), 5 deletions(-) create mode 100644 netcortex/webhooks/event_publisher.py create mode 100644 netcortex/webhooks/meraki_events.py create mode 100644 tests/webhooks/__init__.py create mode 100644 tests/webhooks/conftest.py create mode 100644 tests/webhooks/test_meraki_events.py create mode 100644 tests/webhooks/test_meraki_webhook_route.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 2e31997..f751d7e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -24,6 +24,109 @@ and this file MUST be updated together whenever `__version__` changes. --- +## [0.8.0-dev8] — Second sensory publisher: Meraki webhook → NATS + +First webhook-side sensory publisher. The web process now publishes +`sensory.*` events when Meraki Dashboard webhooks arrive, so a port +flap detected at the network edge reaches the reflex pipeline in +50–200ms instead of waiting up to a full SNMP poll cycle (5 minutes). + +This is also the first release where the dedup store added in +0.8.0-dev3 actually does anything in production: the same port flap +will now be observed by both the Meraki webhook (immediate) and the +SNMP poller (next cycle), and the second arrival within the 60s dedup +window collapses to a single `:ReflexEvent`. + +### Added +- `netcortex/webhooks/event_publisher.py` — web-process bus + coordination. One `NatsEventBus` per process on `app.state.event_bus`; + one cached `SensoryPublisher` per source token on + `app.state._sensory_publishers`. Graceful degradation when + `NATS_URL` is unset (publishing disabled, sync trigger still fires). +- `netcortex/webhooks/meraki_events.py` — pure mapper from Meraki's + vendor dialect (`alertType` + `alertData`) to our closed + `SENSORY_EVENT_CLASSES` vocabulary. Initial coverage: + - `Port connectivity` / `Switch port connection changed` → `link_up` / `link_down` + - `Uplink status change` → `link_up` / `link_down` + - `IDS alerted` / `Malware detected` → `security_alert` + - `Settings changed` / `Configuration change` → `config_change` + - Unknown alertTypes log `webhook.meraki.unmapped_alert_type` and skip + (so we grow coverage based on real traffic, not speculation) +- Target shape: `meraki:|` for port events, + `meraki:|` for IDS, etc. Uses the same + `meraki:` prefix as `Device.id` in the live graph so the dev7 + `:AFFECTS` edge resolver lands the edge automatically. + +### Changed +- `netcortex/webhooks/meraki.py::handle_meraki_webhook` accepts an + optional `publisher: SensoryPublisher | None` parameter. When + provided, the handler runs the mapper and publishes one event per + mapped alert. Pre-dev8 sync-trigger behavior is unchanged when + the publisher is absent. +- `netcortex/webhooks/router.py` resolves the publisher via + `get_publisher(request.app, "meraki_webhook")` and threads it + through to the handler. +- `netcortex/main.py` lifespan now initializes the NATS bus on + startup (`init_event_bus`) and closes it on shutdown + (`close_event_bus`). Both steps are best-effort — a missing + `NATS_URL` or transient outage degrades webhooks back to their + pre-dev8 sync-only behavior, never blocks startup. +- Webhook response body adds `sensory_events_published: ` so + operators can confirm at-a-glance which webhooks made it through + the publish path. + +### Tests +- `tests/webhooks/test_meraki_events.py` — 18 mapper unit tests + covering every supported alertType, both up and down states, + synonym handling, missing-field degradation, and a subject-builder + compatibility smoke test. +- `tests/webhooks/test_meraki_webhook_route.py` — 10 HTTP integration + tests covering HMAC valid/invalid/missing, signature prefix + compatibility, malformed JSON, no-secret-configured degradation, + unknown alertType, IDS publish, publisher-unavailable path, and + bus-failure-during-publish path. +- This is the first FastAPI `TestClient` test module in the repo; + pattern (capturing bus + minimal app + secret fixture) is + generalized via `conftest.py` so dev9/dev10/dev11 vendor + receivers can reuse it. + +### Operational notes +- The web deployment in `deploy/helm` already sets `NATS_URL` when + `nats.enabled` is true — no Helm change required. +- HMAC secrets continue to live at + `netcortex/webhooks/meraki/` in AWS Secrets + Manager (pre-existing storage path). +- A new Cypher query to find webhook-sourced events: + ```cypher + MATCH (e:ReflexEvent {source: 'meraki_webhook'}) + WHERE e.recorded_at > timestamp() - 3600000 + RETURN e.subject, e.target, e.outcome + ORDER BY e.observed_at_ms DESC + ``` +- To see dedup in action (webhook + SNMP poll observing the same + flap), join on target within a tight window: + ```cypher + MATCH (e1:ReflexEvent), (e2:ReflexEvent) + WHERE e1.target = e2.target + AND e1.source = 'meraki_webhook' + AND e2.source = 'snmp_poll' + AND e1.event_class = e2.event_class + AND abs(e1.observed_at_ms - e2.observed_at_ms) < 60000 + RETURN e1.target, e1.outcome AS webhook_outcome, e2.outcome AS snmp_outcome + ``` + In a well-dedupping system the SNMP-side outcome will be `skipped` + for any flap the webhook caught first. + +### Not in this release (deferred) +- `:ThousandEyes` / `:NexusDashboard` / `:cdFMC` webhook receivers + (dev9, dev10, dev11). +- `nexus_dashboard_webhook` and `fmc_webhook` source tokens (added + alongside their respective receivers). +- JetStream durable subscriptions for guaranteed at-least-once across + worker restarts. + +--- + ## [0.8.0-dev7] — :AFFECTS edge resolution + target/subject consistency Second small polish to the dev5 pipeline, surfaced by deployment diff --git a/netcortex/__init__.py b/netcortex/__init__.py index 75abd62..43231f5 100644 --- a/netcortex/__init__.py +++ b/netcortex/__init__.py @@ -22,4 +22,4 @@ ``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes. """ -__version__ = "0.8.0-dev7" +__version__ = "0.8.0-dev8" diff --git a/netcortex/main.py b/netcortex/main.py index dd98c4a..701c470 100644 --- a/netcortex/main.py +++ b/netcortex/main.py @@ -278,6 +278,21 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg] state.redis_status = redis_status state.redis_message = redis_msg + # ── NATS sensory publisher bus (0.8.0-dev8+) ────────────────────────── + # The web process publishes `sensory.*` events when webhooks arrive; + # the worker process owns the runner that consumes them. Bus init is + # best-effort: a missing NATS_URL or a transient NATS outage degrades + # webhooks back to their pre-dev8 sync-trigger behavior, which still + # reconciles the live-state graph. + nats_url = ( + getattr(cfg, "nats_url", "") if cfg else "" + ) or os.environ.get("NATS_URL", "") + try: + from netcortex.webhooks.event_publisher import init_event_bus + await init_event_bus(app, nats_url=nats_url or None) + except Exception as exc: + log.warning("netcortex.sensory_publisher_init_failed", error=str(exc)) + # ── Load and health-check adapters ─────────────────────────────────── if cfg: await _probe_adapters() @@ -332,6 +347,11 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg] # ── Shutdown ────────────────────────────────────────────────────────── refresh_adapter_task.cancel() refresh_graph_task.cancel() + try: + from netcortex.webhooks.event_publisher import close_event_bus + await close_event_bus(app) + except Exception as exc: + log.warning("netcortex.sensory_publisher_close_failed", error=str(exc)) await neo4j_client.close() log.info("netcortex.shutdown") diff --git a/netcortex/webhooks/event_publisher.py b/netcortex/webhooks/event_publisher.py new file mode 100644 index 0000000..dfab878 --- /dev/null +++ b/netcortex/webhooks/event_publisher.py @@ -0,0 +1,141 @@ +"""Web-process sensory publisher coordination. + +The web process needs to publish ``sensory.*`` events when webhooks +arrive from upstream platforms (Meraki, ThousandEyes, Nexus Dashboard, +cdFMC, …). The worker process owns the :class:`ReflexRunner` that +*consumes* those events; web only ever publishes. Keeping the two +responsibilities split means: + +* the web process stays a thin HTTP front (no dedup state, no Neo4j + sink, no handler scheduling — those all live in worker) +* webhook latency stays low: publish, return 200, done. The reflex + pipeline runs asynchronously on the worker subscription side. +* a worker outage does not lose webhook events — NATS holds them until + the worker reconnects (or, with JetStream later, persists them durably) + +Design +------ +One :class:`NatsEventBus` per process, cached on ``app.state.event_bus``. +One :class:`SensoryPublisher` per source token, lazily created and +cached on ``app.state._sensory_publishers``. A typo in a source name +fails at the first call (validated against ``SENSORY_SOURCES``) rather +than silently producing unsubscribable subjects. + +If ``NATS_URL`` is not configured the bus is ``None`` and +:func:`get_publisher` returns ``None``. Routes that try to publish in +that case should degrade silently (the existing sync-trigger behavior +remains intact — losing the sensory side is a recoverable annoyance, +not a data-loss bug). This mirrors the worker's +``_start_reflex_pipeline`` behavior from 0.8.0-dev5. + +Shutdown semantics +------------------ +:func:`close_event_bus` flushes pending publishes and closes the NATS +connection. Called from ``main.py``'s lifespan teardown. Idempotent. +""" + +from __future__ import annotations + +import os +from typing import TYPE_CHECKING, Any + +import structlog + +from netcortex.contracts.subjects import SENSORY_SOURCES + +if TYPE_CHECKING: + from fastapi import FastAPI + from netcortex.thalamus import NatsEventBus, SensoryPublisher + +log = structlog.get_logger(__name__) + + +async def init_event_bus(app: "FastAPI", nats_url: str | None = None) -> "NatsEventBus | None": + """Initialize the web process's NATS publisher bus. + + Resolves ``nats_url`` in this order: + 1. explicit ``nats_url`` argument (test injection) + 2. ``NATS_URL`` environment variable (Helm-provided) + + Stores the bus on ``app.state.event_bus`` so route handlers and the + teardown path can reach it. Returns the bus (or ``None`` if NATS is + not configured / unreachable). + + Failure here logs and returns ``None`` rather than raising — + the web process must boot even when NATS is down. The reflex/sensory + feature degrades; the rest of the API keeps working. + """ + resolved = nats_url or os.environ.get("NATS_URL", "") + if not resolved: + log.info("web.sensory_publisher_disabled", reason="NATS_URL not set") + app.state.event_bus = None + app.state._sensory_publishers = {} + return None + try: + from netcortex.thalamus import NatsEventBus + bus = NatsEventBus(resolved) + app.state.event_bus = bus + app.state._sensory_publishers = {} + log.info("web.sensory_publisher_ready", nats_url=resolved) + return bus + except Exception as exc: + log.warning("web.sensory_publisher_init_failed", error=str(exc)) + app.state.event_bus = None + app.state._sensory_publishers = {} + return None + + +def get_publisher(app: Any, source: str) -> "SensoryPublisher | None": + """Return a cached :class:`SensoryPublisher` for ``source``, or None. + + Cheap to call per-request: the first call for a given source builds + the publisher (which validates ``source`` against ``SENSORY_SOURCES`` + at construction); subsequent calls return the cached instance. + + Returns ``None`` when the bus isn't initialized — callers should + handle that as "publishing disabled, fall through to other side- + effects" rather than as an error condition. + + ``source`` is validated against the closed vocabulary even if the + bus is unavailable, so a typo in a webhook route still fails fast + in unit tests (we don't want "all webhook routes silently skip + publishing because of a typo nobody noticed"). + """ + if source not in SENSORY_SOURCES: + raise ValueError( + f"unknown sensory source {source!r}; " + f"add to SENSORY_SOURCES in netcortex.contracts.subjects " + f"and update docs/architecture/subjects.md" + ) + bus = getattr(app.state, "event_bus", None) + if bus is None: + return None + publishers: dict[str, Any] = getattr(app.state, "_sensory_publishers", None) or {} + if source not in publishers: + from netcortex.thalamus import SensoryPublisher + publishers[source] = SensoryPublisher(bus, source=source) + app.state._sensory_publishers = publishers + return publishers[source] + + +async def close_event_bus(app: Any) -> None: + """Flush and close the publisher bus. Safe to call multiple times. + + Called from the FastAPI lifespan teardown. Errors are logged and + swallowed — a connection that already died will fail to close + cleanly, and that should not block the shutdown sequence. + """ + bus = getattr(app.state, "event_bus", None) + if bus is None: + return + try: + await bus.close() + log.info("web.sensory_publisher_closed") + except Exception as exc: + log.warning("web.sensory_publisher_close_failed", error=str(exc)) + finally: + app.state.event_bus = None + app.state._sensory_publishers = {} + + +__all__ = ["init_event_bus", "get_publisher", "close_event_bus"] diff --git a/netcortex/webhooks/meraki.py b/netcortex/webhooks/meraki.py index cb0b699..b79c9da 100644 --- a/netcortex/webhooks/meraki.py +++ b/netcortex/webhooks/meraki.py @@ -14,11 +14,14 @@ import hashlib import hmac import json -from typing import Any +from typing import TYPE_CHECKING, Any import structlog from fastapi import BackgroundTasks, HTTPException, status +if TYPE_CHECKING: + from netcortex.thalamus import SensoryPublisher + log = structlog.get_logger(__name__) _SECRET_CACHE: dict[str, str] = {} # instance_name → shared_secret @@ -62,8 +65,26 @@ async def handle_meraki_webhook( body: bytes, signature_header: str | None, background_tasks: BackgroundTasks, + publisher: "SensoryPublisher | None" = None, ) -> dict[str, str]: - """Validate and enqueue a Meraki webhook event.""" + """Validate and process a Meraki webhook event. + + Two side effects, both best-effort and independent: + + 1. **Publish sensory events to NATS** (0.8.0-dev8 and later) so + reflex handlers and the episodic memory layer see the event in + seconds, not minutes. Driven by the + :mod:`netcortex.webhooks.meraki_events` mapper. + 2. **Trigger a targeted adapter sync** (pre-dev8 behavior) so the + Neo4j live-state graph reflects the change. This stays even + when the sensory side is wired in: graph sync re-derives + authoritative state, sensory events feed the reflex layer. + + The two paths are intentionally decoupled: a NATS hiccup must not + block the sync, and a misbehaving adapter must not block the + publish. Both run inside the request handler (publish is async + + fast; sync is in BackgroundTasks). + """ shared_secret = await _get_shared_secret(instance_name) if shared_secret is not None: @@ -101,7 +122,32 @@ async def handle_meraki_webhook( org_id=org_id, ) - # Targeted refresh: only re-sync the affected network if possible. + # ── Publish sensory events ──────────────────────────────────────────── + # Pure mapper from vendor dialect to our subjects taxonomy. The + # publisher itself is best-effort; per-event publish failures are + # logged inside :class:`SensoryPublisher` and don't propagate. + sensory_published = 0 + if publisher is not None: + try: + from netcortex.webhooks.meraki_events import map_meraki_payload + events = map_meraki_payload(payload) + for ev in events: + await publisher.publish( + ev.event_class, *ev.target_parts, payload=ev.payload + ) + sensory_published = len(events) + except Exception as exc: + # Mapper bugs or malformed payloads should not 5xx the + # webhook — they should log and let the sync trigger fire + # so live state still reconciles. + log.warning( + "webhook.meraki.publish_failed", + instance=instance_name, + event_type=event_type, + error=str(exc), + ) + + # ── Targeted adapter sync (pre-dev8 behavior, retained) ─────────────── background_tasks.add_task( _sync_meraki_network, instance_name=instance_name, @@ -114,6 +160,7 @@ async def handle_meraki_webhook( "status": "queued", "adapter": f"meraki/{instance_name}", "event_type": event_type, + "sensory_events_published": str(sensory_published), } diff --git a/netcortex/webhooks/meraki_events.py b/netcortex/webhooks/meraki_events.py new file mode 100644 index 0000000..b0b11ce --- /dev/null +++ b/netcortex/webhooks/meraki_events.py @@ -0,0 +1,385 @@ +"""Meraki webhook payload → sensory event mapper. + +Meraki Dashboard sends one webhook POST per alert with a top-level +``alertType`` and a vendor-specific ``alertData`` blob. This module is +the pure-function bridge from that vendor dialect to our closed +:data:`SENSORY_EVENT_CLASSES` vocabulary. + +Why pure functions +------------------ +The mapper has no I/O — no NATS, no Neo4j, no HTTP. It takes a parsed +payload and returns a list of structured events. That makes it trivial +to unit-test with sample payloads and equally easy to evolve when +Meraki adds a new alert type (just add a clause + a test). + +Coverage in 0.8.0-dev8 +---------------------- +We deliberately ship a small, high-value initial set: + +* **Port connectivity** (``port_connectivity``) → ``link_up`` / + ``link_down``. This is the headline use case: the same port flap that + the SNMP poller will detect in the next 30-second cycle now arrives + in 50–200ms via webhook, and the dedup store added in 0.8.0-dev3 + collapses the duplicate into a single ``:ReflexEvent``. +* **Switch port connection changed** — same mapping, treated as a + synonym (Meraki uses both names interchangeably). +* **IDS alert** (``ids_alerted``) → ``security_alert``. +* **Malware detected** → ``security_alert``. +* **Settings changed** / **Configuration change** → ``config_change``. +* **Uplink status change** → ``link_up`` / ``link_down`` (treating the + WAN uplink as an interface; matches existing reflex handler pattern). + +Everything else returns an empty list and emits a single +``webhook.meraki.unmapped_alert_type`` log line so we can spot new +types in production and add them in subsequent releases. We do +**not** publish a generic ``sensory.unknown.*`` event — that would be +unsubscribable noise. + +Target shape +------------ +Targets follow the existing convention from the SNMP publisher: + +* Port events: ``meraki:|`` (or + ``Port_`` when portName is absent). The ``meraki:`` prefix + matches how the Meraki adapter keys ``Device.id`` in Neo4j, so the + ``:AFFECTS`` edge resolver added in 0.8.0-dev7 matches against + ``Device.id`` and the edge lands cleanly. +* Device-level events: ``meraki:``. +* IDS events: ``meraki:|`` when client MAC is + known, otherwise ``meraki:``. +* Network-level events with no specific device: ``meraki:``. + +Payload echo +------------ +Every emitted event carries a normalized payload with: + +* ``device`` (human name when present, else serial-prefixed form) +* ``device_id`` (always ``meraki:``) +* ``interface`` (port name verbatim, original whitespace preserved) +* ``meraki_alert_type`` (the raw alertType for forensics) +* ``meraki_network_id`` (for network/org-level grouping queries) +* ``occurred_at`` (Meraki's ``occurredAt``, ISO-8601) + +The :class:`SensoryPublisher` will add ``source`` and ``recorded_at`` +automatically. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +import structlog + +log = structlog.get_logger(__name__) +_LOG = logging.getLogger(__name__) + + +@dataclass(frozen=True) +class MerakiSensoryEvent: + """One sensory event derived from a Meraki webhook payload. + + Instances of this class are pure data — the mapper produces them, + the route consumes them and hands them to a + :class:`SensoryPublisher`. Tests assert on the dataclass equality + rather than on side effects. + """ + + event_class: str + target_parts: tuple[str, ...] + payload: dict[str, Any] = field(default_factory=dict) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def map_meraki_payload(payload: dict[str, Any]) -> list[MerakiSensoryEvent]: + """Map one Meraki webhook payload to zero-or-more sensory events. + + Returns an empty list for: + + * unrecognized ``alertType`` values (logged once at INFO so we can + grow coverage based on real traffic) + * payloads where the required identifiers (``deviceSerial`` / + ``networkId``) are missing — without those we can't construct a + meaningful target and would only produce subjects that no one + subscribes to + """ + alert_type_raw = payload.get("alertType") or payload.get("alertTypeId") or "" + alert_type = str(alert_type_raw).strip() + if not alert_type: + log.info("webhook.meraki.no_alert_type", payload_keys=sorted(payload.keys())[:16]) + return [] + + handler = _ROUTE.get(_normalize_alert_type(alert_type)) + if handler is None: + log.info( + "webhook.meraki.unmapped_alert_type", + alert_type=alert_type, + payload_keys=sorted(payload.keys())[:16], + ) + return [] + return handler(payload, alert_type) + + +# --------------------------------------------------------------------------- +# Per-alert-type handlers +# --------------------------------------------------------------------------- + + +def _handle_port_connectivity( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """Port up/down events. + + Meraki sends ``alertData.currentValue`` and ``previousValue`` as + one of ``"Connected"`` / ``"Disconnected"``. We map on the new + state only — the reflex layer (and ``:ReflexEvent`` queries) tell + the up/down story over time without us baking the transition into + the event_class. + """ + alert_data = _alert_data(payload) + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + if not device_serial: + return [] + + port_name = ( + alert_data.get("portName") + or alert_data.get("port") + or (f"Port {alert_data['portNum']}" if alert_data.get("portNum") else None) + ) + if not port_name: + log.info( + "webhook.meraki.port_event_missing_port", + alert_type=alert_type, + device_serial=device_serial, + alert_data_keys=sorted(alert_data.keys())[:16], + ) + return [] + + current = str(alert_data.get("currentValue") or "").strip().lower() + if current == "connected": + event_class = "link_up" + elif current == "disconnected": + event_class = "link_down" + else: + log.info( + "webhook.meraki.port_event_unknown_state", + alert_type=alert_type, + device_serial=device_serial, + current_value=current, + ) + return [] + + device_id = f"meraki:{device_serial}" + device_name = payload.get("deviceName") or device_id + target_parts = (device_id, str(port_name)) + return [ + MerakiSensoryEvent( + event_class=event_class, + target_parts=target_parts, + payload={ + "device": device_name, + "device_id": device_id, + "interface": port_name, + "meraki_alert_type": alert_type, + "meraki_network_id": payload.get("networkId"), + "occurred_at": payload.get("occurredAt"), + "previous_state": alert_data.get("previousValue"), + "current_state": alert_data.get("currentValue"), + }, + ) + ] + + +def _handle_uplink_status( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """WAN uplink up/down — same mapping as a switch port, but the + "interface" is the uplink interface (``wan1`` / ``wan2`` / ``cellular``). + """ + alert_data = _alert_data(payload) + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + if not device_serial: + return [] + + uplink_name = ( + alert_data.get("uplinkName") + or alert_data.get("uplink") + or alert_data.get("interface") + or "wan" + ) + current = str(alert_data.get("currentValue") or alert_data.get("status") or "").strip().lower() + # Meraki uplink states observed in the wild: "active" / "ready" / + # "failed" / "not connected". We classify the binary down vs not-down. + if current in {"active", "ready", "connected", "up"}: + event_class = "link_up" + elif current in {"failed", "not connected", "disconnected", "down"}: + event_class = "link_down" + else: + log.info( + "webhook.meraki.uplink_unknown_state", + alert_type=alert_type, + device_serial=device_serial, + current_value=current, + ) + return [] + + device_id = f"meraki:{device_serial}" + device_name = payload.get("deviceName") or device_id + return [ + MerakiSensoryEvent( + event_class=event_class, + target_parts=(device_id, str(uplink_name)), + payload={ + "device": device_name, + "device_id": device_id, + "interface": uplink_name, + "meraki_alert_type": alert_type, + "meraki_network_id": payload.get("networkId"), + "occurred_at": payload.get("occurredAt"), + "previous_state": alert_data.get("previousValue"), + "current_state": alert_data.get("currentValue") or alert_data.get("status"), + }, + ) + ] + + +def _handle_ids_alert( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """IDS / IPS alerts from MX appliances.""" + alert_data = _alert_data(payload) + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + network_id = payload.get("networkId") or alert_data.get("networkId") + if not device_serial and not network_id: + return [] + + client_mac = ( + alert_data.get("clientMac") + or alert_data.get("srcMac") + or alert_data.get("src_mac") + ) + # IDS targets prefer device-scoped form when we have one, falling + # back to network-scoped. Including client MAC when known so the + # dedup key naturally distinguishes per-client incidents. + if device_serial: + device_id = f"meraki:{device_serial}" + target_parts: tuple[str, ...] = ( + (device_id, str(client_mac)) if client_mac else (device_id,) + ) + device_name = payload.get("deviceName") or device_id + else: + target_parts = (f"meraki:{network_id}",) if not client_mac else ( + f"meraki:{network_id}", str(client_mac) + ) + device_id = None + device_name = payload.get("networkName") or f"meraki:{network_id}" + + return [ + MerakiSensoryEvent( + event_class="security_alert", + target_parts=target_parts, + payload={ + "device": device_name, + "device_id": device_id, + "alertType": alert_type, + "meraki_alert_type": alert_type, + "meraki_network_id": network_id, + "clientMac": client_mac, + "deviceSerial": device_serial, + "networkId": network_id, + "occurred_at": payload.get("occurredAt"), + "signature": alert_data.get("signature") or alert_data.get("signatureId"), + "message": alert_data.get("message") or alert_data.get("description"), + "severity_raw": alert_data.get("severity"), + }, + ) + ] + + +def _handle_config_change( + payload: dict[str, Any], alert_type: str +) -> list[MerakiSensoryEvent]: + """Settings/configuration change events.""" + alert_data = _alert_data(payload) + network_id = payload.get("networkId") or alert_data.get("networkId") + device_serial = payload.get("deviceSerial") or alert_data.get("deviceSerial") + if not network_id and not device_serial: + return [] + + if device_serial: + device_id = f"meraki:{device_serial}" + device_name = payload.get("deviceName") or device_id + target_parts: tuple[str, ...] = (device_id,) + else: + device_id = None + device_name = payload.get("networkName") or f"meraki:{network_id}" + target_parts = (f"meraki:{network_id}",) + + return [ + MerakiSensoryEvent( + event_class="config_change", + target_parts=target_parts, + payload={ + "device": device_name, + "device_id": device_id, + "meraki_alert_type": alert_type, + "meraki_network_id": network_id, + "occurred_at": payload.get("occurredAt"), + "admin_email": alert_data.get("adminEmail") or alert_data.get("admin"), + "change_summary": alert_data.get("description") or alert_data.get("changes"), + }, + ) + ] + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _alert_data(payload: dict[str, Any]) -> dict[str, Any]: + """Return ``payload['alertData']`` as a dict, or ``{}`` if absent / malformed. + + Meraki sometimes nests the useful fields under ``alertData`` and + sometimes lifts them to the top level. The handlers below tolerate + both by looking up keys in ``alertData`` first and the payload as + a fallback. + """ + data = payload.get("alertData") + return data if isinstance(data, dict) else {} + + +def _normalize_alert_type(alert_type: str) -> str: + """Fold Meraki's two equivalent naming conventions into one key. + + Meraki sends both human ("Port connectivity") and machine + ("port_connectivity") forms depending on the dashboard version and + the specific alert. We lowercase, strip, and collapse spaces to + underscores so a single ``_ROUTE`` entry handles both. + """ + return alert_type.strip().lower().replace(" ", "_") + + +# Mapping from normalized alert_type to handler. Edit this dict to add +# new coverage — every entry MUST have a corresponding test in +# tests/webhooks/test_meraki_events.py. +_ROUTE: dict[str, Any] = { + "port_connectivity": _handle_port_connectivity, + "switch_port_connection_changed": _handle_port_connectivity, + "uplink_status_change": _handle_uplink_status, + "uplink_status": _handle_uplink_status, + "ids_alerted": _handle_ids_alert, + "ids_alert": _handle_ids_alert, + "malware_detected": _handle_ids_alert, + "settings_changed": _handle_config_change, + "configuration_change": _handle_config_change, + "configuration_changes": _handle_config_change, +} + + +__all__ = ["MerakiSensoryEvent", "map_meraki_payload"] diff --git a/netcortex/webhooks/router.py b/netcortex/webhooks/router.py index a25376d..01a4e2d 100644 --- a/netcortex/webhooks/router.py +++ b/netcortex/webhooks/router.py @@ -24,6 +24,7 @@ from netcortex.webhooks.meraki import handle_meraki_webhook from netcortex.webhooks.catalyst_center import handle_catalyst_center_webhook +from netcortex.webhooks.event_publisher import get_publisher from netcortex.webhooks.telemetry import handle_telemetry_push, telemetry_event_stream log = structlog.get_logger(__name__) @@ -62,11 +63,16 @@ async def meraki_webhook( bytes=len(body), has_sig=x_cisco_meraki_signature is not None, ) + # Lazily fetch the per-app SensoryPublisher for the meraki_webhook + # source. Returns None when NATS_URL isn't configured — handler + # degrades to its pre-dev8 sync-only behavior in that case. + publisher = get_publisher(request.app, "meraki_webhook") result = await handle_meraki_webhook( instance_name=instance_name, body=body, signature_header=x_cisco_meraki_signature, background_tasks=background_tasks, + publisher=publisher, ) return result diff --git a/pyproject.toml b/pyproject.toml index f04fcea..d873398 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "hatchling.build" [project] name = "netcortex" -version = "0.8.0.dev7" +version = "0.8.0.dev8" description = "The intelligence layer for your network — multi-dimensional graph of the network bridging Meraki, Catalyst Center, Intersight, and more with NetBox as SoT" readme = "README.md" requires-python = ">=3.12" diff --git a/tests/webhooks/__init__.py b/tests/webhooks/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/webhooks/conftest.py b/tests/webhooks/conftest.py new file mode 100644 index 0000000..c4dfe6c --- /dev/null +++ b/tests/webhooks/conftest.py @@ -0,0 +1,158 @@ +"""Shared fixtures for webhook integration tests. + +This is the first test module in the repo that drives FastAPI routes +end-to-end (previous tests stayed below the HTTP layer). The fixtures +here exist to keep that pattern consistent for the dev9/dev10/dev11 +vendor receivers that will land in subsequent releases. + +Design choices +-------------- +* We build a **minimal** FastAPI app per test, mounting only the + ``webhook_router``. This avoids dragging in the full ``netcortex.main`` + app, which needs Neo4j, Redis, the secret backend, MCP, etc. — none + of which a webhook route touches. + +* The bus is a small in-test :class:`CapturingEventBus` that records + every publish in a list. We deliberately do not use the production + :class:`NatsEventBus` (would need a NATS server) or the + :class:`InMemoryEventBus` from the contract tests (which is built for + async-iterator subscriptions — overkill here). The capturing bus + exercises the full validation path via :class:`SensoryPublisher` + while letting tests assert synchronously on what was published. + +* HMAC shared secrets are injected directly into the + :mod:`netcortex.webhooks.meraki` module-level cache so tests do not + reach the AWS Secrets Manager backend. + +* Adapter instances are patched to an empty dict so the + ``BackgroundTasks`` adapter-sync callback no-ops cleanly. +""" + +from __future__ import annotations + +from typing import Any + +import pytest +from fastapi import FastAPI +from fastapi.testclient import TestClient + +from netcortex.thalamus import SensoryPublisher +from netcortex.webhooks.router import router as webhook_router + + +class CapturingEventBus: + """Test-only :class:`EventBus` substitute that records every publish. + + Implements the same async ``publish`` / ``subscribe`` / ``close`` + surface so :class:`SensoryPublisher` accepts it. Subscribe is a + no-op iterator because the webhook tests assert on the publish + side only — runner/handler behavior is covered separately in + ``tests/reflex/`` against the real :class:`InMemoryEventBus`. + """ + + def __init__(self) -> None: + self.published: list[tuple[str, dict[str, Any]]] = [] + self.publish_failures: list[Exception] = [] + self.closed = False + + async def publish( + self, + subject: str, + payload: dict[str, Any], + *, + headers: dict[str, str] | None = None, + ) -> None: + if self.publish_failures: + raise self.publish_failures.pop(0) + self.published.append((subject, dict(payload))) + + async def subscribe(self, pattern: str) -> Any: # pragma: no cover - unused + async def _empty() -> Any: + if False: + yield None + return _empty() + + async def close(self) -> None: + self.closed = True + + +@pytest.fixture +def capturing_bus() -> CapturingEventBus: + return CapturingEventBus() + + +@pytest.fixture +def webhook_app(capturing_bus: CapturingEventBus) -> FastAPI: + """Minimal FastAPI app with just the webhook router and a captured bus.""" + app = FastAPI() + app.include_router(webhook_router) + app.state.event_bus = capturing_bus + app.state._sensory_publishers = { + "meraki_webhook": SensoryPublisher(capturing_bus, source="meraki_webhook"), + } + return app + + +@pytest.fixture +def webhook_client(webhook_app: FastAPI) -> TestClient: + """Synchronous TestClient against the minimal webhook app.""" + return TestClient(webhook_app) + + +@pytest.fixture +def webhook_app_no_bus() -> FastAPI: + """Variant with ``event_bus = None`` — exercises the + publisher-unavailable degradation path.""" + app = FastAPI() + app.include_router(webhook_router) + app.state.event_bus = None + app.state._sensory_publishers = {} + return app + + +@pytest.fixture +def webhook_client_no_bus(webhook_app_no_bus: FastAPI) -> TestClient: + return TestClient(webhook_app_no_bus) + + +@pytest.fixture +def meraki_shared_secret(monkeypatch: pytest.MonkeyPatch) -> str: + """Inject a known HMAC shared secret for instance 'TEST_TENANT'. + + Bypasses the AWS Secrets Manager backend entirely by pre-populating + the module-level ``_SECRET_CACHE`` dict. + """ + from netcortex.webhooks import meraki as meraki_module + + secret = "test-shared-secret-do-not-use-in-prod" # noqa: S105 — fixture-scoped test secret + monkeypatch.setitem(meraki_module._SECRET_CACHE, "TEST_TENANT", secret) + return secret + + +@pytest.fixture +def meraki_no_secret(monkeypatch: pytest.MonkeyPatch) -> None: + """Ensure ``_get_shared_secret`` returns None — exercises the + 'no signing key configured' degradation path (still accepts the + request but emits a warning).""" + from netcortex.webhooks import meraki as meraki_module + + async def _no_secret(_instance_name: str) -> str | None: + return None + + monkeypatch.setattr(meraki_module, "_get_shared_secret", _no_secret) + + +@pytest.fixture(autouse=True) +def stub_adapter_instances(monkeypatch: pytest.MonkeyPatch) -> None: + """Patch ``get_instances`` to return an empty mapping for every + webhook test, so the ``BackgroundTasks`` adapter-sync callback + finishes cleanly instead of trying to reach a real Meraki API. + """ + from netcortex.adapters import __init__ as adapters_init # noqa: F401 + + def _empty_instances() -> dict[str, Any]: + return {} + + monkeypatch.setattr( + "netcortex.adapters.get_instances", _empty_instances + ) diff --git a/tests/webhooks/test_meraki_events.py b/tests/webhooks/test_meraki_events.py new file mode 100644 index 0000000..63658ae --- /dev/null +++ b/tests/webhooks/test_meraki_events.py @@ -0,0 +1,403 @@ +"""Unit tests for the Meraki webhook → sensory event mapper. + +The mapper is a pure function so tests are blazingly fast and don't +need any fixtures beyond raw sample payloads. + +Sample payload shapes are intentionally varied — the mapper has to +handle Meraki's habit of sending the same field under different keys +depending on the alert type and dashboard version (``portName`` vs +``port``, ``adminEmail`` vs ``admin``, ``alertType`` vs ``alertTypeId``). + +All identifiers (deviceSerial, networkId) and email addresses are +fabricated and contain ``EXAMPLE`` or use RFC 5737 / RFC 2606 +documentation values where applicable. +""" + +from __future__ import annotations + +from typing import Any + +import pytest + +from netcortex.webhooks.meraki_events import ( + MerakiSensoryEvent, + map_meraki_payload, +) + + +# --------------------------------------------------------------------------- +# Port connectivity (the headline dev8 use case — dedup overlap with SNMP) +# --------------------------------------------------------------------------- + + +def test_port_connectivity_disconnected_emits_link_down() -> None: + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "alertTypeId": "port_connectivity", + "networkId": "L_EXAMPLE001", + "networkName": "main-office", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "occurredAt": "2026-06-05T20:00:00Z", + "alertData": { + "portNum": "12", + "portName": "Port 12", + "previousValue": "Connected", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + ev = events[0] + assert ev.event_class == "link_down" + assert ev.target_parts == ("meraki:Q4CD-EXAM-PLE1", "Port 12") + assert ev.payload["device"] == "cpn-arlington-ms1" + assert ev.payload["device_id"] == "meraki:Q4CD-EXAM-PLE1" + assert ev.payload["interface"] == "Port 12" + assert ev.payload["meraki_alert_type"] == "Port connectivity" + assert ev.payload["meraki_network_id"] == "L_EXAMPLE001" + assert ev.payload["previous_state"] == "Connected" + assert ev.payload["current_state"] == "Disconnected" + + +def test_port_connectivity_connected_emits_link_up() -> None: + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 12", + "currentValue": "Connected", + "previousValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_up" + assert events[0].target_parts == ("meraki:Q4CD-EXAM-PLE1", "Port 12") + + +def test_port_connectivity_synonym_switch_port_connection_changed() -> None: + """Meraki uses two interchangeable names for the same alert.""" + payload: dict[str, Any] = { + "alertType": "Switch port connection changed", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 3", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_down" + + +def test_port_connectivity_builds_port_name_from_portnum_when_missing() -> None: + """Older payloads omit portName and only send numeric portNum.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portNum": "5", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].target_parts == ("meraki:Q4CD-EXAM-PLE1", "Port 5") + assert events[0].payload["interface"] == "Port 5" + + +def test_port_event_missing_device_serial_returns_empty() -> None: + """No way to construct a meaningful target without the device serial.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 1", + "currentValue": "Disconnected", + }, + } + assert map_meraki_payload(payload) == [] + + +def test_port_event_missing_port_returns_empty() -> None: + """No port identifier at all — skip rather than publish a malformed target.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"currentValue": "Disconnected"}, + } + assert map_meraki_payload(payload) == [] + + +def test_port_event_unknown_state_returns_empty() -> None: + """``currentValue`` of something we don't recognize is not silently mapped.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 1", + "currentValue": "PartiallyOnline", + }, + } + assert map_meraki_payload(payload) == [] + + +def test_port_event_device_name_falls_back_to_serial_form() -> None: + """When deviceName is missing, payload['device'] falls back to the + serial-prefixed form so the dev7 :AFFECTS resolver still matches + on ``Device.id`` (which uses the same ``meraki:`` form).""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": { + "portName": "Port 1", + "currentValue": "Disconnected", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].payload["device"] == "meraki:Q4CD-EXAM-PLE1" + assert events[0].payload["device_id"] == "meraki:Q4CD-EXAM-PLE1" + + +# --------------------------------------------------------------------------- +# Uplink status +# --------------------------------------------------------------------------- + + +def test_uplink_status_change_failed_emits_link_down() -> None: + payload: dict[str, Any] = { + "alertType": "Uplink status change", + "deviceSerial": "Q4XX-EXAM-PLE2", + "deviceName": "cpn-mx-arlington", + "networkId": "L_EXAMPLE001", + "alertData": { + "uplinkName": "wan1", + "currentValue": "failed", + "previousValue": "active", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_down" + assert events[0].target_parts == ("meraki:Q4XX-EXAM-PLE2", "wan1") + assert events[0].payload["interface"] == "wan1" + + +def test_uplink_status_change_active_emits_link_up() -> None: + payload: dict[str, Any] = { + "alertType": "Uplink status change", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": { + "uplinkName": "wan2", + "currentValue": "active", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_up" + + +# --------------------------------------------------------------------------- +# IDS / security +# --------------------------------------------------------------------------- + + +def test_ids_alerted_with_client_mac_emits_security_alert() -> None: + payload: dict[str, Any] = { + "alertType": "IDS alerted", + "alertTypeId": "ids_alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "deviceName": "cpn-mx-arlington", + "networkId": "L_EXAMPLE001", + "occurredAt": "2026-06-05T20:01:00Z", + "alertData": { + "clientMac": "aa:bb:cc:dd:ee:ff", + "signature": "1:2008725:1", + "message": "ET POLICY example", + "severity": "high", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + ev = events[0] + assert ev.event_class == "security_alert" + assert ev.target_parts == ("meraki:Q4XX-EXAM-PLE2", "aa:bb:cc:dd:ee:ff") + assert ev.payload["clientMac"] == "aa:bb:cc:dd:ee:ff" + assert ev.payload["signature"] == "1:2008725:1" + assert ev.payload["severity_raw"] == "high" + + +def test_ids_alerted_without_client_mac_uses_device_only_target() -> None: + payload: dict[str, Any] = { + "alertType": "IDS alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": {"signature": "1:2008725:1"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].target_parts == ("meraki:Q4XX-EXAM-PLE2",) + + +def test_malware_detected_maps_to_security_alert() -> None: + payload: dict[str, Any] = { + "alertType": "Malware detected", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": {"clientMac": "aa:bb:cc:dd:ee:ff"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "security_alert" + + +# --------------------------------------------------------------------------- +# Config changes +# --------------------------------------------------------------------------- + + +def test_settings_changed_at_network_level_emits_config_change() -> None: + payload: dict[str, Any] = { + "alertType": "Settings changed", + "networkId": "L_EXAMPLE001", + "networkName": "main-office", + "alertData": { + "adminEmail": "ops@example.com", + "description": "Updated SSID encryption", + }, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + ev = events[0] + assert ev.event_class == "config_change" + assert ev.target_parts == ("meraki:L_EXAMPLE001",) + assert ev.payload["admin_email"] == "ops@example.com" + assert ev.payload["change_summary"] == "Updated SSID encryption" + + +def test_settings_changed_at_device_level_uses_device_target() -> None: + payload: dict[str, Any] = { + "alertType": "Configuration change", + "networkId": "L_EXAMPLE001", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "alertData": {"adminEmail": "ops@example.com"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].target_parts == ("meraki:Q4CD-EXAM-PLE1",) + assert events[0].payload["device"] == "cpn-arlington-ms1" + + +# --------------------------------------------------------------------------- +# Unmapped / malformed +# --------------------------------------------------------------------------- + + +def test_unknown_alert_type_returns_empty_list() -> None: + """Unknown alertTypes do not crash and do not silently publish.""" + payload: dict[str, Any] = { + "alertType": "Camera detected motion", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"foo": "bar"}, + } + assert map_meraki_payload(payload) == [] + + +def test_missing_alert_type_returns_empty_list() -> None: + """A payload with no alertType at all — never publish.""" + payload: dict[str, Any] = {"deviceSerial": "Q4CD-EXAM-PLE1"} + assert map_meraki_payload(payload) == [] + + +def test_alert_type_id_fallback_when_alert_type_absent() -> None: + """Some dashboard variants send only the snake_case ID.""" + payload: dict[str, Any] = { + "alertTypeId": "port_connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"portName": "Port 1", "currentValue": "Disconnected"}, + } + events = map_meraki_payload(payload) + assert len(events) == 1 + assert events[0].event_class == "link_down" + + +def test_alert_data_missing_returns_empty_for_port_event() -> None: + """Mapper must not raise on a payload that has no alertData.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + } + assert map_meraki_payload(payload) == [] + + +def test_alert_data_wrong_type_does_not_raise() -> None: + """alertData is a string by accident — handled gracefully.""" + payload: dict[str, Any] = { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": "not a dict", + } + assert map_meraki_payload(payload) == [] + + +# --------------------------------------------------------------------------- +# Subject builder compatibility +# --------------------------------------------------------------------------- + + +def test_emitted_events_compose_into_valid_sensory_subjects() -> None: + """Smoke-test: every emitted target_parts tuple must be acceptable to + :func:`sensory_subject` after the publisher's whitespace sanitization. + + The publisher (added in 0.8.0-dev6) sanitizes whitespace in target + parts but rejects dots; this guards against introducing an alert + type whose target parts contain dots.""" + from netcortex.contracts.subjects import sensory_subject + + payloads = [ + { + "alertType": "Port connectivity", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"portName": "Port 12", "currentValue": "Disconnected"}, + }, + { + "alertType": "IDS alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "networkId": "L_EXAMPLE001", + "alertData": {"clientMac": "aa:bb:cc:dd:ee:ff"}, + }, + { + "alertType": "Settings changed", + "networkId": "L_EXAMPLE001", + "alertData": {"adminEmail": "ops@example.com"}, + }, + ] + for p in payloads: + events = map_meraki_payload(p) + assert events, f"expected at least one event for {p['alertType']}" + for ev in events: + # Sanitize whitespace as the publisher would, then construct + # the subject. This must not raise. + sanitized = tuple(part.replace(" ", "_") for part in ev.target_parts) + subject = sensory_subject(ev.event_class, "meraki_webhook", *sanitized) + assert subject.startswith(f"sensory.{ev.event_class}.meraki_webhook.") + + +if __name__ == "__main__": + pytest.main([__file__, "-v"]) diff --git a/tests/webhooks/test_meraki_webhook_route.py b/tests/webhooks/test_meraki_webhook_route.py new file mode 100644 index 0000000..0ce4792 --- /dev/null +++ b/tests/webhooks/test_meraki_webhook_route.py @@ -0,0 +1,344 @@ +"""End-to-end HTTP tests for the Meraki webhook route. + +These tests are the first in the repo that drive FastAPI routes via +:class:`TestClient`. The pattern (capturing bus + minimal app + secret +fixture) is intentionally generic so dev9/dev10/dev11 webhook +receivers can reuse it. + +Coverage +-------- +* HMAC verification: valid, invalid, missing signature header +* No-secret-configured degradation (logs warning, still publishes) +* Body parsing: valid JSON, malformed JSON +* End-to-end publish: a port-flap payload produces the expected + ``sensory.link_down.meraki_webhook.`` subject +* Mapper integration: unknown alertType returns 200 with zero + published events (sync trigger still queues) +* Publisher unavailable: app with ``event_bus = None`` still 200s + and routes the sync trigger correctly +* Bus failure: a transient publish error doesn't 5xx the webhook +""" + +from __future__ import annotations + +import hashlib +import hmac +import json +from typing import Any + +import pytest +from fastapi.testclient import TestClient + +from tests.webhooks.conftest import CapturingEventBus + + +def _sign(body: bytes, secret: str) -> str: + return hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + + +def _port_disconnected_payload() -> dict[str, Any]: + """Realistic Meraki port-disconnect payload (sanitized identifiers).""" + return { + "alertType": "Port connectivity", + "alertTypeId": "port_connectivity", + "version": "0.1", + "sentAt": "2026-06-05T20:00:00Z", + "organizationId": "EXAMPLE_ORG", + "organizationName": "Example Org", + "networkId": "L_EXAMPLE001", + "networkName": "main-office", + "deviceSerial": "Q4CD-EXAM-PLE1", + "deviceName": "cpn-arlington-ms1", + "alertId": "test-alert-001", + "occurredAt": "2026-06-05T20:00:00Z", + "alertData": { + "portNum": "12", + "portName": "Port 12", + "previousValue": "Connected", + "currentValue": "Disconnected", + }, + } + + +# --------------------------------------------------------------------------- +# HMAC verification +# --------------------------------------------------------------------------- + + +def test_valid_hmac_returns_200_and_publishes( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + body = json.dumps(_port_disconnected_payload()).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + j = resp.json() + assert j["status"] == "queued" + assert j["adapter"] == "meraki/TEST_TENANT" + assert j["sensory_events_published"] == "1" + + assert len(capturing_bus.published) == 1 + subject, payload = capturing_bus.published[0] + # The publisher sanitizes whitespace in target parts (dev6); the + # original ``Port 12`` is preserved in the payload (dev6 contract). + assert subject == "sensory.link_down.meraki_webhook.meraki:Q4CD-EXAM-PLE1.Port_12" + assert payload["device"] == "cpn-arlington-ms1" + assert payload["device_id"] == "meraki:Q4CD-EXAM-PLE1" + assert payload["interface"] == "Port 12" + assert payload["meraki_alert_type"] == "Port connectivity" + assert payload["source"] == "meraki_webhook" # injected by publisher + + +def test_invalid_hmac_returns_401_no_publish( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + body = json.dumps(_port_disconnected_payload()).encode() + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": "deadbeef" * 8, + }, + ) + + assert resp.status_code == 401 + assert "Invalid Meraki webhook signature" in resp.text + assert capturing_bus.published == [] + + +def test_missing_signature_returns_401_no_publish( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + body = json.dumps(_port_disconnected_payload()).encode() + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 401 + assert capturing_bus.published == [] + + +def test_signature_prefix_sha256_accepted( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + """Newer Meraki firmware prefixes the signature with 'sha256='.""" + body = json.dumps(_port_disconnected_payload()).encode() + sig = "sha256=" + _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + assert len(capturing_bus.published) == 1 + + +# --------------------------------------------------------------------------- +# Body parsing +# --------------------------------------------------------------------------- + + +def test_malformed_json_returns_400( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_no_secret: None, +) -> None: + body = b"this is not json at all" + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 400 + assert capturing_bus.published == [] + + +# --------------------------------------------------------------------------- +# No-secret-configured degradation +# --------------------------------------------------------------------------- + + +def test_no_secret_configured_still_publishes_with_warning( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_no_secret: None, + caplog: pytest.LogCaptureFixture, +) -> None: + """When the per-tenant secret isn't in the backend, the handler + accepts the request and logs a loud warning. We still process the + event (operators bootstrapping the integration would otherwise + see no events at all and assume the pipeline is broken).""" + import logging + caplog.set_level(logging.WARNING) + body = json.dumps(_port_disconnected_payload()).encode() + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={"Content-Type": "application/json"}, + ) + + assert resp.status_code == 200 + assert len(capturing_bus.published) == 1 + + +# --------------------------------------------------------------------------- +# Mapper integration +# --------------------------------------------------------------------------- + + +def test_unknown_alert_type_returns_200_with_zero_published( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + """Unmapped alertTypes still 200 (the adapter sync is still + valuable for ground-truth reconciliation) but produce no + sensory events.""" + payload: dict[str, Any] = { + "alertType": "Camera detected motion", + "deviceSerial": "Q4CD-EXAM-PLE1", + "networkId": "L_EXAMPLE001", + "alertData": {"foo": "bar"}, + } + body = json.dumps(payload).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + assert resp.json()["sensory_events_published"] == "0" + assert capturing_bus.published == [] + + +def test_ids_alerted_publishes_security_alert( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + payload: dict[str, Any] = { + "alertType": "IDS alerted", + "deviceSerial": "Q4XX-EXAM-PLE2", + "deviceName": "cpn-mx-arlington", + "networkId": "L_EXAMPLE001", + "occurredAt": "2026-06-05T20:01:00Z", + "alertData": { + "clientMac": "aa:bb:cc:dd:ee:ff", + "signature": "1:2008725:1", + "message": "ET POLICY example", + "severity": "high", + }, + } + body = json.dumps(payload).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + assert len(capturing_bus.published) == 1 + subject, pub_payload = capturing_bus.published[0] + assert subject.startswith("sensory.security_alert.meraki_webhook.") + assert pub_payload["clientMac"] == "aa:bb:cc:dd:ee:ff" + + +# --------------------------------------------------------------------------- +# Publisher unavailable / failure degradation +# --------------------------------------------------------------------------- + + +def test_route_works_without_publisher_configured( + webhook_client_no_bus: TestClient, + meraki_shared_secret: str, +) -> None: + """When NATS_URL is unset (``app.state.event_bus = None``), the + route still accepts the webhook and queues the sync trigger. + Pre-dev8 behavior is preserved end-to-end.""" + body = json.dumps(_port_disconnected_payload()).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client_no_bus.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + # No publisher → no events counted, but request succeeded. + assert resp.json()["sensory_events_published"] == "0" + + +def test_bus_publish_failure_does_not_fail_request( + webhook_client: TestClient, + capturing_bus: CapturingEventBus, + meraki_shared_secret: str, +) -> None: + """A transient NATS failure mid-publish must not 5xx the webhook; + the sync trigger is still queued. The SensoryPublisher swallows + the underlying bus error.""" + capturing_bus.publish_failures.append(RuntimeError("nats connection lost")) + body = json.dumps(_port_disconnected_payload()).encode() + sig = _sign(body, meraki_shared_secret) + + resp = webhook_client.post( + "/webhooks/meraki/TEST_TENANT", + content=body, + headers={ + "Content-Type": "application/json", + "X-Cisco-Meraki-Signature": sig, + }, + ) + + assert resp.status_code == 200 + # The publish failed and was swallowed inside SensoryPublisher; + # the handler still reports one event was attempted (the mapper + # produced one) so the count reflects mapper output, not bus + # success — operationally we have ``bus.published`` and + # ``sensory_publisher.publish_failed`` log lines to disambiguate. + assert resp.json()["sensory_events_published"] == "1" + assert capturing_bus.published == []