diff --git a/CHANGELOG.md b/CHANGELOG.md index 44a958b1..fd8955b3 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [mqtt5-wasm 1.3.1] - 2026-05-15 + +### Fixed + +- **Retained message properties dropped (wasm broker)** - pulls in the `mqtt5 0.31.5` fix for issue #77. The wasm broker uses the same `broker::storage::RetainedMessage` as the native broker and shared the same v5-property loss on retained delivery to late subscribers; transitively fixed by the upstream change. + +## [mqtt5 0.31.5] - 2026-05-15 + +### Fixed + +- **Retained message properties dropped** - `response_topic`, `correlation_data`, `content_type`, `user_properties`, and `payload_format_indicator` were stripped when a retained message was stored and never restored on delivery to a late subscriber (issue #77). `broker::storage::RetainedMessage` had no fields for them; added the fields (each `#[serde(default)]` for file-backend back-compat) and routed extraction/restoration through a shared `V5PublishProps` helper now used by both `RetainedMessage` and `InflightMessage`. + +### Deprecated + +- **`session::retained::{RetainedMessage, RetainedMessageStore}` and `SessionState::{store_retained_message, get_retained_messages, retained_messages}`** - the session-level retained store is unused by the broker (the broker uses `broker::storage::RetainedMessage`). Scheduled for removal in 0.32.0. + ## [mqttv5-cli 0.27.2] - 2026-04-13 ### Fixed diff --git a/crates/mqtt5-wasm/Cargo.toml b/crates/mqtt5-wasm/Cargo.toml index d7085fab..5ed138bb 100644 --- a/crates/mqtt5-wasm/Cargo.toml +++ b/crates/mqtt5-wasm/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5-wasm" -version = "1.3.0" +version = "1.3.1" edition.workspace = true rust-version.workspace = true authors.workspace = true diff --git a/crates/mqtt5-wasm/src/broker.rs b/crates/mqtt5-wasm/src/broker.rs index 1bde435f..2fae4733 100644 --- a/crates/mqtt5-wasm/src/broker.rs +++ b/crates/mqtt5-wasm/src/broker.rs @@ -324,7 +324,7 @@ impl WasmBroker { let stats = Arc::new(BrokerStats::new()); - let max_clients = config.read().map(|c| c.max_clients).unwrap_or(1000); + let max_clients = config.read().map_or(1000, |c| c.max_clients); let limits = ResourceLimits { max_connections: max_clients, ..Default::default() diff --git a/crates/mqtt5/Cargo.toml b/crates/mqtt5/Cargo.toml index 71351ba4..01a23bd4 100644 --- a/crates/mqtt5/Cargo.toml +++ b/crates/mqtt5/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5" -version = "0.31.4" +version = "0.31.5" edition.workspace = true rust-version.workspace = true authors.workspace = true diff --git a/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs b/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs index a0926e43..6062b766 100644 --- a/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs +++ b/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs @@ -166,8 +166,7 @@ impl JwtAuthProvider { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); + .map_or(0, |d| d.as_secs()); let exp = claims.exp.ok_or(JwtError::MissingClaim("exp"))?; if now > exp + self.clock_skew_secs { diff --git a/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs b/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs index 522bee81..a2e9568f 100644 --- a/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs +++ b/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs @@ -225,8 +225,7 @@ impl FederatedJwtAuthProvider { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); + .map_or(0, |d| d.as_secs()); let clock_skew = issuer_state .config diff --git a/crates/mqtt5/src/broker/quic_acceptor.rs b/crates/mqtt5/src/broker/quic_acceptor.rs index dd797547..b7e31a3a 100644 --- a/crates/mqtt5/src/broker/quic_acceptor.rs +++ b/crates/mqtt5/src/broker/quic_acceptor.rs @@ -650,17 +650,17 @@ fn spawn_datagram_reader( label, peer_addr ); - match decode_datagram_packet(&datagram) { - Some(Ok(packet)) => { - if packet_tx.send((packet, None)).await.is_err() { - debug!("Datagram packet channel closed for {}", peer_addr); - break; - } - } + let packet = match decode_datagram_packet(&datagram) { + Some(Ok(packet)) => packet, Some(Err(e)) => { warn!("Failed to decode datagram from {}: {}", peer_addr, e); + continue; } - None => {} + None => continue, + }; + if packet_tx.send((packet, None)).await.is_err() { + debug!("Datagram packet channel closed for {}", peer_addr); + break; } } Err(e) => { diff --git a/crates/mqtt5/src/broker/storage/file_backend.rs b/crates/mqtt5/src/broker/storage/file_backend.rs index 49b57f32..ae4bb50d 100644 --- a/crates/mqtt5/src/broker/storage/file_backend.rs +++ b/crates/mqtt5/src/broker/storage/file_backend.rs @@ -320,10 +320,7 @@ impl FileBackend { .map_err(|e| MqttError::Io(format!("Failed to read directory entry: {e}")))? { let path = entry.path(); - let is_file = fs::metadata(&path) - .await - .map(|m| m.is_file()) - .unwrap_or(false); + let is_file = fs::metadata(&path).await.is_ok_and(|m| m.is_file()); if is_file && path.extension().is_some_and(|ext| ext == extension) { files.push(path); } @@ -338,10 +335,7 @@ impl FileBackend { if let Ok(mut inflight_entries) = fs::read_dir(&self.inflight_dir).await { while let Ok(Some(entry)) = inflight_entries.next_entry().await { let client_dir = entry.path(); - let is_dir = fs::metadata(&client_dir) - .await - .map(|m| m.is_dir()) - .unwrap_or(false); + let is_dir = fs::metadata(&client_dir).await.is_ok_and(|m| m.is_dir()); if is_dir { let files = self.list_files(&client_dir, "json").await?; for file_path in files { @@ -665,10 +659,7 @@ impl StorageBackend for FileBackend { .map_err(|e| MqttError::Io(format!("Failed to read queue entry: {e}")))? { let client_dir = entry.path(); - let is_dir = fs::metadata(&client_dir) - .await - .map(|m| m.is_dir()) - .unwrap_or(false); + let is_dir = fs::metadata(&client_dir).await.is_ok_and(|m| m.is_dir()); if is_dir { let queue_files = self.list_files(&client_dir, "json").await?; for file_path in queue_files { diff --git a/crates/mqtt5/src/broker/storage/mod.rs b/crates/mqtt5/src/broker/storage/mod.rs index 33a9c48b..5d42a733 100644 --- a/crates/mqtt5/src/broker/storage/mod.rs +++ b/crates/mqtt5/src/broker/storage/mod.rs @@ -33,6 +33,115 @@ use std::sync::Arc; use tokio::sync::RwLock; use tracing::warn; +/// MQTT v5 publish properties extracted from a `PublishPacket` for storage. +/// +/// Shared by `RetainedMessage` and `InflightMessage` to avoid duplicating the +/// `PropertyId`/`PropertyValue` extraction and round-trip logic. +struct V5PublishProps { + user_properties: Vec<(String, String)>, + content_type: Option, + response_topic: Option, + correlation_data: Option>, + payload_format_indicator: Option, +} + +impl V5PublishProps { + fn from_packet(packet: &PublishPacket) -> Self { + use crate::protocol::v5::properties::{PropertyId, PropertyValue}; + + let user_properties = packet + .properties + .get_all(PropertyId::UserProperty) + .map(|values| { + values + .iter() + .filter_map(|v| { + if let PropertyValue::Utf8StringPair(k, val) = v { + Some((k.clone(), val.clone())) + } else { + None + } + }) + .collect() + }) + .unwrap_or_default(); + + let content_type = packet.properties.get_content_type(); + + let response_topic = packet + .properties + .get(PropertyId::ResponseTopic) + .and_then(|v| { + if let PropertyValue::Utf8String(s) = v { + Some(s.clone()) + } else { + None + } + }); + + let correlation_data = packet + .properties + .get(PropertyId::CorrelationData) + .and_then(|v| { + if let PropertyValue::BinaryData(b) = v { + Some(b.to_vec()) + } else { + None + } + }); + + let payload_format_indicator = packet + .properties + .get(PropertyId::PayloadFormatIndicator) + .and_then(|v| { + if let PropertyValue::Byte(b) = v { + Some(*b != 0) + } else { + None + } + }); + + Self { + user_properties, + content_type, + response_topic, + correlation_data, + payload_format_indicator, + } + } +} + +fn apply_v5_props( + packet: &mut PublishPacket, + user_properties: &[(String, String)], + content_type: Option<&str>, + response_topic: Option<&str>, + correlation_data: Option<&[u8]>, + payload_format_indicator: Option, +) { + for (key, value) in user_properties { + packet + .properties + .add_user_property(key.clone(), value.clone()); + } + + if let Some(ct) = content_type { + packet.properties.set_content_type(ct.to_string()); + } + + if let Some(rt) = response_topic { + packet.properties.set_response_topic(rt.to_string()); + } + + if let Some(cd) = correlation_data { + packet.properties.set_correlation_data(cd.to_vec().into()); + } + + if let Some(pfi) = payload_format_indicator { + packet.properties.set_payload_format_indicator(pfi); + } +} + /// Retained message with metadata #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RetainedMessage { @@ -51,6 +160,16 @@ pub struct RetainedMessage { /// Message expiry time (computed from `stored_at` + interval) #[serde(skip)] pub expires_at: Option, + #[serde(default)] + pub user_properties: Vec<(String, String)>, + #[serde(default)] + pub content_type: Option, + #[serde(default)] + pub response_topic: Option, + #[serde(default)] + pub correlation_data: Option>, + #[serde(default)] + pub payload_format_indicator: Option, } const CHANGE_ONLY_MAX_TOPICS: usize = 10_000; @@ -591,6 +710,8 @@ impl RetainedMessage { let expires_at = message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval))); + let v5 = V5PublishProps::from_packet(&packet); + Self { topic: packet.topic_name, payload: packet.payload.to_vec(), @@ -599,6 +720,11 @@ impl RetainedMessage { stored_at_secs, message_expiry_interval, expires_at, + user_properties: v5.user_properties, + content_type: v5.content_type, + response_topic: v5.response_topic, + correlation_data: v5.correlation_data, + payload_format_indicator: v5.payload_format_indicator, } } @@ -620,6 +746,15 @@ impl RetainedMessage { packet.properties.set_message_expiry_interval(remaining); } + apply_v5_props( + &mut packet, + &self.user_properties, + self.content_type.as_deref(), + self.response_topic.as_deref(), + self.correlation_data.as_deref(), + self.payload_format_indicator, + ); + packet } @@ -832,8 +967,6 @@ impl InflightMessage { direction: InflightDirection, phase: InflightPhase, ) -> Self { - use crate::protocol::v5::properties::{PropertyId, PropertyValue}; - let now = SystemTime::now(); let stored_at_secs = now .duration_since(SystemTime::UNIX_EPOCH) @@ -845,57 +978,7 @@ impl InflightMessage { let expires_at_cache = message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval))); - let user_properties = packet - .properties - .get_all(PropertyId::UserProperty) - .map(|values| { - values - .iter() - .filter_map(|v| { - if let PropertyValue::Utf8StringPair(k, val) = v { - Some((k.clone(), val.clone())) - } else { - None - } - }) - .collect() - }) - .unwrap_or_default(); - - let content_type = packet.properties.get_content_type(); - - let response_topic = packet - .properties - .get(PropertyId::ResponseTopic) - .and_then(|v| { - if let PropertyValue::Utf8String(s) = v { - Some(s.clone()) - } else { - None - } - }); - - let correlation_data = packet - .properties - .get(PropertyId::CorrelationData) - .and_then(|v| { - if let PropertyValue::BinaryData(b) = v { - Some(b.to_vec()) - } else { - None - } - }); - - let payload_format_indicator = packet - .properties - .get(PropertyId::PayloadFormatIndicator) - .and_then(|v| { - if let PropertyValue::Byte(b) = v { - Some(*b != 0) - } else { - None - } - }); + let v5 = V5PublishProps::from_packet(packet); Self { client_id, @@ -910,11 +993,11 @@ impl InflightMessage { message_expiry_interval, expires_at_secs, expires_at_cache, - user_properties, - content_type, - response_topic, - correlation_data, - payload_format_indicator, + user_properties: v5.user_properties, + content_type: v5.content_type, + response_topic: v5.response_topic, + correlation_data: v5.correlation_data, + payload_format_indicator: v5.payload_format_indicator, } } @@ -928,27 +1011,14 @@ impl InflightMessage { packet.properties.set_message_expiry_interval(remaining); } - for (key, value) in &self.user_properties { - packet - .properties - .add_user_property(key.clone(), value.clone()); - } - - if let Some(ref ct) = self.content_type { - packet.properties.set_content_type(ct.clone()); - } - - if let Some(ref rt) = self.response_topic { - packet.properties.set_response_topic(rt.clone()); - } - - if let Some(ref cd) = self.correlation_data { - packet.properties.set_correlation_data(cd.clone().into()); - } - - if let Some(pfi) = self.payload_format_indicator { - packet.properties.set_payload_format_indicator(pfi); - } + apply_v5_props( + &mut packet, + &self.user_properties, + self.content_type.as_deref(), + self.response_topic.as_deref(), + self.correlation_data.as_deref(), + self.payload_format_indicator, + ); packet } diff --git a/crates/mqtt5/src/broker/storage/tests.rs b/crates/mqtt5/src/broker/storage/tests.rs index 918d6088..e657492f 100644 --- a/crates/mqtt5/src/broker/storage/tests.rs +++ b/crates/mqtt5/src/broker/storage/tests.rs @@ -240,6 +240,109 @@ async fn test_file_backend_persistence() { } } +#[tokio::test] +async fn test_file_backend_retained_v5_properties_round_trip() { + let dir = tempfile::tempdir().unwrap(); + let backend_path = dir.path().to_path_buf(); + + { + let backend = FileBackend::new(&backend_path).await.unwrap(); + + let mut packet = + PublishPacket::new("sensors/request", &b"25C"[..], QoS::ExactlyOnce).with_retain(true); + packet.properties.set_message_expiry_interval(3600); + packet + .properties + .set_response_topic("sensors/response".to_string()); + packet + .properties + .set_correlation_data(Bytes::from_static(b"corr-77")); + packet.properties.set_content_type("text/plain".to_string()); + packet.properties.set_payload_format_indicator(true); + packet + .properties + .add_user_property("trace-id".to_string(), "issue-77".to_string()); + + let retained = RetainedMessage::new(packet); + backend + .store_retained_message("sensors/request", retained) + .await + .unwrap(); + } + + let backend = FileBackend::new(&backend_path).await.unwrap(); + let loaded = backend + .get_retained_message("sensors/request") + .await + .unwrap() + .expect("retained message should have been persisted"); + + assert_eq!(loaded.response_topic.as_deref(), Some("sensors/response")); + assert_eq!(loaded.correlation_data.as_deref(), Some(&b"corr-77"[..])); + assert_eq!(loaded.content_type.as_deref(), Some("text/plain")); + assert_eq!(loaded.payload_format_indicator, Some(true)); + assert!( + loaded + .user_properties + .iter() + .any(|(k, v)| k == "trace-id" && v == "issue-77"), + "user property survived round-trip; got {:?}", + loaded.user_properties + ); + + let packet = loaded.to_publish_packet(); + let restored_response_topic = packet + .properties + .get(PropertyId::ResponseTopic) + .and_then(|v| { + if let PropertyValue::Utf8String(s) = v { + Some(s.clone()) + } else { + None + } + }); + assert_eq!(restored_response_topic.as_deref(), Some("sensors/response")); +} + +#[tokio::test] +async fn test_file_backend_retained_legacy_json_deserializes() { + let dir = tempfile::tempdir().unwrap(); + let backend_path = dir.path().to_path_buf(); + + let backend = FileBackend::new(&backend_path).await.unwrap(); + let retained_dir = backend_path.join("retained"); + tokio::fs::create_dir_all(&retained_dir).await.unwrap(); + + let legacy_json = serde_json::json!({ + "topic": "legacytopic", + "payload": [104, 105], + "qos": "AtLeastOnce", + "retain": true, + "stored_at_secs": 1_700_000_000u64, + "message_expiry_interval": null, + }); + tokio::fs::write( + retained_dir.join("legacytopic.json"), + serde_json::to_vec_pretty(&legacy_json).unwrap(), + ) + .await + .unwrap(); + + let loaded = backend + .get_retained_message("legacytopic") + .await + .unwrap() + .expect("legacy JSON should deserialize via serde defaults"); + + assert_eq!(loaded.topic, "legacytopic"); + assert_eq!(&loaded.payload[..], b"hi"); + assert!(loaded.user_properties.is_empty()); + assert!(loaded.content_type.is_none()); + assert!(loaded.response_topic.is_none()); + assert!(loaded.correlation_data.is_none()); + assert!(loaded.payload_format_indicator.is_none()); +} + #[tokio::test] async fn test_concurrent_access() { let storage = Arc::new(create_memory_storage()); diff --git a/crates/mqtt5/src/client/auth_handlers/scram.rs b/crates/mqtt5/src/client/auth_handlers/scram.rs index e0c07728..03db0da2 100644 --- a/crates/mqtt5/src/client/auth_handlers/scram.rs +++ b/crates/mqtt5/src/client/auth_handlers/scram.rs @@ -272,7 +272,7 @@ mod tests { assert!(msg.starts_with("c=biws,r=")); assert!(msg.contains(",p=")); } - other => panic!("Expected Continue, got {other:?}",), + other => panic!("Expected Continue, got {other:?}"), } } @@ -314,7 +314,7 @@ mod tests { AuthResponse::Abort(msg) => { assert!(msg.contains("Server signature verification failed")); } - other => panic!("Expected Abort, got {other:?}",), + other => panic!("Expected Abort, got {other:?}"), } } @@ -338,7 +338,7 @@ mod tests { AuthResponse::Abort(msg) => { assert!(msg.contains("nonce")); } - other => panic!("Expected Abort, got {other:?}",), + other => panic!("Expected Abort, got {other:?}"), } } } diff --git a/crates/mqtt5/src/client/direct/reader.rs b/crates/mqtt5/src/client/direct/reader.rs index 549aa29b..b3d2ad89 100644 --- a/crates/mqtt5/src/client/direct/reader.rs +++ b/crates/mqtt5/src/client/direct/reader.rs @@ -108,24 +108,21 @@ pub(super) async fn packet_reader_task_with_responses( continue; } } - Packet::PubRec(pubrec) => { - if pubrec.reason_code.is_error() { - tracing::debug!( - packet_id = pubrec.packet_id, - reason_code = ?pubrec.reason_code, - "QoS 2 PUBREC rejected" - ); - if let Some(tx) = ctx.pubcomp_channels.lock().remove(&pubrec.packet_id) - { - let _ = tx.send(pubrec.reason_code); - } - ctx.session - .write() - .await - .remove_unacked_publish(pubrec.packet_id) - .await; - continue; + Packet::PubRec(pubrec) if pubrec.reason_code.is_error() => { + tracing::debug!( + packet_id = pubrec.packet_id, + reason_code = ?pubrec.reason_code, + "QoS 2 PUBREC rejected" + ); + if let Some(tx) = ctx.pubcomp_channels.lock().remove(&pubrec.packet_id) { + let _ = tx.send(pubrec.reason_code); } + ctx.session + .write() + .await + .remove_unacked_publish(pubrec.packet_id) + .await; + continue; } Packet::PubComp(pubcomp) => { if let Some(tx) = ctx.pubcomp_channels.lock().remove(&pubcomp.packet_id) { diff --git a/crates/mqtt5/src/client/error_recovery.rs b/crates/mqtt5/src/client/error_recovery.rs index 1be84d85..cb0ba09c 100644 --- a/crates/mqtt5/src/client/error_recovery.rs +++ b/crates/mqtt5/src/client/error_recovery.rs @@ -201,7 +201,7 @@ mod tests { ); assert_eq!( retry_delay(RecoverableError::QuotaExceeded, 0, &config), - Duration::from_millis(1000) + Duration::from_secs(1) ); } diff --git a/crates/mqtt5/src/session.rs b/crates/mqtt5/src/session.rs index 6a228bea..15a1a18a 100644 --- a/crates/mqtt5/src/session.rs +++ b/crates/mqtt5/src/session.rs @@ -14,6 +14,7 @@ pub use limits::{ExpiringMessage, LimitsConfig, LimitsManager}; pub use queue::{MessageQueue, QueueResult, QueueStats, QueuedMessage}; #[cfg(not(target_arch = "wasm32"))] pub use quic_flow::{FlowRegistry, FlowState, FlowType}; +#[allow(deprecated)] pub use retained::{RetainedMessage, RetainedMessageStore}; pub use state::{SessionConfig, SessionState, SessionStats}; pub use subscription::{Subscription, SubscriptionManager}; diff --git a/crates/mqtt5/src/session/retained.rs b/crates/mqtt5/src/session/retained.rs index 2cdbab84..31b1ba48 100644 --- a/crates/mqtt5/src/session/retained.rs +++ b/crates/mqtt5/src/session/retained.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + use crate::packet::publish::PublishPacket; use crate::topic_matching::matches as topic_matches; use crate::QoS; @@ -6,6 +8,10 @@ use std::sync::Arc; use tokio::sync::RwLock; /// Storage for retained messages +#[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; use the broker's storage backend (broker::storage::RetainedMessage) instead. Scheduled for removal in 0.32.0." +)] #[derive(Debug, Clone)] pub struct RetainedMessageStore { /// Map of topic names to retained messages @@ -13,6 +19,10 @@ pub struct RetainedMessageStore { } /// A retained message +#[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; use broker::storage::RetainedMessage instead. Scheduled for removal in 0.32.0." +)] #[derive(Debug, Clone)] pub struct RetainedMessage { /// The topic name diff --git a/crates/mqtt5/src/session/state.rs b/crates/mqtt5/src/session/state.rs index b693f227..d2afe629 100644 --- a/crates/mqtt5/src/session/state.rs +++ b/crates/mqtt5/src/session/state.rs @@ -5,6 +5,7 @@ use crate::session::limits::LimitsManager; use crate::session::queue::{MessageQueue, QueuedMessage}; #[cfg(not(target_arch = "wasm32"))] use crate::session::quic_flow::{FlowRegistry, FlowState}; +#[allow(deprecated)] use crate::session::retained::{RetainedMessage, RetainedMessageStore}; use crate::session::subscription::{Subscription, SubscriptionManager}; use crate::time::{Duration, Instant}; @@ -70,6 +71,7 @@ pub struct SessionState { /// Topic alias manager for incoming messages topic_alias_in: Arc>, /// Retained message store + #[allow(deprecated)] retained_messages: Arc, /// Will message (to be published on abnormal disconnection) will_message: Arc>>, @@ -104,7 +106,10 @@ impl SessionState { flow_control: Arc::new(RwLock::new(FlowControlManager::new(65535))), // Default to max topic_alias_out: Arc::new(RwLock::new(TopicAliasManager::new(0))), // Default to disabled topic_alias_in: Arc::new(RwLock::new(TopicAliasManager::new(0))), // Default to disabled - retained_messages: Arc::new(RetainedMessageStore::new()), + retained_messages: Arc::new({ + #[allow(deprecated)] + RetainedMessageStore::new() + }), will_message: Arc::new(RwLock::new(None)), will_delay_handle: Arc::new(RwLock::new(None)), limits: Arc::new(RwLock::new(LimitsManager::with_defaults())), @@ -432,26 +437,39 @@ impl SessionState { } /// Stores or clears a retained message + #[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; the broker uses broker::storage::RetainedMessage. Scheduled for removal in 0.32.0." + )] + #[allow(deprecated)] pub async fn store_retained_message(&self, packet: &PublishPacket) { let topic = packet.topic_name.clone(); if packet.payload.is_empty() { - // Empty payload clears the retained message self.retained_messages.store(topic, None).await; } else { - // Store the retained message let message = RetainedMessage::from(packet); self.retained_messages.store(topic, Some(message)).await; } } /// Gets retained messages matching a topic filter + #[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; the broker uses broker::storage::RetainedMessage. Scheduled for removal in 0.32.0." + )] + #[allow(deprecated)] pub async fn get_retained_messages(&self, topic_filter: &str) -> Vec { self.retained_messages.get_matching(topic_filter).await } #[must_use] /// Gets the retained message store + #[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; the broker uses broker::storage::RetainedMessage. Scheduled for removal in 0.32.0." + )] + #[allow(deprecated)] pub fn retained_messages(&self) -> &Arc { &self.retained_messages } @@ -692,6 +710,7 @@ pub struct SessionStats { } #[cfg(test)] +#[allow(deprecated)] mod tests { use super::*; use crate::packet::subscribe::SubscriptionOptions; diff --git a/crates/mqtt5/src/test_utils.rs b/crates/mqtt5/src/test_utils.rs index 25fb8692..f0bcefa2 100644 --- a/crates/mqtt5/src/test_utils.rs +++ b/crates/mqtt5/src/test_utils.rs @@ -9,6 +9,7 @@ use crate::packet::Packet; use crate::protocol::v5::properties::Properties; use crate::protocol::v5::reason_codes::ReasonCode; use crate::session::limits::{ExpiringMessage, LimitsManager}; +#[allow(deprecated)] use crate::session::retained::RetainedMessage; use crate::time::Duration; use crate::{MqttClient, QoS, Result}; @@ -296,6 +297,7 @@ pub fn test_expiring_message(index: u8) -> ExpiringMessage { /// Creates a test retained message with standard defaults #[must_use] +#[allow(deprecated)] pub fn test_retained_message(index: u8) -> RetainedMessage { RetainedMessage { topic: format!("topic/{index}"), @@ -364,6 +366,7 @@ impl TestMessageBuilder { /// Builds a batch of retained messages #[must_use] + #[allow(deprecated)] pub fn build_retained_batch(self, count: u8) -> Vec { (0..count) .map(|i| RetainedMessage { diff --git a/crates/mqtt5/tests/retained_properties_preserved.rs b/crates/mqtt5/tests/retained_properties_preserved.rs new file mode 100644 index 00000000..d78e5610 --- /dev/null +++ b/crates/mqtt5/tests/retained_properties_preserved.rs @@ -0,0 +1,154 @@ +#![allow(clippy::large_futures)] + +mod common; + +use common::{create_test_client_with_broker, test_client_id, TestBroker}; +use mqtt5::time::Duration; +use mqtt5::types::{Message, PublishProperties}; +use mqtt5::{MqttClient, PublishOptions, QoS}; +use std::sync::Arc; +use tokio::sync::Mutex; + +const PUBLISH_TOPIC: &str = "sensors/request"; + +async fn run_retained_v5_props_case( + qos: QoS, + subscribe_filter: &str, + message_expiry_interval: Option, + case_label: &str, +) { + let broker = TestBroker::start().await; + + let publisher = create_test_client_with_broker( + &format!("retained-props-pub-{case_label}"), + broker.address(), + ) + .await; + + let options = PublishOptions { + qos, + retain: true, + properties: PublishProperties { + response_topic: Some("sensors/response".to_string()), + correlation_data: Some(b"corr-77".to_vec()), + content_type: Some("text/plain".to_string()), + payload_format_indicator: Some(true), + user_properties: vec![("trace-id".to_string(), "issue-77".to_string())], + message_expiry_interval, + ..Default::default() + }, + skip_codec: false, + }; + + publisher + .publish_with_options(PUBLISH_TOPIC, b"25C", options) + .await + .expect("publish retained failed"); + + tokio::time::sleep(Duration::from_millis(200)).await; + + let subscriber = MqttClient::new(test_client_id(&format!("retained-props-sub-{case_label}"))); + subscriber + .connect(broker.address()) + .await + .expect("subscriber connect failed"); + + let received: Arc>> = Arc::new(Mutex::new(None)); + let received_clone = Arc::clone(&received); + + subscriber + .subscribe(subscribe_filter, move |msg| { + let slot = Arc::clone(&received_clone); + tokio::spawn(async move { + *slot.lock().await = Some(msg); + }); + }) + .await + .expect("subscribe failed"); + + let start = tokio::time::Instant::now(); + let msg = loop { + if let Some(m) = received.lock().await.clone() { + break m; + } + assert!( + start.elapsed() <= Duration::from_secs(3), + "[{case_label}] did not receive retained message within 3s" + ); + tokio::time::sleep(Duration::from_millis(20)).await; + }; + + assert_eq!(msg.topic, PUBLISH_TOPIC, "[{case_label}] topic mismatch"); + assert_eq!(&msg.payload[..], b"25C", "[{case_label}] payload mismatch"); + assert!(msg.retain, "[{case_label}] retain flag should be set"); + + assert_eq!( + msg.properties.response_topic.as_deref(), + Some("sensors/response"), + "[{case_label}] response_topic was dropped" + ); + assert_eq!( + msg.properties.correlation_data.as_deref(), + Some(&b"corr-77"[..]), + "[{case_label}] correlation_data was dropped" + ); + assert_eq!( + msg.properties.content_type.as_deref(), + Some("text/plain"), + "[{case_label}] content_type was dropped" + ); + assert_eq!( + msg.properties.payload_format_indicator, + Some(true), + "[{case_label}] payload_format_indicator was dropped" + ); + assert!( + msg.properties + .user_properties + .iter() + .any(|(k, v)| k == "trace-id" && v == "issue-77"), + "[{case_label}] user property 'trace-id=issue-77' was dropped; got {:?}", + msg.properties.user_properties + ); + + if let Some(expected_expiry) = message_expiry_interval { + let actual = msg + .properties + .message_expiry_interval + .expect("message_expiry_interval should still be set"); + assert!( + actual <= expected_expiry, + "[{case_label}] expiry should be <= original {expected_expiry}, got {actual}" + ); + } +} + +#[tokio::test] +async fn retained_v5_props_qos2_exact_topic() { + run_retained_v5_props_case(QoS::ExactlyOnce, PUBLISH_TOPIC, None, "qos2-exact").await; +} + +#[tokio::test] +async fn retained_v5_props_qos1_exact_topic() { + run_retained_v5_props_case(QoS::AtLeastOnce, PUBLISH_TOPIC, None, "qos1-exact").await; +} + +#[tokio::test] +async fn retained_v5_props_qos0_exact_topic() { + run_retained_v5_props_case(QoS::AtMostOnce, PUBLISH_TOPIC, None, "qos0-exact").await; +} + +#[tokio::test] +async fn retained_v5_props_qos1_wildcard_subscribe() { + run_retained_v5_props_case(QoS::AtLeastOnce, "sensors/+", None, "qos1-wildcard").await; +} + +#[tokio::test] +async fn retained_v5_props_qos1_multi_level_wildcard() { + run_retained_v5_props_case(QoS::AtLeastOnce, "sensors/#", None, "qos1-multi-wildcard").await; +} + +#[tokio::test] +async fn retained_v5_props_survives_message_expiry_interval() { + run_retained_v5_props_case(QoS::AtLeastOnce, PUBLISH_TOPIC, Some(3600), "expiry").await; +} diff --git a/crates/mqtt5/tests/session_state_property_tests.rs b/crates/mqtt5/tests/session_state_property_tests.rs index ebe57228..01063fa3 100644 --- a/crates/mqtt5/tests/session_state_property_tests.rs +++ b/crates/mqtt5/tests/session_state_property_tests.rs @@ -10,6 +10,7 @@ //! - Subscription management #![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_precision_loss)] +#![allow(deprecated)] use mqtt5::packet::publish::PublishPacket; use mqtt5::packet::subscribe::{RetainHandling, SubscriptionOptions}; diff --git a/crates/mqttv5-cli/src/commands/bench_cmd.rs b/crates/mqttv5-cli/src/commands/bench_cmd.rs index 2589b8bd..4fb864e6 100644 --- a/crates/mqttv5-cli/src/commands/bench_cmd.rs +++ b/crates/mqttv5-cli/src/commands/bench_cmd.rs @@ -1453,7 +1453,9 @@ async fn run_hol_blocking(cmd: BenchCommand) -> Result<()> { let per_topic_interval_us = if cmd.rate > 0 { #[allow(clippy::cast_possible_truncation)] - let interval = 1_000_000u64 * (num_topics as u64) / cmd.rate; + let interval = (1_000_000u64 * (num_topics as u64)) + .checked_div(cmd.rate) + .unwrap_or(0); Some(interval) } else { None