Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
103 changes: 103 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,109 @@ and this file MUST be updated together whenever `__version__` changes.

---

## [0.8.0-dev8] — Second sensory publisher: Meraki webhook → NATS

First webhook-side sensory publisher. The web process now publishes
`sensory.*` events when Meraki Dashboard webhooks arrive, so a port
flap detected at the network edge reaches the reflex pipeline in
50–200ms instead of waiting up to a full SNMP poll cycle (5 minutes).

This is also the first release where the dedup store added in
0.8.0-dev3 actually does anything in production: the same port flap
will now be observed by both the Meraki webhook (immediate) and the
SNMP poller (next cycle), and the second arrival within the 60s dedup
window collapses to a single `:ReflexEvent`.

### Added
- `netcortex/webhooks/event_publisher.py` — web-process bus
coordination. One `NatsEventBus` per process on `app.state.event_bus`;
one cached `SensoryPublisher` per source token on
`app.state._sensory_publishers`. Graceful degradation when
`NATS_URL` is unset (publishing disabled, sync trigger still fires).
- `netcortex/webhooks/meraki_events.py` — pure mapper from Meraki's
vendor dialect (`alertType` + `alertData`) to our closed
`SENSORY_EVENT_CLASSES` vocabulary. Initial coverage:
- `Port connectivity` / `Switch port connection changed` → `link_up` / `link_down`
- `Uplink status change` → `link_up` / `link_down`
- `IDS alerted` / `Malware detected` → `security_alert`
- `Settings changed` / `Configuration change` → `config_change`
- Unknown alertTypes log `webhook.meraki.unmapped_alert_type` and skip
(so we grow coverage based on real traffic, not speculation)
- Target shape: `meraki:<deviceSerial>|<portName>` for port events,
`meraki:<deviceSerial>|<clientMac>` for IDS, etc. Uses the same
`meraki:` prefix as `Device.id` in the live graph so the dev7
`:AFFECTS` edge resolver lands the edge automatically.

### Changed
- `netcortex/webhooks/meraki.py::handle_meraki_webhook` accepts an
optional `publisher: SensoryPublisher | None` parameter. When
provided, the handler runs the mapper and publishes one event per
mapped alert. Pre-dev8 sync-trigger behavior is unchanged when
the publisher is absent.
- `netcortex/webhooks/router.py` resolves the publisher via
`get_publisher(request.app, "meraki_webhook")` and threads it
through to the handler.
- `netcortex/main.py` lifespan now initializes the NATS bus on
startup (`init_event_bus`) and closes it on shutdown
(`close_event_bus`). Both steps are best-effort — a missing
`NATS_URL` or transient outage degrades webhooks back to their
pre-dev8 sync-only behavior, never blocks startup.
- Webhook response body adds `sensory_events_published: <count>` so
operators can confirm at-a-glance which webhooks made it through
the publish path.

### Tests
- `tests/webhooks/test_meraki_events.py` — 18 mapper unit tests
covering every supported alertType, both up and down states,
synonym handling, missing-field degradation, and a subject-builder
compatibility smoke test.
- `tests/webhooks/test_meraki_webhook_route.py` — 10 HTTP integration
tests covering HMAC valid/invalid/missing, signature prefix
compatibility, malformed JSON, no-secret-configured degradation,
unknown alertType, IDS publish, publisher-unavailable path, and
bus-failure-during-publish path.
- This is the first FastAPI `TestClient` test module in the repo;
pattern (capturing bus + minimal app + secret fixture) is
generalized via `conftest.py` so dev9/dev10/dev11 vendor
receivers can reuse it.

### Operational notes
- The web deployment in `deploy/helm` already sets `NATS_URL` when
`nats.enabled` is true — no Helm change required.
- HMAC secrets continue to live at
`netcortex/webhooks/meraki/<instance_name>` in AWS Secrets
Manager (pre-existing storage path).
- A new Cypher query to find webhook-sourced events:
```cypher
MATCH (e:ReflexEvent {source: 'meraki_webhook'})
WHERE e.recorded_at > timestamp() - 3600000
RETURN e.subject, e.target, e.outcome
ORDER BY e.observed_at_ms DESC
```
- To see dedup in action (webhook + SNMP poll observing the same
flap), join on target within a tight window:
```cypher
MATCH (e1:ReflexEvent), (e2:ReflexEvent)
WHERE e1.target = e2.target
AND e1.source = 'meraki_webhook'
AND e2.source = 'snmp_poll'
AND e1.event_class = e2.event_class
AND abs(e1.observed_at_ms - e2.observed_at_ms) < 60000
RETURN e1.target, e1.outcome AS webhook_outcome, e2.outcome AS snmp_outcome
```
In a well-dedupping system the SNMP-side outcome will be `skipped`
for any flap the webhook caught first.

### Not in this release (deferred)
- `:ThousandEyes` / `:NexusDashboard` / `:cdFMC` webhook receivers
(dev9, dev10, dev11).
- `nexus_dashboard_webhook` and `fmc_webhook` source tokens (added
alongside their respective receivers).
- JetStream durable subscriptions for guaranteed at-least-once across
worker restarts.

---

## [0.8.0-dev7] — :AFFECTS edge resolution + target/subject consistency

Second small polish to the dev5 pipeline, surfaced by deployment
Expand Down
2 changes: 1 addition & 1 deletion netcortex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes.
"""

__version__ = "0.8.0-dev7"
__version__ = "0.8.0-dev8"
20 changes: 20 additions & 0 deletions netcortex/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,21 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg]
state.redis_status = redis_status
state.redis_message = redis_msg

# ── NATS sensory publisher bus (0.8.0-dev8+) ──────────────────────────
# The web process publishes `sensory.*` events when webhooks arrive;
# the worker process owns the runner that consumes them. Bus init is
# best-effort: a missing NATS_URL or a transient NATS outage degrades
# webhooks back to their pre-dev8 sync-trigger behavior, which still
# reconciles the live-state graph.
nats_url = (
getattr(cfg, "nats_url", "") if cfg else ""
) or os.environ.get("NATS_URL", "")
try:
from netcortex.webhooks.event_publisher import init_event_bus
await init_event_bus(app, nats_url=nats_url or None)
except Exception as exc:
log.warning("netcortex.sensory_publisher_init_failed", error=str(exc))

# ── Load and health-check adapters ───────────────────────────────────
if cfg:
await _probe_adapters()
Expand Down Expand Up @@ -332,6 +347,11 @@ async def lifespan(app: FastAPI): # type: ignore[type-arg]
# ── Shutdown ──────────────────────────────────────────────────────────
refresh_adapter_task.cancel()
refresh_graph_task.cancel()
try:
from netcortex.webhooks.event_publisher import close_event_bus
await close_event_bus(app)
except Exception as exc:
log.warning("netcortex.sensory_publisher_close_failed", error=str(exc))
await neo4j_client.close()
log.info("netcortex.shutdown")

Expand Down
141 changes: 141 additions & 0 deletions netcortex/webhooks/event_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,141 @@
"""Web-process sensory publisher coordination.

The web process needs to publish ``sensory.*`` events when webhooks
arrive from upstream platforms (Meraki, ThousandEyes, Nexus Dashboard,
cdFMC, …). The worker process owns the :class:`ReflexRunner` that
*consumes* those events; web only ever publishes. Keeping the two
responsibilities split means:

* the web process stays a thin HTTP front (no dedup state, no Neo4j
sink, no handler scheduling — those all live in worker)
* webhook latency stays low: publish, return 200, done. The reflex
pipeline runs asynchronously on the worker subscription side.
* a worker outage does not lose webhook events — NATS holds them until
the worker reconnects (or, with JetStream later, persists them durably)

Design
------
One :class:`NatsEventBus` per process, cached on ``app.state.event_bus``.
One :class:`SensoryPublisher` per source token, lazily created and
cached on ``app.state._sensory_publishers``. A typo in a source name
fails at the first call (validated against ``SENSORY_SOURCES``) rather
than silently producing unsubscribable subjects.

If ``NATS_URL`` is not configured the bus is ``None`` and
:func:`get_publisher` returns ``None``. Routes that try to publish in
that case should degrade silently (the existing sync-trigger behavior
remains intact — losing the sensory side is a recoverable annoyance,
not a data-loss bug). This mirrors the worker's
``_start_reflex_pipeline`` behavior from 0.8.0-dev5.

Shutdown semantics
------------------
:func:`close_event_bus` flushes pending publishes and closes the NATS
connection. Called from ``main.py``'s lifespan teardown. Idempotent.
"""

from __future__ import annotations

import os
from typing import TYPE_CHECKING, Any

import structlog

from netcortex.contracts.subjects import SENSORY_SOURCES

if TYPE_CHECKING:
from fastapi import FastAPI
from netcortex.thalamus import NatsEventBus, SensoryPublisher

log = structlog.get_logger(__name__)


async def init_event_bus(app: "FastAPI", nats_url: str | None = None) -> "NatsEventBus | None":
"""Initialize the web process's NATS publisher bus.

Resolves ``nats_url`` in this order:
1. explicit ``nats_url`` argument (test injection)
2. ``NATS_URL`` environment variable (Helm-provided)

Stores the bus on ``app.state.event_bus`` so route handlers and the
teardown path can reach it. Returns the bus (or ``None`` if NATS is
not configured / unreachable).

Failure here logs and returns ``None`` rather than raising —
the web process must boot even when NATS is down. The reflex/sensory
feature degrades; the rest of the API keeps working.
"""
resolved = nats_url or os.environ.get("NATS_URL", "")
if not resolved:
log.info("web.sensory_publisher_disabled", reason="NATS_URL not set")
app.state.event_bus = None
app.state._sensory_publishers = {}
return None
try:
from netcortex.thalamus import NatsEventBus
bus = NatsEventBus(resolved)
app.state.event_bus = bus
app.state._sensory_publishers = {}
log.info("web.sensory_publisher_ready", nats_url=resolved)
return bus
except Exception as exc:
log.warning("web.sensory_publisher_init_failed", error=str(exc))
app.state.event_bus = None
app.state._sensory_publishers = {}
return None


def get_publisher(app: Any, source: str) -> "SensoryPublisher | None":
"""Return a cached :class:`SensoryPublisher` for ``source``, or None.

Cheap to call per-request: the first call for a given source builds
the publisher (which validates ``source`` against ``SENSORY_SOURCES``
at construction); subsequent calls return the cached instance.

Returns ``None`` when the bus isn't initialized — callers should
handle that as "publishing disabled, fall through to other side-
effects" rather than as an error condition.

``source`` is validated against the closed vocabulary even if the
bus is unavailable, so a typo in a webhook route still fails fast
in unit tests (we don't want "all webhook routes silently skip
publishing because of a typo nobody noticed").
"""
if source not in SENSORY_SOURCES:
raise ValueError(
f"unknown sensory source {source!r}; "
f"add to SENSORY_SOURCES in netcortex.contracts.subjects "
f"and update docs/architecture/subjects.md"
)
bus = getattr(app.state, "event_bus", None)
if bus is None:
return None
publishers: dict[str, Any] = getattr(app.state, "_sensory_publishers", None) or {}
if source not in publishers:
from netcortex.thalamus import SensoryPublisher
publishers[source] = SensoryPublisher(bus, source=source)
app.state._sensory_publishers = publishers
return publishers[source]


async def close_event_bus(app: Any) -> None:
"""Flush and close the publisher bus. Safe to call multiple times.

Called from the FastAPI lifespan teardown. Errors are logged and
swallowed — a connection that already died will fail to close
cleanly, and that should not block the shutdown sequence.
"""
bus = getattr(app.state, "event_bus", None)
if bus is None:
return
try:
await bus.close()
log.info("web.sensory_publisher_closed")
except Exception as exc:
log.warning("web.sensory_publisher_close_failed", error=str(exc))
finally:
app.state.event_bus = None
app.state._sensory_publishers = {}


__all__ = ["init_event_bus", "get_publisher", "close_event_bus"]
53 changes: 50 additions & 3 deletions netcortex/webhooks/meraki.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@
import hashlib
import hmac
import json
from typing import Any
from typing import TYPE_CHECKING, Any

import structlog
from fastapi import BackgroundTasks, HTTPException, status

if TYPE_CHECKING:
from netcortex.thalamus import SensoryPublisher

log = structlog.get_logger(__name__)

_SECRET_CACHE: dict[str, str] = {} # instance_name → shared_secret
Expand Down Expand Up @@ -62,8 +65,26 @@ async def handle_meraki_webhook(
body: bytes,
signature_header: str | None,
background_tasks: BackgroundTasks,
publisher: "SensoryPublisher | None" = None,
) -> dict[str, str]:
"""Validate and enqueue a Meraki webhook event."""
"""Validate and process a Meraki webhook event.

Two side effects, both best-effort and independent:

1. **Publish sensory events to NATS** (0.8.0-dev8 and later) so
reflex handlers and the episodic memory layer see the event in
seconds, not minutes. Driven by the
:mod:`netcortex.webhooks.meraki_events` mapper.
2. **Trigger a targeted adapter sync** (pre-dev8 behavior) so the
Neo4j live-state graph reflects the change. This stays even
when the sensory side is wired in: graph sync re-derives
authoritative state, sensory events feed the reflex layer.

The two paths are intentionally decoupled: a NATS hiccup must not
block the sync, and a misbehaving adapter must not block the
publish. Both run inside the request handler (publish is async +
fast; sync is in BackgroundTasks).
"""
shared_secret = await _get_shared_secret(instance_name)

if shared_secret is not None:
Expand Down Expand Up @@ -101,7 +122,32 @@ async def handle_meraki_webhook(
org_id=org_id,
)

# Targeted refresh: only re-sync the affected network if possible.
# ── Publish sensory events ────────────────────────────────────────────
# Pure mapper from vendor dialect to our subjects taxonomy. The
# publisher itself is best-effort; per-event publish failures are
# logged inside :class:`SensoryPublisher` and don't propagate.
sensory_published = 0
if publisher is not None:
try:
from netcortex.webhooks.meraki_events import map_meraki_payload
events = map_meraki_payload(payload)
for ev in events:
await publisher.publish(
ev.event_class, *ev.target_parts, payload=ev.payload
)
sensory_published = len(events)
except Exception as exc:
# Mapper bugs or malformed payloads should not 5xx the
# webhook — they should log and let the sync trigger fire
# so live state still reconciles.
log.warning(
"webhook.meraki.publish_failed",
instance=instance_name,
event_type=event_type,
error=str(exc),
)

# ── Targeted adapter sync (pre-dev8 behavior, retained) ───────────────
background_tasks.add_task(
_sync_meraki_network,
instance_name=instance_name,
Expand All @@ -114,6 +160,7 @@ async def handle_meraki_webhook(
"status": "queued",
"adapter": f"meraki/{instance_name}",
"event_type": event_type,
"sensory_events_published": str(sensory_published),
}


Expand Down
Loading
Loading