Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/push/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
//! Prioritised queues used for asynchronously pushing frames to a connection.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Elevate module docs with an example and deprecation note

Document the builder-centric API here with a short example and call out deprecated constructors to meet the public API documentation guideline.

Apply:

-//! Prioritised queues used for asynchronously pushing frames to a connection.
+//! Prioritised queues used for asynchronously pushing frames to a connection.
+//!
+//! Example
+//! -------
+//! Build queues and a handle using the builder:
+//!
+//! ```ignore
+//! use wireframe::push::PushQueues;
+//!
+//! let (queues, handle) = PushQueues::<u8>::builder()
+//!     .high_capacity(8)
+//!     .low_capacity(8)
+//!     .build()
+//!     .expect("failed to build push queues");
+//! ```
+//!
+//! Note: legacy `bounded*` constructors are deprecated; use `PushQueues::builder()` instead.
🤖 Prompt for AI Agents
In src/push/mod.rs around lines 1 to 1, augment the module-level documentation
to include a short builder-centric example and a deprecation note: add a doc
example showing how to construct PushQueues using PushQueues::<T>::builder()
with high_capacity, low_capacity and build() (including expect), and append a
single-line note that legacy bounded* constructors are deprecated and users
should use PushQueues::builder() instead; ensure the example is fenced as a Rust
doc test and the deprecation note is concise and placed in the module docs.


mod queues;

pub use queues::*;
273 changes: 155 additions & 118 deletions src/push.rs → src/push/queues.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
//! Prioritised queues used for asynchronously pushing frames to a connection.
//! Queue management used by [`PushHandle`] and [`PushQueues`].
//!
//! `PushQueues` maintain separate high- and low-priority channels so
//! background tasks can send messages without blocking the request/response
//! cycle. Producers interact with these queues through a cloneable
//! [`PushHandle`]. Queued frames are delivered in FIFO order within each
//! priority level. An optional rate limiter caps throughput at
//! [`MAX_PUSH_RATE`] pushes per second.
//! Provides the core implementation for prioritised queues delivering frames
//! to a connection. Background tasks can send messages without blocking the
//! request/response cycle. Frames maintain FIFO order within each priority
//! level. An optional rate limiter caps throughput at [`MAX_PUSH_RATE`] pushes
//! per second.

Comment on lines +1 to 8
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Split this module; file length exceeds the 400-line guideline.

Extract builder + errors into submodules (e.g., push/queues/{builder.rs, errors.rs, handle.rs}) and re-export from mod.rs.

use std::{
sync::{Arc, Weak},
Expand All @@ -27,7 +26,7 @@ impl<T> FrameLike for T where T: Send + 'static {}
// Default maximum pushes per second when no custom rate is specified.
// This is an internal implementation detail and may change.
const DEFAULT_PUSH_RATE: usize = 100;
/// Highest supported rate for [`PushQueues::bounded_with_rate`].
/// Highest supported rate for [`PushQueuesBuilder::rate`].
pub const MAX_PUSH_RATE: usize = 10_000;

// Compile-time guard: DEFAULT_PUSH_RATE must not exceed MAX_PUSH_RATE.
Expand Down Expand Up @@ -145,7 +144,12 @@ impl<F: FrameLike> PushHandle<F> {
///
/// #[tokio::test]
/// async fn example() {
/// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1));
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(Some(1))
/// .build()
/// .unwrap();
/// handle.push_high_priority(42u8).await.unwrap();
Comment on lines +147 to 153
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Use expect in doctest.

Avoid unwrap in examples per house style.

-    ///         .build()
-    ///         .unwrap();
+    ///         .build()
+    ///         .expect("failed to build queues");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(Some(1))
/// .build()
/// .unwrap();
/// handle.push_high_priority(42u8).await.unwrap();
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(Some(1))
/// .build()
/// .expect("failed to build queues");
/// handle.push_high_priority(42u8).await.unwrap();
🤖 Prompt for AI Agents
In src/push/queues.rs around lines 147 to 153, the doctest uses unwrap() which
violates house style; replace unwrap() calls with expect(...) that include
brief, clear failure messages (e.g., "failed to build PushQueues" and "failed to
push_high_priority") so the example still panics with useful context while
avoiding unwrap().

/// let (priority, frame) = queues.recv().await.unwrap();
/// assert_eq!(priority, PushPriority::High);
Expand All @@ -171,7 +175,12 @@ impl<F: FrameLike> PushHandle<F> {
///
/// #[tokio::test]
/// async fn example() {
/// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1));
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(Some(1))
/// .build()
/// .unwrap();
/// handle.push_low_priority(10u8).await.unwrap();
Comment on lines +178 to 184
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Use expect in doctest.

-    ///         .build()
-    ///         .unwrap();
+    ///         .build()
+    ///         .expect("failed to build queues");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(Some(1))
/// .build()
/// .unwrap();
/// handle.push_low_priority(10u8).await.unwrap();
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(Some(1))
/// .build()
/// .expect("failed to build queues");
/// handle.push_low_priority(10u8).await.unwrap();
🤖 Prompt for AI Agents
In src/push/queues.rs around lines 178 to 184, the doctest uses unwrap() which
is discouraged; replace unwrap() calls with expect(...) that include clear
messages (e.g., .build().expect("failed to build PushQueues") and
.await.expect("push_low_priority failed")) so test failures give helpful
context; update both unwraps in the snippet accordingly.

/// let (priority, frame) = queues.recv().await.unwrap();
/// assert_eq!(priority, PushPriority::Low);
Expand Down Expand Up @@ -219,8 +228,13 @@ impl<F: FrameLike> PushHandle<F> {
/// #[tokio::test]
/// async fn example() {
/// let (dlq_tx, mut dlq_rx) = mpsc::channel(1);
/// let (mut queues, handle) =
/// PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap();
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(None)
/// .dlq(Some(dlq_tx))
/// .build()
/// .unwrap();
Comment on lines +231 to 237
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Use expect in doctest.

-    ///         .build()
-    ///         .unwrap();
+    ///         .build()
+    ///         .expect("failed to build queues");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(None)
/// .dlq(Some(dlq_tx))
/// .build()
/// .unwrap();
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .rate(None)
/// .dlq(Some(dlq_tx))
/// .build()
/// .expect("failed to build queues");
🤖 Prompt for AI Agents
In src/push/queues.rs around lines 231 to 237, the doctest currently ends with
.build().unwrap(); — replace the unwrap() with expect(...) to provide a clear
failure message in tests; update the line to use .build().expect("failed to
build PushQueues in doctest") so the doctest fails with a helpful message
instead of panicking without context.

/// handle.push_high_priority(1u8).await.unwrap();
///
/// handle
Expand Down Expand Up @@ -276,159 +290,178 @@ pub struct PushQueues<F> {
pub(crate) low_priority_rx: mpsc::Receiver<F>,
}

/// Builder for [`PushQueues`].
///
/// Allows configuration of queue capacities, rate limiting and an optional
/// dead-letter queue before constructing [`PushQueues`] and its paired
/// [`PushHandle`]. Defaults mirror the previous constructors: both queues have
/// a capacity of one and pushes are limited to [`DEFAULT_PUSH_RATE`] per second
/// unless overridden.
pub struct PushQueuesBuilder<F> {
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
}

impl<F: FrameLike> PushQueuesBuilder<F> {
fn new() -> Self {
Self {
high_capacity: 1,
low_capacity: 1,
rate: Some(DEFAULT_PUSH_RATE),
dlq: None,
}
}

/// Set the capacity of the high-priority queue.
#[must_use]
pub fn high_capacity(mut self, capacity: usize) -> Self {
self.high_capacity = capacity;
self
}

/// Set the capacity of the low-priority queue.
#[must_use]
pub fn low_capacity(mut self, capacity: usize) -> Self {
self.low_capacity = capacity;
self
}

/// Set the global push rate limit in pushes per second.
#[must_use]
pub fn rate(mut self, rate: Option<usize>) -> Self {
self.rate = rate;
self
}

/// Provide a dead-letter queue for discarded frames.
#[must_use]
pub fn dlq(mut self, dlq: Option<mpsc::Sender<F>>) -> Self {
self.dlq = dlq;
self
}

/// Build the configured [`PushQueues`] and associated [`PushHandle`].
///
/// # Errors
///
/// Returns [`PushConfigError::InvalidRate`] if the rate is zero or above
/// [`MAX_PUSH_RATE`].
pub fn build(self) -> Result<(PushQueues<F>, PushHandle<F>), PushConfigError> {
PushQueues::build_with_rate_dlq(self.high_capacity, self.low_capacity, self.rate, self.dlq)
}
}

impl<F: FrameLike> PushQueues<F> {
/// Start building a new set of push queues.
#[must_use]
pub fn builder() -> PushQueuesBuilder<F> { PushQueuesBuilder::new() }

fn build_with_rate_dlq(
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) {
// Reject unsupported rates early to avoid building queues that cannot
// be used. The bounds prevent runaway resource consumption.
return Err(PushConfigError::InvalidRate(r));
}
Comment on lines +361 to +371
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Prevent runtime panic: validate capacities ≥ 1 and return a config error instead.

tokio::sync::mpsc::channel(0) panics; adhere to “Handle errors with Result instead of panicking”.

Apply:

 pub fn build(self) -> Result<(PushQueues<F>, PushHandle<F>), PushConfigError> {
     PushQueues::build_with_rate_dlq(self.high_capacity, self.low_capacity, self.rate, self.dlq)
 }
@@
     ) -> Result<(Self, PushHandle<F>), PushConfigError> {
-        if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) {
+        if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) {
             // Reject unsupported rates early to avoid building queues that cannot
             // be used. The bounds prevent runaway resource consumption.
             return Err(PushConfigError::InvalidRate(r));
         }
+        if high_capacity == 0 || low_capacity == 0 {
+            return Err(PushConfigError::InvalidCapacity { high: high_capacity, low: low_capacity });
+        }

And extend the error type + display:

 pub enum PushConfigError {
     /// The provided rate was zero or exceeded [`MAX_PUSH_RATE`].
     InvalidRate(usize),
+    /// Queue capacities must be ≥ 1.
+    InvalidCapacity { high: usize, low: usize },
 }
@@
         match self {
             Self::InvalidRate(r) => {
                 write!(f, "invalid rate {r}; must be between 1 and {MAX_PUSH_RATE}")
             }
+            Self::InvalidCapacity { high, low } => {
+                write!(f, "invalid capacities; high={high}, low={low}; each must be ≥ 1")
+            }
         }

Update the build() docs:

-    /// Returns [`PushConfigError::InvalidRate`] if the rate is zero or above
-    /// [`MAX_PUSH_RATE`].
+    /// Returns [`PushConfigError::InvalidRate`] if the rate is zero or above
+    /// [`MAX_PUSH_RATE`], or [`PushConfigError::InvalidCapacity`] if any capacity is zero.
🤖 Prompt for AI Agents
In src/push/queues.rs around lines 361 to 371, the builder currently allows
capacity values that can be zero which would cause tokio::sync::mpsc::channel(0)
to panic; add validation ahead of creating channels to ensure both high_capacity
and low_capacity are >= 1 and return Err(PushConfigError::InvalidCapacity {
field: ..., value: ... }) instead of proceeding. Update the PushConfigError enum
to include an InvalidCapacity variant (with which field and value), update its
Display/Debug formatting to include a clear human-readable message, and update
the build() docs comment to state capacities must be >= 1; ensure the new error
variant is used where capacities are validated and that no code path calls
mpsc::channel with 0.

let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
let limiter = rate.map(|r| {
RateLimiter::builder()
.initial(r)
.refill(r)
.interval(Duration::from_secs(1))
.max(r)
.build()
});
let inner = PushHandleInner {
high_prio_tx: high_tx,
low_prio_tx: low_tx,
limiter,
dlq_tx: dlq,
};
Ok((
Self {
high_priority_rx: high_rx,
low_priority_rx: low_rx,
},
PushHandle(Arc::new(inner)),
))
}

/// Create a new set of queues with the specified bounds for each priority
/// and return them along with a [`PushHandle`] for producers.
///
/// # Examples
///
/// ```rust,no_run
/// use wireframe::push::{PushPriority, PushQueues};
///
/// #[tokio::test]
/// async fn example() {
/// let (mut queues, handle) = PushQueues::<u8>::bounded(1, 1);
/// handle.push_high_priority(7u8).await.unwrap();
/// let (priority, frame) = queues.recv().await.unwrap();
/// assert_eq!(priority, PushPriority::High);
/// assert_eq!(frame, 7);
/// }
/// ```
///
/// # Panics
///
/// Panics if an internal invariant is violated. This should never occur.
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
#[must_use]
pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) {
Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None)
Self::builder()
.high_capacity(high_capacity)
.low_capacity(low_capacity)
.build()
.expect("DEFAULT_PUSH_RATE is always valid")
}

/// Create queues with no rate limiting.
///
/// # Examples
///
/// ```rust,no_run
/// use wireframe::push::PushQueues;
///
/// let (_queues, handle) = PushQueues::<u8>::bounded_no_rate_limit(1, 1);
/// let _ = handle;
/// ```
///
/// # Panics
///
/// Panics if an internal invariant is violated. This should never occur.
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
#[must_use]
pub fn bounded_no_rate_limit(
high_capacity: usize,
low_capacity: usize,
) -> (Self, PushHandle<F>) {
// `bounded_with_rate_dlq` only fails when given an invalid rate. Passing
// `None` disables rate limiting entirely so the call is infallible. The
// debug assertion guards against future regressions.
let result = Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None);
debug_assert!(result.is_ok(), "bounded_no_rate_limit should not fail");
result.expect("bounded_no_rate_limit should not fail")
Self::builder()
.high_capacity(high_capacity)
.low_capacity(low_capacity)
.rate(None)
.build()
.expect("bounded_no_rate_limit should not fail")
}

/// Create queues with a custom rate limit in pushes per second.
///
/// The limiter enforces fairness by allowing at most `rate` pushes
/// per second across all producers for the returned [`PushHandle`].
/// Pass `None` to disable rate limiting entirely.
///
/// # Errors
///
/// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater
/// than [`MAX_PUSH_RATE`].
///
/// # Examples
///
/// ```rust,no_run
/// use wireframe::push::PushQueues;
///
/// #[tokio::main]
/// async fn main() {
/// let (mut queues, handle) = PushQueues::<u8>::bounded_with_rate(1, 1, Some(10)).unwrap();
/// handle.push_low_priority(1u8).await.unwrap();
/// let (_prio, frame) = queues.recv().await.unwrap();
/// assert_eq!(frame, 1);
/// }
/// ```
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
pub fn bounded_with_rate(
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None)
Self::builder()
.high_capacity(high_capacity)
.low_capacity(low_capacity)
.rate(rate)
.build()
}

/// Create queues with a custom rate limit and optional dead letter queue.
///
/// Frames that would be dropped by [`try_push`](PushHandle::try_push) when
/// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`]
/// are routed to `dlq` if provided.
///
/// # Errors
///
/// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater
/// than [`MAX_PUSH_RATE`].
///
/// # Examples
///
/// ```rust,no_run
/// use tokio::sync::mpsc;
/// use wireframe::push::{PushPolicy, PushPriority, PushQueues};
///
/// #[tokio::main]
/// async fn main() {
/// let (dlq_tx, mut dlq_rx) = mpsc::channel(1);
/// let (mut queues, handle) =
/// PushQueues::<u8>::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap();
/// handle.push_high_priority(1u8).await.unwrap();
/// handle
/// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull)
/// .unwrap();
///
/// let (_, val) = queues.recv().await.unwrap();
/// assert_eq!(val, 1);
/// assert_eq!(dlq_rx.recv().await.unwrap(), 2);
/// }
/// ```
#[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")]
pub fn bounded_with_rate_dlq(
high_capacity: usize,
low_capacity: usize,
rate: Option<usize>,
dlq: Option<mpsc::Sender<F>>,
) -> Result<(Self, PushHandle<F>), PushConfigError> {
if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) {
// Reject unsupported rates early to avoid building queues that cannot
// be used. The bounds prevent runaway resource consumption.
return Err(PushConfigError::InvalidRate(r));
}
let (high_tx, high_rx) = mpsc::channel(high_capacity);
let (low_tx, low_rx) = mpsc::channel(low_capacity);
let limiter = rate.map(|r| {
RateLimiter::builder()
.initial(r)
.refill(r)
.interval(Duration::from_secs(1))
.max(r)
.build()
});
let inner = PushHandleInner {
high_prio_tx: high_tx,
low_prio_tx: low_tx,
limiter,
dlq_tx: dlq,
};
Ok((
Self {
high_priority_rx: high_rx,
low_priority_rx: low_rx,
},
PushHandle(Arc::new(inner)),
))
Self::build_with_rate_dlq(high_capacity, low_capacity, rate, dlq)
}

/// Receive the next frame, preferring high priority frames when available.
Expand All @@ -442,7 +475,11 @@ impl<F: FrameLike> PushQueues<F> {
///
/// #[tokio::test]
/// async fn example() {
/// let (mut queues, handle) = PushQueues::bounded(1, 1);
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .build()
/// .unwrap();
/// handle.push_high_priority(2u8).await.unwrap();
Comment on lines +478 to 483
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Use expect in doctest.

-    ///         .build()
-    ///         .unwrap();
+    ///         .build()
+    ///         .expect("failed to build queues");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .build()
/// .unwrap();
/// handle.push_high_priority(2u8).await.unwrap();
/// let (mut queues, handle) = PushQueues::builder()
/// .high_capacity(1)
/// .low_capacity(1)
/// .build()
/// .expect("failed to build queues");
/// handle.push_high_priority(2u8).await.unwrap();
🤖 Prompt for AI Agents
In src/push/queues.rs around lines 478 to 483, the doctest uses unwrap() which
can produce unhelpful panic messages; replace unwrap() calls in the example with
expect("descriptive message") (e.g., on build result and on push_high_priority)
so the doctest fails with clear context — update both unwrap() occurrences to
expect(...) with brief, specific messages.

/// let (priority, frame) = queues.recv().await.unwrap();
/// assert_eq!(priority, PushPriority::High);
Expand All @@ -467,7 +504,7 @@ impl<F: FrameLike> PushQueues<F> {
/// ```rust,no_run
/// use wireframe::push::PushQueues;
///
/// let (mut queues, _handle) = PushQueues::<u8>::bounded(1, 1);
/// let (mut queues, _handle) = PushQueues::<u8>::builder().build().unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

Use expect in doctest.

-    /// let (mut queues, _handle) = PushQueues::<u8>::builder().build().unwrap();
+    /// let (mut queues, _handle) = PushQueues::<u8>::builder()
+    ///     .build()
+    ///     .expect("failed to build queues");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/// let (mut queues, _handle) = PushQueues::<u8>::builder().build().unwrap();
/// let (mut queues, _handle) = PushQueues::<u8>::builder()
/// .build()
/// .expect("failed to build queues");
🤖 Prompt for AI Agents
In src/push/queues.rs around line 507, the doctest currently uses unwrap() which
gives poor failure diagnostics; replace the unwrap() call with expect("failed to
build PushQueues in doctest") so test failures include a clear message — update
the example line to call .expect(...) with a brief descriptive string.

/// queues.close();
/// ```
pub fn close(&mut self) {
Expand Down
3 changes: 2 additions & 1 deletion tests/advanced/concurrency_loom.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ fn concurrent_push_delivery() {
.expect("failed to build tokio runtime");

rt.block_on(async {
let (queues, handle) = PushQueues::bounded(1, 1);
let (queues, handle) =
PushQueues::builder().high_capacity(1).low_capacity(1).build().unwrap();
Comment on lines +24 to +25
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick (assertive)

🛠️ Refactor suggestion

Replace unwrap() with expect() and make the generic explicit

Eliminate Clippy’s unwrap_used and align with the guideline “Prefer .expect() over .unwrap()”. Make the type parameter explicit for consistency across tests.

-            let (queues, handle) =
-                PushQueues::builder().high_capacity(1).low_capacity(1).build().unwrap();
+            let (queues, handle) =
+                PushQueues::<u8>::builder().high_capacity(1).low_capacity(1).build().expect("failed to build push queues");
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
let (queues, handle) =
PushQueues::builder().high_capacity(1).low_capacity(1).build().unwrap();
let (queues, handle) =
PushQueues::<u8>::builder().high_capacity(1).low_capacity(1).build().expect("failed to build push queues");
🤖 Prompt for AI Agents
In tests/advanced/concurrency_loom.rs around lines 24–25, replace the call that
uses .unwrap() on
PushQueues::builder().high_capacity(1).low_capacity(1).build().unwrap() with an
explicit generic on PushQueues (e.g. PushQueues::<YourItemType>::builder() — use
the item type used elsewhere in this test) and replace .unwrap() with
.expect("failed to build PushQueues") so Clippy unwrap_used is eliminated and
the type parameter is explicit and consistent with other tests.

let token = CancellationToken::new();

let out = loom::sync::Arc::new(loom::sync::Mutex::new(Vec::new()));
Expand Down
Loading
Loading