Skip to content

feat(client): honor MQTT v5 ServerKeepAlive negotiation#78

Merged
fabracht merged 6 commits into
LabOverWire:mainfrom
butterflyfish:feat/server-keep-alive-negotiation
May 16, 2026
Merged

feat(client): honor MQTT v5 ServerKeepAlive negotiation#78
fabracht merged 6 commits into
LabOverWire:mainfrom
butterflyfish:feat/server-keep-alive-negotiation

Conversation

@butterflyfish

Copy link
Copy Markdown
Contributor

Summary

  • The client previously ignored the ServerKeepAlive property in CONNACK — even when the broker imposed a smaller interval the client kept pinging on its originally requested cadence, violating [MQTT-3.2.2-22].
  • Reads ServerKeepAlive from CONNACK and uses the negotiated value to spawn the keep-alive task; falls back to the client-configured value when the broker did not override.
  • Surfaces the effective value through a new MqttClient::keep_alive() getter and a new keep_alive: Duration field on ConnectionEvent::Connected.

Public API changes

  • Breaking: ConnectionEvent::Connected now carries keep_alive: Duration in addition to session_present. All consumer match arms must be updated.
  • New: MqttClient::keep_alive() -> Duration returns the currently-active interval (configured value pre-connect, negotiated value post-CONNACK). Duration::ZERO continues to mean keep-alive disabled.
  • New: Properties::get_server_keep_alive() -> Option<u16> accessor on mqtt5-protocol for symmetry with the existing setter.

Behavior

  • When CONNACK contains ServerKeepAlive, the negotiated value overrides ConnectOptions::keep_alive for the duration of that connection only — the configured option is preserved so subsequent CONNECTs (e.g. after reconnect) still advertise the user's original request and let the broker re-negotiate.
  • Duration::ZERO keeps current semantics: no keep-alive task is spawned.

Workspace dep change

mqttv5-cli was pinned to mqtt5 = \"0.29\" from crates.io while the workspace [patch.crates-io] rewrites mqtt5-protocol to the local path. Adding a field to a public enum variant in protocol means the old registry copy of mqtt5 no longer builds against the patched protocol crate. Switched the CLI to mqtt5 = { path = \"../mqtt5\", version = \"0.31\" } so it tracks the workspace while still being publishable.

Bundled cleanup

A separate commit (`chore(lint)`) fixes pre-existing clippy lints surfaced by rustc 1.95 that would otherwise fail the workspace's `-D warnings` gate on toolchain bumps. Purely mechanical; no behavior change.

Test plan

  • `cargo fmt --all -- --check`
  • `cargo clippy --all-targets --workspace --features opentelemetry -- -D warnings`
  • `cargo test --lib --bins` (test-fast)
  • `cargo test --test cli_functionality` (test-cli, 7/7)
  • `cargo test -p mqtt5` (full integration, including new `keepalive_negotiation` suite)
  • `cargo test -p mqtt5-conformance -p mqtt5-protocol`
  • `RUSTDOCFLAGS=-D warnings cargo doc --no-deps --workspace`
  • `cargo check --target wasm32-unknown-unknown --all-features -p mqtt5-wasm`
  • `cargo build -p mqtt5-protocol --target thumbv7em-none-eabihf --no-default-features`
  • `cargo check --workspace` (MSRV check)

New integration tests under `crates/mqtt5/tests/keepalive_negotiation.rs`:

  1. Broker configured with `server_keep_alive=10s`, client requests 60s — `client.keep_alive()` returns 10s, `ConnectionEvent::Connected` carries 10s.
  2. No broker override — `client.keep_alive()` returns the client-configured value (30s).
  3. Pre-connect getter returns the configured value (45s).

Not in this PR

Broker-side fix: when `server_keep_alive` is configured, the broker still uses `connect.keep_alive` to compute its own read timeout. With a spec-compliant client (this PR) the timeout window is still safe because the client adjusts to the smaller cadence, but the broker should ideally drive its own timeout off the same negotiated value. Happy to follow up in a separate PR if desired.

@butterflyfish butterflyfish force-pushed the feat/server-keep-alive-negotiation branch 2 times, most recently from fddf512 to 29f9b1b Compare May 14, 2026 03:24
@fabracht

Copy link
Copy Markdown
Contributor

Feature is solid — addresses a real [MQTT-3.2.2-22] compliance gap that the test broker doesn't catch. Implementation is clean: AtomicU64 for the negotiated value, options preserved across reconnects, new public getter and event field are well-named.

Heads-up: the chore(lint) commit fully overlaps with what just landed in #79 — sorry, I duplicated your lint work without seeing this PR. On rebase you'll want to drop that commit entirely. While rebasing, two stylistic choices in your commit are arguably better than what main now has and worth re-applying:

  • quic_acceptor.rs — your #[allow(clippy::collapsible_match, reason = "...")] is more conservative than my refactor-to-early-continue. Yours preserves the original control flow.
  • bench_cmd.rs — your (1_000_000u64 * ...).checked_div(cmd.rate) one-liner is cleaner than my if cmd.rate > 0 { ... .unwrap_or(0) }. Equivalent behavior; happy to swap to yours.

Blockers / asks before merge:

  1. Version bumps + CHANGELOG. ConnectionEvent::Connected gaining a field is a SemVer-major break per Rust API guidelines. Needs:

    • mqtt5 0.31.5 → 0.32.0
    • mqtt5-protocol minor/major (same breaking variant)
    • mqtt5-wasm patch (transitive)
    • mqttv5-cli patch (consumer)
    • CHANGELOG entry for each.
  2. mqttv5-cli workspace dep change. Switching mqtt5 = "0.29"{ path = "../mqtt5", version = "0.31" } is the right local fix for the breaking protocol change, but loses crates.io publishability for mqttv5-cli unless both crates publish in lockstep. Either confirm that's the intent, or split this change out and just bump mqtt5 = "0.32" once 0.32.0 ships.

Minor (non-blocking):

  • Ordering::SeqCst on negotiated_keep_alive_secs can be Relaxed — single value, no cross-variable ordering requirement.
  • u16::try_from(self.options.keep_alive.as_secs()).unwrap_or(u16::MAX) silently truncates configured intervals >65535s; a tracing::warn! would surface that.
  • Test gap: no reconnect-with-renegotiation case. The PR carefully preserves ConnectOptions::keep_alive so subsequent CONNECTs re-negotiate, but that isn't exercised — easy to add.

Deferred broker-side fix you mention is the right call — with a spec-compliant client, the broker's wider window is safe.

@butterflyfish

Copy link
Copy Markdown
Contributor Author

Intentional — { path, version } is the standard workspace pattern (used by tokio, serde, etc.): path for local builds, version for crates.io publish validation. It doesn't lose publishability, just requires publishing mqtt5 before
mqttv5-cli on each release. Given mqttv5-cli is the official CLI for this lib, lockstep publishing is the desired behavior anyway. I'll bump the version field to "0.32" to match.

The client used to ignore `ServerKeepAlive` in CONNACK and keep
pinging on its requested cadence, violating `[MQTT-3.2.2-22]`. Now
the negotiated value drives the keep-alive task for the connection,
and the original `ConnectOptions::keep_alive` is preserved so the
next CONNECT re-negotiates.

- **Breaking**: `ConnectionEvent::Connected` gains
  `keep_alive: Duration`.
- New `MqttClient::keep_alive()` getter for the active interval.
- New `Properties::get_server_keep_alive()` accessor on
  `mqtt5-protocol`.

- `mqttv5-cli` switched from `mqtt5 = "0.29"` to
  `{ path = "../mqtt5", version = "0.31" }` so the breaking change
  to the patched protocol crate doesn't break the old registry copy.

New `tests/keepalive_negotiation.rs` covers broker override,
client-value fallback, and the pre-connect getter.
After LabOverWire#79 landed lint cleanup ahead of this PR, the rebase dropped the
overlapping `chore(lint)` commit but reverted two spots where the
reviewer noted my variants were preferable.

- `quic_acceptor.rs`: `#[allow(clippy::collapsible_match, reason = ...)]`
  on the nested match preserves the original control flow rather than
  refactoring to early-`continue`.
- `bench_cmd.rs`: collapse the rate calc to
  `(1_000_000u64 * ...).checked_div(cmd.rate)` returning `Option<u64>`
  directly, replacing the `if cmd.rate > 0 { ... .unwrap_or(0) }` wrap.
- Drop `Ordering::SeqCst` to `Ordering::Relaxed` on
  `negotiated_keep_alive_secs` — it is a single value with no
  cross-variable ordering requirement.
- Replace the silent `u16::try_from(...).unwrap_or(u16::MAX)` for
  configured keep-alive with an `unwrap_or_else` that emits a
  `tracing::warn!` so wire-range truncation surfaces in logs.
- Add `keep_alive_renegotiates_against_each_broker_on_reconnect` to
  cover the previously-untested path where `ConnectOptions::keep_alive`
  is preserved so subsequent CONNECTs re-negotiate from the user's
  original request rather than the previously-negotiated value.
…ing change

`ConnectionEvent::Connected` gaining `server_keep_alive_secs` is a
SemVer-major change per the Rust API guidelines (adding a field to a
public enum variant). Propagate the bump:

- `mqtt5-protocol` 0.12.0 → 0.13.0 (defines the variant)
- `mqtt5` 0.31.5 → 0.32.0 (re-exports the variant, adds `negotiated_keep_alive_secs` getter)
- `mqtt5-wasm` 1.3.1 → 1.3.2 (transitive)
- `mqttv5-cli` 0.27.2 → 0.27.3, plus its `mqtt5 = "0.31"` dep → `"0.32"`

CHANGELOG entries added for all four crates.
@butterflyfish butterflyfish force-pushed the feat/server-keep-alive-negotiation branch from 29f9b1b to 1a61666 Compare May 16, 2026 15:34
The CHANGELOG entries for mqtt5 0.32.0 and mqtt5-protocol 0.13.0
described `server_keep_alive_secs: Option<u16>` and
`Client::negotiated_keep_alive_secs()`, but the implemented API uses
`keep_alive: Duration` on `ConnectionEvent::Connected` and
`MqttClient::keep_alive() -> Duration`.

Also adds the missing `Properties::get_server_keep_alive() -> Option<u16>`
under mqtt5-protocol 0.13.0 Added.
@fabracht

Copy link
Copy Markdown
Contributor

Pushed a small docs(changelog) commit to your branch (85e5bc0) to align the 0.32 / 0.13 CHANGELOG entries with the actual API surface — keep_alive: Duration and MqttClient::keep_alive() -> Duration instead of the *_secs: Option<u16> shape, plus the missing Properties::get_server_keep_alive() mention under mqtt5-protocol Added. Force-push freely if you'd prefer to redo it; just wanted to spare you a round trip on something purely textual.

With that fixed I think this is good to land. Nice work on the negotiation logic and the reconnect test.

@fabracht fabracht self-assigned this May 16, 2026
- Drop gratuitous `Arc<AtomicU64>` wrapper on `negotiated_keep_alive_secs`
  — the atomic is never cloned and `AtomicU64` is already `Sync`.
- Extract `configured_keep_alive_u16()` helper so both `build_connect_packet`
  and `apply_negotiated_keep_alive`'s fallback share the truncation+warn
  logic instead of one warning and the other clamping silently.
- Document the `keep_alive=0` override edge case on
  `MqttClient::keep_alive()`: per [MQTT-3.2.2-22] a non-zero
  `ServerKeepAlive` re-enables keep-alive even when the client requested
  `Duration::ZERO`.
- Add `server_keep_alive_zero_disables_keepalive_even_with_nonzero_client_request`
  to cover the previously-uncovered `ServerKeepAlive=0` disable path.

@fabracht fabracht left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Spec-compliance gap is addressed correctly per [MQTT-3.2.2-22], atomic ordering is sound, version cascade and CHANGELOG are accurate, and the new test suite covers the negotiation paths including the ServerKeepAlive=0 disable case added in the followup.

The broker-side read-timeout asymmetry you flagged in the PR body is the right call to defer — I'll file a separate issue tracking it so it doesn't get lost.

@fabracht fabracht merged commit 681c7ac into LabOverWire:main May 16, 2026
10 checks passed
@fabracht

Copy link
Copy Markdown
Contributor

Heads-up on the two commits I pushed to the branch before merge, in case the rebase log buried the context:

85e5bc0 docs(changelog): match actual API surface for 0.32 / 0.13 entries — aligned the 0.32 / 0.13 CHANGELOG entries with the actual API names (keep_alive: Duration and MqttClient::keep_alive() -> Duration instead of the *_secs: Option<u16> shape that was drafted), plus added the missing Properties::get_server_keep_alive() note under mqtt5-protocol Added.

f7131ad fix(client): post-review followups for keep-alive negotiation — four post-review polish items verified against the code:

  • Dropped Arc<AtomicU64>AtomicU64 on negotiated_keep_alive_secs (client/direct/mod.rs:113,171). The atomic is never cloned anywhere, and AtomicU64 is already Sync, so the Arc was pure indirection.
  • Extracted configured_keep_alive_u16() helper so build_connect_packet and apply_negotiated_keep_alive's fallback share the same truncation+warn instead of one warning and the other clamping silently with unwrap_or(u16::MAX).
  • Documented the keep_alive=0 override edge case on MqttClient::keep_alive(): per [MQTT-3.2.2-22] a non-zero ServerKeepAlive re-enables keep-alive even when the client requested Duration::ZERO. Spec-compliant but surprising for apps that opted out for battery/cost reasons.
  • New test server_keep_alive_zero_disables_keepalive_even_with_nonzero_client_request covering the ServerKeepAlive=0 disable path (handled correctly by is_zero() skip at mod.rs:1240 but previously untested).

The broker-side asymmetry you flagged in the PR body is now tracked as #80.

One observation I deliberately didn't act on — the integration tests in keepalive_negotiation.rs all assert on the cached atomic via client.keep_alive().await. They don't observe wire-level PINGREQ cadence. The keepalive task snapshots keepalive_interval by value at spawn (mod.rs:1239,1251), so a future refactor that mis-wires task spawning could still pass these tests. Not a blocker — just worth noting in case you ever want to add a probe-broker test.

Thanks for the clean implementation and quick turnaround on review feedback.

@butterflyfish

Copy link
Copy Markdown
Contributor Author

Thanks for the polish commits — the Arc removal and the shared configured_keep_alive_u16() helper are both real
improvements over what I had, and appreciate you writing the ServerKeepAlive=0 test.

Fair point on the wire-level PINGREQ gap — agreed the current tests only cover the negotiated value, not the actual
ping cadence. I won't be opening a follow-up PR for it myself, but if you think it's worth a probe-broker test,
would you be open to handling it directly? No worries either way if you'd rather leave it.

fabracht added a commit that referenced this pull request May 17, 2026
Closes #80.

## Summary

When the broker overrides the client's requested keep-alive via
`BrokerConfig.server_keep_alive` (writing `ServerKeepAlive` into CONNACK
per `[MQTT-3.2.2-22]`), it was continuing to compute its own
read-timeout from the **client's original** requested value rather than
the value it just imposed. PR #78 fixed the client side; this is the
symmetric broker-side fix.

In `build_connack_properties`, after writing the `ServerKeepAlive`
property the broker now also updates `self.keep_alive`, so the
downstream `read_timeout = self.keep_alive * 1.5` reflects the
negotiated value.

A small `clamp_keep_alive_to_u16` helper logs a warning if a
`server_keep_alive > u16::MAX` seconds is configured (previously it
silently clamped).

## Test

`broker_read_timeout_uses_negotiated_keep_alive_not_client_request`
opens a raw TCP socket, sends a hand-crafted CONNECT with `keep_alive =
600s` against a broker configured with `server_keep_alive = 1s`, sends
no PINGREQs, and asserts the broker drops the connection within ~1.5s
(negotiated) rather than ~900s (client request). Verified that reverting
just the `self.keep_alive` update makes this test fail with the expected
`broker did not close connection within 4s — read_timeout still using
client's 600s request` message.

## Test plan
- [x] `cargo test -p mqtt5 --test keepalive_negotiation` (all 7 tests
pass)
- [x] Counter-example: temporarily reverted the fix and confirmed the
new test fails
- [x] `cargo make ci-verify` clean
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants