From b1c06a35252539e4f41aebac76d63ca774af8a22 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Fri, 15 May 2026 11:37:34 -0700 Subject: [PATCH 1/4] preserve v5 properties on retained messages --- crates/mqtt5/src/broker/storage/mod.rs | 91 ++++++++++++++++ .../tests/retained_properties_preserved.rs | 100 ++++++++++++++++++ 2 files changed, 191 insertions(+) create mode 100644 crates/mqtt5/tests/retained_properties_preserved.rs diff --git a/crates/mqtt5/src/broker/storage/mod.rs b/crates/mqtt5/src/broker/storage/mod.rs index 33a9c48..894d0a4 100644 --- a/crates/mqtt5/src/broker/storage/mod.rs +++ b/crates/mqtt5/src/broker/storage/mod.rs @@ -51,6 +51,16 @@ pub struct RetainedMessage { /// Message expiry time (computed from `stored_at` + interval) #[serde(skip)] pub expires_at: Option, + #[serde(default)] + pub user_properties: Vec<(String, String)>, + #[serde(default)] + pub content_type: Option, + #[serde(default)] + pub response_topic: Option, + #[serde(default)] + pub correlation_data: Option>, + #[serde(default)] + pub payload_format_indicator: Option, } const CHANGE_ONLY_MAX_TOPICS: usize = 10_000; @@ -582,6 +592,8 @@ impl RetainedMessage { /// Create new retained message from PUBLISH packet #[must_use] pub fn new(packet: PublishPacket) -> Self { + use crate::protocol::v5::properties::{PropertyId, PropertyValue}; + let now = SystemTime::now(); let stored_at_secs = now .duration_since(SystemTime::UNIX_EPOCH) @@ -591,6 +603,58 @@ impl RetainedMessage { let expires_at = message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval))); + let user_properties = packet + .properties + .get_all(PropertyId::UserProperty) + .map(|values| { + values + .iter() + .filter_map(|v| { + if let PropertyValue::Utf8StringPair(k, val) = v { + Some((k.clone(), val.clone())) + } else { + None + } + }) + .collect() + }) + .unwrap_or_default(); + + let content_type = packet.properties.get_content_type(); + + let response_topic = packet + .properties + .get(PropertyId::ResponseTopic) + .and_then(|v| { + if let PropertyValue::Utf8String(s) = v { + Some(s.clone()) + } else { + None + } + }); + + let correlation_data = packet + .properties + .get(PropertyId::CorrelationData) + .and_then(|v| { + if let PropertyValue::BinaryData(b) = v { + Some(b.to_vec()) + } else { + None + } + }); + + let payload_format_indicator = packet + .properties + .get(PropertyId::PayloadFormatIndicator) + .and_then(|v| { + if let PropertyValue::Byte(b) = v { + Some(*b != 0) + } else { + None + } + }); + Self { topic: packet.topic_name, payload: packet.payload.to_vec(), @@ -599,6 +663,11 @@ impl RetainedMessage { stored_at_secs, message_expiry_interval, expires_at, + user_properties, + content_type, + response_topic, + correlation_data, + payload_format_indicator, } } @@ -620,6 +689,28 @@ impl RetainedMessage { packet.properties.set_message_expiry_interval(remaining); } + for (key, value) in &self.user_properties { + packet + .properties + .add_user_property(key.clone(), value.clone()); + } + + if let Some(ref ct) = self.content_type { + packet.properties.set_content_type(ct.clone()); + } + + if let Some(ref rt) = self.response_topic { + packet.properties.set_response_topic(rt.clone()); + } + + if let Some(ref cd) = self.correlation_data { + packet.properties.set_correlation_data(cd.clone().into()); + } + + if let Some(pfi) = self.payload_format_indicator { + packet.properties.set_payload_format_indicator(pfi); + } + packet } diff --git a/crates/mqtt5/tests/retained_properties_preserved.rs b/crates/mqtt5/tests/retained_properties_preserved.rs new file mode 100644 index 0000000..ef94208 --- /dev/null +++ b/crates/mqtt5/tests/retained_properties_preserved.rs @@ -0,0 +1,100 @@ +mod common; + +use common::{create_test_client_with_broker, test_client_id, TestBroker}; +use mqtt5::time::Duration; +use mqtt5::types::{Message, PublishProperties}; +use mqtt5::{MqttClient, PublishOptions, QoS}; +use std::sync::Arc; +use tokio::sync::Mutex; + +#[tokio::test] +async fn retained_message_preserves_v5_properties_for_late_subscriber() { + let broker = TestBroker::start().await; + + let publisher = create_test_client_with_broker("retained-props-pub", broker.address()).await; + + let options = PublishOptions { + qos: QoS::ExactlyOnce, + retain: true, + properties: PublishProperties { + response_topic: Some("sensors/response".to_string()), + correlation_data: Some(b"corr-77".to_vec()), + content_type: Some("text/plain".to_string()), + payload_format_indicator: Some(true), + user_properties: vec![("trace-id".to_string(), "issue-77".to_string())], + ..Default::default() + }, + skip_codec: false, + }; + + publisher + .publish_with_options("sensors/request", b"25C", options) + .await + .expect("publish retained failed"); + + tokio::time::sleep(Duration::from_millis(200)).await; + + let subscriber = MqttClient::new(test_client_id("retained-props-sub")); + subscriber + .connect(broker.address()) + .await + .expect("subscriber connect failed"); + + let received: Arc>> = Arc::new(Mutex::new(None)); + let received_clone = Arc::clone(&received); + + subscriber + .subscribe("sensors/request", move |msg| { + let slot = Arc::clone(&received_clone); + tokio::spawn(async move { + *slot.lock().await = Some(msg); + }); + }) + .await + .expect("subscribe failed"); + + let start = tokio::time::Instant::now(); + let msg = loop { + if let Some(m) = received.lock().await.clone() { + break m; + } + assert!( + start.elapsed() <= Duration::from_secs(3), + "did not receive retained message within 3s" + ); + tokio::time::sleep(Duration::from_millis(20)).await; + }; + + assert_eq!(msg.topic, "sensors/request"); + assert_eq!(&msg.payload[..], b"25C"); + assert!(msg.retain, "retain flag should be set on retained delivery"); + + assert_eq!( + msg.properties.response_topic.as_deref(), + Some("sensors/response"), + "response_topic was dropped by the broker on retained delivery" + ); + assert_eq!( + msg.properties.correlation_data.as_deref(), + Some(&b"corr-77"[..]), + "correlation_data was dropped by the broker on retained delivery" + ); + assert_eq!( + msg.properties.content_type.as_deref(), + Some("text/plain"), + "content_type was dropped by the broker on retained delivery" + ); + assert_eq!( + msg.properties.payload_format_indicator, + Some(true), + "payload_format_indicator was dropped by the broker on retained delivery" + ); + assert!( + msg.properties + .user_properties + .iter() + .any(|(k, v)| k == "trace-id" && v == "issue-77"), + "user property 'trace-id=issue-77' was dropped by the broker on retained delivery; got {:?}", + msg.properties.user_properties + ); +} From fd057cb5bbec903aa7fdd73fd707cccfc5fa6cd4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Fri, 15 May 2026 12:03:59 -0700 Subject: [PATCH 2/4] address PR review: dedupe v5 props extraction, expand tests, deprecate session::retained --- crates/mqtt5/src/broker/storage/mod.rs | 295 ++++++++---------- crates/mqtt5/src/broker/storage/tests.rs | 103 ++++++ crates/mqtt5/src/session.rs | 1 + crates/mqtt5/src/session/retained.rs | 10 + crates/mqtt5/src/session/state.rs | 25 +- crates/mqtt5/src/test_utils.rs | 3 + .../tests/retained_properties_preserved.rs | 86 ++++- .../tests/session_state_property_tests.rs | 1 + 8 files changed, 347 insertions(+), 177 deletions(-) diff --git a/crates/mqtt5/src/broker/storage/mod.rs b/crates/mqtt5/src/broker/storage/mod.rs index 894d0a4..5d42a73 100644 --- a/crates/mqtt5/src/broker/storage/mod.rs +++ b/crates/mqtt5/src/broker/storage/mod.rs @@ -33,6 +33,115 @@ use std::sync::Arc; use tokio::sync::RwLock; use tracing::warn; +/// MQTT v5 publish properties extracted from a `PublishPacket` for storage. +/// +/// Shared by `RetainedMessage` and `InflightMessage` to avoid duplicating the +/// `PropertyId`/`PropertyValue` extraction and round-trip logic. +struct V5PublishProps { + user_properties: Vec<(String, String)>, + content_type: Option, + response_topic: Option, + correlation_data: Option>, + payload_format_indicator: Option, +} + +impl V5PublishProps { + fn from_packet(packet: &PublishPacket) -> Self { + use crate::protocol::v5::properties::{PropertyId, PropertyValue}; + + let user_properties = packet + .properties + .get_all(PropertyId::UserProperty) + .map(|values| { + values + .iter() + .filter_map(|v| { + if let PropertyValue::Utf8StringPair(k, val) = v { + Some((k.clone(), val.clone())) + } else { + None + } + }) + .collect() + }) + .unwrap_or_default(); + + let content_type = packet.properties.get_content_type(); + + let response_topic = packet + .properties + .get(PropertyId::ResponseTopic) + .and_then(|v| { + if let PropertyValue::Utf8String(s) = v { + Some(s.clone()) + } else { + None + } + }); + + let correlation_data = packet + .properties + .get(PropertyId::CorrelationData) + .and_then(|v| { + if let PropertyValue::BinaryData(b) = v { + Some(b.to_vec()) + } else { + None + } + }); + + let payload_format_indicator = packet + .properties + .get(PropertyId::PayloadFormatIndicator) + .and_then(|v| { + if let PropertyValue::Byte(b) = v { + Some(*b != 0) + } else { + None + } + }); + + Self { + user_properties, + content_type, + response_topic, + correlation_data, + payload_format_indicator, + } + } +} + +fn apply_v5_props( + packet: &mut PublishPacket, + user_properties: &[(String, String)], + content_type: Option<&str>, + response_topic: Option<&str>, + correlation_data: Option<&[u8]>, + payload_format_indicator: Option, +) { + for (key, value) in user_properties { + packet + .properties + .add_user_property(key.clone(), value.clone()); + } + + if let Some(ct) = content_type { + packet.properties.set_content_type(ct.to_string()); + } + + if let Some(rt) = response_topic { + packet.properties.set_response_topic(rt.to_string()); + } + + if let Some(cd) = correlation_data { + packet.properties.set_correlation_data(cd.to_vec().into()); + } + + if let Some(pfi) = payload_format_indicator { + packet.properties.set_payload_format_indicator(pfi); + } +} + /// Retained message with metadata #[derive(Debug, Clone, Serialize, Deserialize)] pub struct RetainedMessage { @@ -592,8 +701,6 @@ impl RetainedMessage { /// Create new retained message from PUBLISH packet #[must_use] pub fn new(packet: PublishPacket) -> Self { - use crate::protocol::v5::properties::{PropertyId, PropertyValue}; - let now = SystemTime::now(); let stored_at_secs = now .duration_since(SystemTime::UNIX_EPOCH) @@ -603,57 +710,7 @@ impl RetainedMessage { let expires_at = message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval))); - let user_properties = packet - .properties - .get_all(PropertyId::UserProperty) - .map(|values| { - values - .iter() - .filter_map(|v| { - if let PropertyValue::Utf8StringPair(k, val) = v { - Some((k.clone(), val.clone())) - } else { - None - } - }) - .collect() - }) - .unwrap_or_default(); - - let content_type = packet.properties.get_content_type(); - - let response_topic = packet - .properties - .get(PropertyId::ResponseTopic) - .and_then(|v| { - if let PropertyValue::Utf8String(s) = v { - Some(s.clone()) - } else { - None - } - }); - - let correlation_data = packet - .properties - .get(PropertyId::CorrelationData) - .and_then(|v| { - if let PropertyValue::BinaryData(b) = v { - Some(b.to_vec()) - } else { - None - } - }); - - let payload_format_indicator = packet - .properties - .get(PropertyId::PayloadFormatIndicator) - .and_then(|v| { - if let PropertyValue::Byte(b) = v { - Some(*b != 0) - } else { - None - } - }); + let v5 = V5PublishProps::from_packet(&packet); Self { topic: packet.topic_name, @@ -663,11 +720,11 @@ impl RetainedMessage { stored_at_secs, message_expiry_interval, expires_at, - user_properties, - content_type, - response_topic, - correlation_data, - payload_format_indicator, + user_properties: v5.user_properties, + content_type: v5.content_type, + response_topic: v5.response_topic, + correlation_data: v5.correlation_data, + payload_format_indicator: v5.payload_format_indicator, } } @@ -689,27 +746,14 @@ impl RetainedMessage { packet.properties.set_message_expiry_interval(remaining); } - for (key, value) in &self.user_properties { - packet - .properties - .add_user_property(key.clone(), value.clone()); - } - - if let Some(ref ct) = self.content_type { - packet.properties.set_content_type(ct.clone()); - } - - if let Some(ref rt) = self.response_topic { - packet.properties.set_response_topic(rt.clone()); - } - - if let Some(ref cd) = self.correlation_data { - packet.properties.set_correlation_data(cd.clone().into()); - } - - if let Some(pfi) = self.payload_format_indicator { - packet.properties.set_payload_format_indicator(pfi); - } + apply_v5_props( + &mut packet, + &self.user_properties, + self.content_type.as_deref(), + self.response_topic.as_deref(), + self.correlation_data.as_deref(), + self.payload_format_indicator, + ); packet } @@ -923,8 +967,6 @@ impl InflightMessage { direction: InflightDirection, phase: InflightPhase, ) -> Self { - use crate::protocol::v5::properties::{PropertyId, PropertyValue}; - let now = SystemTime::now(); let stored_at_secs = now .duration_since(SystemTime::UNIX_EPOCH) @@ -936,57 +978,7 @@ impl InflightMessage { let expires_at_cache = message_expiry_interval.map(|interval| now + Duration::from_secs(u64::from(interval))); - let user_properties = packet - .properties - .get_all(PropertyId::UserProperty) - .map(|values| { - values - .iter() - .filter_map(|v| { - if let PropertyValue::Utf8StringPair(k, val) = v { - Some((k.clone(), val.clone())) - } else { - None - } - }) - .collect() - }) - .unwrap_or_default(); - - let content_type = packet.properties.get_content_type(); - - let response_topic = packet - .properties - .get(PropertyId::ResponseTopic) - .and_then(|v| { - if let PropertyValue::Utf8String(s) = v { - Some(s.clone()) - } else { - None - } - }); - - let correlation_data = packet - .properties - .get(PropertyId::CorrelationData) - .and_then(|v| { - if let PropertyValue::BinaryData(b) = v { - Some(b.to_vec()) - } else { - None - } - }); - - let payload_format_indicator = packet - .properties - .get(PropertyId::PayloadFormatIndicator) - .and_then(|v| { - if let PropertyValue::Byte(b) = v { - Some(*b != 0) - } else { - None - } - }); + let v5 = V5PublishProps::from_packet(packet); Self { client_id, @@ -1001,11 +993,11 @@ impl InflightMessage { message_expiry_interval, expires_at_secs, expires_at_cache, - user_properties, - content_type, - response_topic, - correlation_data, - payload_format_indicator, + user_properties: v5.user_properties, + content_type: v5.content_type, + response_topic: v5.response_topic, + correlation_data: v5.correlation_data, + payload_format_indicator: v5.payload_format_indicator, } } @@ -1019,27 +1011,14 @@ impl InflightMessage { packet.properties.set_message_expiry_interval(remaining); } - for (key, value) in &self.user_properties { - packet - .properties - .add_user_property(key.clone(), value.clone()); - } - - if let Some(ref ct) = self.content_type { - packet.properties.set_content_type(ct.clone()); - } - - if let Some(ref rt) = self.response_topic { - packet.properties.set_response_topic(rt.clone()); - } - - if let Some(ref cd) = self.correlation_data { - packet.properties.set_correlation_data(cd.clone().into()); - } - - if let Some(pfi) = self.payload_format_indicator { - packet.properties.set_payload_format_indicator(pfi); - } + apply_v5_props( + &mut packet, + &self.user_properties, + self.content_type.as_deref(), + self.response_topic.as_deref(), + self.correlation_data.as_deref(), + self.payload_format_indicator, + ); packet } diff --git a/crates/mqtt5/src/broker/storage/tests.rs b/crates/mqtt5/src/broker/storage/tests.rs index 918d608..e657492 100644 --- a/crates/mqtt5/src/broker/storage/tests.rs +++ b/crates/mqtt5/src/broker/storage/tests.rs @@ -240,6 +240,109 @@ async fn test_file_backend_persistence() { } } +#[tokio::test] +async fn test_file_backend_retained_v5_properties_round_trip() { + let dir = tempfile::tempdir().unwrap(); + let backend_path = dir.path().to_path_buf(); + + { + let backend = FileBackend::new(&backend_path).await.unwrap(); + + let mut packet = + PublishPacket::new("sensors/request", &b"25C"[..], QoS::ExactlyOnce).with_retain(true); + packet.properties.set_message_expiry_interval(3600); + packet + .properties + .set_response_topic("sensors/response".to_string()); + packet + .properties + .set_correlation_data(Bytes::from_static(b"corr-77")); + packet.properties.set_content_type("text/plain".to_string()); + packet.properties.set_payload_format_indicator(true); + packet + .properties + .add_user_property("trace-id".to_string(), "issue-77".to_string()); + + let retained = RetainedMessage::new(packet); + backend + .store_retained_message("sensors/request", retained) + .await + .unwrap(); + } + + let backend = FileBackend::new(&backend_path).await.unwrap(); + let loaded = backend + .get_retained_message("sensors/request") + .await + .unwrap() + .expect("retained message should have been persisted"); + + assert_eq!(loaded.response_topic.as_deref(), Some("sensors/response")); + assert_eq!(loaded.correlation_data.as_deref(), Some(&b"corr-77"[..])); + assert_eq!(loaded.content_type.as_deref(), Some("text/plain")); + assert_eq!(loaded.payload_format_indicator, Some(true)); + assert!( + loaded + .user_properties + .iter() + .any(|(k, v)| k == "trace-id" && v == "issue-77"), + "user property survived round-trip; got {:?}", + loaded.user_properties + ); + + let packet = loaded.to_publish_packet(); + let restored_response_topic = packet + .properties + .get(PropertyId::ResponseTopic) + .and_then(|v| { + if let PropertyValue::Utf8String(s) = v { + Some(s.clone()) + } else { + None + } + }); + assert_eq!(restored_response_topic.as_deref(), Some("sensors/response")); +} + +#[tokio::test] +async fn test_file_backend_retained_legacy_json_deserializes() { + let dir = tempfile::tempdir().unwrap(); + let backend_path = dir.path().to_path_buf(); + + let backend = FileBackend::new(&backend_path).await.unwrap(); + let retained_dir = backend_path.join("retained"); + tokio::fs::create_dir_all(&retained_dir).await.unwrap(); + + let legacy_json = serde_json::json!({ + "topic": "legacytopic", + "payload": [104, 105], + "qos": "AtLeastOnce", + "retain": true, + "stored_at_secs": 1_700_000_000u64, + "message_expiry_interval": null, + }); + tokio::fs::write( + retained_dir.join("legacytopic.json"), + serde_json::to_vec_pretty(&legacy_json).unwrap(), + ) + .await + .unwrap(); + + let loaded = backend + .get_retained_message("legacytopic") + .await + .unwrap() + .expect("legacy JSON should deserialize via serde defaults"); + + assert_eq!(loaded.topic, "legacytopic"); + assert_eq!(&loaded.payload[..], b"hi"); + assert!(loaded.user_properties.is_empty()); + assert!(loaded.content_type.is_none()); + assert!(loaded.response_topic.is_none()); + assert!(loaded.correlation_data.is_none()); + assert!(loaded.payload_format_indicator.is_none()); +} + #[tokio::test] async fn test_concurrent_access() { let storage = Arc::new(create_memory_storage()); diff --git a/crates/mqtt5/src/session.rs b/crates/mqtt5/src/session.rs index 6a228be..15a1a18 100644 --- a/crates/mqtt5/src/session.rs +++ b/crates/mqtt5/src/session.rs @@ -14,6 +14,7 @@ pub use limits::{ExpiringMessage, LimitsConfig, LimitsManager}; pub use queue::{MessageQueue, QueueResult, QueueStats, QueuedMessage}; #[cfg(not(target_arch = "wasm32"))] pub use quic_flow::{FlowRegistry, FlowState, FlowType}; +#[allow(deprecated)] pub use retained::{RetainedMessage, RetainedMessageStore}; pub use state::{SessionConfig, SessionState, SessionStats}; pub use subscription::{Subscription, SubscriptionManager}; diff --git a/crates/mqtt5/src/session/retained.rs b/crates/mqtt5/src/session/retained.rs index 2cdbab8..31b1ba4 100644 --- a/crates/mqtt5/src/session/retained.rs +++ b/crates/mqtt5/src/session/retained.rs @@ -1,3 +1,5 @@ +#![allow(deprecated)] + use crate::packet::publish::PublishPacket; use crate::topic_matching::matches as topic_matches; use crate::QoS; @@ -6,6 +8,10 @@ use std::sync::Arc; use tokio::sync::RwLock; /// Storage for retained messages +#[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; use the broker's storage backend (broker::storage::RetainedMessage) instead. Scheduled for removal in 0.32.0." +)] #[derive(Debug, Clone)] pub struct RetainedMessageStore { /// Map of topic names to retained messages @@ -13,6 +19,10 @@ pub struct RetainedMessageStore { } /// A retained message +#[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; use broker::storage::RetainedMessage instead. Scheduled for removal in 0.32.0." +)] #[derive(Debug, Clone)] pub struct RetainedMessage { /// The topic name diff --git a/crates/mqtt5/src/session/state.rs b/crates/mqtt5/src/session/state.rs index b693f22..d2afe62 100644 --- a/crates/mqtt5/src/session/state.rs +++ b/crates/mqtt5/src/session/state.rs @@ -5,6 +5,7 @@ use crate::session::limits::LimitsManager; use crate::session::queue::{MessageQueue, QueuedMessage}; #[cfg(not(target_arch = "wasm32"))] use crate::session::quic_flow::{FlowRegistry, FlowState}; +#[allow(deprecated)] use crate::session::retained::{RetainedMessage, RetainedMessageStore}; use crate::session::subscription::{Subscription, SubscriptionManager}; use crate::time::{Duration, Instant}; @@ -70,6 +71,7 @@ pub struct SessionState { /// Topic alias manager for incoming messages topic_alias_in: Arc>, /// Retained message store + #[allow(deprecated)] retained_messages: Arc, /// Will message (to be published on abnormal disconnection) will_message: Arc>>, @@ -104,7 +106,10 @@ impl SessionState { flow_control: Arc::new(RwLock::new(FlowControlManager::new(65535))), // Default to max topic_alias_out: Arc::new(RwLock::new(TopicAliasManager::new(0))), // Default to disabled topic_alias_in: Arc::new(RwLock::new(TopicAliasManager::new(0))), // Default to disabled - retained_messages: Arc::new(RetainedMessageStore::new()), + retained_messages: Arc::new({ + #[allow(deprecated)] + RetainedMessageStore::new() + }), will_message: Arc::new(RwLock::new(None)), will_delay_handle: Arc::new(RwLock::new(None)), limits: Arc::new(RwLock::new(LimitsManager::with_defaults())), @@ -432,26 +437,39 @@ impl SessionState { } /// Stores or clears a retained message + #[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; the broker uses broker::storage::RetainedMessage. Scheduled for removal in 0.32.0." + )] + #[allow(deprecated)] pub async fn store_retained_message(&self, packet: &PublishPacket) { let topic = packet.topic_name.clone(); if packet.payload.is_empty() { - // Empty payload clears the retained message self.retained_messages.store(topic, None).await; } else { - // Store the retained message let message = RetainedMessage::from(packet); self.retained_messages.store(topic, Some(message)).await; } } /// Gets retained messages matching a topic filter + #[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; the broker uses broker::storage::RetainedMessage. Scheduled for removal in 0.32.0." + )] + #[allow(deprecated)] pub async fn get_retained_messages(&self, topic_filter: &str) -> Vec { self.retained_messages.get_matching(topic_filter).await } #[must_use] /// Gets the retained message store + #[deprecated( + since = "0.31.5", + note = "session-level retained store is unused by the broker; the broker uses broker::storage::RetainedMessage. Scheduled for removal in 0.32.0." + )] + #[allow(deprecated)] pub fn retained_messages(&self) -> &Arc { &self.retained_messages } @@ -692,6 +710,7 @@ pub struct SessionStats { } #[cfg(test)] +#[allow(deprecated)] mod tests { use super::*; use crate::packet::subscribe::SubscriptionOptions; diff --git a/crates/mqtt5/src/test_utils.rs b/crates/mqtt5/src/test_utils.rs index 25fb869..f0bcefa 100644 --- a/crates/mqtt5/src/test_utils.rs +++ b/crates/mqtt5/src/test_utils.rs @@ -9,6 +9,7 @@ use crate::packet::Packet; use crate::protocol::v5::properties::Properties; use crate::protocol::v5::reason_codes::ReasonCode; use crate::session::limits::{ExpiringMessage, LimitsManager}; +#[allow(deprecated)] use crate::session::retained::RetainedMessage; use crate::time::Duration; use crate::{MqttClient, QoS, Result}; @@ -296,6 +297,7 @@ pub fn test_expiring_message(index: u8) -> ExpiringMessage { /// Creates a test retained message with standard defaults #[must_use] +#[allow(deprecated)] pub fn test_retained_message(index: u8) -> RetainedMessage { RetainedMessage { topic: format!("topic/{index}"), @@ -364,6 +366,7 @@ impl TestMessageBuilder { /// Builds a batch of retained messages #[must_use] + #[allow(deprecated)] pub fn build_retained_batch(self, count: u8) -> Vec { (0..count) .map(|i| RetainedMessage { diff --git a/crates/mqtt5/tests/retained_properties_preserved.rs b/crates/mqtt5/tests/retained_properties_preserved.rs index ef94208..d78e561 100644 --- a/crates/mqtt5/tests/retained_properties_preserved.rs +++ b/crates/mqtt5/tests/retained_properties_preserved.rs @@ -1,3 +1,5 @@ +#![allow(clippy::large_futures)] + mod common; use common::{create_test_client_with_broker, test_client_id, TestBroker}; @@ -7,14 +9,24 @@ use mqtt5::{MqttClient, PublishOptions, QoS}; use std::sync::Arc; use tokio::sync::Mutex; -#[tokio::test] -async fn retained_message_preserves_v5_properties_for_late_subscriber() { +const PUBLISH_TOPIC: &str = "sensors/request"; + +async fn run_retained_v5_props_case( + qos: QoS, + subscribe_filter: &str, + message_expiry_interval: Option, + case_label: &str, +) { let broker = TestBroker::start().await; - let publisher = create_test_client_with_broker("retained-props-pub", broker.address()).await; + let publisher = create_test_client_with_broker( + &format!("retained-props-pub-{case_label}"), + broker.address(), + ) + .await; let options = PublishOptions { - qos: QoS::ExactlyOnce, + qos, retain: true, properties: PublishProperties { response_topic: Some("sensors/response".to_string()), @@ -22,19 +34,20 @@ async fn retained_message_preserves_v5_properties_for_late_subscriber() { content_type: Some("text/plain".to_string()), payload_format_indicator: Some(true), user_properties: vec![("trace-id".to_string(), "issue-77".to_string())], + message_expiry_interval, ..Default::default() }, skip_codec: false, }; publisher - .publish_with_options("sensors/request", b"25C", options) + .publish_with_options(PUBLISH_TOPIC, b"25C", options) .await .expect("publish retained failed"); tokio::time::sleep(Duration::from_millis(200)).await; - let subscriber = MqttClient::new(test_client_id("retained-props-sub")); + let subscriber = MqttClient::new(test_client_id(&format!("retained-props-sub-{case_label}"))); subscriber .connect(broker.address()) .await @@ -44,7 +57,7 @@ async fn retained_message_preserves_v5_properties_for_late_subscriber() { let received_clone = Arc::clone(&received); subscriber - .subscribe("sensors/request", move |msg| { + .subscribe(subscribe_filter, move |msg| { let slot = Arc::clone(&received_clone); tokio::spawn(async move { *slot.lock().await = Some(msg); @@ -60,41 +73,82 @@ async fn retained_message_preserves_v5_properties_for_late_subscriber() { } assert!( start.elapsed() <= Duration::from_secs(3), - "did not receive retained message within 3s" + "[{case_label}] did not receive retained message within 3s" ); tokio::time::sleep(Duration::from_millis(20)).await; }; - assert_eq!(msg.topic, "sensors/request"); - assert_eq!(&msg.payload[..], b"25C"); - assert!(msg.retain, "retain flag should be set on retained delivery"); + assert_eq!(msg.topic, PUBLISH_TOPIC, "[{case_label}] topic mismatch"); + assert_eq!(&msg.payload[..], b"25C", "[{case_label}] payload mismatch"); + assert!(msg.retain, "[{case_label}] retain flag should be set"); assert_eq!( msg.properties.response_topic.as_deref(), Some("sensors/response"), - "response_topic was dropped by the broker on retained delivery" + "[{case_label}] response_topic was dropped" ); assert_eq!( msg.properties.correlation_data.as_deref(), Some(&b"corr-77"[..]), - "correlation_data was dropped by the broker on retained delivery" + "[{case_label}] correlation_data was dropped" ); assert_eq!( msg.properties.content_type.as_deref(), Some("text/plain"), - "content_type was dropped by the broker on retained delivery" + "[{case_label}] content_type was dropped" ); assert_eq!( msg.properties.payload_format_indicator, Some(true), - "payload_format_indicator was dropped by the broker on retained delivery" + "[{case_label}] payload_format_indicator was dropped" ); assert!( msg.properties .user_properties .iter() .any(|(k, v)| k == "trace-id" && v == "issue-77"), - "user property 'trace-id=issue-77' was dropped by the broker on retained delivery; got {:?}", + "[{case_label}] user property 'trace-id=issue-77' was dropped; got {:?}", msg.properties.user_properties ); + + if let Some(expected_expiry) = message_expiry_interval { + let actual = msg + .properties + .message_expiry_interval + .expect("message_expiry_interval should still be set"); + assert!( + actual <= expected_expiry, + "[{case_label}] expiry should be <= original {expected_expiry}, got {actual}" + ); + } +} + +#[tokio::test] +async fn retained_v5_props_qos2_exact_topic() { + run_retained_v5_props_case(QoS::ExactlyOnce, PUBLISH_TOPIC, None, "qos2-exact").await; +} + +#[tokio::test] +async fn retained_v5_props_qos1_exact_topic() { + run_retained_v5_props_case(QoS::AtLeastOnce, PUBLISH_TOPIC, None, "qos1-exact").await; +} + +#[tokio::test] +async fn retained_v5_props_qos0_exact_topic() { + run_retained_v5_props_case(QoS::AtMostOnce, PUBLISH_TOPIC, None, "qos0-exact").await; +} + +#[tokio::test] +async fn retained_v5_props_qos1_wildcard_subscribe() { + run_retained_v5_props_case(QoS::AtLeastOnce, "sensors/+", None, "qos1-wildcard").await; +} + +#[tokio::test] +async fn retained_v5_props_qos1_multi_level_wildcard() { + run_retained_v5_props_case(QoS::AtLeastOnce, "sensors/#", None, "qos1-multi-wildcard").await; +} + +#[tokio::test] +async fn retained_v5_props_survives_message_expiry_interval() { + run_retained_v5_props_case(QoS::AtLeastOnce, PUBLISH_TOPIC, Some(3600), "expiry").await; } diff --git a/crates/mqtt5/tests/session_state_property_tests.rs b/crates/mqtt5/tests/session_state_property_tests.rs index ebe5722..01063fa 100644 --- a/crates/mqtt5/tests/session_state_property_tests.rs +++ b/crates/mqtt5/tests/session_state_property_tests.rs @@ -10,6 +10,7 @@ //! - Subscription management #![allow(clippy::cast_possible_truncation)] #![allow(clippy::cast_precision_loss)] +#![allow(deprecated)] use mqtt5::packet::publish::PublishPacket; use mqtt5::packet::subscribe::{RetainHandling, SubscriptionOptions}; From 91cca4f57084eef1e6f977e1c4b2ac2f0b1a7788 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Fri, 15 May 2026 12:20:02 -0700 Subject: [PATCH 3/4] bump mqtt5 to 0.31.5 and mqtt5-wasm to 1.3.1 --- CHANGELOG.md | 16 ++++++++++++++++ crates/mqtt5-wasm/Cargo.toml | 2 +- crates/mqtt5/Cargo.toml | 2 +- 3 files changed, 18 insertions(+), 2 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 44a958b..fd8955b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [mqtt5-wasm 1.3.1] - 2026-05-15 + +### Fixed + +- **Retained message properties dropped (wasm broker)** - pulls in the `mqtt5 0.31.5` fix for issue #77. The wasm broker uses the same `broker::storage::RetainedMessage` as the native broker and shared the same v5-property loss on retained delivery to late subscribers; transitively fixed by the upstream change. + +## [mqtt5 0.31.5] - 2026-05-15 + +### Fixed + +- **Retained message properties dropped** - `response_topic`, `correlation_data`, `content_type`, `user_properties`, and `payload_format_indicator` were stripped when a retained message was stored and never restored on delivery to a late subscriber (issue #77). `broker::storage::RetainedMessage` had no fields for them; added the fields (each `#[serde(default)]` for file-backend back-compat) and routed extraction/restoration through a shared `V5PublishProps` helper now used by both `RetainedMessage` and `InflightMessage`. + +### Deprecated + +- **`session::retained::{RetainedMessage, RetainedMessageStore}` and `SessionState::{store_retained_message, get_retained_messages, retained_messages}`** - the session-level retained store is unused by the broker (the broker uses `broker::storage::RetainedMessage`). Scheduled for removal in 0.32.0. + ## [mqttv5-cli 0.27.2] - 2026-04-13 ### Fixed diff --git a/crates/mqtt5-wasm/Cargo.toml b/crates/mqtt5-wasm/Cargo.toml index d7085fa..5ed138b 100644 --- a/crates/mqtt5-wasm/Cargo.toml +++ b/crates/mqtt5-wasm/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5-wasm" -version = "1.3.0" +version = "1.3.1" edition.workspace = true rust-version.workspace = true authors.workspace = true diff --git a/crates/mqtt5/Cargo.toml b/crates/mqtt5/Cargo.toml index 71351ba..01a23bd 100644 --- a/crates/mqtt5/Cargo.toml +++ b/crates/mqtt5/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "mqtt5" -version = "0.31.4" +version = "0.31.5" edition.workspace = true rust-version.workspace = true authors.workspace = true From cf2a3626ec1eb4d0f4241e2bbd1796e70acaf604 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Fabr=C3=ADcio=20Bracht?= Date: Fri, 15 May 2026 16:05:51 -0700 Subject: [PATCH 4/4] fix Rust 1.95 clippy lints across workspace --- crates/mqtt5-wasm/src/broker.rs | 2 +- .../mqtt5/src/broker/auth_mechanisms/jwt.rs | 3 +- .../broker/auth_mechanisms/jwt_federated.rs | 3 +- crates/mqtt5/src/broker/quic_acceptor.rs | 16 +++++----- .../mqtt5/src/broker/storage/file_backend.rs | 15 ++------- .../mqtt5/src/client/auth_handlers/scram.rs | 6 ++-- crates/mqtt5/src/client/direct/reader.rs | 31 +++++++++---------- crates/mqtt5/src/client/error_recovery.rs | 2 +- crates/mqttv5-cli/src/commands/bench_cmd.rs | 4 ++- 9 files changed, 35 insertions(+), 47 deletions(-) diff --git a/crates/mqtt5-wasm/src/broker.rs b/crates/mqtt5-wasm/src/broker.rs index 1bde435..2fae473 100644 --- a/crates/mqtt5-wasm/src/broker.rs +++ b/crates/mqtt5-wasm/src/broker.rs @@ -324,7 +324,7 @@ impl WasmBroker { let stats = Arc::new(BrokerStats::new()); - let max_clients = config.read().map(|c| c.max_clients).unwrap_or(1000); + let max_clients = config.read().map_or(1000, |c| c.max_clients); let limits = ResourceLimits { max_connections: max_clients, ..Default::default() diff --git a/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs b/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs index a0926e4..6062b76 100644 --- a/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs +++ b/crates/mqtt5/src/broker/auth_mechanisms/jwt.rs @@ -166,8 +166,7 @@ impl JwtAuthProvider { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); + .map_or(0, |d| d.as_secs()); let exp = claims.exp.ok_or(JwtError::MissingClaim("exp"))?; if now > exp + self.clock_skew_secs { diff --git a/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs b/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs index 522bee8..a2e9568 100644 --- a/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs +++ b/crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs @@ -225,8 +225,7 @@ impl FederatedJwtAuthProvider { let now = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) - .map(|d| d.as_secs()) - .unwrap_or(0); + .map_or(0, |d| d.as_secs()); let clock_skew = issuer_state .config diff --git a/crates/mqtt5/src/broker/quic_acceptor.rs b/crates/mqtt5/src/broker/quic_acceptor.rs index dd79754..b7e31a3 100644 --- a/crates/mqtt5/src/broker/quic_acceptor.rs +++ b/crates/mqtt5/src/broker/quic_acceptor.rs @@ -650,17 +650,17 @@ fn spawn_datagram_reader( label, peer_addr ); - match decode_datagram_packet(&datagram) { - Some(Ok(packet)) => { - if packet_tx.send((packet, None)).await.is_err() { - debug!("Datagram packet channel closed for {}", peer_addr); - break; - } - } + let packet = match decode_datagram_packet(&datagram) { + Some(Ok(packet)) => packet, Some(Err(e)) => { warn!("Failed to decode datagram from {}: {}", peer_addr, e); + continue; } - None => {} + None => continue, + }; + if packet_tx.send((packet, None)).await.is_err() { + debug!("Datagram packet channel closed for {}", peer_addr); + break; } } Err(e) => { diff --git a/crates/mqtt5/src/broker/storage/file_backend.rs b/crates/mqtt5/src/broker/storage/file_backend.rs index 49b57f3..ae4bb50 100644 --- a/crates/mqtt5/src/broker/storage/file_backend.rs +++ b/crates/mqtt5/src/broker/storage/file_backend.rs @@ -320,10 +320,7 @@ impl FileBackend { .map_err(|e| MqttError::Io(format!("Failed to read directory entry: {e}")))? { let path = entry.path(); - let is_file = fs::metadata(&path) - .await - .map(|m| m.is_file()) - .unwrap_or(false); + let is_file = fs::metadata(&path).await.is_ok_and(|m| m.is_file()); if is_file && path.extension().is_some_and(|ext| ext == extension) { files.push(path); } @@ -338,10 +335,7 @@ impl FileBackend { if let Ok(mut inflight_entries) = fs::read_dir(&self.inflight_dir).await { while let Ok(Some(entry)) = inflight_entries.next_entry().await { let client_dir = entry.path(); - let is_dir = fs::metadata(&client_dir) - .await - .map(|m| m.is_dir()) - .unwrap_or(false); + let is_dir = fs::metadata(&client_dir).await.is_ok_and(|m| m.is_dir()); if is_dir { let files = self.list_files(&client_dir, "json").await?; for file_path in files { @@ -665,10 +659,7 @@ impl StorageBackend for FileBackend { .map_err(|e| MqttError::Io(format!("Failed to read queue entry: {e}")))? { let client_dir = entry.path(); - let is_dir = fs::metadata(&client_dir) - .await - .map(|m| m.is_dir()) - .unwrap_or(false); + let is_dir = fs::metadata(&client_dir).await.is_ok_and(|m| m.is_dir()); if is_dir { let queue_files = self.list_files(&client_dir, "json").await?; for file_path in queue_files { diff --git a/crates/mqtt5/src/client/auth_handlers/scram.rs b/crates/mqtt5/src/client/auth_handlers/scram.rs index e0c0772..03db0da 100644 --- a/crates/mqtt5/src/client/auth_handlers/scram.rs +++ b/crates/mqtt5/src/client/auth_handlers/scram.rs @@ -272,7 +272,7 @@ mod tests { assert!(msg.starts_with("c=biws,r=")); assert!(msg.contains(",p=")); } - other => panic!("Expected Continue, got {other:?}",), + other => panic!("Expected Continue, got {other:?}"), } } @@ -314,7 +314,7 @@ mod tests { AuthResponse::Abort(msg) => { assert!(msg.contains("Server signature verification failed")); } - other => panic!("Expected Abort, got {other:?}",), + other => panic!("Expected Abort, got {other:?}"), } } @@ -338,7 +338,7 @@ mod tests { AuthResponse::Abort(msg) => { assert!(msg.contains("nonce")); } - other => panic!("Expected Abort, got {other:?}",), + other => panic!("Expected Abort, got {other:?}"), } } } diff --git a/crates/mqtt5/src/client/direct/reader.rs b/crates/mqtt5/src/client/direct/reader.rs index 549aa29..b3d2ad8 100644 --- a/crates/mqtt5/src/client/direct/reader.rs +++ b/crates/mqtt5/src/client/direct/reader.rs @@ -108,24 +108,21 @@ pub(super) async fn packet_reader_task_with_responses( continue; } } - Packet::PubRec(pubrec) => { - if pubrec.reason_code.is_error() { - tracing::debug!( - packet_id = pubrec.packet_id, - reason_code = ?pubrec.reason_code, - "QoS 2 PUBREC rejected" - ); - if let Some(tx) = ctx.pubcomp_channels.lock().remove(&pubrec.packet_id) - { - let _ = tx.send(pubrec.reason_code); - } - ctx.session - .write() - .await - .remove_unacked_publish(pubrec.packet_id) - .await; - continue; + Packet::PubRec(pubrec) if pubrec.reason_code.is_error() => { + tracing::debug!( + packet_id = pubrec.packet_id, + reason_code = ?pubrec.reason_code, + "QoS 2 PUBREC rejected" + ); + if let Some(tx) = ctx.pubcomp_channels.lock().remove(&pubrec.packet_id) { + let _ = tx.send(pubrec.reason_code); } + ctx.session + .write() + .await + .remove_unacked_publish(pubrec.packet_id) + .await; + continue; } Packet::PubComp(pubcomp) => { if let Some(tx) = ctx.pubcomp_channels.lock().remove(&pubcomp.packet_id) { diff --git a/crates/mqtt5/src/client/error_recovery.rs b/crates/mqtt5/src/client/error_recovery.rs index 1be84d8..cb0ba09 100644 --- a/crates/mqtt5/src/client/error_recovery.rs +++ b/crates/mqtt5/src/client/error_recovery.rs @@ -201,7 +201,7 @@ mod tests { ); assert_eq!( retry_delay(RecoverableError::QuotaExceeded, 0, &config), - Duration::from_millis(1000) + Duration::from_secs(1) ); } diff --git a/crates/mqttv5-cli/src/commands/bench_cmd.rs b/crates/mqttv5-cli/src/commands/bench_cmd.rs index 2589b8b..4fb864e 100644 --- a/crates/mqttv5-cli/src/commands/bench_cmd.rs +++ b/crates/mqttv5-cli/src/commands/bench_cmd.rs @@ -1453,7 +1453,9 @@ async fn run_hol_blocking(cmd: BenchCommand) -> Result<()> { let per_topic_interval_us = if cmd.rate > 0 { #[allow(clippy::cast_possible_truncation)] - let interval = 1_000_000u64 * (num_topics as u64) / cmd.rate; + let interval = (1_000_000u64 * (num_topics as u64)) + .checked_div(cmd.rate) + .unwrap_or(0); Some(interval) } else { None