From ae5890247343c910c0e821de36d16cd276746f7b Mon Sep 17 00:00:00 2001 From: Leynos Date: Wed, 27 Aug 2025 13:29:28 +0100 Subject: [PATCH] Introduce builder for push queues --- src/push/mod.rs | 5 + src/{push.rs => push/queues.rs} | 273 ++++++++++++++++------------- tests/advanced/concurrency_loom.rs | 3 +- tests/advanced/interaction_fuzz.rs | 6 +- tests/async_stream.rs | 6 +- tests/connection_actor.rs | 20 ++- tests/correlation_id.rs | 6 +- tests/push.rs | 57 ++++-- tests/push_policies.rs | 20 ++- tests/session_registry.rs | 8 +- tests/stream_end.rs | 12 +- tests/wireframe_protocol.rs | 12 +- tests/world.rs | 12 +- 13 files changed, 293 insertions(+), 147 deletions(-) create mode 100644 src/push/mod.rs rename src/{push.rs => push/queues.rs} (74%) diff --git a/src/push/mod.rs b/src/push/mod.rs new file mode 100644 index 00000000..5b1b547f --- /dev/null +++ b/src/push/mod.rs @@ -0,0 +1,5 @@ +//! Prioritised queues used for asynchronously pushing frames to a connection. + +mod queues; + +pub use queues::*; diff --git a/src/push.rs b/src/push/queues.rs similarity index 74% rename from src/push.rs rename to src/push/queues.rs index 317f5250..d9698b20 100644 --- a/src/push.rs +++ b/src/push/queues.rs @@ -1,11 +1,10 @@ -//! Prioritised queues used for asynchronously pushing frames to a connection. +//! Queue management used by [`PushHandle`] and [`PushQueues`]. //! -//! `PushQueues` maintain separate high- and low-priority channels so -//! background tasks can send messages without blocking the request/response -//! cycle. Producers interact with these queues through a cloneable -//! [`PushHandle`]. Queued frames are delivered in FIFO order within each -//! priority level. An optional rate limiter caps throughput at -//! [`MAX_PUSH_RATE`] pushes per second. +//! Provides the core implementation for prioritised queues delivering frames +//! to a connection. Background tasks can send messages without blocking the +//! request/response cycle. Frames maintain FIFO order within each priority +//! level. An optional rate limiter caps throughput at [`MAX_PUSH_RATE`] pushes +//! per second. use std::{ sync::{Arc, Weak}, @@ -27,7 +26,7 @@ impl FrameLike for T where T: Send + 'static {} // Default maximum pushes per second when no custom rate is specified. // This is an internal implementation detail and may change. const DEFAULT_PUSH_RATE: usize = 100; -/// Highest supported rate for [`PushQueues::bounded_with_rate`]. +/// Highest supported rate for [`PushQueuesBuilder::rate`]. pub const MAX_PUSH_RATE: usize = 10_000; // Compile-time guard: DEFAULT_PUSH_RATE must not exceed MAX_PUSH_RATE. @@ -145,7 +144,12 @@ impl PushHandle { /// /// #[tokio::test] /// async fn example() { - /// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1)); + /// let (mut queues, handle) = PushQueues::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .rate(Some(1)) + /// .build() + /// .unwrap(); /// handle.push_high_priority(42u8).await.unwrap(); /// let (priority, frame) = queues.recv().await.unwrap(); /// assert_eq!(priority, PushPriority::High); @@ -171,7 +175,12 @@ impl PushHandle { /// /// #[tokio::test] /// async fn example() { - /// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1)); + /// let (mut queues, handle) = PushQueues::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .rate(Some(1)) + /// .build() + /// .unwrap(); /// handle.push_low_priority(10u8).await.unwrap(); /// let (priority, frame) = queues.recv().await.unwrap(); /// assert_eq!(priority, PushPriority::Low); @@ -219,8 +228,13 @@ impl PushHandle { /// #[tokio::test] /// async fn example() { /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - /// let (mut queues, handle) = - /// PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); + /// let (mut queues, handle) = PushQueues::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .rate(None) + /// .dlq(Some(dlq_tx)) + /// .build() + /// .unwrap(); /// handle.push_high_priority(1u8).await.unwrap(); /// /// handle @@ -276,159 +290,178 @@ pub struct PushQueues { pub(crate) low_priority_rx: mpsc::Receiver, } +/// Builder for [`PushQueues`]. +/// +/// Allows configuration of queue capacities, rate limiting and an optional +/// dead-letter queue before constructing [`PushQueues`] and its paired +/// [`PushHandle`]. Defaults mirror the previous constructors: both queues have +/// a capacity of one and pushes are limited to [`DEFAULT_PUSH_RATE`] per second +/// unless overridden. +pub struct PushQueuesBuilder { + high_capacity: usize, + low_capacity: usize, + rate: Option, + dlq: Option>, +} + +impl PushQueuesBuilder { + fn new() -> Self { + Self { + high_capacity: 1, + low_capacity: 1, + rate: Some(DEFAULT_PUSH_RATE), + dlq: None, + } + } + + /// Set the capacity of the high-priority queue. + #[must_use] + pub fn high_capacity(mut self, capacity: usize) -> Self { + self.high_capacity = capacity; + self + } + + /// Set the capacity of the low-priority queue. + #[must_use] + pub fn low_capacity(mut self, capacity: usize) -> Self { + self.low_capacity = capacity; + self + } + + /// Set the global push rate limit in pushes per second. + #[must_use] + pub fn rate(mut self, rate: Option) -> Self { + self.rate = rate; + self + } + + /// Provide a dead-letter queue for discarded frames. + #[must_use] + pub fn dlq(mut self, dlq: Option>) -> Self { + self.dlq = dlq; + self + } + + /// Build the configured [`PushQueues`] and associated [`PushHandle`]. + /// + /// # Errors + /// + /// Returns [`PushConfigError::InvalidRate`] if the rate is zero or above + /// [`MAX_PUSH_RATE`]. + pub fn build(self) -> Result<(PushQueues, PushHandle), PushConfigError> { + PushQueues::build_with_rate_dlq(self.high_capacity, self.low_capacity, self.rate, self.dlq) + } +} + impl PushQueues { + /// Start building a new set of push queues. + #[must_use] + pub fn builder() -> PushQueuesBuilder { PushQueuesBuilder::new() } + + fn build_with_rate_dlq( + high_capacity: usize, + low_capacity: usize, + rate: Option, + dlq: Option>, + ) -> Result<(Self, PushHandle), PushConfigError> { + if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) { + // Reject unsupported rates early to avoid building queues that cannot + // be used. The bounds prevent runaway resource consumption. + return Err(PushConfigError::InvalidRate(r)); + } + let (high_tx, high_rx) = mpsc::channel(high_capacity); + let (low_tx, low_rx) = mpsc::channel(low_capacity); + let limiter = rate.map(|r| { + RateLimiter::builder() + .initial(r) + .refill(r) + .interval(Duration::from_secs(1)) + .max(r) + .build() + }); + let inner = PushHandleInner { + high_prio_tx: high_tx, + low_prio_tx: low_tx, + limiter, + dlq_tx: dlq, + }; + Ok(( + Self { + high_priority_rx: high_rx, + low_priority_rx: low_rx, + }, + PushHandle(Arc::new(inner)), + )) + } + /// Create a new set of queues with the specified bounds for each priority /// and return them along with a [`PushHandle`] for producers. /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::{PushPriority, PushQueues}; - /// - /// #[tokio::test] - /// async fn example() { - /// let (mut queues, handle) = PushQueues::::bounded(1, 1); - /// handle.push_high_priority(7u8).await.unwrap(); - /// let (priority, frame) = queues.recv().await.unwrap(); - /// assert_eq!(priority, PushPriority::High); - /// assert_eq!(frame, 7); - /// } - /// ``` - /// /// # Panics /// /// Panics if an internal invariant is violated. This should never occur. + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] #[must_use] pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle) { - Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None) + Self::builder() + .high_capacity(high_capacity) + .low_capacity(low_capacity) + .build() .expect("DEFAULT_PUSH_RATE is always valid") } /// Create queues with no rate limiting. /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::PushQueues; - /// - /// let (_queues, handle) = PushQueues::::bounded_no_rate_limit(1, 1); - /// let _ = handle; - /// ``` - /// /// # Panics /// /// Panics if an internal invariant is violated. This should never occur. + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] #[must_use] pub fn bounded_no_rate_limit( high_capacity: usize, low_capacity: usize, ) -> (Self, PushHandle) { - // `bounded_with_rate_dlq` only fails when given an invalid rate. Passing - // `None` disables rate limiting entirely so the call is infallible. The - // debug assertion guards against future regressions. - let result = Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None); - debug_assert!(result.is_ok(), "bounded_no_rate_limit should not fail"); - result.expect("bounded_no_rate_limit should not fail") + Self::builder() + .high_capacity(high_capacity) + .low_capacity(low_capacity) + .rate(None) + .build() + .expect("bounded_no_rate_limit should not fail") } /// Create queues with a custom rate limit in pushes per second. /// - /// The limiter enforces fairness by allowing at most `rate` pushes - /// per second across all producers for the returned [`PushHandle`]. - /// Pass `None` to disable rate limiting entirely. - /// /// # Errors /// /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater /// than [`MAX_PUSH_RATE`]. - /// - /// # Examples - /// - /// ```rust,no_run - /// use wireframe::push::PushQueues; - /// - /// #[tokio::main] - /// async fn main() { - /// let (mut queues, handle) = PushQueues::::bounded_with_rate(1, 1, Some(10)).unwrap(); - /// handle.push_low_priority(1u8).await.unwrap(); - /// let (_prio, frame) = queues.recv().await.unwrap(); - /// assert_eq!(frame, 1); - /// } - /// ``` + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] pub fn bounded_with_rate( high_capacity: usize, low_capacity: usize, rate: Option, ) -> Result<(Self, PushHandle), PushConfigError> { - Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None) + Self::builder() + .high_capacity(high_capacity) + .low_capacity(low_capacity) + .rate(rate) + .build() } /// Create queues with a custom rate limit and optional dead letter queue. /// - /// Frames that would be dropped by [`try_push`](PushHandle::try_push) when - /// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`] - /// are routed to `dlq` if provided. - /// /// # Errors /// /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater /// than [`MAX_PUSH_RATE`]. - /// - /// # Examples - /// - /// ```rust,no_run - /// use tokio::sync::mpsc; - /// use wireframe::push::{PushPolicy, PushPriority, PushQueues}; - /// - /// #[tokio::main] - /// async fn main() { - /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - /// let (mut queues, handle) = - /// PushQueues::::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); - /// handle.push_high_priority(1u8).await.unwrap(); - /// handle - /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) - /// .unwrap(); - /// - /// let (_, val) = queues.recv().await.unwrap(); - /// assert_eq!(val, 1); - /// assert_eq!(dlq_rx.recv().await.unwrap(), 2); - /// } - /// ``` + #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] pub fn bounded_with_rate_dlq( high_capacity: usize, low_capacity: usize, rate: Option, dlq: Option>, ) -> Result<(Self, PushHandle), PushConfigError> { - if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) { - // Reject unsupported rates early to avoid building queues that cannot - // be used. The bounds prevent runaway resource consumption. - return Err(PushConfigError::InvalidRate(r)); - } - let (high_tx, high_rx) = mpsc::channel(high_capacity); - let (low_tx, low_rx) = mpsc::channel(low_capacity); - let limiter = rate.map(|r| { - RateLimiter::builder() - .initial(r) - .refill(r) - .interval(Duration::from_secs(1)) - .max(r) - .build() - }); - let inner = PushHandleInner { - high_prio_tx: high_tx, - low_prio_tx: low_tx, - limiter, - dlq_tx: dlq, - }; - Ok(( - Self { - high_priority_rx: high_rx, - low_priority_rx: low_rx, - }, - PushHandle(Arc::new(inner)), - )) + Self::build_with_rate_dlq(high_capacity, low_capacity, rate, dlq) } /// Receive the next frame, preferring high priority frames when available. @@ -442,7 +475,11 @@ impl PushQueues { /// /// #[tokio::test] /// async fn example() { - /// let (mut queues, handle) = PushQueues::bounded(1, 1); + /// let (mut queues, handle) = PushQueues::builder() + /// .high_capacity(1) + /// .low_capacity(1) + /// .build() + /// .unwrap(); /// handle.push_high_priority(2u8).await.unwrap(); /// let (priority, frame) = queues.recv().await.unwrap(); /// assert_eq!(priority, PushPriority::High); @@ -467,7 +504,7 @@ impl PushQueues { /// ```rust,no_run /// use wireframe::push::PushQueues; /// - /// let (mut queues, _handle) = PushQueues::::bounded(1, 1); + /// let (mut queues, _handle) = PushQueues::::builder().build().unwrap(); /// queues.close(); /// ``` pub fn close(&mut self) { diff --git a/tests/advanced/concurrency_loom.rs b/tests/advanced/concurrency_loom.rs index ff997bad..cfbfca5c 100644 --- a/tests/advanced/concurrency_loom.rs +++ b/tests/advanced/concurrency_loom.rs @@ -21,7 +21,8 @@ fn concurrent_push_delivery() { .expect("failed to build tokio runtime"); rt.block_on(async { - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = + PushQueues::builder().high_capacity(1).low_capacity(1).build().unwrap(); let token = CancellationToken::new(); let out = loom::sync::Arc::new(loom::sync::Mutex::new(Vec::new())); diff --git a/tests/advanced/interaction_fuzz.rs b/tests/advanced/interaction_fuzz.rs index 99d71632..3b4f2e13 100644 --- a/tests/advanced/interaction_fuzz.rs +++ b/tests/advanced/interaction_fuzz.rs @@ -23,7 +23,11 @@ enum Action { } async fn run_actions(actions: &[Action]) -> Vec { - let (queues, handle) = PushQueues::bounded(16, 16); + let (queues, handle) = PushQueues::builder() + .high_capacity(16) + .low_capacity(16) + .build() + .unwrap(); let shutdown = CancellationToken::new(); let mut stream: Option> = None; diff --git a/tests/async_stream.rs b/tests/async_stream.rs index 86f1482f..b7510c74 100644 --- a/tests/async_stream.rs +++ b/tests/async_stream.rs @@ -23,7 +23,11 @@ fn frame_stream() -> impl futures::Stream> { #[rstest] #[tokio::test] async fn async_stream_frames_processed_in_order() { - let (queues, handle) = PushQueues::::bounded(8, 8); + let (queues, handle) = PushQueues::::builder() + .high_capacity(8) + .low_capacity(8) + .build() + .unwrap(); let shutdown = CancellationToken::new(); let stream: FrameStream = Box::pin(frame_stream()); diff --git a/tests/connection_actor.rs b/tests/connection_actor.rs index ef1dbe79..86feb047 100644 --- a/tests/connection_actor.rs +++ b/tests/connection_actor.rs @@ -22,7 +22,13 @@ use wireframe_testing::push_expect; unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn queues() -> (PushQueues, wireframe::push::PushHandle) { PushQueues::bounded(8, 8) } +fn queues() -> (PushQueues, wireframe::push::PushHandle) { + PushQueues::builder() + .high_capacity(8) + .low_capacity(8) + .build() + .unwrap() +} #[fixture] #[allow( @@ -380,7 +386,11 @@ async fn interleaved_shutdown_during_stream( #[tokio::test] #[serial] async fn push_queue_exhaustion_backpressure() { - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); push_expect!(handle.push_high_priority(1), "push high-priority"); let blocked = timeout(Duration::from_millis(50), handle.push_high_priority(2)).await; @@ -467,7 +477,11 @@ async fn graceful_shutdown_waits_for_tasks() { let mut handles = Vec::new(); for _ in 0..5 { - let (queues, handle) = PushQueues::::bounded(1, 1); + let (queues, handle) = PushQueues::::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); let mut actor: ConnectionActor<_, ()> = ConnectionActor::new(queues, handle.clone(), None, token.clone()); handles.push(handle); diff --git a/tests/correlation_id.rs b/tests/correlation_id.rs index e515473b..f6ae785d 100644 --- a/tests/correlation_id.rs +++ b/tests/correlation_id.rs @@ -15,7 +15,11 @@ async fn stream_frames_carry_request_correlation_id() { yield Envelope::new(1, Some(cid), vec![1]); yield Envelope::new(1, Some(cid), vec![2]); }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); let shutdown = CancellationToken::new(); let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); let mut out = Vec::new(); diff --git a/tests/push.rs b/tests/push.rs index ed3f991b..e9103756 100644 --- a/tests/push.rs +++ b/tests/push.rs @@ -10,7 +10,11 @@ use wireframe_testing::{push_expect, recv_expect}; /// Frames are delivered to queues matching their push priority. #[tokio::test] async fn frames_routed_to_correct_priority_queues() { - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); push_expect!(handle.push_low_priority(1u8)); push_expect!(handle.push_high_priority(2u8)); @@ -30,7 +34,11 @@ async fn frames_routed_to_correct_priority_queues() { /// return `PushError::Full` once the queue is at capacity. #[tokio::test] async fn try_push_respects_policy() { - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); push_expect!(handle.push_high_priority(1u8)); let result = handle.try_push(2u8, PushPriority::High, PushPolicy::ReturnErrorIfFull); @@ -46,7 +54,11 @@ async fn try_push_respects_policy() { /// Push attempts return `Closed` when all queues have been shut down. #[tokio::test] async fn push_queues_error_on_closed() { - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); let mut queues = queues; queues.close(); @@ -66,8 +78,12 @@ async fn push_queues_error_on_closed() { #[tokio::test] async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(2) + .low_capacity(2) + .rate(Some(1)) + .build() + .expect("queue creation failed"); match priority { PushPriority::High => push_expect!(handle.push_high_priority(1u8)), @@ -100,8 +116,12 @@ async fn rate_limiter_blocks_when_exceeded(#[case] priority: PushPriority) { #[tokio::test] async fn rate_limiter_allows_after_wait() { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(2) + .low_capacity(2) + .rate(Some(1)) + .build() + .expect("queue creation failed"); push_expect!(handle.push_high_priority(1u8)); time::advance(Duration::from_secs(1)).await; push_expect!(handle.push_high_priority(2u8)); @@ -117,8 +137,12 @@ async fn rate_limiter_allows_after_wait() { #[tokio::test] async fn rate_limiter_shared_across_priorities() { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(2, 2, Some(1)).expect("queue creation failed"); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(2) + .low_capacity(2) + .rate(Some(1)) + .build() + .expect("queue creation failed"); push_expect!(handle.push_high_priority(1u8)); let attempt = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; @@ -139,7 +163,12 @@ async fn rate_limiter_shared_across_priorities() { #[tokio::test] async fn unlimited_queues_do_not_block() { time::pause(); - let (mut queues, handle) = PushQueues::bounded_no_rate_limit(1, 1); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .rate(None) + .build() + .unwrap(); push_expect!(handle.push_high_priority(1u8)); let res = time::timeout(Duration::from_millis(10), handle.push_low_priority(2u8)).await; assert!(res.is_ok(), "pushes should not block when unlimited"); @@ -154,8 +183,12 @@ async fn unlimited_queues_do_not_block() { #[tokio::test] async fn rate_limiter_allows_burst_within_capacity_and_blocks_excess() { time::pause(); - let (mut queues, handle) = - PushQueues::bounded_with_rate(4, 4, Some(3)).expect("queue creation failed"); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(4) + .low_capacity(4) + .rate(Some(3)) + .build() + .expect("queue creation failed"); for i in 0u8..3 { push_expect!(handle.push_high_priority(i)); diff --git a/tests/push_policies.rs b/tests/push_policies.rs index 68bfec99..0c5f5a1b 100644 --- a/tests/push_policies.rs +++ b/tests/push_policies.rs @@ -38,7 +38,11 @@ fn push_policy_behaviour( ) { rt.block_on(async { while logger.pop().is_some() {} - let (mut queues, handle) = PushQueues::bounded(1, 1); + let (mut queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); handle .push_high_priority(1u8) @@ -76,7 +80,12 @@ fn push_policy_behaviour( fn dropped_frame_goes_to_dlq(rt: Runtime) { rt.block_on(async { let (dlq_tx, mut dlq_rx) = mpsc::channel(1); - let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)) + let (mut queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .rate(None) + .dlq(Some(dlq_tx)) + .build() .expect("queue creation failed"); handle @@ -135,7 +144,12 @@ fn dlq_error_scenarios( let (dlq_tx, dlq_rx) = mpsc::channel(1); let mut dlq_rx = Some(dlq_rx); setup(&dlq_tx, &mut dlq_rx); - let (mut queues, handle) = PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)) + let (mut queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .rate(None) + .dlq(Some(dlq_tx)) + .build() .expect("queue creation failed"); handle diff --git a/tests/session_registry.rs b/tests/session_registry.rs index 12cede5e..d04d0419 100644 --- a/tests/session_registry.rs +++ b/tests/session_registry.rs @@ -17,7 +17,13 @@ fn registry() -> SessionRegistry { SessionRegistry::default() } unused_braces, reason = "rustc false positive for single line rstest fixtures" )] -fn push_setup() -> (PushQueues, PushHandle) { PushQueues::bounded(1, 1) } +fn push_setup() -> (PushQueues, PushHandle) { + PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap() +} /// Test that handles can be retrieved whilst the connection remains alive. #[rstest] diff --git a/tests/stream_end.rs b/tests/stream_end.rs index 383e2b72..57cbedea 100644 --- a/tests/stream_end.rs +++ b/tests/stream_end.rs @@ -23,7 +23,11 @@ async fn emits_end_frame() { yield 2; }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); let shutdown = CancellationToken::new(); let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); @@ -51,7 +55,11 @@ async fn emits_no_end_frame_when_none() { yield 8; }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); let shutdown = CancellationToken::new(); let hooks = ProtocolHooks::from_protocol(&Arc::new(NoTerminator)); let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks); diff --git a/tests/wireframe_protocol.rs b/tests/wireframe_protocol.rs index 774cb79e..7a66d77a 100644 --- a/tests/wireframe_protocol.rs +++ b/tests/wireframe_protocol.rs @@ -56,7 +56,11 @@ async fn builder_produces_protocol_hooks() { .with_protocol(protocol); let mut hooks = app.protocol_hooks(); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); hooks.on_connection_setup(handle, &mut ConnectionContext); drop(queues); // silence unused warnings @@ -80,7 +84,11 @@ async fn connection_actor_uses_protocol_from_builder() { .with_protocol(protocol); let hooks = app.protocol_hooks(); - let (queues, handle) = PushQueues::bounded(8, 8); + let (queues, handle) = PushQueues::builder() + .high_capacity(8) + .low_capacity(8) + .build() + .unwrap(); handle .push_high_priority(vec![1]) .await diff --git a/tests/world.rs b/tests/world.rs index 9162a876..37623363 100644 --- a/tests/world.rs +++ b/tests/world.rs @@ -142,7 +142,11 @@ impl CorrelationWorld { yield Envelope::new(1, Some(cid), vec![1]); yield Envelope::new(1, Some(cid), vec![2]); }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); let shutdown = CancellationToken::new(); let mut actor = ConnectionActor::new(queues, handle, Some(stream), shutdown); actor.run(&mut self.frames).await.expect("actor run failed"); @@ -180,7 +184,11 @@ impl StreamEndWorld { yield 2u8; }); - let (queues, handle) = PushQueues::bounded(1, 1); + let (queues, handle) = PushQueues::builder() + .high_capacity(1) + .low_capacity(1) + .build() + .unwrap(); let shutdown = CancellationToken::new(); let hooks = ProtocolHooks::from_protocol(&Arc::new(Terminator)); let mut actor = ConnectionActor::with_hooks(queues, handle, Some(stream), shutdown, hooks);