feat(client): honor MQTT v5 ServerKeepAlive negotiation#78
Conversation
fddf512 to
29f9b1b
Compare
|
Feature is solid — addresses a real Heads-up: the
Blockers / asks before merge:
Minor (non-blocking):
Deferred broker-side fix you mention is the right call — with a spec-compliant client, the broker's wider window is safe. |
|
Intentional — { path, version } is the standard workspace pattern (used by tokio, serde, etc.): path for local builds, version for crates.io publish validation. It doesn't lose publishability, just requires publishing mqtt5 before |
The client used to ignore `ServerKeepAlive` in CONNACK and keep
pinging on its requested cadence, violating `[MQTT-3.2.2-22]`. Now
the negotiated value drives the keep-alive task for the connection,
and the original `ConnectOptions::keep_alive` is preserved so the
next CONNECT re-negotiates.
- **Breaking**: `ConnectionEvent::Connected` gains
`keep_alive: Duration`.
- New `MqttClient::keep_alive()` getter for the active interval.
- New `Properties::get_server_keep_alive()` accessor on
`mqtt5-protocol`.
- `mqttv5-cli` switched from `mqtt5 = "0.29"` to
`{ path = "../mqtt5", version = "0.31" }` so the breaking change
to the patched protocol crate doesn't break the old registry copy.
New `tests/keepalive_negotiation.rs` covers broker override,
client-value fallback, and the pre-connect getter.
After LabOverWire#79 landed lint cleanup ahead of this PR, the rebase dropped the overlapping `chore(lint)` commit but reverted two spots where the reviewer noted my variants were preferable. - `quic_acceptor.rs`: `#[allow(clippy::collapsible_match, reason = ...)]` on the nested match preserves the original control flow rather than refactoring to early-`continue`. - `bench_cmd.rs`: collapse the rate calc to `(1_000_000u64 * ...).checked_div(cmd.rate)` returning `Option<u64>` directly, replacing the `if cmd.rate > 0 { ... .unwrap_or(0) }` wrap.
- Drop `Ordering::SeqCst` to `Ordering::Relaxed` on `negotiated_keep_alive_secs` — it is a single value with no cross-variable ordering requirement. - Replace the silent `u16::try_from(...).unwrap_or(u16::MAX)` for configured keep-alive with an `unwrap_or_else` that emits a `tracing::warn!` so wire-range truncation surfaces in logs. - Add `keep_alive_renegotiates_against_each_broker_on_reconnect` to cover the previously-untested path where `ConnectOptions::keep_alive` is preserved so subsequent CONNECTs re-negotiate from the user's original request rather than the previously-negotiated value.
…ing change `ConnectionEvent::Connected` gaining `server_keep_alive_secs` is a SemVer-major change per the Rust API guidelines (adding a field to a public enum variant). Propagate the bump: - `mqtt5-protocol` 0.12.0 → 0.13.0 (defines the variant) - `mqtt5` 0.31.5 → 0.32.0 (re-exports the variant, adds `negotiated_keep_alive_secs` getter) - `mqtt5-wasm` 1.3.1 → 1.3.2 (transitive) - `mqttv5-cli` 0.27.2 → 0.27.3, plus its `mqtt5 = "0.31"` dep → `"0.32"` CHANGELOG entries added for all four crates.
29f9b1b to
1a61666
Compare
The CHANGELOG entries for mqtt5 0.32.0 and mqtt5-protocol 0.13.0 described `server_keep_alive_secs: Option<u16>` and `Client::negotiated_keep_alive_secs()`, but the implemented API uses `keep_alive: Duration` on `ConnectionEvent::Connected` and `MqttClient::keep_alive() -> Duration`. Also adds the missing `Properties::get_server_keep_alive() -> Option<u16>` under mqtt5-protocol 0.13.0 Added.
|
Pushed a small With that fixed I think this is good to land. Nice work on the negotiation logic and the reconnect test. |
- Drop gratuitous `Arc<AtomicU64>` wrapper on `negotiated_keep_alive_secs` — the atomic is never cloned and `AtomicU64` is already `Sync`. - Extract `configured_keep_alive_u16()` helper so both `build_connect_packet` and `apply_negotiated_keep_alive`'s fallback share the truncation+warn logic instead of one warning and the other clamping silently. - Document the `keep_alive=0` override edge case on `MqttClient::keep_alive()`: per [MQTT-3.2.2-22] a non-zero `ServerKeepAlive` re-enables keep-alive even when the client requested `Duration::ZERO`. - Add `server_keep_alive_zero_disables_keepalive_even_with_nonzero_client_request` to cover the previously-uncovered `ServerKeepAlive=0` disable path.
fabracht
left a comment
There was a problem hiding this comment.
LGTM. Spec-compliance gap is addressed correctly per [MQTT-3.2.2-22], atomic ordering is sound, version cascade and CHANGELOG are accurate, and the new test suite covers the negotiation paths including the ServerKeepAlive=0 disable case added in the followup.
The broker-side read-timeout asymmetry you flagged in the PR body is the right call to defer — I'll file a separate issue tracking it so it doesn't get lost.
|
Heads-up on the two commits I pushed to the branch before merge, in case the rebase log buried the context:
The broker-side asymmetry you flagged in the PR body is now tracked as #80. One observation I deliberately didn't act on — the integration tests in Thanks for the clean implementation and quick turnaround on review feedback. |
|
Thanks for the polish commits — the Arc removal and the shared configured_keep_alive_u16() helper are both real Fair point on the wire-level PINGREQ gap — agreed the current tests only cover the negotiated value, not the actual |
Closes #80. ## Summary When the broker overrides the client's requested keep-alive via `BrokerConfig.server_keep_alive` (writing `ServerKeepAlive` into CONNACK per `[MQTT-3.2.2-22]`), it was continuing to compute its own read-timeout from the **client's original** requested value rather than the value it just imposed. PR #78 fixed the client side; this is the symmetric broker-side fix. In `build_connack_properties`, after writing the `ServerKeepAlive` property the broker now also updates `self.keep_alive`, so the downstream `read_timeout = self.keep_alive * 1.5` reflects the negotiated value. A small `clamp_keep_alive_to_u16` helper logs a warning if a `server_keep_alive > u16::MAX` seconds is configured (previously it silently clamped). ## Test `broker_read_timeout_uses_negotiated_keep_alive_not_client_request` opens a raw TCP socket, sends a hand-crafted CONNECT with `keep_alive = 600s` against a broker configured with `server_keep_alive = 1s`, sends no PINGREQs, and asserts the broker drops the connection within ~1.5s (negotiated) rather than ~900s (client request). Verified that reverting just the `self.keep_alive` update makes this test fail with the expected `broker did not close connection within 4s — read_timeout still using client's 600s request` message. ## Test plan - [x] `cargo test -p mqtt5 --test keepalive_negotiation` (all 7 tests pass) - [x] Counter-example: temporarily reverted the fix and confirmed the new test fails - [x] `cargo make ci-verify` clean
Summary
ServerKeepAliveproperty in CONNACK — even when the broker imposed a smaller interval the client kept pinging on its originally requested cadence, violating[MQTT-3.2.2-22].ServerKeepAlivefrom CONNACK and uses the negotiated value to spawn the keep-alive task; falls back to the client-configured value when the broker did not override.MqttClient::keep_alive()getter and a newkeep_alive: Durationfield onConnectionEvent::Connected.Public API changes
ConnectionEvent::Connectednow carrieskeep_alive: Durationin addition tosession_present. All consumer match arms must be updated.MqttClient::keep_alive() -> Durationreturns the currently-active interval (configured value pre-connect, negotiated value post-CONNACK).Duration::ZEROcontinues to mean keep-alive disabled.Properties::get_server_keep_alive() -> Option<u16>accessor onmqtt5-protocolfor symmetry with the existing setter.Behavior
ServerKeepAlive, the negotiated value overridesConnectOptions::keep_alivefor the duration of that connection only — the configured option is preserved so subsequent CONNECTs (e.g. after reconnect) still advertise the user's original request and let the broker re-negotiate.Duration::ZEROkeeps current semantics: no keep-alive task is spawned.Workspace dep change
mqttv5-cliwas pinned tomqtt5 = \"0.29\"from crates.io while the workspace[patch.crates-io]rewritesmqtt5-protocolto the local path. Adding a field to a public enum variant in protocol means the old registry copy ofmqtt5no longer builds against the patched protocol crate. Switched the CLI tomqtt5 = { path = \"../mqtt5\", version = \"0.31\" }so it tracks the workspace while still being publishable.Bundled cleanup
A separate commit (`chore(lint)`) fixes pre-existing clippy lints surfaced by rustc 1.95 that would otherwise fail the workspace's `-D warnings` gate on toolchain bumps. Purely mechanical; no behavior change.
Test plan
New integration tests under `crates/mqtt5/tests/keepalive_negotiation.rs`:
Not in this PR
Broker-side fix: when `server_keep_alive` is configured, the broker still uses `connect.keep_alive` to compute its own read timeout. With a spec-compliant client (this PR) the timeout window is still safe because the client adjusts to the smaller cadence, but the broker should ideally drive its own timeout off the same negotiated value. Happy to follow up in a separate PR if desired.