Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,22 @@ All notable changes to this project will be documented in this file.
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).

## [mqtt5-wasm 1.3.1] - 2026-05-15

### Fixed

- **Retained message properties dropped (wasm broker)** - pulls in the `mqtt5 0.31.5` fix for issue #77. The wasm broker uses the same `broker::storage::RetainedMessage` as the native broker and shared the same v5-property loss on retained delivery to late subscribers; transitively fixed by the upstream change.

## [mqtt5 0.31.5] - 2026-05-15

### Fixed

- **Retained message properties dropped** - `response_topic`, `correlation_data`, `content_type`, `user_properties`, and `payload_format_indicator` were stripped when a retained message was stored and never restored on delivery to a late subscriber (issue #77). `broker::storage::RetainedMessage` had no fields for them; added the fields (each `#[serde(default)]` for file-backend back-compat) and routed extraction/restoration through a shared `V5PublishProps` helper now used by both `RetainedMessage` and `InflightMessage`.

### Deprecated

- **`session::retained::{RetainedMessage, RetainedMessageStore}` and `SessionState::{store_retained_message, get_retained_messages, retained_messages}`** - the session-level retained store is unused by the broker (the broker uses `broker::storage::RetainedMessage`). Scheduled for removal in 0.32.0.

## [mqttv5-cli 0.27.2] - 2026-04-13

### Fixed
Expand Down
2 changes: 1 addition & 1 deletion crates/mqtt5-wasm/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqtt5-wasm"
version = "1.3.0"
version = "1.3.1"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
2 changes: 1 addition & 1 deletion crates/mqtt5-wasm/src/broker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -324,7 +324,7 @@ impl WasmBroker {

let stats = Arc::new(BrokerStats::new());

let max_clients = config.read().map(|c| c.max_clients).unwrap_or(1000);
let max_clients = config.read().map_or(1000, |c| c.max_clients);
let limits = ResourceLimits {
max_connections: max_clients,
..Default::default()
Expand Down
2 changes: 1 addition & 1 deletion crates/mqtt5/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "mqtt5"
version = "0.31.4"
version = "0.31.5"
edition.workspace = true
rust-version.workspace = true
authors.workspace = true
Expand Down
3 changes: 1 addition & 2 deletions crates/mqtt5/src/broker/auth_mechanisms/jwt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,8 +166,7 @@ impl JwtAuthProvider {

let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
.map_or(0, |d| d.as_secs());

let exp = claims.exp.ok_or(JwtError::MissingClaim("exp"))?;
if now > exp + self.clock_skew_secs {
Expand Down
3 changes: 1 addition & 2 deletions crates/mqtt5/src/broker/auth_mechanisms/jwt_federated.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,8 +225,7 @@ impl FederatedJwtAuthProvider {

let now = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
.map_or(0, |d| d.as_secs());

let clock_skew = issuer_state
.config
Expand Down
16 changes: 8 additions & 8 deletions crates/mqtt5/src/broker/quic_acceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,17 +650,17 @@ fn spawn_datagram_reader(
label,
peer_addr
);
match decode_datagram_packet(&datagram) {
Some(Ok(packet)) => {
if packet_tx.send((packet, None)).await.is_err() {
debug!("Datagram packet channel closed for {}", peer_addr);
break;
}
}
let packet = match decode_datagram_packet(&datagram) {
Some(Ok(packet)) => packet,
Some(Err(e)) => {
warn!("Failed to decode datagram from {}: {}", peer_addr, e);
continue;
}
None => {}
None => continue,
};
if packet_tx.send((packet, None)).await.is_err() {
debug!("Datagram packet channel closed for {}", peer_addr);
break;
}
}
Err(e) => {
Expand Down
15 changes: 3 additions & 12 deletions crates/mqtt5/src/broker/storage/file_backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,7 @@ impl FileBackend {
.map_err(|e| MqttError::Io(format!("Failed to read directory entry: {e}")))?
{
let path = entry.path();
let is_file = fs::metadata(&path)
.await
.map(|m| m.is_file())
.unwrap_or(false);
let is_file = fs::metadata(&path).await.is_ok_and(|m| m.is_file());
if is_file && path.extension().is_some_and(|ext| ext == extension) {
files.push(path);
}
Expand All @@ -338,10 +335,7 @@ impl FileBackend {
if let Ok(mut inflight_entries) = fs::read_dir(&self.inflight_dir).await {
while let Ok(Some(entry)) = inflight_entries.next_entry().await {
let client_dir = entry.path();
let is_dir = fs::metadata(&client_dir)
.await
.map(|m| m.is_dir())
.unwrap_or(false);
let is_dir = fs::metadata(&client_dir).await.is_ok_and(|m| m.is_dir());
if is_dir {
let files = self.list_files(&client_dir, "json").await?;
for file_path in files {
Expand Down Expand Up @@ -665,10 +659,7 @@ impl StorageBackend for FileBackend {
.map_err(|e| MqttError::Io(format!("Failed to read queue entry: {e}")))?
{
let client_dir = entry.path();
let is_dir = fs::metadata(&client_dir)
.await
.map(|m| m.is_dir())
.unwrap_or(false);
let is_dir = fs::metadata(&client_dir).await.is_ok_and(|m| m.is_dir());
if is_dir {
let queue_files = self.list_files(&client_dir, "json").await?;
for file_path in queue_files {
Expand Down
Loading
Loading