Skip to content
Merged
4 changes: 2 additions & 2 deletions crates/common/metrics/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ pub mod timing;
// Re-export prometheus types and macros we use
pub use prometheus::{
Encoder, Error as PrometheusError, Histogram, IntCounter, IntCounterVec, IntGauge, IntGaugeVec,
TextEncoder, gather, register_histogram, register_int_counter, register_int_counter_vec,
register_int_gauge, register_int_gauge_vec,
TextEncoder, core::Collector, gather, register_histogram, register_int_counter,
register_int_counter_vec, register_int_gauge, register_int_gauge_vec,
};

// Re-export commonly used items
Expand Down
35 changes: 35 additions & 0 deletions crates/net/p2p/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,15 @@ static LEAN_CONNECTED_PEERS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
.unwrap()
});

static LEAN_GOSSIP_MESH_PEERS: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"lean_gossip_mesh_peers",
"Number of peers in the gossipsub mesh",
&["client"]
)
.unwrap()
});

static LEAN_PEER_CONNECTION_EVENTS_TOTAL: LazyLock<IntCounterVec> = LazyLock::new(|| {
register_int_counter_vec!(
"lean_peer_connection_events_total",
Expand Down Expand Up @@ -168,3 +177,29 @@ pub fn notify_peer_disconnected(peer_id: &Option<PeerId>, direction: &str, reaso
let name = resolve(peer_id);
LEAN_CONNECTED_PEERS.with_label_values(&[name]).dec();
}

/// Refresh the gossipsub mesh peers gauge from the current mesh peer set.
pub fn update_gossip_mesh_peers<'a>(peers: impl Iterator<Item = &'a PeerId>) {
let mut counts: HashMap<String, i64> = HashMap::new();
{
let registry = NODE_NAME_REGISTRY.read().unwrap();
for peer_id in peers {
let name = registry.get(peer_id).copied().unwrap_or("unknown");
*counts.entry(name.to_string()).or_default() += 1;
}
}
// Seed previously-published labels with 0 so departed clients fall to
// zero in the single set() pass below.
for family in LEAN_GOSSIP_MESH_PEERS.collect() {
for metric in family.get_metric() {
for label in metric.get_label() {
counts.entry(label.value().to_string()).or_insert(0);
}
}
}
Comment thread
greptile-apps[bot] marked this conversation as resolved.
for (name, count) in counts {
LEAN_GOSSIP_MESH_PEERS
.with_label_values(&[&name])
.set(count);
}
}
16 changes: 14 additions & 2 deletions crates/net/p2p/src/swarm_adapter.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,18 @@
use std::time::Duration;

use libp2p::{
Multiaddr, PeerId, StreamProtocol,
futures::StreamExt,
request_response::{self, OutboundRequestId},
swarm::SwarmEvent,
};
use tokio::sync::mpsc;
use tokio::{sync::mpsc, time::MissedTickBehavior};
use tracing::{error, warn};

use crate::{Behaviour, BehaviourEvent, req_resp::Request, req_resp::Response};
use crate::{Behaviour, BehaviourEvent, metrics, req_resp::Request, req_resp::Response};

/// Interval between gossipsub mesh peer metric refreshes.
const MESH_METRIC_REFRESH_INTERVAL: Duration = Duration::from_secs(10);

pub enum SwarmCommand {
Publish {
Expand Down Expand Up @@ -106,6 +111,8 @@ async fn swarm_loop(
event_tx: mpsc::UnboundedSender<SwarmEvent<BehaviourEvent>>,
mut cmd_rx: mpsc::UnboundedReceiver<SwarmCommand>,
) {
let mut mesh_metric_tick = tokio::time::interval(MESH_METRIC_REFRESH_INTERVAL);
mesh_metric_tick.set_missed_tick_behavior(MissedTickBehavior::Skip);
loop {
tokio::select! {
event = swarm.next() => {
Expand All @@ -116,6 +123,11 @@ async fn swarm_loop(
let Some(cmd) = cmd else { break };
execute_command(&mut swarm, cmd);
}
_ = mesh_metric_tick.tick() => {
metrics::update_gossip_mesh_peers(
swarm.behaviour().gossipsub.all_mesh_peers(),
);
}
}
}
error!("Swarm adapter loop exited β€” P2P networking is no longer functional");
Expand Down