diff --git a/crates/common/metrics/src/lib.rs b/crates/common/metrics/src/lib.rs index 9e508bbd..83539a2e 100644 --- a/crates/common/metrics/src/lib.rs +++ b/crates/common/metrics/src/lib.rs @@ -6,8 +6,8 @@ pub mod timing; // Re-export prometheus types and macros we use pub use prometheus::{ Encoder, Error as PrometheusError, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec, - TextEncoder, gather, register_histogram, register_int_counter, register_int_counter_vec, - register_int_gauge, register_int_gauge_vec, + TextEncoder, core::Collector, gather, register_histogram, register_int_counter, + register_int_counter_vec, register_int_gauge, register_int_gauge_vec, }; // Re-export commonly used items diff --git a/crates/net/p2p/src/metrics.rs b/crates/net/p2p/src/metrics.rs index d6d2c29b..19845258 100644 --- a/crates/net/p2p/src/metrics.rs +++ b/crates/net/p2p/src/metrics.rs @@ -50,6 +50,15 @@ static LEAN_CONNECTED_PEERS: LazyLock = LazyLock::new(|| { .unwrap() }); +static LEAN_GOSSIP_MESH_PEERS: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "lean_gossip_mesh_peers", + "Number of peers in the gossipsub mesh", + &["client"] + ) + .unwrap() +}); + static LEAN_PEER_CONNECTION_EVENTS_TOTAL: LazyLock = LazyLock::new(|| { register_int_counter_vec!( "lean_peer_connection_events_total", @@ -168,3 +177,29 @@ pub fn notify_peer_disconnected(peer_id: &Option, direction: &str, reaso let name = resolve(peer_id); LEAN_CONNECTED_PEERS.with_label_values(&[name]).dec(); } + +/// Refresh the gossipsub mesh peers gauge from the current mesh peer set. +pub fn update_gossip_mesh_peers<'a>(peers: impl Iterator) { + let mut counts: HashMap = HashMap::new(); + { + let registry = NODE_NAME_REGISTRY.read().unwrap(); + for peer_id in peers { + let name = registry.get(peer_id).copied().unwrap_or("unknown"); + *counts.entry(name.to_string()).or_default() += 1; + } + } + // Seed previously-published labels with 0 so departed clients fall to + // zero in the single set() pass below. + for family in LEAN_GOSSIP_MESH_PEERS.collect() { + for metric in family.get_metric() { + for label in metric.get_label() { + counts.entry(label.value().to_string()).or_insert(0); + } + } + } + for (name, count) in counts { + LEAN_GOSSIP_MESH_PEERS + .with_label_values(&[&name]) + .set(count); + } +} diff --git a/crates/net/p2p/src/swarm_adapter.rs b/crates/net/p2p/src/swarm_adapter.rs index 1def5406..79ee03e8 100644 --- a/crates/net/p2p/src/swarm_adapter.rs +++ b/crates/net/p2p/src/swarm_adapter.rs @@ -1,13 +1,18 @@ +use std::time::Duration; + use libp2p::{ Multiaddr, PeerId, StreamProtocol, futures::StreamExt, request_response::{self, OutboundRequestId}, swarm::SwarmEvent, }; -use tokio::sync::mpsc; +use tokio::{sync::mpsc, time::MissedTickBehavior}; use tracing::{error, warn}; -use crate::{Behaviour, BehaviourEvent, req_resp::Request, req_resp::Response}; +use crate::{Behaviour, BehaviourEvent, metrics, req_resp::Request, req_resp::Response}; + +/// Interval between gossipsub mesh peer metric refreshes. +const MESH_METRIC_REFRESH_INTERVAL: Duration = Duration::from_secs(10); pub enum SwarmCommand { Publish { @@ -106,6 +111,8 @@ async fn swarm_loop( event_tx: mpsc::UnboundedSender>, mut cmd_rx: mpsc::UnboundedReceiver, ) { + let mut mesh_metric_tick = tokio::time::interval(MESH_METRIC_REFRESH_INTERVAL); + mesh_metric_tick.set_missed_tick_behavior(MissedTickBehavior::Skip); loop { tokio::select! { event = swarm.next() => { @@ -116,6 +123,11 @@ async fn swarm_loop( let Some(cmd) = cmd else { break }; execute_command(&mut swarm, cmd); } + _ = mesh_metric_tick.tick() => { + metrics::update_gossip_mesh_peers( + swarm.behaviour().gossipsub.all_mesh_peers(), + ); + } } } error!("Swarm adapter loop exited — P2P networking is no longer functional");