-
Notifications
You must be signed in to change notification settings - Fork 0
Add builder for push queues #335
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| //! Prioritised queues used for asynchronously pushing frames to a connection. | ||
|
|
||
| mod queues; | ||
|
|
||
| pub use queues::*; | ||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -1,11 +1,10 @@ | ||||||||||||||||||||||||||||||
| //! Prioritised queues used for asynchronously pushing frames to a connection. | ||||||||||||||||||||||||||||||
| //! Queue management used by [`PushHandle`] and [`PushQueues`]. | ||||||||||||||||||||||||||||||
| //! | ||||||||||||||||||||||||||||||
| //! `PushQueues` maintain separate high- and low-priority channels so | ||||||||||||||||||||||||||||||
| //! background tasks can send messages without blocking the request/response | ||||||||||||||||||||||||||||||
| //! cycle. Producers interact with these queues through a cloneable | ||||||||||||||||||||||||||||||
| //! [`PushHandle`]. Queued frames are delivered in FIFO order within each | ||||||||||||||||||||||||||||||
| //! priority level. An optional rate limiter caps throughput at | ||||||||||||||||||||||||||||||
| //! [`MAX_PUSH_RATE`] pushes per second. | ||||||||||||||||||||||||||||||
| //! Provides the core implementation for prioritised queues delivering frames | ||||||||||||||||||||||||||||||
| //! to a connection. Background tasks can send messages without blocking the | ||||||||||||||||||||||||||||||
| //! request/response cycle. Frames maintain FIFO order within each priority | ||||||||||||||||||||||||||||||
| //! level. An optional rate limiter caps throughput at [`MAX_PUSH_RATE`] pushes | ||||||||||||||||||||||||||||||
| //! per second. | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
|
Comment on lines
+1
to
8
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Split this module; file length exceeds the 400-line guideline. Extract builder + errors into submodules (e.g., push/queues/{builder.rs, errors.rs, handle.rs}) and re-export from mod.rs. |
||||||||||||||||||||||||||||||
| use std::{ | ||||||||||||||||||||||||||||||
| sync::{Arc, Weak}, | ||||||||||||||||||||||||||||||
|
|
@@ -27,7 +26,7 @@ impl<T> FrameLike for T where T: Send + 'static {} | |||||||||||||||||||||||||||||
| // Default maximum pushes per second when no custom rate is specified. | ||||||||||||||||||||||||||||||
| // This is an internal implementation detail and may change. | ||||||||||||||||||||||||||||||
| const DEFAULT_PUSH_RATE: usize = 100; | ||||||||||||||||||||||||||||||
| /// Highest supported rate for [`PushQueues::bounded_with_rate`]. | ||||||||||||||||||||||||||||||
| /// Highest supported rate for [`PushQueuesBuilder::rate`]. | ||||||||||||||||||||||||||||||
| pub const MAX_PUSH_RATE: usize = 10_000; | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| // Compile-time guard: DEFAULT_PUSH_RATE must not exceed MAX_PUSH_RATE. | ||||||||||||||||||||||||||||||
|
|
@@ -145,7 +144,12 @@ impl<F: FrameLike> PushHandle<F> { | |||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// #[tokio::test] | ||||||||||||||||||||||||||||||
| /// async fn example() { | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1)); | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::builder() | ||||||||||||||||||||||||||||||
| /// .high_capacity(1) | ||||||||||||||||||||||||||||||
| /// .low_capacity(1) | ||||||||||||||||||||||||||||||
| /// .rate(Some(1)) | ||||||||||||||||||||||||||||||
| /// .build() | ||||||||||||||||||||||||||||||
| /// .unwrap(); | ||||||||||||||||||||||||||||||
| /// handle.push_high_priority(42u8).await.unwrap(); | ||||||||||||||||||||||||||||||
|
Comment on lines
+147
to
153
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Use expect in doctest. Avoid unwrap in examples per house style. - /// .build()
- /// .unwrap();
+ /// .build()
+ /// .expect("failed to build queues");📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| /// let (priority, frame) = queues.recv().await.unwrap(); | ||||||||||||||||||||||||||||||
| /// assert_eq!(priority, PushPriority::High); | ||||||||||||||||||||||||||||||
|
|
@@ -171,7 +175,12 @@ impl<F: FrameLike> PushHandle<F> { | |||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// #[tokio::test] | ||||||||||||||||||||||||||||||
| /// async fn example() { | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::bounded_with_rate(1, 1, Some(1)); | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::builder() | ||||||||||||||||||||||||||||||
| /// .high_capacity(1) | ||||||||||||||||||||||||||||||
| /// .low_capacity(1) | ||||||||||||||||||||||||||||||
| /// .rate(Some(1)) | ||||||||||||||||||||||||||||||
| /// .build() | ||||||||||||||||||||||||||||||
| /// .unwrap(); | ||||||||||||||||||||||||||||||
| /// handle.push_low_priority(10u8).await.unwrap(); | ||||||||||||||||||||||||||||||
|
Comment on lines
+178
to
184
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Use expect in doctest. - /// .build()
- /// .unwrap();
+ /// .build()
+ /// .expect("failed to build queues");📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| /// let (priority, frame) = queues.recv().await.unwrap(); | ||||||||||||||||||||||||||||||
| /// assert_eq!(priority, PushPriority::Low); | ||||||||||||||||||||||||||||||
|
|
@@ -219,8 +228,13 @@ impl<F: FrameLike> PushHandle<F> { | |||||||||||||||||||||||||||||
| /// #[tokio::test] | ||||||||||||||||||||||||||||||
| /// async fn example() { | ||||||||||||||||||||||||||||||
| /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = | ||||||||||||||||||||||||||||||
| /// PushQueues::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::builder() | ||||||||||||||||||||||||||||||
| /// .high_capacity(1) | ||||||||||||||||||||||||||||||
| /// .low_capacity(1) | ||||||||||||||||||||||||||||||
| /// .rate(None) | ||||||||||||||||||||||||||||||
| /// .dlq(Some(dlq_tx)) | ||||||||||||||||||||||||||||||
| /// .build() | ||||||||||||||||||||||||||||||
| /// .unwrap(); | ||||||||||||||||||||||||||||||
|
Comment on lines
+231
to
237
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Use expect in doctest. - /// .build()
- /// .unwrap();
+ /// .build()
+ /// .expect("failed to build queues");📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| /// handle.push_high_priority(1u8).await.unwrap(); | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// handle | ||||||||||||||||||||||||||||||
|
|
@@ -276,159 +290,178 @@ pub struct PushQueues<F> { | |||||||||||||||||||||||||||||
| pub(crate) low_priority_rx: mpsc::Receiver<F>, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Builder for [`PushQueues`]. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// Allows configuration of queue capacities, rate limiting and an optional | ||||||||||||||||||||||||||||||
| /// dead-letter queue before constructing [`PushQueues`] and its paired | ||||||||||||||||||||||||||||||
| /// [`PushHandle`]. Defaults mirror the previous constructors: both queues have | ||||||||||||||||||||||||||||||
| /// a capacity of one and pushes are limited to [`DEFAULT_PUSH_RATE`] per second | ||||||||||||||||||||||||||||||
| /// unless overridden. | ||||||||||||||||||||||||||||||
| pub struct PushQueuesBuilder<F> { | ||||||||||||||||||||||||||||||
| high_capacity: usize, | ||||||||||||||||||||||||||||||
| low_capacity: usize, | ||||||||||||||||||||||||||||||
| rate: Option<usize>, | ||||||||||||||||||||||||||||||
| dlq: Option<mpsc::Sender<F>>, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| impl<F: FrameLike> PushQueuesBuilder<F> { | ||||||||||||||||||||||||||||||
| fn new() -> Self { | ||||||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||||||
| high_capacity: 1, | ||||||||||||||||||||||||||||||
| low_capacity: 1, | ||||||||||||||||||||||||||||||
| rate: Some(DEFAULT_PUSH_RATE), | ||||||||||||||||||||||||||||||
| dlq: None, | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Set the capacity of the high-priority queue. | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| pub fn high_capacity(mut self, capacity: usize) -> Self { | ||||||||||||||||||||||||||||||
| self.high_capacity = capacity; | ||||||||||||||||||||||||||||||
| self | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Set the capacity of the low-priority queue. | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| pub fn low_capacity(mut self, capacity: usize) -> Self { | ||||||||||||||||||||||||||||||
| self.low_capacity = capacity; | ||||||||||||||||||||||||||||||
| self | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Set the global push rate limit in pushes per second. | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| pub fn rate(mut self, rate: Option<usize>) -> Self { | ||||||||||||||||||||||||||||||
| self.rate = rate; | ||||||||||||||||||||||||||||||
| self | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Provide a dead-letter queue for discarded frames. | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| pub fn dlq(mut self, dlq: Option<mpsc::Sender<F>>) -> Self { | ||||||||||||||||||||||||||||||
| self.dlq = dlq; | ||||||||||||||||||||||||||||||
| self | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Build the configured [`PushQueues`] and associated [`PushHandle`]. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Errors | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// Returns [`PushConfigError::InvalidRate`] if the rate is zero or above | ||||||||||||||||||||||||||||||
| /// [`MAX_PUSH_RATE`]. | ||||||||||||||||||||||||||||||
| pub fn build(self) -> Result<(PushQueues<F>, PushHandle<F>), PushConfigError> { | ||||||||||||||||||||||||||||||
| PushQueues::build_with_rate_dlq(self.high_capacity, self.low_capacity, self.rate, self.dlq) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| impl<F: FrameLike> PushQueues<F> { | ||||||||||||||||||||||||||||||
| /// Start building a new set of push queues. | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| pub fn builder() -> PushQueuesBuilder<F> { PushQueuesBuilder::new() } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| fn build_with_rate_dlq( | ||||||||||||||||||||||||||||||
| high_capacity: usize, | ||||||||||||||||||||||||||||||
| low_capacity: usize, | ||||||||||||||||||||||||||||||
| rate: Option<usize>, | ||||||||||||||||||||||||||||||
| dlq: Option<mpsc::Sender<F>>, | ||||||||||||||||||||||||||||||
| ) -> Result<(Self, PushHandle<F>), PushConfigError> { | ||||||||||||||||||||||||||||||
| if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) { | ||||||||||||||||||||||||||||||
| // Reject unsupported rates early to avoid building queues that cannot | ||||||||||||||||||||||||||||||
| // be used. The bounds prevent runaway resource consumption. | ||||||||||||||||||||||||||||||
| return Err(PushConfigError::InvalidRate(r)); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
Comment on lines
+361
to
+371
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Prevent runtime panic: validate capacities ≥ 1 and return a config error instead. tokio::sync::mpsc::channel(0) panics; adhere to “Handle errors with Result instead of panicking”. Apply: pub fn build(self) -> Result<(PushQueues<F>, PushHandle<F>), PushConfigError> {
PushQueues::build_with_rate_dlq(self.high_capacity, self.low_capacity, self.rate, self.dlq)
}
@@
) -> Result<(Self, PushHandle<F>), PushConfigError> {
- if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) {
+ if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) {
// Reject unsupported rates early to avoid building queues that cannot
// be used. The bounds prevent runaway resource consumption.
return Err(PushConfigError::InvalidRate(r));
}
+ if high_capacity == 0 || low_capacity == 0 {
+ return Err(PushConfigError::InvalidCapacity { high: high_capacity, low: low_capacity });
+ }And extend the error type + display: pub enum PushConfigError {
/// The provided rate was zero or exceeded [`MAX_PUSH_RATE`].
InvalidRate(usize),
+ /// Queue capacities must be ≥ 1.
+ InvalidCapacity { high: usize, low: usize },
}
@@
match self {
Self::InvalidRate(r) => {
write!(f, "invalid rate {r}; must be between 1 and {MAX_PUSH_RATE}")
}
+ Self::InvalidCapacity { high, low } => {
+ write!(f, "invalid capacities; high={high}, low={low}; each must be ≥ 1")
+ }
}Update the build() docs: - /// Returns [`PushConfigError::InvalidRate`] if the rate is zero or above
- /// [`MAX_PUSH_RATE`].
+ /// Returns [`PushConfigError::InvalidRate`] if the rate is zero or above
+ /// [`MAX_PUSH_RATE`], or [`PushConfigError::InvalidCapacity`] if any capacity is zero.🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| let (high_tx, high_rx) = mpsc::channel(high_capacity); | ||||||||||||||||||||||||||||||
| let (low_tx, low_rx) = mpsc::channel(low_capacity); | ||||||||||||||||||||||||||||||
| let limiter = rate.map(|r| { | ||||||||||||||||||||||||||||||
| RateLimiter::builder() | ||||||||||||||||||||||||||||||
| .initial(r) | ||||||||||||||||||||||||||||||
| .refill(r) | ||||||||||||||||||||||||||||||
| .interval(Duration::from_secs(1)) | ||||||||||||||||||||||||||||||
| .max(r) | ||||||||||||||||||||||||||||||
| .build() | ||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||
| let inner = PushHandleInner { | ||||||||||||||||||||||||||||||
| high_prio_tx: high_tx, | ||||||||||||||||||||||||||||||
| low_prio_tx: low_tx, | ||||||||||||||||||||||||||||||
| limiter, | ||||||||||||||||||||||||||||||
| dlq_tx: dlq, | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
| Ok(( | ||||||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||||||
| high_priority_rx: high_rx, | ||||||||||||||||||||||||||||||
| low_priority_rx: low_rx, | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| PushHandle(Arc::new(inner)), | ||||||||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Create a new set of queues with the specified bounds for each priority | ||||||||||||||||||||||||||||||
| /// and return them along with a [`PushHandle`] for producers. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Examples | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// ```rust,no_run | ||||||||||||||||||||||||||||||
| /// use wireframe::push::{PushPriority, PushQueues}; | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// #[tokio::test] | ||||||||||||||||||||||||||||||
| /// async fn example() { | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::<u8>::bounded(1, 1); | ||||||||||||||||||||||||||||||
| /// handle.push_high_priority(7u8).await.unwrap(); | ||||||||||||||||||||||||||||||
| /// let (priority, frame) = queues.recv().await.unwrap(); | ||||||||||||||||||||||||||||||
| /// assert_eq!(priority, PushPriority::High); | ||||||||||||||||||||||||||||||
| /// assert_eq!(frame, 7); | ||||||||||||||||||||||||||||||
| /// } | ||||||||||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Panics | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// Panics if an internal invariant is violated. This should never occur. | ||||||||||||||||||||||||||||||
| #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| pub fn bounded(high_capacity: usize, low_capacity: usize) -> (Self, PushHandle<F>) { | ||||||||||||||||||||||||||||||
| Self::bounded_with_rate_dlq(high_capacity, low_capacity, Some(DEFAULT_PUSH_RATE), None) | ||||||||||||||||||||||||||||||
| Self::builder() | ||||||||||||||||||||||||||||||
| .high_capacity(high_capacity) | ||||||||||||||||||||||||||||||
| .low_capacity(low_capacity) | ||||||||||||||||||||||||||||||
| .build() | ||||||||||||||||||||||||||||||
| .expect("DEFAULT_PUSH_RATE is always valid") | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Create queues with no rate limiting. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Examples | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// ```rust,no_run | ||||||||||||||||||||||||||||||
| /// use wireframe::push::PushQueues; | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// let (_queues, handle) = PushQueues::<u8>::bounded_no_rate_limit(1, 1); | ||||||||||||||||||||||||||||||
| /// let _ = handle; | ||||||||||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Panics | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// Panics if an internal invariant is violated. This should never occur. | ||||||||||||||||||||||||||||||
| #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] | ||||||||||||||||||||||||||||||
| #[must_use] | ||||||||||||||||||||||||||||||
| pub fn bounded_no_rate_limit( | ||||||||||||||||||||||||||||||
| high_capacity: usize, | ||||||||||||||||||||||||||||||
| low_capacity: usize, | ||||||||||||||||||||||||||||||
| ) -> (Self, PushHandle<F>) { | ||||||||||||||||||||||||||||||
| // `bounded_with_rate_dlq` only fails when given an invalid rate. Passing | ||||||||||||||||||||||||||||||
| // `None` disables rate limiting entirely so the call is infallible. The | ||||||||||||||||||||||||||||||
| // debug assertion guards against future regressions. | ||||||||||||||||||||||||||||||
| let result = Self::bounded_with_rate_dlq(high_capacity, low_capacity, None, None); | ||||||||||||||||||||||||||||||
| debug_assert!(result.is_ok(), "bounded_no_rate_limit should not fail"); | ||||||||||||||||||||||||||||||
| result.expect("bounded_no_rate_limit should not fail") | ||||||||||||||||||||||||||||||
| Self::builder() | ||||||||||||||||||||||||||||||
| .high_capacity(high_capacity) | ||||||||||||||||||||||||||||||
| .low_capacity(low_capacity) | ||||||||||||||||||||||||||||||
| .rate(None) | ||||||||||||||||||||||||||||||
| .build() | ||||||||||||||||||||||||||||||
| .expect("bounded_no_rate_limit should not fail") | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Create queues with a custom rate limit in pushes per second. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// The limiter enforces fairness by allowing at most `rate` pushes | ||||||||||||||||||||||||||||||
| /// per second across all producers for the returned [`PushHandle`]. | ||||||||||||||||||||||||||||||
| /// Pass `None` to disable rate limiting entirely. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Errors | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater | ||||||||||||||||||||||||||||||
| /// than [`MAX_PUSH_RATE`]. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Examples | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// ```rust,no_run | ||||||||||||||||||||||||||||||
| /// use wireframe::push::PushQueues; | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// #[tokio::main] | ||||||||||||||||||||||||||||||
| /// async fn main() { | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::<u8>::bounded_with_rate(1, 1, Some(10)).unwrap(); | ||||||||||||||||||||||||||||||
| /// handle.push_low_priority(1u8).await.unwrap(); | ||||||||||||||||||||||||||||||
| /// let (_prio, frame) = queues.recv().await.unwrap(); | ||||||||||||||||||||||||||||||
| /// assert_eq!(frame, 1); | ||||||||||||||||||||||||||||||
| /// } | ||||||||||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||||||||||
| #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] | ||||||||||||||||||||||||||||||
| pub fn bounded_with_rate( | ||||||||||||||||||||||||||||||
| high_capacity: usize, | ||||||||||||||||||||||||||||||
| low_capacity: usize, | ||||||||||||||||||||||||||||||
| rate: Option<usize>, | ||||||||||||||||||||||||||||||
| ) -> Result<(Self, PushHandle<F>), PushConfigError> { | ||||||||||||||||||||||||||||||
| Self::bounded_with_rate_dlq(high_capacity, low_capacity, rate, None) | ||||||||||||||||||||||||||||||
| Self::builder() | ||||||||||||||||||||||||||||||
| .high_capacity(high_capacity) | ||||||||||||||||||||||||||||||
| .low_capacity(low_capacity) | ||||||||||||||||||||||||||||||
| .rate(rate) | ||||||||||||||||||||||||||||||
| .build() | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Create queues with a custom rate limit and optional dead letter queue. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// Frames that would be dropped by [`try_push`](PushHandle::try_push) when | ||||||||||||||||||||||||||||||
| /// using [`PushPolicy::DropIfFull`] or [`PushPolicy::WarnAndDropIfFull`] | ||||||||||||||||||||||||||||||
| /// are routed to `dlq` if provided. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Errors | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// Returns [`PushConfigError::InvalidRate`] if `rate` is zero or greater | ||||||||||||||||||||||||||||||
| /// than [`MAX_PUSH_RATE`]. | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// # Examples | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// ```rust,no_run | ||||||||||||||||||||||||||||||
| /// use tokio::sync::mpsc; | ||||||||||||||||||||||||||||||
| /// use wireframe::push::{PushPolicy, PushPriority, PushQueues}; | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// #[tokio::main] | ||||||||||||||||||||||||||||||
| /// async fn main() { | ||||||||||||||||||||||||||||||
| /// let (dlq_tx, mut dlq_rx) = mpsc::channel(1); | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = | ||||||||||||||||||||||||||||||
| /// PushQueues::<u8>::bounded_with_rate_dlq(1, 1, None, Some(dlq_tx)).unwrap(); | ||||||||||||||||||||||||||||||
| /// handle.push_high_priority(1u8).await.unwrap(); | ||||||||||||||||||||||||||||||
| /// handle | ||||||||||||||||||||||||||||||
| /// .try_push(2u8, PushPriority::High, PushPolicy::DropIfFull) | ||||||||||||||||||||||||||||||
| /// .unwrap(); | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// let (_, val) = queues.recv().await.unwrap(); | ||||||||||||||||||||||||||||||
| /// assert_eq!(val, 1); | ||||||||||||||||||||||||||||||
| /// assert_eq!(dlq_rx.recv().await.unwrap(), 2); | ||||||||||||||||||||||||||||||
| /// } | ||||||||||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||||||||||
| #[deprecated(since = "0.1.0", note = "Use `PushQueues::builder` instead")] | ||||||||||||||||||||||||||||||
| pub fn bounded_with_rate_dlq( | ||||||||||||||||||||||||||||||
| high_capacity: usize, | ||||||||||||||||||||||||||||||
| low_capacity: usize, | ||||||||||||||||||||||||||||||
| rate: Option<usize>, | ||||||||||||||||||||||||||||||
| dlq: Option<mpsc::Sender<F>>, | ||||||||||||||||||||||||||||||
| ) -> Result<(Self, PushHandle<F>), PushConfigError> { | ||||||||||||||||||||||||||||||
| if let Some(r) = rate.filter(|r| *r == 0 || *r > MAX_PUSH_RATE) { | ||||||||||||||||||||||||||||||
| // Reject unsupported rates early to avoid building queues that cannot | ||||||||||||||||||||||||||||||
| // be used. The bounds prevent runaway resource consumption. | ||||||||||||||||||||||||||||||
| return Err(PushConfigError::InvalidRate(r)); | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
| let (high_tx, high_rx) = mpsc::channel(high_capacity); | ||||||||||||||||||||||||||||||
| let (low_tx, low_rx) = mpsc::channel(low_capacity); | ||||||||||||||||||||||||||||||
| let limiter = rate.map(|r| { | ||||||||||||||||||||||||||||||
| RateLimiter::builder() | ||||||||||||||||||||||||||||||
| .initial(r) | ||||||||||||||||||||||||||||||
| .refill(r) | ||||||||||||||||||||||||||||||
| .interval(Duration::from_secs(1)) | ||||||||||||||||||||||||||||||
| .max(r) | ||||||||||||||||||||||||||||||
| .build() | ||||||||||||||||||||||||||||||
| }); | ||||||||||||||||||||||||||||||
| let inner = PushHandleInner { | ||||||||||||||||||||||||||||||
| high_prio_tx: high_tx, | ||||||||||||||||||||||||||||||
| low_prio_tx: low_tx, | ||||||||||||||||||||||||||||||
| limiter, | ||||||||||||||||||||||||||||||
| dlq_tx: dlq, | ||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||
| Ok(( | ||||||||||||||||||||||||||||||
| Self { | ||||||||||||||||||||||||||||||
| high_priority_rx: high_rx, | ||||||||||||||||||||||||||||||
| low_priority_rx: low_rx, | ||||||||||||||||||||||||||||||
| }, | ||||||||||||||||||||||||||||||
| PushHandle(Arc::new(inner)), | ||||||||||||||||||||||||||||||
| )) | ||||||||||||||||||||||||||||||
| Self::build_with_rate_dlq(high_capacity, low_capacity, rate, dlq) | ||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| /// Receive the next frame, preferring high priority frames when available. | ||||||||||||||||||||||||||||||
|
|
@@ -442,7 +475,11 @@ impl<F: FrameLike> PushQueues<F> { | |||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// #[tokio::test] | ||||||||||||||||||||||||||||||
| /// async fn example() { | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::bounded(1, 1); | ||||||||||||||||||||||||||||||
| /// let (mut queues, handle) = PushQueues::builder() | ||||||||||||||||||||||||||||||
| /// .high_capacity(1) | ||||||||||||||||||||||||||||||
| /// .low_capacity(1) | ||||||||||||||||||||||||||||||
| /// .build() | ||||||||||||||||||||||||||||||
| /// .unwrap(); | ||||||||||||||||||||||||||||||
| /// handle.push_high_priority(2u8).await.unwrap(); | ||||||||||||||||||||||||||||||
|
Comment on lines
+478
to
483
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Use expect in doctest. - /// .build()
- /// .unwrap();
+ /// .build()
+ /// .expect("failed to build queues");📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| /// let (priority, frame) = queues.recv().await.unwrap(); | ||||||||||||||||||||||||||||||
| /// assert_eq!(priority, PushPriority::High); | ||||||||||||||||||||||||||||||
|
|
@@ -467,7 +504,7 @@ impl<F: FrameLike> PushQueues<F> { | |||||||||||||||||||||||||||||
| /// ```rust,no_run | ||||||||||||||||||||||||||||||
| /// use wireframe::push::PushQueues; | ||||||||||||||||||||||||||||||
| /// | ||||||||||||||||||||||||||||||
| /// let (mut queues, _handle) = PushQueues::<u8>::bounded(1, 1); | ||||||||||||||||||||||||||||||
| /// let (mut queues, _handle) = PushQueues::<u8>::builder().build().unwrap(); | ||||||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) Use expect in doctest. - /// let (mut queues, _handle) = PushQueues::<u8>::builder().build().unwrap();
+ /// let (mut queues, _handle) = PushQueues::<u8>::builder()
+ /// .build()
+ /// .expect("failed to build queues");📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||||||||||||||||||||||
| /// queues.close(); | ||||||||||||||||||||||||||||||
| /// ``` | ||||||||||||||||||||||||||||||
| pub fn close(&mut self) { | ||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -21,7 +21,8 @@ fn concurrent_push_delivery() { | |||||||||
| .expect("failed to build tokio runtime"); | ||||||||||
|
|
||||||||||
| rt.block_on(async { | ||||||||||
| let (queues, handle) = PushQueues::bounded(1, 1); | ||||||||||
| let (queues, handle) = | ||||||||||
| PushQueues::builder().high_capacity(1).low_capacity(1).build().unwrap(); | ||||||||||
|
Comment on lines
+24
to
+25
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🧹 Nitpick (assertive) 🛠️ Refactor suggestion Replace unwrap() with expect() and make the generic explicit Eliminate Clippy’s unwrap_used and align with the guideline “Prefer .expect() over .unwrap()”. Make the type parameter explicit for consistency across tests. - let (queues, handle) =
- PushQueues::builder().high_capacity(1).low_capacity(1).build().unwrap();
+ let (queues, handle) =
+ PushQueues::<u8>::builder().high_capacity(1).low_capacity(1).build().expect("failed to build push queues");📝 Committable suggestion
Suggested change
🤖 Prompt for AI Agents |
||||||||||
| let token = CancellationToken::new(); | ||||||||||
|
|
||||||||||
| let out = loom::sync::Arc::new(loom::sync::Mutex::new(Vec::new())); | ||||||||||
|
|
||||||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧹 Nitpick (assertive)
Elevate module docs with an example and deprecation note
Document the builder-centric API here with a short example and call out deprecated constructors to meet the public API documentation guideline.
Apply:
🤖 Prompt for AI Agents