Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
120 changes: 120 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,126 @@ and this file MUST be updated together whenever `__version__` changes.

---

## [0.8.0-dev5] — First sensory publisher: SNMP link-state → NATS → :ReflexEvent

Closes the loop on the dev1–dev4 foundation. The bus has been live in
production since the dev4 deploy but idle (no publishers, no consumers
acting). Dev5 wires the **first end-to-end pipeline**:

```
SNMP poll → NATS subject → link_down reflex handler → Neo4j :ReflexEvent
```

When an interface transitions between `up` and `down` between two
consecutive SNMP polls of a device, the adapter publishes
`sensory.link_down.snmp_poll.<device>|<ifname>` (or `link_up`) to NATS.
The reflex runner picks it up, runs the existing `link_down` handler
(dedup + outcome construction), and persists the resulting outcome as
a `:ReflexEvent` node in the graph with an `:AFFECTS` edge to the
matched device.

After this lands operators can query reflex history with:

```cypher
MATCH (e:ReflexEvent {handler: 'link_down'})
RETURN e ORDER BY e.observed_at_ms DESC LIMIT 10
```

### Added
- `netcortex.contracts.reflex_event_sink.ReflexEventSink` Protocol —
the narrow persistence surface every reflex outcome sink must satisfy
(idempotent `record`, `close`). Adding a NetBox-journal sink or an
OpenSearch sink later means writing one class against this Protocol.
- `netcortex.episodic` package — new "episodic memory" layer in the
brain metaphor. Houses:
- `InMemoryReflexEventSink` — used by tests, deduplicates via stable
sha256 of `(handler, subject, target, occurred_at)`.
- `Neo4jReflexEventSink` — production default. Writes `:ReflexEvent`
nodes via the shared driver, with an optional `:AFFECTS` edge to
`:Device` when the target parses as `<device>|<interface>`. MERGE
on the stable id makes it at-least-once safe.
- `netcortex.thalamus.SensoryPublisher` — thin facade over the
`EventBus` Protocol. Validates `source` against `SENSORY_SOURCES` at
construction (so wiring typos crash at import, not at 3am), builds
validated `sensory_subject()` per publish, injects `source` /
`recorded_at` defaults, swallows + logs bus failures so a transient
NATS hiccup doesn't kill the poll loop.
- `netcortex.adapters.snmp_link_state_publisher.emit_link_state_transitions`
— focused module the SNMP adapter calls each poll. Reads prior
`oper_status` for the device's interfaces from Neo4j, diffs against
the fresh `if_map`, publishes one event per transition. Silently
no-ops on missing publisher / driver / empty map so the SNMP poller
doesn't need feature-flag branching. **First-ever poll** of a device
only emits for currently-`down` interfaces — prevents a 96-port
switch from publishing 96 `link_up` events on first observation.
- `SnmpAdapter.attach_event_publisher(publisher)` — worker startup
calls this on every SNMP adapter instance after the bus is ready.
- `ReflexContext.event_sink` field — when set, the runner persists
every returned outcome (logged / applied / skipped / errored) to
the sink. Handlers stay pure functions; persistence is the runner's
job.
- Worker reflex pipeline startup (`_start_reflex_pipeline`):
constructs `NatsEventBus`, `SensoryPublisher(source='snmp_poll')`,
`Neo4jReflexEventSink`, and a `ReflexRunner` with full context.
Logs `worker.reflex_pipeline_started` on success. Graceful no-op
when `NATS_URL` is unset (degraded service, not a crash). Clean
drain of runner and bus on shutdown.
- Tests:
- `tests/contracts/reflex_event_sink/` — Protocol-conformance suite,
runs against `InMemoryReflexEventSink` today, ready for the Neo4j
sink when CI gets a Neo4j service container.
- `tests/thalamus/test_sensory_publisher.py` — covers source
validation, default injection, bus-failure swallowing, and
caller-provided override of source/recorded_at.
- `tests/adapters/test_snmp_link_state_publisher.py` — fake-driver
tests for all 4 transition combinations plus first-observation,
Neo4j-read failure, unknown oper_status, no-publisher, no-driver,
empty-map cases.
- `tests/reflex/test_runner.py` extended: outcomes persist to a
configured `event_sink`; sink failures don't kill dispatch and
don't drop in-process `runner.outcomes`.

### Changed
- `ReflexRunner` now writes every recorded outcome to
`context.event_sink` (when set). Sink exceptions are logged and
swallowed — the runner stays online for the next event.
- `ReflexRunner` module docstring updated: dev2/dev3 ("only logged")
language replaced with dev5 ("written to ReflexContext.event_sink").
- `LinkDownHandler` module docstring updated: dev2/dev3 ("still idle")
language replaced with the dev5 pipeline description. Rationale text
on the `logged` outcome now says "persisted to episodic memory"
instead of "no remediation yet".

### Why this matters
- This is the **first** operational use of the NATS bus that has been
sitting idle since dev4. Until now, an operator looking at the
cluster could see `netcortex-nats-0` running but had no way to
verify the whole brain-shaped pipeline actually works.
- Establishes the pattern for every future publisher (Meraki webhook,
SNMP trap, gNMI dial-out): build a `SensoryPublisher`, hand it to
the adapter, emit at observation time. Each new publisher is now a
~50-line change, not an architectural one.
- Validates the dedup design (dev3) for real: when the SNMP trap
receiver lands in dev6, the *same* logical link-down will arrive
from both `snmp_poll` and `snmp_trap` sources, and the
`DedupStore`-backed handler will correctly suppress the duplicate
within its 60s window.

### Operational verification on `cpn-ful-netcortex1`
After deploying:
```cypher
MATCH (e:ReflexEvent {handler: 'link_down'})
WHERE e.recorded_at > timestamp() - 86400000
RETURN e.subject, e.target, e.observed_at_ms, e.outcome
ORDER BY e.observed_at_ms DESC LIMIT 20
```
Expect entries within minutes of any interface flap. Log lines to
correlate: `snmp.link_state.transitions_published`,
`bus.published subject=sensory.link_down.snmp_poll.*`,
`reflex.outcome handler=link_down`.

---

## [0.8.0-dev4] — Deployment-safety: NetBox writeback dry-run knob

> Note: dev4 was originally planned for the first SensoryEvent publisher.
Expand Down
2 changes: 1 addition & 1 deletion netcortex/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,4 +22,4 @@
``CHANGELOG.md`` MUST be kept in sync whenever ``__version__`` changes.
"""

__version__ = "0.8.0-dev4"
__version__ = "0.8.0-dev5"
41 changes: 41 additions & 0 deletions netcortex/adapters/snmp.py
Original file line number Diff line number Diff line change
Expand Up @@ -3174,6 +3174,20 @@ def __init__(self, config: dict, instance_name: str = "default") -> None:
# ``(instance_name, network_id)``. See _fetch_meraki_network_snmp_creds
# and _resolve_device_creds for the consumption path.
self._meraki_network_snmp_creds: dict[tuple[str, str], dict[str, str]] = {}
# Sensory event publisher (0.8.0-dev5). Optional — the adapter
# functions identically when None (the worker doesn't wire NATS).
# Set by the worker via :meth:`attach_event_publisher` after the
# bus and publisher are constructed at startup.
self._event_publisher: Any | None = None

def attach_event_publisher(self, publisher: Any) -> None:
"""Wire a :class:`SensoryPublisher` for emitting state-change events.

Called once at worker startup, after the NATS bus and publisher
are constructed. ``publisher`` should have ``source='snmp_poll'``.
Passing ``None`` is a no-op for symmetry with shutdown.
"""
self._event_publisher = publisher

async def authenticate(self) -> None:
"""No separate auth step — credentials resolved per-device at poll time."""
Expand Down Expand Up @@ -4215,6 +4229,33 @@ async def _poll_device(
except Exception as exc:
log.debug("snmp.interface_emit.failed", host=ip, error=str(exc))

# ── Sensory publish (0.8.0-dev5) ────────────────────────────────
# Detect ifOperStatus transitions vs prior Neo4j state and emit
# `sensory.link_down|link_up.snmp_poll.<device>|<ifname>` events
# to NATS so reflex handlers (e.g. link_down) can react.
# Silently no-ops when the publisher is not wired (worker without
# NATS) or the graph driver isn't initialized yet.
if self._event_publisher is not None and if_map:
try:
from netcortex.adapters.snmp_link_state_publisher import (
emit_link_state_transitions,
)
from netcortex.graph.client import get_driver as _get_driver
try:
driver = _get_driver()
except Exception:
driver = None
await emit_link_state_transitions(
publisher=self._event_publisher,
driver=driver,
device_node_id=dev_node_id,
device_name=dev_name,
if_map=if_map,
)
except Exception as exc:
# Never let publishing break a poll cycle.
log.debug("snmp.link_state.emit_failed", host=ip, error=str(exc))

# ── MIB coverage probe ───────────────────────────────────────────
# Cheap (~12 single-OID walks with 2 repetitions each, bounded
# parallelism), but still run only on the topology cadence so a
Expand Down
190 changes: 190 additions & 0 deletions netcortex/adapters/snmp_link_state_publisher.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""SNMP link-state change publisher (0.8.0-dev5 — first sensory publisher).

The SNMP poller already walks ``ifOperStatus`` every poll cycle. This
module wires that data to the NATS event bus: when an interface
transitions between ``up`` and ``down`` between two consecutive polls,
we emit a ``sensory.link_down.snmp_poll.<device>|<ifname>`` (or
``link_up``) event so reflex handlers can react.

Why not compare against the prior in-memory poll?
-------------------------------------------------
The poller is sharded — a given device may be polled by a different
worker pod on consecutive cycles when we scale out. Reading prior state
from Neo4j gives a single source of truth for "what we believed last
time" regardless of which process did the polling.

Why not piggyback on ingest's status_changed_at stamping?
---------------------------------------------------------
Ingest currently stamps ``status_changed_at`` only on the :Device label,
not on interfaces (see the exploration in the dev5 PR description). Going
through ingest also adds latency — the poller has the new data in hand
*before* ingest writes it; emitting at poll time keeps the reflex chain
under one second from observation to ``:ReflexEvent`` write.

Deduplication
-------------
This module does not dedup. Dedup is the reflex handler's job (via
:class:`DedupStore`), because the same logical link-down may legitimately
arrive from multiple sources (poller + trap + Meraki webhook) and the
handler is the only place with enough context to collapse them. Each
publisher is responsible only for "what I observed", not "what's already
been observed by someone else".

Failure modes
-------------
* Neo4j read fails → log + skip the diff; no events published this cycle.
The next poll cycle will pick up any transitions that span the outage.
* Bus publish fails → :class:`SensoryPublisher` swallows + logs; the
graph still gets the new state via the normal ingest path so we don't
lose the observation, only the reflex hook for this one event.
* No prior state in Neo4j (first-ever poll of a device) → every
currently-down interface emits one ``link_down``. The dedup window in
the handler suppresses re-emission on the next cycle when the prior
state is now known.
"""

from __future__ import annotations

import logging
from typing import TYPE_CHECKING, Any

if TYPE_CHECKING:
from netcortex.thalamus.sensory_publisher import SensoryPublisher

_LOG = logging.getLogger(__name__)


async def emit_link_state_transitions(
*,
publisher: SensoryPublisher | None,
driver: Any | None,
device_node_id: str,
device_name: str,
if_map: dict[str, dict[str, Any]],
) -> int:
"""Compare freshly-polled ``if_map`` against prior Neo4j state, publish deltas.

Returns the number of events published. Returns 0 (silently) if any
of the prerequisites are missing — publisher not configured, driver
not initialized, empty if_map. That degradation path is what lets
the SNMP adapter call this unconditionally without branching on
"is dev5 wiring up yet".

Parameters
----------
publisher:
Wired ``SensoryPublisher`` with ``source='snmp_poll'``. ``None``
disables publishing for this poll (the adapter still ingests the
observation as usual).
driver:
The shared Neo4j async driver, or ``None`` if the graph layer
isn't initialized yet. ``None`` means we can't compute a diff,
so we silently emit nothing.
device_node_id:
The ``Device.id`` whose interfaces we're diffing. Comes from
``SnmpAdapter._poll_device``'s ``dev_node_id``.
device_name:
Human-readable device name used in the published subject and
payload. Comes from ``SnmpAdapter._poll_device``'s ``dev_name``.
if_map:
Fresh poll result keyed by ifIndex. Each value is the dict
``_poll_interfaces`` populated with ``name``, ``oper_status``,
and the other IF-MIB fields.
"""
if publisher is None or driver is None or not if_map or not device_name:
return 0

# Build the set of (interface_name, current_oper_status) we just observed.
# We use the canonical id shape that `_build_interface_health_nodes` uses
# so the Cypher MATCH below finds the same nodes ingest writes.
fresh: dict[str, str] = {}
for iface in if_map.values():
name = iface.get("name")
oper = iface.get("oper_status")
if not name or oper not in ("up", "down"):
continue
fresh[str(name)] = str(oper)

if not fresh:
return 0

# Read prior oper_status for these interfaces in one query. We match
# by canonical id (same shape `_build_interface_health_nodes` writes)
# AND by name, because some interfaces are written by both SNMP and
# platform adapters and may have non-snmp ids — we want the most
# recent observation regardless of provenance.
iface_ids = [f"snmp-if:{device_node_id}:{name}" for name in fresh]
try:
async with driver.session() as session:
result = await session.run(
"""
MATCH (i:Interface)
WHERE i.id IN $ids
RETURN i.id AS id, i.name AS name, i.oper_status AS oper
""",
ids=iface_ids,
)
prior: dict[str, str | None] = {}
async for record in result:
prior[str(record["name"])] = (
str(record["oper"]) if record["oper"] is not None else None
)
except Exception as exc:
_LOG.warning(
"snmp.link_state.prior_read_failed device=%s error=%s",
device_name, exc,
)
return 0

transitions: list[tuple[str, str, str | None, str]] = []
for ifname, new_oper in fresh.items():
prev = prior.get(ifname)
if prev is None:
# First-ever observation for this interface. We only emit
# for currently-down so a noisy first-time poll of a 96-port
# switch doesn't publish 96 link_up events. Currently-up
# interfaces are the steady state we expect.
if new_oper == "down":
transitions.append((ifname, "link_down", None, new_oper))
continue
if prev == new_oper:
continue
if new_oper == "down":
transitions.append((ifname, "link_down", prev, new_oper))
elif new_oper == "up":
transitions.append((ifname, "link_up", prev, new_oper))

if not transitions:
return 0

published = 0
for ifname, event_class, prev, new in transitions:
target = f"{device_name}|{ifname}"
payload: dict[str, Any] = {
"device": device_name,
"device_id": device_node_id,
"interface": ifname,
"oper_status": new,
"prior_oper_status": prev,
}
try:
await publisher.publish(event_class, target, payload=payload)
published += 1
except Exception as exc:
# SensoryPublisher already swallows bus errors; this catches
# programmer errors (bad subject, unknown event_class) so
# one bad transition doesn't kill the rest of the batch.
_LOG.warning(
"snmp.link_state.publish_failed device=%s interface=%s class=%s error=%s",
device_name, ifname, event_class, exc,
)

if published:
_LOG.info(
"snmp.link_state.transitions_published device=%s count=%d",
device_name, published,
)
return published


__all__ = ["emit_link_state_transitions"]
2 changes: 2 additions & 0 deletions netcortex/contracts/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from netcortex.contracts.dedup_store import DedupStore
from netcortex.contracts.event_bus import EventBus, EventBusValidationError, EventMessage
from netcortex.contracts.policy import Decision, Policy, PolicyContext
from netcortex.contracts.reflex_event_sink import ReflexEventSink
from netcortex.contracts.sensory_adapter import SensoryAdapter, SensoryEvent

__all__ = [
Expand All @@ -46,6 +47,7 @@
"EventMessage",
"Policy",
"PolicyContext",
"ReflexEventSink",
"SensoryAdapter",
"SensoryEvent",
]
Loading
Loading