diff --git a/CHANGELOG.md b/CHANGELOG.md index fd8955b..204d3d6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,38 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [mqttv5-cli 0.27.3] - 2026-05-16 + +### Changed + +- **Bump `mqtt5` dep to `0.32`** - transitive bump for the breaking `ConnectionEvent::Connected` field addition. No CLI surface changes. + +## [mqtt5-wasm 1.3.2] - 2026-05-16 + +### Changed + +- **Bump `mqtt5` dep to `0.32` and `mqtt5-protocol` to `0.13`** - transitive bump for the breaking `ConnectionEvent::Connected` field addition. No wasm surface changes. + +## [mqtt5 0.32.0] - 2026-05-16 + +### Changed + +- **BREAKING: `ConnectionEvent::Connected` gained `keep_alive: Duration`** - exposes the broker-negotiated keep-alive interval from CONNACK to event consumers. Existing match arms that destructure the variant must be updated. + +### Added + +- **Honor MQTT v5 ServerKeepAlive negotiation** - the client now adopts the broker-supplied keep-alive interval from the CONNACK `Server Keep Alive` property when present ([MQTT-3.2.2-22]), driving PINGREQ cadence and read timeouts off the negotiated value rather than the requested one. The originally configured interval is preserved on `ConnectOptions::keep_alive` so subsequent CONNECTs re-negotiate from the user's intent. New `MqttClient::keep_alive() -> Duration` accessor exposes the current effective value. + +## [mqtt5-protocol 0.13.0] - 2026-05-16 + +### Changed + +- **BREAKING: `ConnectionEvent::Connected` variant gained `keep_alive: Duration`** - carries the broker-negotiated keep-alive interval out to event consumers. Adding a field to a public enum variant is a SemVer-major change. + +### Added + +- **`Properties::get_server_keep_alive() -> Option`** - getter for the v5 `ServerKeepAlive` CONNACK property, symmetric with the existing `set_server_keep_alive` setter. + ## [mqtt5-wasm 1.3.1] - 2026-05-15 ### Fixed diff --git a/crates/mqtt5-protocol/Cargo.toml b/crates/mqtt5-protocol/Cargo.toml index 3d2d876..8131660 100644 --- a/crates/mqtt5-protocol/Cargo.toml +++ b/crates/mqtt5-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5-protocol" -version = "0.12.0" +version = "0.13.0" edition.workspace = true rust-version.workspace = true authors.workspace = true diff --git a/crates/mqtt5-protocol/src/connection.rs b/crates/mqtt5-protocol/src/connection.rs index cb4e2fd..34a7657 100644 --- a/crates/mqtt5-protocol/src/connection.rs +++ b/crates/mqtt5-protocol/src/connection.rs @@ -52,10 +52,22 @@ pub enum DisconnectReason { #[derive(Debug, Clone)] pub enum ConnectionEvent { Connecting, - Connected { session_present: bool }, - Disconnected { reason: DisconnectReason }, - Reconnecting { attempt: u32 }, - ReconnectFailed { error: MqttError }, + Connected { + session_present: bool, + /// Effective keep-alive interval after MQTT v5 `ServerKeepAlive` negotiation. + /// + /// Equals `Duration::ZERO` if keep-alive is disabled. + keep_alive: Duration, + }, + Disconnected { + reason: DisconnectReason, + }, + Reconnecting { + attempt: u32, + }, + ReconnectFailed { + error: MqttError, + }, } #[derive(Debug, Clone, Default)] @@ -197,9 +209,13 @@ impl ConnectionStateMachine { ConnectionEvent::Connecting => { self.state = ConnectionState::Connecting; } - ConnectionEvent::Connected { session_present } => { + ConnectionEvent::Connected { + session_present, + keep_alive, + } => { self.state = ConnectionState::Connected; self.info.session_present = *session_present; + self.info.server_keep_alive = u16::try_from(keep_alive.as_secs()).ok(); } ConnectionEvent::Disconnected { .. } | ConnectionEvent::ReconnectFailed { .. } => { self.state = ConnectionState::Disconnected; @@ -275,9 +291,11 @@ mod tests { sm.transition(&ConnectionEvent::Connected { session_present: true, + keep_alive: Duration::from_secs(60), }); assert!(sm.is_connected()); assert!(sm.info().session_present); + assert_eq!(sm.info().server_keep_alive, Some(60)); sm.transition(&ConnectionEvent::Disconnected { reason: DisconnectReason::NetworkError("timeout".into()), @@ -338,6 +356,7 @@ mod tests { sm.transition(&ConnectionEvent::Connecting); sm.transition(&ConnectionEvent::Connected { session_present: false, + keep_alive: Duration::from_secs(60), }); assert!(sm.is_connected()); diff --git a/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs b/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs index 03534b9..5db7505 100644 --- a/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs +++ b/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs @@ -173,6 +173,20 @@ impl Properties { .push(PropertyValue::TwoByteInteger(seconds)); } + #[must_use] + pub fn get_server_keep_alive(&self) -> Option { + self.properties + .get(&PropertyId::ServerKeepAlive) + .and_then(|values| values.first()) + .and_then(|value| { + if let PropertyValue::TwoByteInteger(v) = value { + Some(*v) + } else { + None + } + }) + } + pub fn set_authentication_method(&mut self, method: String) { self.properties .entry(PropertyId::AuthenticationMethod) diff --git a/crates/mqtt5-wasm/Cargo.toml b/crates/mqtt5-wasm/Cargo.toml index 5ed138b..61ba312 100644 --- a/crates/mqtt5-wasm/Cargo.toml +++ b/crates/mqtt5-wasm/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5-wasm" -version = "1.3.1" +version = "1.3.2" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -27,8 +27,8 @@ broker = ["client", "dep:mqtt5", "dep:tokio"] codec = ["client", "dep:miniz_oxide"] [dependencies] -mqtt5-protocol = "0.12.0" -mqtt5 = { version = "0.31", optional = true, default-features = false, features = [ +mqtt5-protocol = "0.13.0" +mqtt5 = { version = "0.32", optional = true, default-features = false, features = [ "tokio", ] } diff --git a/crates/mqtt5/Cargo.toml b/crates/mqtt5/Cargo.toml index 01a23bd..68a5421 100644 --- a/crates/mqtt5/Cargo.toml +++ b/crates/mqtt5/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5" -version = "0.31.5" +version = "0.32.0" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -120,7 +120,7 @@ opentelemetry_sdk = { version = "0.31", features = ["trace", "rt-tokio", "metric opentelemetry-otlp = { version = "0.31", features = ["trace", "grpc-tonic", "metrics"], optional = true } tracing-opentelemetry = { version = "0.32", optional = true } opentelemetry = { version = "0.31", features = ["metrics", "trace"], optional = true } -mqtt5-protocol = "0.12.0" +mqtt5-protocol = "0.13.0" argon2 = "0.5" getrandom = "0.3" bebytes = { version = "3.0", features = ["bytes"] } diff --git a/crates/mqtt5/examples/simple_client.rs b/crates/mqtt5/examples/simple_client.rs index efe8db6..4b3d61f 100644 --- a/crates/mqtt5/examples/simple_client.rs +++ b/crates/mqtt5/examples/simple_client.rs @@ -72,8 +72,13 @@ async fn main() -> Result<(), Box> { ConnectionEvent::Connecting => { println!("🔌 Connecting to MQTT broker..."); } - ConnectionEvent::Connected { session_present } => { - println!("✅ Connected to MQTT broker (session_present: {session_present})"); + ConnectionEvent::Connected { + session_present, + keep_alive, + } => { + println!( + "✅ Connected to MQTT broker (session_present: {session_present}, keep_alive: {keep_alive:?})" + ); } ConnectionEvent::Disconnected { reason } => { println!("❌ Disconnected from broker: {reason:?}"); diff --git a/crates/mqtt5/src/broker/quic_acceptor.rs b/crates/mqtt5/src/broker/quic_acceptor.rs index b7e31a3..a894e98 100644 --- a/crates/mqtt5/src/broker/quic_acceptor.rs +++ b/crates/mqtt5/src/broker/quic_acceptor.rs @@ -650,17 +650,21 @@ fn spawn_datagram_reader( label, peer_addr ); - let packet = match decode_datagram_packet(&datagram) { - Some(Ok(packet)) => packet, + #[allow( + clippy::collapsible_match, + reason = "cannot move `packet` into async send from a pattern guard" + )] + match decode_datagram_packet(&datagram) { + Some(Ok(packet)) => { + if packet_tx.send((packet, None)).await.is_err() { + debug!("Datagram packet channel closed for {}", peer_addr); + break; + } + } Some(Err(e)) => { warn!("Failed to decode datagram from {}: {}", peer_addr, e); - continue; } - None => continue, - }; - if packet_tx.send((packet, None)).await.is_err() { - debug!("Datagram packet channel closed for {}", peer_addr); - break; + None => {} } } Err(e) => { diff --git a/crates/mqtt5/src/client/direct/mod.rs b/crates/mqtt5/src/client/direct/mod.rs index ef7ea6b..084cbe7 100644 --- a/crates/mqtt5/src/client/direct/mod.rs +++ b/crates/mqtt5/src/client/direct/mod.rs @@ -110,6 +110,7 @@ pub struct DirectClientInner { pub auth_handler: Option>, pub auth_method: Option, pub keepalive_state: Arc>, + pub negotiated_keep_alive_secs: AtomicU64, #[cfg(feature = "transport-quic")] pub cached_quic_client_config: Option, #[cfg(feature = "transport-quic")] @@ -126,6 +127,7 @@ impl DirectClientInner { let queue_on_disconnect = !options.clean_start; let auth_method = options.properties.authentication_method.clone(); + let initial_keep_alive_secs = options.keep_alive.as_secs(); Self { writer: None, @@ -166,6 +168,7 @@ impl DirectClientInner { auth_handler: None, auth_method, keepalive_state: Arc::new(Mutex::new(KeepaliveState::default())), + negotiated_keep_alive_secs: AtomicU64::new(initial_keep_alive_secs), #[cfg(feature = "transport-quic")] cached_quic_client_config: None, #[cfg(feature = "transport-quic")] @@ -181,6 +184,38 @@ impl DirectClientInner { self.connected.load(Ordering::SeqCst) } + pub fn negotiated_keep_alive(&self) -> Duration { + Duration::from_secs(self.negotiated_keep_alive_secs.load(Ordering::Relaxed)) + } + + fn configured_keep_alive_u16(&self) -> u16 { + let requested = self.options.keep_alive.as_secs(); + u16::try_from(requested).unwrap_or_else(|_| { + tracing::warn!( + "Configured keep-alive {}s exceeds the u16 wire range; clamping to {}s", + requested, + u16::MAX, + ); + u16::MAX + }) + } + + fn apply_negotiated_keep_alive(&self, server_value: Option) { + let effective = server_value.map_or_else( + || self.configured_keep_alive_u16(), + |v| { + tracing::debug!( + "Server overrode keep-alive: requested={}s, negotiated={}s", + self.options.keep_alive.as_secs(), + v, + ); + v + }, + ); + self.negotiated_keep_alive_secs + .store(u64::from(effective), Ordering::Relaxed); + } + pub fn set_connected(&self, connected: bool) { self.connected.store(connected, Ordering::SeqCst); } @@ -359,6 +394,8 @@ impl DirectClientInner { *self.server_max_qos.lock() = None; } + self.apply_negotiated_keep_alive(connack.properties.get_server_keep_alive()); + let protocol_version = self.options.protocol_version.as_u8(); let (reader, writer) = match transport { TransportType::Tcp(tcp) => { @@ -1139,12 +1176,7 @@ impl DirectClientInner { ConnectPacket { protocol_version: self.options.protocol_version.as_u8(), clean_start: self.options.clean_start, - keep_alive: self - .options - .keep_alive - .as_secs() - .try_into() - .unwrap_or(u16::MAX), + keep_alive: self.configured_keep_alive_u16(), client_id: session.client_id().to_string(), will: self.options.will.clone(), username: self.options.username.clone(), @@ -1201,7 +1233,7 @@ impl DirectClientInner { tracing::debug!("📦 PACKET READER - Task exited"); })); - let keepalive_interval = self.options.keep_alive; + let keepalive_interval = self.negotiated_keep_alive(); if keepalive_interval.is_zero() { tracing::debug!("💓 KEEPALIVE - Disabled (interval is zero)"); } else { diff --git a/crates/mqtt5/src/client/inner.rs b/crates/mqtt5/src/client/inner.rs index 6be8165..785c9e7 100644 --- a/crates/mqtt5/src/client/inner.rs +++ b/crates/mqtt5/src/client/inner.rs @@ -28,9 +28,13 @@ impl MqttClient { &self, stored_subs: Vec, session_present: bool, + keep_alive: std::time::Duration, ) { - self.trigger_connection_event(ConnectionEvent::Connected { session_present }) - .await; + self.trigger_connection_event(ConnectionEvent::Connected { + session_present, + keep_alive, + }) + .await; self.recover_quic_flows().await; self.restore_subscriptions_after_connect(stored_subs, session_present) .await; @@ -419,9 +423,10 @@ impl MqttClient { } let stored_subs = inner.stored_subscriptions.lock().clone(); let session_present = result.session_present; + let keep_alive = inner.negotiated_keep_alive(); drop(inner); - self.on_successful_connect(stored_subs, session_present) + self.on_successful_connect(stored_subs, session_present, keep_alive) .await; return Ok(result); @@ -525,9 +530,10 @@ impl MqttClient { Ok(result) => { let stored_subs = inner.stored_subscriptions.lock().clone(); let session_present = result.session_present; + let keep_alive = inner.negotiated_keep_alive(); drop(inner); - self.on_successful_connect(stored_subs, session_present) + self.on_successful_connect(stored_subs, session_present, keep_alive) .await; Ok(result) diff --git a/crates/mqtt5/src/client/mod.rs b/crates/mqtt5/src/client/mod.rs index 1c60120..57ae093 100644 --- a/crates/mqtt5/src/client/mod.rs +++ b/crates/mqtt5/src/client/mod.rs @@ -125,6 +125,25 @@ impl MqttClient { self.inner.read().await.is_connected() } + /// Returns the keep-alive interval currently in effect. + /// + /// On MQTT v5 this reflects the value after `ServerKeepAlive` negotiation: + /// if the broker returned a `ServerKeepAlive` property in CONNACK the + /// returned value matches the broker-imposed interval, otherwise it matches + /// the value passed to [`ConnectOptions::with_keep_alive`]. + /// + /// Per `[MQTT-3.2.2-22]` a broker-returned `ServerKeepAlive` always wins, + /// including the case where the client requested `Duration::ZERO` + /// (disabled): a non-zero `ServerKeepAlive` will re-enable keep-alive on + /// the connection. A `ServerKeepAlive` of zero disables keep-alive. + /// + /// Before the first successful `connect()` call this returns the value + /// configured on the [`ConnectOptions`]. A returned value of + /// `Duration::ZERO` means keep-alive is disabled. + pub async fn keep_alive(&self) -> std::time::Duration { + self.inner.read().await.negotiated_keep_alive() + } + /// Gets the client ID pub async fn client_id(&self) -> String { self.inner @@ -152,8 +171,8 @@ impl MqttClient { /// ConnectionEvent::Connecting => { /// println!("Connecting..."); /// } - /// ConnectionEvent::Connected { session_present } => { - /// println!("Connected! Session present: {}", session_present); + /// ConnectionEvent::Connected { session_present, keep_alive } => { + /// println!("Connected! Session present: {session_present}, keep-alive: {keep_alive:?}"); /// } /// ConnectionEvent::Disconnected { reason } => { /// println!("Disconnected: {:?}", reason); diff --git a/crates/mqtt5/src/lib.rs b/crates/mqtt5/src/lib.rs index df28186..4f6e23e 100644 --- a/crates/mqtt5/src/lib.rs +++ b/crates/mqtt5/src/lib.rs @@ -61,8 +61,8 @@ //! ConnectionEvent::Connecting => { //! println!("Connecting..."); //! } -//! ConnectionEvent::Connected { session_present } => { -//! println!("Connected! Session present: {}", session_present); +//! ConnectionEvent::Connected { session_present, keep_alive } => { +//! println!("Connected! Session present: {session_present}, keep-alive: {keep_alive:?}"); //! } //! ConnectionEvent::Disconnected { reason } => { //! println!("Disconnected: {:?}", reason); diff --git a/crates/mqtt5/tests/connection_events.rs b/crates/mqtt5/tests/connection_events.rs index 981ea58..b922741 100644 --- a/crates/mqtt5/tests/connection_events.rs +++ b/crates/mqtt5/tests/connection_events.rs @@ -21,6 +21,7 @@ async fn test_connection_event_callback() { .on_connection_event(move |event| match event { ConnectionEvent::Connected { session_present: sp, + .. } => { connected_count_clone.fetch_add(1, Ordering::Relaxed); session_present_clone.store(sp, Ordering::Relaxed); diff --git a/crates/mqtt5/tests/keepalive_negotiation.rs b/crates/mqtt5/tests/keepalive_negotiation.rs new file mode 100644 index 0000000..5a47141 --- /dev/null +++ b/crates/mqtt5/tests/keepalive_negotiation.rs @@ -0,0 +1,152 @@ +mod common; + +use common::TestBroker; + +use mqtt5::broker::config::{BrokerConfig, StorageBackend, StorageConfig}; +use mqtt5::time::Duration; +use mqtt5::{ConnectOptions, ConnectionEvent, MqttClient}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[tokio::test] +async fn server_keep_alive_overrides_client_request() { + let storage_config = StorageConfig { + backend: StorageBackend::Memory, + enable_persistence: true, + ..Default::default() + }; + let mut config = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config); + config.server_keep_alive = Some(Duration::from_secs(10)); + + let broker = TestBroker::start_with_config(config).await; + + let mut options = ConnectOptions::new("kanego-1"); + options.keep_alive = Duration::from_secs(60); + + let observed = Arc::new(Mutex::new(None::)); + let observed_clone = Arc::clone(&observed); + + let client = MqttClient::with_options(options); + client + .on_connection_event(move |event| { + if let ConnectionEvent::Connected { keep_alive, .. } = event { + let observed = Arc::clone(&observed_clone); + tokio::spawn(async move { + *observed.lock().await = Some(keep_alive); + }); + } + }) + .await + .unwrap(); + + client.connect(broker.address()).await.unwrap(); + + assert_eq!(client.keep_alive().await, Duration::from_secs(10)); + + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!( + *observed.lock().await, + Some(Duration::from_secs(10)), + "ConnectionEvent::Connected must carry the broker-negotiated keep-alive", + ); + + client.disconnect().await.unwrap(); +} + +#[tokio::test] +async fn keep_alive_falls_back_to_client_value_without_server_override() { + let broker = TestBroker::start().await; + + let mut options = ConnectOptions::new("kanego-2"); + options.keep_alive = Duration::from_secs(30); + + let client = MqttClient::with_options(options); + client.connect(broker.address()).await.unwrap(); + + assert_eq!(client.keep_alive().await, Duration::from_secs(30)); + + client.disconnect().await.unwrap(); +} + +#[tokio::test] +async fn keep_alive_before_connect_returns_configured_value() { + let mut options = ConnectOptions::new("kanego-3"); + options.keep_alive = Duration::from_secs(45); + + let client = MqttClient::with_options(options); + assert_eq!(client.keep_alive().await, Duration::from_secs(45)); +} + +#[tokio::test] +async fn keep_alive_renegotiates_against_each_broker_on_reconnect() { + let storage_config = StorageConfig { + backend: StorageBackend::Memory, + enable_persistence: true, + ..Default::default() + }; + + let mut config_a = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config.clone()); + config_a.server_keep_alive = Some(Duration::from_secs(10)); + let broker_a = TestBroker::start_with_config(config_a).await; + + let mut config_b = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config); + config_b.server_keep_alive = Some(Duration::from_secs(25)); + let broker_b = TestBroker::start_with_config(config_b).await; + + let mut options = ConnectOptions::new("kanego-4"); + options.keep_alive = Duration::from_secs(60); + + let client = MqttClient::with_options(options); + + client.connect(broker_a.address()).await.unwrap(); + assert_eq!( + client.keep_alive().await, + Duration::from_secs(10), + "first CONNECT must adopt broker A's ServerKeepAlive", + ); + client.disconnect().await.unwrap(); + + client.connect(broker_b.address()).await.unwrap(); + assert_eq!( + client.keep_alive().await, + Duration::from_secs(25), + "reconnect must re-negotiate from the original 60s request, not the previously-negotiated 10s", + ); + client.disconnect().await.unwrap(); +} + +#[tokio::test] +async fn server_keep_alive_zero_disables_keepalive_even_with_nonzero_client_request() { + let storage_config = StorageConfig { + backend: StorageBackend::Memory, + enable_persistence: true, + ..Default::default() + }; + let mut config = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config); + config.server_keep_alive = Some(Duration::ZERO); + + let broker = TestBroker::start_with_config(config).await; + + let mut options = ConnectOptions::new("kanego-5"); + options.keep_alive = Duration::from_secs(60); + + let client = MqttClient::with_options(options); + client.connect(broker.address()).await.unwrap(); + + assert_eq!( + client.keep_alive().await, + Duration::ZERO, + "ServerKeepAlive=0 must disable keep-alive even when the client requested a non-zero interval", + ); + + client.disconnect().await.unwrap(); +} diff --git a/crates/mqttv5-cli/Cargo.toml b/crates/mqttv5-cli/Cargo.toml index 3935596..799403d 100644 --- a/crates/mqttv5-cli/Cargo.toml +++ b/crates/mqttv5-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqttv5-cli" -version = "0.27.2" +version = "0.27.3" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -22,7 +22,7 @@ opentelemetry = ["mqtt5/opentelemetry"] codec = ["mqtt5/codec-all"] [dependencies] -mqtt5 = "0.31" +mqtt5 = { path = "../mqtt5", version = "0.32" } anyhow = "1.0" tracing = "0.1" serde = { version = "1.0", features = ["derive"] } diff --git a/crates/mqttv5-cli/src/commands/bench_cmd.rs b/crates/mqttv5-cli/src/commands/bench_cmd.rs index 4fb864e..30010eb 100644 --- a/crates/mqttv5-cli/src/commands/bench_cmd.rs +++ b/crates/mqttv5-cli/src/commands/bench_cmd.rs @@ -1451,15 +1451,8 @@ async fn run_hol_blocking(cmd: BenchCommand) -> Result<()> { ) .await?; - let per_topic_interval_us = if cmd.rate > 0 { - #[allow(clippy::cast_possible_truncation)] - let interval = (1_000_000u64 * (num_topics as u64)) - .checked_div(cmd.rate) - .unwrap_or(0); - Some(interval) - } else { - None - }; + #[allow(clippy::cast_possible_truncation)] + let per_topic_interval_us = (1_000_000u64 * (num_topics as u64)).checked_div(cmd.rate); let rate_label = if cmd.rate > 0 { format!("{} msg/s", cmd.rate) } else { diff --git a/crates/mqttv5-cli/src/commands/pub_cmd.rs b/crates/mqttv5-cli/src/commands/pub_cmd.rs index fc4bc3a..92adb39 100644 --- a/crates/mqttv5-cli/src/commands/pub_cmd.rs +++ b/crates/mqttv5-cli/src/commands/pub_cmd.rs @@ -715,7 +715,9 @@ async fn keep_alive_loop(client: &MqttClient, auto_reconnect: bool) -> Result<() ConnectionEvent::Connecting => { info!("Connecting to broker..."); } - ConnectionEvent::Connected { session_present } => { + ConnectionEvent::Connected { + session_present, .. + } => { if session_present { info!("✓ Reconnected (session present)"); } else {