Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,38 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [mqttv5-cli 0.27.3] - 2026-05-16

### Changed

- **Bump `mqtt5` dep to `0.32`** - transitive bump for the breaking `ConnectionEvent::Connected` field addition. No CLI surface changes.

## [mqtt5-wasm 1.3.2] - 2026-05-16

### Changed

- **Bump `mqtt5` dep to `0.32` and `mqtt5-protocol` to `0.13`** - transitive bump for the breaking `ConnectionEvent::Connected` field addition. No wasm surface changes.

## [mqtt5 0.32.0] - 2026-05-16

### Changed

- **BREAKING: `ConnectionEvent::Connected` gained `keep_alive: Duration`** - exposes the broker-negotiated keep-alive interval from CONNACK to event consumers. Existing match arms that destructure the variant must be updated.

### Added

- **Honor MQTT v5 ServerKeepAlive negotiation** - the client now adopts the broker-supplied keep-alive interval from the CONNACK `Server Keep Alive` property when present ([MQTT-3.2.2-22]), driving PINGREQ cadence and read timeouts off the negotiated value rather than the requested one. The originally configured interval is preserved on `ConnectOptions::keep_alive` so subsequent CONNECTs re-negotiate from the user's intent. New `MqttClient::keep_alive() -> Duration` accessor exposes the current effective value.

## [mqtt5-protocol 0.13.0] - 2026-05-16

### Changed

- **BREAKING: `ConnectionEvent::Connected` variant gained `keep_alive: Duration`** - carries the broker-negotiated keep-alive interval out to event consumers. Adding a field to a public enum variant is a SemVer-major change.

### Added

- **`Properties::get_server_keep_alive() -> Option<u16>`** - getter for the v5 `ServerKeepAlive` CONNACK property, symmetric with the existing `set_server_keep_alive` setter.

## [mqtt5-wasm 1.3.1] - 2026-05-15

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion crates/mqtt5-protocol/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqtt5-protocol"
version = "0.12.0"
version = "0.13.0"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
29 changes: 24 additions & 5 deletions crates/mqtt5-protocol/src/connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,22 @@ pub enum DisconnectReason {
#[derive(Debug, Clone)]
pub enum ConnectionEvent {
Connecting,
Connected { session_present: bool },
Disconnected { reason: DisconnectReason },
Reconnecting { attempt: u32 },
ReconnectFailed { error: MqttError },
Connected {
session_present: bool,
/// Effective keep-alive interval after MQTT v5 `ServerKeepAlive` negotiation.
///
/// Equals `Duration::ZERO` if keep-alive is disabled.
keep_alive: Duration,
},
Disconnected {
reason: DisconnectReason,
},
Reconnecting {
attempt: u32,
},
ReconnectFailed {
error: MqttError,
},
}

#[derive(Debug, Clone, Default)]
Expand Down Expand Up @@ -197,9 +209,13 @@ impl ConnectionStateMachine {
ConnectionEvent::Connecting => {
self.state = ConnectionState::Connecting;
}
ConnectionEvent::Connected { session_present } => {
ConnectionEvent::Connected {
session_present,
keep_alive,
} => {
self.state = ConnectionState::Connected;
self.info.session_present = *session_present;
self.info.server_keep_alive = u16::try_from(keep_alive.as_secs()).ok();
}
ConnectionEvent::Disconnected { .. } | ConnectionEvent::ReconnectFailed { .. } => {
self.state = ConnectionState::Disconnected;
Expand Down Expand Up @@ -275,9 +291,11 @@ mod tests {

sm.transition(&ConnectionEvent::Connected {
session_present: true,
keep_alive: Duration::from_secs(60),
});
assert!(sm.is_connected());
assert!(sm.info().session_present);
assert_eq!(sm.info().server_keep_alive, Some(60));

sm.transition(&ConnectionEvent::Disconnected {
reason: DisconnectReason::NetworkError("timeout".into()),
Expand Down Expand Up @@ -338,6 +356,7 @@ mod tests {
sm.transition(&ConnectionEvent::Connecting);
sm.transition(&ConnectionEvent::Connected {
session_present: false,
keep_alive: Duration::from_secs(60),
});
assert!(sm.is_connected());

Expand Down
14 changes: 14 additions & 0 deletions crates/mqtt5-protocol/src/protocol/v5/properties/accessors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,20 @@ impl Properties {
.push(PropertyValue::TwoByteInteger(seconds));
}

#[must_use]
pub fn get_server_keep_alive(&self) -> Option<u16> {
self.properties
.get(&PropertyId::ServerKeepAlive)
.and_then(|values| values.first())
.and_then(|value| {
if let PropertyValue::TwoByteInteger(v) = value {
Some(*v)
} else {
None
}
})
}

pub fn set_authentication_method(&mut self, method: String) {
self.properties
.entry(PropertyId::AuthenticationMethod)
Expand Down
6 changes: 3 additions & 3 deletions crates/mqtt5-wasm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqtt5-wasm"
version = "1.3.1"
version = "1.3.2"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand All @@ -27,8 +27,8 @@ broker = ["client", "dep:mqtt5", "dep:tokio"]
codec = ["client", "dep:miniz_oxide"]

[dependencies]
mqtt5-protocol = "0.12.0"
mqtt5 = { version = "0.31", optional = true, default-features = false, features = [
mqtt5-protocol = "0.13.0"
mqtt5 = { version = "0.32", optional = true, default-features = false, features = [
"tokio",
] }

Expand Down
4 changes: 2 additions & 2 deletions crates/mqtt5/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqtt5"
version = "0.31.5"
version = "0.32.0"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down Expand Up @@ -120,7 +120,7 @@ opentelemetry_sdk = { version = "0.31", features = ["trace", "rt-tokio", "metric
opentelemetry-otlp = { version = "0.31", features = ["trace", "grpc-tonic", "metrics"], optional = true }
tracing-opentelemetry = { version = "0.32", optional = true }
opentelemetry = { version = "0.31", features = ["metrics", "trace"], optional = true }
mqtt5-protocol = "0.12.0"
mqtt5-protocol = "0.13.0"
argon2 = "0.5"
getrandom = "0.3"
bebytes = { version = "3.0", features = ["bytes"] }
Expand Down
9 changes: 7 additions & 2 deletions crates/mqtt5/examples/simple_client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,13 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
ConnectionEvent::Connecting => {
println!("🔌 Connecting to MQTT broker...");
}
ConnectionEvent::Connected { session_present } => {
println!("✅ Connected to MQTT broker (session_present: {session_present})");
ConnectionEvent::Connected {
session_present,
keep_alive,
} => {
println!(
"✅ Connected to MQTT broker (session_present: {session_present}, keep_alive: {keep_alive:?})"
);
}
ConnectionEvent::Disconnected { reason } => {
println!("❌ Disconnected from broker: {reason:?}");
Expand Down
20 changes: 12 additions & 8 deletions crates/mqtt5/src/broker/quic_acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,17 +650,21 @@ fn spawn_datagram_reader(
label,
peer_addr
);
let packet = match decode_datagram_packet(&datagram) {
Some(Ok(packet)) => packet,
#[allow(
clippy::collapsible_match,
reason = "cannot move `packet` into async send from a pattern guard"
)]
match decode_datagram_packet(&datagram) {
Some(Ok(packet)) => {
if packet_tx.send((packet, None)).await.is_err() {
debug!("Datagram packet channel closed for {}", peer_addr);
break;
}
}
Some(Err(e)) => {
warn!("Failed to decode datagram from {}: {}", peer_addr, e);
continue;
}
None => continue,
};
if packet_tx.send((packet, None)).await.is_err() {
debug!("Datagram packet channel closed for {}", peer_addr);
break;
None => {}
}
}
Err(e) => {
Expand Down
46 changes: 39 additions & 7 deletions crates/mqtt5/src/client/direct/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ pub struct DirectClientInner {
pub auth_handler: Option<Arc<dyn AuthHandler>>,
pub auth_method: Option<String>,
pub keepalive_state: Arc<Mutex<KeepaliveState>>,
pub negotiated_keep_alive_secs: AtomicU64,
#[cfg(feature = "transport-quic")]
pub cached_quic_client_config: Option<quinn::ClientConfig>,
#[cfg(feature = "transport-quic")]
Expand All @@ -126,6 +127,7 @@ impl DirectClientInner {

let queue_on_disconnect = !options.clean_start;
let auth_method = options.properties.authentication_method.clone();
let initial_keep_alive_secs = options.keep_alive.as_secs();

Self {
writer: None,
Expand Down Expand Up @@ -166,6 +168,7 @@ impl DirectClientInner {
auth_handler: None,
auth_method,
keepalive_state: Arc::new(Mutex::new(KeepaliveState::default())),
negotiated_keep_alive_secs: AtomicU64::new(initial_keep_alive_secs),
#[cfg(feature = "transport-quic")]
cached_quic_client_config: None,
#[cfg(feature = "transport-quic")]
Expand All @@ -181,6 +184,38 @@ impl DirectClientInner {
self.connected.load(Ordering::SeqCst)
}

pub fn negotiated_keep_alive(&self) -> Duration {
Duration::from_secs(self.negotiated_keep_alive_secs.load(Ordering::Relaxed))
}

fn configured_keep_alive_u16(&self) -> u16 {
let requested = self.options.keep_alive.as_secs();
u16::try_from(requested).unwrap_or_else(|_| {
tracing::warn!(
"Configured keep-alive {}s exceeds the u16 wire range; clamping to {}s",
requested,
u16::MAX,
);
u16::MAX
})
}

fn apply_negotiated_keep_alive(&self, server_value: Option<u16>) {
let effective = server_value.map_or_else(
|| self.configured_keep_alive_u16(),
|v| {
tracing::debug!(
"Server overrode keep-alive: requested={}s, negotiated={}s",
self.options.keep_alive.as_secs(),
v,
);
v
},
);
self.negotiated_keep_alive_secs
.store(u64::from(effective), Ordering::Relaxed);
}

pub fn set_connected(&self, connected: bool) {
self.connected.store(connected, Ordering::SeqCst);
}
Expand Down Expand Up @@ -359,6 +394,8 @@ impl DirectClientInner {
*self.server_max_qos.lock() = None;
}

self.apply_negotiated_keep_alive(connack.properties.get_server_keep_alive());

let protocol_version = self.options.protocol_version.as_u8();
let (reader, writer) = match transport {
TransportType::Tcp(tcp) => {
Expand Down Expand Up @@ -1139,12 +1176,7 @@ impl DirectClientInner {
ConnectPacket {
protocol_version: self.options.protocol_version.as_u8(),
clean_start: self.options.clean_start,
keep_alive: self
.options
.keep_alive
.as_secs()
.try_into()
.unwrap_or(u16::MAX),
keep_alive: self.configured_keep_alive_u16(),
client_id: session.client_id().to_string(),
will: self.options.will.clone(),
username: self.options.username.clone(),
Expand Down Expand Up @@ -1201,7 +1233,7 @@ impl DirectClientInner {
tracing::debug!("📦 PACKET READER - Task exited");
}));

let keepalive_interval = self.options.keep_alive;
let keepalive_interval = self.negotiated_keep_alive();
if keepalive_interval.is_zero() {
tracing::debug!("💓 KEEPALIVE - Disabled (interval is zero)");
} else {
Expand Down
14 changes: 10 additions & 4 deletions crates/mqtt5/src/client/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ impl MqttClient {
&self,
stored_subs: Vec<StoredSubscription>,
session_present: bool,
keep_alive: std::time::Duration,
) {
self.trigger_connection_event(ConnectionEvent::Connected { session_present })
.await;
self.trigger_connection_event(ConnectionEvent::Connected {
session_present,
keep_alive,
})
.await;
self.recover_quic_flows().await;
self.restore_subscriptions_after_connect(stored_subs, session_present)
.await;
Expand Down Expand Up @@ -419,9 +423,10 @@ impl MqttClient {
}
let stored_subs = inner.stored_subscriptions.lock().clone();
let session_present = result.session_present;
let keep_alive = inner.negotiated_keep_alive();
drop(inner);

self.on_successful_connect(stored_subs, session_present)
self.on_successful_connect(stored_subs, session_present, keep_alive)
.await;

return Ok(result);
Expand Down Expand Up @@ -525,9 +530,10 @@ impl MqttClient {
Ok(result) => {
let stored_subs = inner.stored_subscriptions.lock().clone();
let session_present = result.session_present;
let keep_alive = inner.negotiated_keep_alive();
drop(inner);

self.on_successful_connect(stored_subs, session_present)
self.on_successful_connect(stored_subs, session_present, keep_alive)
.await;

Ok(result)
Expand Down
23 changes: 21 additions & 2 deletions crates/mqtt5/src/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,25 @@ impl MqttClient {
self.inner.read().await.is_connected()
}

/// Returns the keep-alive interval currently in effect.
///
/// On MQTT v5 this reflects the value after `ServerKeepAlive` negotiation:
/// if the broker returned a `ServerKeepAlive` property in CONNACK the
/// returned value matches the broker-imposed interval, otherwise it matches
/// the value passed to [`ConnectOptions::with_keep_alive`].
///
/// Per `[MQTT-3.2.2-22]` a broker-returned `ServerKeepAlive` always wins,
/// including the case where the client requested `Duration::ZERO`
/// (disabled): a non-zero `ServerKeepAlive` will re-enable keep-alive on
/// the connection. A `ServerKeepAlive` of zero disables keep-alive.
///
/// Before the first successful `connect()` call this returns the value
/// configured on the [`ConnectOptions`]. A returned value of
/// `Duration::ZERO` means keep-alive is disabled.
pub async fn keep_alive(&self) -> std::time::Duration {
self.inner.read().await.negotiated_keep_alive()
}

/// Gets the client ID
pub async fn client_id(&self) -> String {
self.inner
Expand Down Expand Up @@ -152,8 +171,8 @@ impl MqttClient {
/// ConnectionEvent::Connecting => {
/// println!("Connecting...");
/// }
/// ConnectionEvent::Connected { session_present } => {
/// println!("Connected! Session present: {}", session_present);
/// ConnectionEvent::Connected { session_present, keep_alive } => {
/// println!("Connected! Session present: {session_present}, keep-alive: {keep_alive:?}");
/// }
/// ConnectionEvent::Disconnected { reason } => {
/// println!("Disconnected: {:?}", reason);
Expand Down
4 changes: 2 additions & 2 deletions crates/mqtt5/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,8 @@
//! ConnectionEvent::Connecting => {
//! println!("Connecting...");
//! }
//! ConnectionEvent::Connected { session_present } => {
//! println!("Connected! Session present: {}", session_present);
//! ConnectionEvent::Connected { session_present, keep_alive } => {
//! println!("Connected! Session present: {session_present}, keep-alive: {keep_alive:?}");
//! }
//! ConnectionEvent::Disconnected { reason } => {
//! println!("Disconnected: {:?}", reason);
Expand Down
Loading
Loading