From 2a5ff58ec31f0024ccb8fd45f8757300d6ec5050 Mon Sep 17 00:00:00 2001 From: Michael Date: Thu, 14 May 2026 11:12:08 +0800 Subject: [PATCH 1/6] feat(client): honor MQTT v5 ServerKeepAlive negotiation The client used to ignore `ServerKeepAlive` in CONNACK and keep pinging on its requested cadence, violating `[MQTT-3.2.2-22]`. Now the negotiated value drives the keep-alive task for the connection, and the original `ConnectOptions::keep_alive` is preserved so the next CONNECT re-negotiates. - **Breaking**: `ConnectionEvent::Connected` gains `keep_alive: Duration`. - New `MqttClient::keep_alive()` getter for the active interval. - New `Properties::get_server_keep_alive()` accessor on `mqtt5-protocol`. - `mqttv5-cli` switched from `mqtt5 = "0.29"` to `{ path = "../mqtt5", version = "0.31" }` so the breaking change to the patched protocol crate doesn't break the old registry copy. New `tests/keepalive_negotiation.rs` covers broker override, client-value fallback, and the pre-connect getter. --- crates/mqtt5-protocol/src/connection.rs | 29 +++++-- .../src/protocol/v5/properties/accessors.rs | 14 ++++ crates/mqtt5/examples/simple_client.rs | 9 ++- crates/mqtt5/src/client/direct/mod.rs | 27 ++++++- crates/mqtt5/src/client/inner.rs | 14 +++- crates/mqtt5/src/client/mod.rs | 18 ++++- crates/mqtt5/src/lib.rs | 4 +- crates/mqtt5/tests/connection_events.rs | 1 + crates/mqtt5/tests/keepalive_negotiation.rs | 81 +++++++++++++++++++ crates/mqttv5-cli/Cargo.toml | 2 +- crates/mqttv5-cli/src/commands/pub_cmd.rs | 4 +- 11 files changed, 185 insertions(+), 18 deletions(-) create mode 100644 crates/mqtt5/tests/keepalive_negotiation.rs diff --git a/crates/mqtt5-protocol/src/connection.rs b/crates/mqtt5-protocol/src/connection.rs index cb4e2fdf..34a76574 100644 --- a/crates/mqtt5-protocol/src/connection.rs +++ b/crates/mqtt5-protocol/src/connection.rs @@ -52,10 +52,22 @@ pub enum DisconnectReason { #[derive(Debug, Clone)] pub enum ConnectionEvent { Connecting, - Connected { session_present: bool }, - Disconnected { reason: DisconnectReason }, - Reconnecting { attempt: u32 }, - ReconnectFailed { error: MqttError }, + Connected { + session_present: bool, + /// Effective keep-alive interval after MQTT v5 `ServerKeepAlive` negotiation. + /// + /// Equals `Duration::ZERO` if keep-alive is disabled. + keep_alive: Duration, + }, + Disconnected { + reason: DisconnectReason, + }, + Reconnecting { + attempt: u32, + }, + ReconnectFailed { + error: MqttError, + }, } #[derive(Debug, Clone, Default)] @@ -197,9 +209,13 @@ impl ConnectionStateMachine { ConnectionEvent::Connecting => { self.state = ConnectionState::Connecting; } - ConnectionEvent::Connected { session_present } => { + ConnectionEvent::Connected { + session_present, + keep_alive, + } => { self.state = ConnectionState::Connected; self.info.session_present = *session_present; + self.info.server_keep_alive = u16::try_from(keep_alive.as_secs()).ok(); } ConnectionEvent::Disconnected { .. } | ConnectionEvent::ReconnectFailed { .. } => { self.state = ConnectionState::Disconnected; @@ -275,9 +291,11 @@ mod tests { sm.transition(&ConnectionEvent::Connected { session_present: true, + keep_alive: Duration::from_secs(60), }); assert!(sm.is_connected()); assert!(sm.info().session_present); + assert_eq!(sm.info().server_keep_alive, Some(60)); sm.transition(&ConnectionEvent::Disconnected { reason: DisconnectReason::NetworkError("timeout".into()), @@ -338,6 +356,7 @@ mod tests { sm.transition(&ConnectionEvent::Connecting); sm.transition(&ConnectionEvent::Connected { session_present: false, + keep_alive: Duration::from_secs(60), }); assert!(sm.is_connected()); diff --git a/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs b/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs index 03534b92..5db75057 100644 --- a/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs +++ b/crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs @@ -173,6 +173,20 @@ impl Properties { .push(PropertyValue::TwoByteInteger(seconds)); } + #[must_use] + pub fn get_server_keep_alive(&self) -> Option { + self.properties + .get(&PropertyId::ServerKeepAlive) + .and_then(|values| values.first()) + .and_then(|value| { + if let PropertyValue::TwoByteInteger(v) = value { + Some(*v) + } else { + None + } + }) + } + pub fn set_authentication_method(&mut self, method: String) { self.properties .entry(PropertyId::AuthenticationMethod) diff --git a/crates/mqtt5/examples/simple_client.rs b/crates/mqtt5/examples/simple_client.rs index efe8db65..4b3d61f2 100644 --- a/crates/mqtt5/examples/simple_client.rs +++ b/crates/mqtt5/examples/simple_client.rs @@ -72,8 +72,13 @@ async fn main() -> Result<(), Box> { ConnectionEvent::Connecting => { println!("🔌 Connecting to MQTT broker..."); } - ConnectionEvent::Connected { session_present } => { - println!("✅ Connected to MQTT broker (session_present: {session_present})"); + ConnectionEvent::Connected { + session_present, + keep_alive, + } => { + println!( + "✅ Connected to MQTT broker (session_present: {session_present}, keep_alive: {keep_alive:?})" + ); } ConnectionEvent::Disconnected { reason } => { println!("❌ Disconnected from broker: {reason:?}"); diff --git a/crates/mqtt5/src/client/direct/mod.rs b/crates/mqtt5/src/client/direct/mod.rs index ef7ea6ba..9f9f4f52 100644 --- a/crates/mqtt5/src/client/direct/mod.rs +++ b/crates/mqtt5/src/client/direct/mod.rs @@ -110,6 +110,7 @@ pub struct DirectClientInner { pub auth_handler: Option>, pub auth_method: Option, pub keepalive_state: Arc>, + pub negotiated_keep_alive_secs: Arc, #[cfg(feature = "transport-quic")] pub cached_quic_client_config: Option, #[cfg(feature = "transport-quic")] @@ -126,6 +127,7 @@ impl DirectClientInner { let queue_on_disconnect = !options.clean_start; let auth_method = options.properties.authentication_method.clone(); + let initial_keep_alive_secs = options.keep_alive.as_secs(); Self { writer: None, @@ -166,6 +168,7 @@ impl DirectClientInner { auth_handler: None, auth_method, keepalive_state: Arc::new(Mutex::new(KeepaliveState::default())), + negotiated_keep_alive_secs: Arc::new(AtomicU64::new(initial_keep_alive_secs)), #[cfg(feature = "transport-quic")] cached_quic_client_config: None, #[cfg(feature = "transport-quic")] @@ -181,6 +184,26 @@ impl DirectClientInner { self.connected.load(Ordering::SeqCst) } + pub fn negotiated_keep_alive(&self) -> Duration { + Duration::from_secs(self.negotiated_keep_alive_secs.load(Ordering::SeqCst)) + } + + fn apply_negotiated_keep_alive(&self, server_value: Option) { + let effective = server_value.map_or_else( + || u16::try_from(self.options.keep_alive.as_secs()).unwrap_or(u16::MAX), + |v| { + tracing::debug!( + "Server overrode keep-alive: requested={}s, negotiated={}s", + self.options.keep_alive.as_secs(), + v, + ); + v + }, + ); + self.negotiated_keep_alive_secs + .store(u64::from(effective), Ordering::SeqCst); + } + pub fn set_connected(&self, connected: bool) { self.connected.store(connected, Ordering::SeqCst); } @@ -359,6 +382,8 @@ impl DirectClientInner { *self.server_max_qos.lock() = None; } + self.apply_negotiated_keep_alive(connack.properties.get_server_keep_alive()); + let protocol_version = self.options.protocol_version.as_u8(); let (reader, writer) = match transport { TransportType::Tcp(tcp) => { @@ -1201,7 +1226,7 @@ impl DirectClientInner { tracing::debug!("📦 PACKET READER - Task exited"); })); - let keepalive_interval = self.options.keep_alive; + let keepalive_interval = self.negotiated_keep_alive(); if keepalive_interval.is_zero() { tracing::debug!("💓 KEEPALIVE - Disabled (interval is zero)"); } else { diff --git a/crates/mqtt5/src/client/inner.rs b/crates/mqtt5/src/client/inner.rs index 6be81657..785c9e7e 100644 --- a/crates/mqtt5/src/client/inner.rs +++ b/crates/mqtt5/src/client/inner.rs @@ -28,9 +28,13 @@ impl MqttClient { &self, stored_subs: Vec, session_present: bool, + keep_alive: std::time::Duration, ) { - self.trigger_connection_event(ConnectionEvent::Connected { session_present }) - .await; + self.trigger_connection_event(ConnectionEvent::Connected { + session_present, + keep_alive, + }) + .await; self.recover_quic_flows().await; self.restore_subscriptions_after_connect(stored_subs, session_present) .await; @@ -419,9 +423,10 @@ impl MqttClient { } let stored_subs = inner.stored_subscriptions.lock().clone(); let session_present = result.session_present; + let keep_alive = inner.negotiated_keep_alive(); drop(inner); - self.on_successful_connect(stored_subs, session_present) + self.on_successful_connect(stored_subs, session_present, keep_alive) .await; return Ok(result); @@ -525,9 +530,10 @@ impl MqttClient { Ok(result) => { let stored_subs = inner.stored_subscriptions.lock().clone(); let session_present = result.session_present; + let keep_alive = inner.negotiated_keep_alive(); drop(inner); - self.on_successful_connect(stored_subs, session_present) + self.on_successful_connect(stored_subs, session_present, keep_alive) .await; Ok(result) diff --git a/crates/mqtt5/src/client/mod.rs b/crates/mqtt5/src/client/mod.rs index 1c601203..1af6adfd 100644 --- a/crates/mqtt5/src/client/mod.rs +++ b/crates/mqtt5/src/client/mod.rs @@ -125,6 +125,20 @@ impl MqttClient { self.inner.read().await.is_connected() } + /// Returns the keep-alive interval currently in effect. + /// + /// On MQTT v5 this reflects the value after `ServerKeepAlive` negotiation: + /// if the broker returned a `ServerKeepAlive` property in CONNACK the + /// returned value matches the broker-imposed interval, otherwise it matches + /// the value passed to [`ConnectOptions::with_keep_alive`]. + /// + /// Before the first successful `connect()` call this returns the value + /// configured on the [`ConnectOptions`]. A returned value of + /// `Duration::ZERO` means keep-alive is disabled. + pub async fn keep_alive(&self) -> std::time::Duration { + self.inner.read().await.negotiated_keep_alive() + } + /// Gets the client ID pub async fn client_id(&self) -> String { self.inner @@ -152,8 +166,8 @@ impl MqttClient { /// ConnectionEvent::Connecting => { /// println!("Connecting..."); /// } - /// ConnectionEvent::Connected { session_present } => { - /// println!("Connected! Session present: {}", session_present); + /// ConnectionEvent::Connected { session_present, keep_alive } => { + /// println!("Connected! Session present: {session_present}, keep-alive: {keep_alive:?}"); /// } /// ConnectionEvent::Disconnected { reason } => { /// println!("Disconnected: {:?}", reason); diff --git a/crates/mqtt5/src/lib.rs b/crates/mqtt5/src/lib.rs index df28186e..4f6e23e7 100644 --- a/crates/mqtt5/src/lib.rs +++ b/crates/mqtt5/src/lib.rs @@ -61,8 +61,8 @@ //! ConnectionEvent::Connecting => { //! println!("Connecting..."); //! } -//! ConnectionEvent::Connected { session_present } => { -//! println!("Connected! Session present: {}", session_present); +//! ConnectionEvent::Connected { session_present, keep_alive } => { +//! println!("Connected! Session present: {session_present}, keep-alive: {keep_alive:?}"); //! } //! ConnectionEvent::Disconnected { reason } => { //! println!("Disconnected: {:?}", reason); diff --git a/crates/mqtt5/tests/connection_events.rs b/crates/mqtt5/tests/connection_events.rs index 981ea589..b9227413 100644 --- a/crates/mqtt5/tests/connection_events.rs +++ b/crates/mqtt5/tests/connection_events.rs @@ -21,6 +21,7 @@ async fn test_connection_event_callback() { .on_connection_event(move |event| match event { ConnectionEvent::Connected { session_present: sp, + .. } => { connected_count_clone.fetch_add(1, Ordering::Relaxed); session_present_clone.store(sp, Ordering::Relaxed); diff --git a/crates/mqtt5/tests/keepalive_negotiation.rs b/crates/mqtt5/tests/keepalive_negotiation.rs new file mode 100644 index 00000000..972ca08d --- /dev/null +++ b/crates/mqtt5/tests/keepalive_negotiation.rs @@ -0,0 +1,81 @@ +mod common; + +use common::TestBroker; + +use mqtt5::broker::config::{BrokerConfig, StorageBackend, StorageConfig}; +use mqtt5::time::Duration; +use mqtt5::{ConnectOptions, ConnectionEvent, MqttClient}; +use std::net::SocketAddr; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[tokio::test] +async fn server_keep_alive_overrides_client_request() { + let storage_config = StorageConfig { + backend: StorageBackend::Memory, + enable_persistence: true, + ..Default::default() + }; + let mut config = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config); + config.server_keep_alive = Some(Duration::from_secs(10)); + + let broker = TestBroker::start_with_config(config).await; + + let mut options = ConnectOptions::new("kanego-1"); + options.keep_alive = Duration::from_secs(60); + + let observed = Arc::new(Mutex::new(None::)); + let observed_clone = Arc::clone(&observed); + + let client = MqttClient::with_options(options); + client + .on_connection_event(move |event| { + if let ConnectionEvent::Connected { keep_alive, .. } = event { + let observed = Arc::clone(&observed_clone); + tokio::spawn(async move { + *observed.lock().await = Some(keep_alive); + }); + } + }) + .await + .unwrap(); + + client.connect(broker.address()).await.unwrap(); + + assert_eq!(client.keep_alive().await, Duration::from_secs(10)); + + tokio::time::sleep(Duration::from_millis(100)).await; + assert_eq!( + *observed.lock().await, + Some(Duration::from_secs(10)), + "ConnectionEvent::Connected must carry the broker-negotiated keep-alive", + ); + + client.disconnect().await.unwrap(); +} + +#[tokio::test] +async fn keep_alive_falls_back_to_client_value_without_server_override() { + let broker = TestBroker::start().await; + + let mut options = ConnectOptions::new("kanego-2"); + options.keep_alive = Duration::from_secs(30); + + let client = MqttClient::with_options(options); + client.connect(broker.address()).await.unwrap(); + + assert_eq!(client.keep_alive().await, Duration::from_secs(30)); + + client.disconnect().await.unwrap(); +} + +#[tokio::test] +async fn keep_alive_before_connect_returns_configured_value() { + let mut options = ConnectOptions::new("kanego-3"); + options.keep_alive = Duration::from_secs(45); + + let client = MqttClient::with_options(options); + assert_eq!(client.keep_alive().await, Duration::from_secs(45)); +} diff --git a/crates/mqttv5-cli/Cargo.toml b/crates/mqttv5-cli/Cargo.toml index 39355964..f29040cb 100644 --- a/crates/mqttv5-cli/Cargo.toml +++ b/crates/mqttv5-cli/Cargo.toml @@ -22,7 +22,7 @@ opentelemetry = ["mqtt5/opentelemetry"] codec = ["mqtt5/codec-all"] [dependencies] -mqtt5 = "0.31" +mqtt5 = { path = "../mqtt5", version = "0.31" } anyhow = "1.0" tracing = "0.1" serde = { version = "1.0", features = ["derive"] } diff --git a/crates/mqttv5-cli/src/commands/pub_cmd.rs b/crates/mqttv5-cli/src/commands/pub_cmd.rs index fc4bc3ad..92adb39e 100644 --- a/crates/mqttv5-cli/src/commands/pub_cmd.rs +++ b/crates/mqttv5-cli/src/commands/pub_cmd.rs @@ -715,7 +715,9 @@ async fn keep_alive_loop(client: &MqttClient, auto_reconnect: bool) -> Result<() ConnectionEvent::Connecting => { info!("Connecting to broker..."); } - ConnectionEvent::Connected { session_present } => { + ConnectionEvent::Connected { + session_present, .. + } => { if session_present { info!("✓ Reconnected (session present)"); } else { From 721697fa1f6052638f81ee0994a85d413d903a30 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 16 May 2026 23:13:20 +0800 Subject: [PATCH 2/6] chore(lint): keep two stylistic refinements over #79 After #79 landed lint cleanup ahead of this PR, the rebase dropped the overlapping `chore(lint)` commit but reverted two spots where the reviewer noted my variants were preferable. - `quic_acceptor.rs`: `#[allow(clippy::collapsible_match, reason = ...)]` on the nested match preserves the original control flow rather than refactoring to early-`continue`. - `bench_cmd.rs`: collapse the rate calc to `(1_000_000u64 * ...).checked_div(cmd.rate)` returning `Option` directly, replacing the `if cmd.rate > 0 { ... .unwrap_or(0) }` wrap. --- crates/mqtt5/src/broker/quic_acceptor.rs | 20 ++++++++++++-------- crates/mqttv5-cli/src/commands/bench_cmd.rs | 11 ++--------- 2 files changed, 14 insertions(+), 17 deletions(-) diff --git a/crates/mqtt5/src/broker/quic_acceptor.rs b/crates/mqtt5/src/broker/quic_acceptor.rs index b7e31a3a..a894e986 100644 --- a/crates/mqtt5/src/broker/quic_acceptor.rs +++ b/crates/mqtt5/src/broker/quic_acceptor.rs @@ -650,17 +650,21 @@ fn spawn_datagram_reader( label, peer_addr ); - let packet = match decode_datagram_packet(&datagram) { - Some(Ok(packet)) => packet, + #[allow( + clippy::collapsible_match, + reason = "cannot move `packet` into async send from a pattern guard" + )] + match decode_datagram_packet(&datagram) { + Some(Ok(packet)) => { + if packet_tx.send((packet, None)).await.is_err() { + debug!("Datagram packet channel closed for {}", peer_addr); + break; + } + } Some(Err(e)) => { warn!("Failed to decode datagram from {}: {}", peer_addr, e); - continue; } - None => continue, - }; - if packet_tx.send((packet, None)).await.is_err() { - debug!("Datagram packet channel closed for {}", peer_addr); - break; + None => {} } } Err(e) => { diff --git a/crates/mqttv5-cli/src/commands/bench_cmd.rs b/crates/mqttv5-cli/src/commands/bench_cmd.rs index 4fb864e6..30010ebc 100644 --- a/crates/mqttv5-cli/src/commands/bench_cmd.rs +++ b/crates/mqttv5-cli/src/commands/bench_cmd.rs @@ -1451,15 +1451,8 @@ async fn run_hol_blocking(cmd: BenchCommand) -> Result<()> { ) .await?; - let per_topic_interval_us = if cmd.rate > 0 { - #[allow(clippy::cast_possible_truncation)] - let interval = (1_000_000u64 * (num_topics as u64)) - .checked_div(cmd.rate) - .unwrap_or(0); - Some(interval) - } else { - None - }; + #[allow(clippy::cast_possible_truncation)] + let per_topic_interval_us = (1_000_000u64 * (num_topics as u64)).checked_div(cmd.rate); let rate_label = if cmd.rate > 0 { format!("{} msg/s", cmd.rate) } else { From cebb091d934cfd2480f74df6827392a60e9d235c Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 16 May 2026 23:23:26 +0800 Subject: [PATCH 3/6] fix(client): tighten keep-alive negotiation per review MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Drop `Ordering::SeqCst` to `Ordering::Relaxed` on `negotiated_keep_alive_secs` — it is a single value with no cross-variable ordering requirement. - Replace the silent `u16::try_from(...).unwrap_or(u16::MAX)` for configured keep-alive with an `unwrap_or_else` that emits a `tracing::warn!` so wire-range truncation surfaces in logs. - Add `keep_alive_renegotiates_against_each_broker_on_reconnect` to cover the previously-untested path where `ConnectOptions::keep_alive` is preserved so subsequent CONNECTs re-negotiate from the user's original request rather than the previously-negotiated value. --- crates/mqtt5/src/client/direct/mod.rs | 16 ++++++-- crates/mqtt5/tests/keepalive_negotiation.rs | 42 +++++++++++++++++++++ 2 files changed, 55 insertions(+), 3 deletions(-) diff --git a/crates/mqtt5/src/client/direct/mod.rs b/crates/mqtt5/src/client/direct/mod.rs index 9f9f4f52..1a8f811f 100644 --- a/crates/mqtt5/src/client/direct/mod.rs +++ b/crates/mqtt5/src/client/direct/mod.rs @@ -185,12 +185,22 @@ impl DirectClientInner { } pub fn negotiated_keep_alive(&self) -> Duration { - Duration::from_secs(self.negotiated_keep_alive_secs.load(Ordering::SeqCst)) + Duration::from_secs(self.negotiated_keep_alive_secs.load(Ordering::Relaxed)) } fn apply_negotiated_keep_alive(&self, server_value: Option) { let effective = server_value.map_or_else( - || u16::try_from(self.options.keep_alive.as_secs()).unwrap_or(u16::MAX), + || { + let requested = self.options.keep_alive.as_secs(); + u16::try_from(requested).unwrap_or_else(|_| { + tracing::warn!( + "Configured keep-alive {}s exceeds the u16 wire range; clamping to {}s", + requested, + u16::MAX, + ); + u16::MAX + }) + }, |v| { tracing::debug!( "Server overrode keep-alive: requested={}s, negotiated={}s", @@ -201,7 +211,7 @@ impl DirectClientInner { }, ); self.negotiated_keep_alive_secs - .store(u64::from(effective), Ordering::SeqCst); + .store(u64::from(effective), Ordering::Relaxed); } pub fn set_connected(&self, connected: bool) { diff --git a/crates/mqtt5/tests/keepalive_negotiation.rs b/crates/mqtt5/tests/keepalive_negotiation.rs index 972ca08d..93feb073 100644 --- a/crates/mqtt5/tests/keepalive_negotiation.rs +++ b/crates/mqtt5/tests/keepalive_negotiation.rs @@ -79,3 +79,45 @@ async fn keep_alive_before_connect_returns_configured_value() { let client = MqttClient::with_options(options); assert_eq!(client.keep_alive().await, Duration::from_secs(45)); } + +#[tokio::test] +async fn keep_alive_renegotiates_against_each_broker_on_reconnect() { + let storage_config = StorageConfig { + backend: StorageBackend::Memory, + enable_persistence: true, + ..Default::default() + }; + + let mut config_a = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config.clone()); + config_a.server_keep_alive = Some(Duration::from_secs(10)); + let broker_a = TestBroker::start_with_config(config_a).await; + + let mut config_b = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config); + config_b.server_keep_alive = Some(Duration::from_secs(25)); + let broker_b = TestBroker::start_with_config(config_b).await; + + let mut options = ConnectOptions::new("kanego-4"); + options.keep_alive = Duration::from_secs(60); + + let client = MqttClient::with_options(options); + + client.connect(broker_a.address()).await.unwrap(); + assert_eq!( + client.keep_alive().await, + Duration::from_secs(10), + "first CONNECT must adopt broker A's ServerKeepAlive", + ); + client.disconnect().await.unwrap(); + + client.connect(broker_b.address()).await.unwrap(); + assert_eq!( + client.keep_alive().await, + Duration::from_secs(25), + "reconnect must re-negotiate from the original 60s request, not the previously-negotiated 10s", + ); + client.disconnect().await.unwrap(); +} From 1a61666ac3acad24d9795a51e78b7e2f68e9e0b6 Mon Sep 17 00:00:00 2001 From: Michael Date: Sat, 16 May 2026 23:23:26 +0800 Subject: [PATCH 4/6] chore(release): bump versions and CHANGELOG for ConnectionEvent breaking change MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit `ConnectionEvent::Connected` gaining `server_keep_alive_secs` is a SemVer-major change per the Rust API guidelines (adding a field to a public enum variant). Propagate the bump: - `mqtt5-protocol` 0.12.0 → 0.13.0 (defines the variant) - `mqtt5` 0.31.5 → 0.32.0 (re-exports the variant, adds `negotiated_keep_alive_secs` getter) - `mqtt5-wasm` 1.3.1 → 1.3.2 (transitive) - `mqttv5-cli` 0.27.2 → 0.27.3, plus its `mqtt5 = "0.31"` dep → `"0.32"` CHANGELOG entries added for all four crates. --- CHANGELOG.md | 28 ++++++++++++++++++++++++++++ crates/mqtt5-protocol/Cargo.toml | 2 +- crates/mqtt5-wasm/Cargo.toml | 6 +++--- crates/mqtt5/Cargo.toml | 4 ++-- crates/mqttv5-cli/Cargo.toml | 4 ++-- 5 files changed, 36 insertions(+), 8 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index fd8955b3..37f79a4e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,34 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [mqttv5-cli 0.27.3] - 2026-05-16 + +### Changed + +- **Bump `mqtt5` dep to `0.32`** - transitive bump for the breaking `ConnectionEvent::Connected` field addition. No CLI surface changes. + +## [mqtt5-wasm 1.3.2] - 2026-05-16 + +### Changed + +- **Bump `mqtt5` dep to `0.32` and `mqtt5-protocol` to `0.13`** - transitive bump for the breaking `ConnectionEvent::Connected` field addition. No wasm surface changes. + +## [mqtt5 0.32.0] - 2026-05-16 + +### Changed + +- **BREAKING: `ConnectionEvent::Connected` gained `server_keep_alive_secs: Option`** - exposes the broker-negotiated keep-alive interval from CONNACK to event consumers. Existing match arms that destructure the variant must be updated. + +### Added + +- **Honor MQTT v5 ServerKeepAlive negotiation** - the client now adopts the broker-supplied keep-alive interval from the CONNACK `Server Keep Alive` property when present ([MQTT-3.2.2-22]), driving PINGREQ cadence and read timeouts off the negotiated value rather than the requested one. The originally configured interval is preserved on `ConnectOptions::keep_alive` so subsequent CONNECTs re-negotiate from the user's intent. New `Client::negotiated_keep_alive_secs()` accessor exposes the current effective value. + +## [mqtt5-protocol 0.13.0] - 2026-05-16 + +### Changed + +- **BREAKING: `ConnectionEvent::Connected` variant gained `server_keep_alive_secs: Option`** - carries the broker-negotiated keep-alive interval out to event consumers. Adding a field to a public enum variant is a SemVer-major change. + ## [mqtt5-wasm 1.3.1] - 2026-05-15 ### Fixed diff --git a/crates/mqtt5-protocol/Cargo.toml b/crates/mqtt5-protocol/Cargo.toml index 3d2d8764..8131660d 100644 --- a/crates/mqtt5-protocol/Cargo.toml +++ b/crates/mqtt5-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5-protocol" -version = "0.12.0" +version = "0.13.0" edition.workspace = true rust-version.workspace = true authors.workspace = true diff --git a/crates/mqtt5-wasm/Cargo.toml b/crates/mqtt5-wasm/Cargo.toml index 5ed138bb..61ba3125 100644 --- a/crates/mqtt5-wasm/Cargo.toml +++ b/crates/mqtt5-wasm/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5-wasm" -version = "1.3.1" +version = "1.3.2" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -27,8 +27,8 @@ broker = ["client", "dep:mqtt5", "dep:tokio"] codec = ["client", "dep:miniz_oxide"] [dependencies] -mqtt5-protocol = "0.12.0" -mqtt5 = { version = "0.31", optional = true, default-features = false, features = [ +mqtt5-protocol = "0.13.0" +mqtt5 = { version = "0.32", optional = true, default-features = false, features = [ "tokio", ] } diff --git a/crates/mqtt5/Cargo.toml b/crates/mqtt5/Cargo.toml index 01a23bd4..68a54213 100644 --- a/crates/mqtt5/Cargo.toml +++ b/crates/mqtt5/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5" -version = "0.31.5" +version = "0.32.0" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -120,7 +120,7 @@ opentelemetry_sdk = { version = "0.31", features = ["trace", "rt-tokio", "metric opentelemetry-otlp = { version = "0.31", features = ["trace", "grpc-tonic", "metrics"], optional = true } tracing-opentelemetry = { version = "0.32", optional = true } opentelemetry = { version = "0.31", features = ["metrics", "trace"], optional = true } -mqtt5-protocol = "0.12.0" +mqtt5-protocol = "0.13.0" argon2 = "0.5" getrandom = "0.3" bebytes = { version = "3.0", features = ["bytes"] } diff --git a/crates/mqttv5-cli/Cargo.toml b/crates/mqttv5-cli/Cargo.toml index f29040cb..799403d4 100644 --- a/crates/mqttv5-cli/Cargo.toml +++ b/crates/mqttv5-cli/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqttv5-cli" -version = "0.27.2" +version = "0.27.3" edition.workspace = true rust-version.workspace = true authors.workspace = true @@ -22,7 +22,7 @@ opentelemetry = ["mqtt5/opentelemetry"] codec = ["mqtt5/codec-all"] [dependencies] -mqtt5 = { path = "../mqtt5", version = "0.31" } +mqtt5 = { path = "../mqtt5", version = "0.32" } anyhow = "1.0" tracing = "0.1" serde = { version = "1.0", features = ["derive"] } From 85e5bc026ab4f48022b699aa890b6625734a306c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Sat, 16 May 2026 15:14:13 -0700 Subject: [PATCH 5/6] docs(changelog): match actual API surface for 0.32 / 0.13 entries The CHANGELOG entries for mqtt5 0.32.0 and mqtt5-protocol 0.13.0 described `server_keep_alive_secs: Option` and `Client::negotiated_keep_alive_secs()`, but the implemented API uses `keep_alive: Duration` on `ConnectionEvent::Connected` and `MqttClient::keep_alive() -> Duration`. Also adds the missing `Properties::get_server_keep_alive() -> Option` under mqtt5-protocol 0.13.0 Added. --- CHANGELOG.md | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 37f79a4e..204d3d6b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -21,17 +21,21 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed -- **BREAKING: `ConnectionEvent::Connected` gained `server_keep_alive_secs: Option`** - exposes the broker-negotiated keep-alive interval from CONNACK to event consumers. Existing match arms that destructure the variant must be updated. +- **BREAKING: `ConnectionEvent::Connected` gained `keep_alive: Duration`** - exposes the broker-negotiated keep-alive interval from CONNACK to event consumers. Existing match arms that destructure the variant must be updated. ### Added -- **Honor MQTT v5 ServerKeepAlive negotiation** - the client now adopts the broker-supplied keep-alive interval from the CONNACK `Server Keep Alive` property when present ([MQTT-3.2.2-22]), driving PINGREQ cadence and read timeouts off the negotiated value rather than the requested one. The originally configured interval is preserved on `ConnectOptions::keep_alive` so subsequent CONNECTs re-negotiate from the user's intent. New `Client::negotiated_keep_alive_secs()` accessor exposes the current effective value. +- **Honor MQTT v5 ServerKeepAlive negotiation** - the client now adopts the broker-supplied keep-alive interval from the CONNACK `Server Keep Alive` property when present ([MQTT-3.2.2-22]), driving PINGREQ cadence and read timeouts off the negotiated value rather than the requested one. The originally configured interval is preserved on `ConnectOptions::keep_alive` so subsequent CONNECTs re-negotiate from the user's intent. New `MqttClient::keep_alive() -> Duration` accessor exposes the current effective value. ## [mqtt5-protocol 0.13.0] - 2026-05-16 ### Changed -- **BREAKING: `ConnectionEvent::Connected` variant gained `server_keep_alive_secs: Option`** - carries the broker-negotiated keep-alive interval out to event consumers. Adding a field to a public enum variant is a SemVer-major change. +- **BREAKING: `ConnectionEvent::Connected` variant gained `keep_alive: Duration`** - carries the broker-negotiated keep-alive interval out to event consumers. Adding a field to a public enum variant is a SemVer-major change. + +### Added + +- **`Properties::get_server_keep_alive() -> Option`** - getter for the v5 `ServerKeepAlive` CONNACK property, symmetric with the existing `set_server_keep_alive` setter. ## [mqtt5-wasm 1.3.1] - 2026-05-15 From f7131ad4b14dd928537daf1835bc4ab4f8dffa72 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Sat, 16 May 2026 16:27:02 -0700 Subject: [PATCH 6/6] fix(client): post-review followups for keep-alive negotiation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Drop gratuitous `Arc` wrapper on `negotiated_keep_alive_secs` — the atomic is never cloned and `AtomicU64` is already `Sync`. - Extract `configured_keep_alive_u16()` helper so both `build_connect_packet` and `apply_negotiated_keep_alive`'s fallback share the truncation+warn logic instead of one warning and the other clamping silently. - Document the `keep_alive=0` override edge case on `MqttClient::keep_alive()`: per [MQTT-3.2.2-22] a non-zero `ServerKeepAlive` re-enables keep-alive even when the client requested `Duration::ZERO`. - Add `server_keep_alive_zero_disables_keepalive_even_with_nonzero_client_request` to cover the previously-uncovered `ServerKeepAlive=0` disable path. --- crates/mqtt5/src/client/direct/mod.rs | 35 ++++++++++----------- crates/mqtt5/src/client/mod.rs | 5 +++ crates/mqtt5/tests/keepalive_negotiation.rs | 29 +++++++++++++++++ 3 files changed, 50 insertions(+), 19 deletions(-) diff --git a/crates/mqtt5/src/client/direct/mod.rs b/crates/mqtt5/src/client/direct/mod.rs index 1a8f811f..084cbe7f 100644 --- a/crates/mqtt5/src/client/direct/mod.rs +++ b/crates/mqtt5/src/client/direct/mod.rs @@ -110,7 +110,7 @@ pub struct DirectClientInner { pub auth_handler: Option>, pub auth_method: Option, pub keepalive_state: Arc>, - pub negotiated_keep_alive_secs: Arc, + pub negotiated_keep_alive_secs: AtomicU64, #[cfg(feature = "transport-quic")] pub cached_quic_client_config: Option, #[cfg(feature = "transport-quic")] @@ -168,7 +168,7 @@ impl DirectClientInner { auth_handler: None, auth_method, keepalive_state: Arc::new(Mutex::new(KeepaliveState::default())), - negotiated_keep_alive_secs: Arc::new(AtomicU64::new(initial_keep_alive_secs)), + negotiated_keep_alive_secs: AtomicU64::new(initial_keep_alive_secs), #[cfg(feature = "transport-quic")] cached_quic_client_config: None, #[cfg(feature = "transport-quic")] @@ -188,19 +188,21 @@ impl DirectClientInner { Duration::from_secs(self.negotiated_keep_alive_secs.load(Ordering::Relaxed)) } + fn configured_keep_alive_u16(&self) -> u16 { + let requested = self.options.keep_alive.as_secs(); + u16::try_from(requested).unwrap_or_else(|_| { + tracing::warn!( + "Configured keep-alive {}s exceeds the u16 wire range; clamping to {}s", + requested, + u16::MAX, + ); + u16::MAX + }) + } + fn apply_negotiated_keep_alive(&self, server_value: Option) { let effective = server_value.map_or_else( - || { - let requested = self.options.keep_alive.as_secs(); - u16::try_from(requested).unwrap_or_else(|_| { - tracing::warn!( - "Configured keep-alive {}s exceeds the u16 wire range; clamping to {}s", - requested, - u16::MAX, - ); - u16::MAX - }) - }, + || self.configured_keep_alive_u16(), |v| { tracing::debug!( "Server overrode keep-alive: requested={}s, negotiated={}s", @@ -1174,12 +1176,7 @@ impl DirectClientInner { ConnectPacket { protocol_version: self.options.protocol_version.as_u8(), clean_start: self.options.clean_start, - keep_alive: self - .options - .keep_alive - .as_secs() - .try_into() - .unwrap_or(u16::MAX), + keep_alive: self.configured_keep_alive_u16(), client_id: session.client_id().to_string(), will: self.options.will.clone(), username: self.options.username.clone(), diff --git a/crates/mqtt5/src/client/mod.rs b/crates/mqtt5/src/client/mod.rs index 1af6adfd..57ae093f 100644 --- a/crates/mqtt5/src/client/mod.rs +++ b/crates/mqtt5/src/client/mod.rs @@ -132,6 +132,11 @@ impl MqttClient { /// returned value matches the broker-imposed interval, otherwise it matches /// the value passed to [`ConnectOptions::with_keep_alive`]. /// + /// Per `[MQTT-3.2.2-22]` a broker-returned `ServerKeepAlive` always wins, + /// including the case where the client requested `Duration::ZERO` + /// (disabled): a non-zero `ServerKeepAlive` will re-enable keep-alive on + /// the connection. A `ServerKeepAlive` of zero disables keep-alive. + /// /// Before the first successful `connect()` call this returns the value /// configured on the [`ConnectOptions`]. A returned value of /// `Duration::ZERO` means keep-alive is disabled. diff --git a/crates/mqtt5/tests/keepalive_negotiation.rs b/crates/mqtt5/tests/keepalive_negotiation.rs index 93feb073..5a47141c 100644 --- a/crates/mqtt5/tests/keepalive_negotiation.rs +++ b/crates/mqtt5/tests/keepalive_negotiation.rs @@ -121,3 +121,32 @@ async fn keep_alive_renegotiates_against_each_broker_on_reconnect() { ); client.disconnect().await.unwrap(); } + +#[tokio::test] +async fn server_keep_alive_zero_disables_keepalive_even_with_nonzero_client_request() { + let storage_config = StorageConfig { + backend: StorageBackend::Memory, + enable_persistence: true, + ..Default::default() + }; + let mut config = BrokerConfig::default() + .with_bind_address("127.0.0.1:0".parse::().unwrap()) + .with_storage(storage_config); + config.server_keep_alive = Some(Duration::ZERO); + + let broker = TestBroker::start_with_config(config).await; + + let mut options = ConnectOptions::new("kanego-5"); + options.keep_alive = Duration::from_secs(60); + + let client = MqttClient::with_options(options); + client.connect(broker.address()).await.unwrap(); + + assert_eq!( + client.keep_alive().await, + Duration::ZERO, + "ServerKeepAlive=0 must disable keep-alive even when the client requested a non-zero interval", + ); + + client.disconnect().await.unwrap(); +}