aqueue provides three concurrency models for protecting shared state in async Rust,
each backed by a different locking primitive — pick the one that best fits your workload:
| Type | Primitive | Best for |
|---|---|---|
Actor<I> |
async_lock::Mutex |
Serial / write-heavy / stateful workloads |
RwModel<I> |
async_lock::RwLock |
Read-heavy workloads |
PCModel<I> |
async_lock::Semaphore |
Bounded parallelism / rate-limiting |
The low-level queue primitives (AQueue, RwQueue, SemaphoreQueue) are also exported for custom use cases. |
[dependencies]
aqueue = "1.4"
# Optional: enable timeout macro support (tokio runtime)
# aqueue = { version = "1.4", features = ["tokio_time"] }| Flag | Description |
|---|---|
tokio_time |
Enables inner_wait!, call_wait!, call_mut_wait! backed by tokio timeouts |
async_std_time |
Same three macros backed by async-std timeouts |
Neither flag is enabled by default. The core library only depends on async-lock. |
Best for: write-heavy, exclusive-access, or stateful workloads.
Actor<I>wraps any valueIbehind an async Mutex queue (AQueue). All operations execute serially — only one closure runs at a time — without anyMutex<I>in user code. The closure parameter is&'a InnerStore<I>whose lifetime'ais tied to&'a self, guaranteeing the reference stays valid across every.awaitpoint inside the closure. Public API: | Method | Description | |--------|-------------| |Actor::new(x)| Wrapxin a newActor| |async fn inner_call(f)| Runfexclusively under the queue lock (FIFO) | |unsafe fn deref_inner()| Borrow the inner value directly, bypassing the lock |
Best for: read-heavy workloads with occasional exclusive mutations.
RwModel<I>is backed by an async RwLock queue (RwQueue):
callacquires a shared read lock — multiple readers run concurrently.call_mutacquires an exclusive write lock — no other reads or writes run concurrently. Synchronous variantssync_call/sync_mut_callare available for non-async call sites (they yield the thread viathread::yield_nowuntil the lock is free). Public API: | Method | Description | |--------|-------------| |RwModel::new(x)| Wrapxin a newRwModel| |async fn call(f)| Shared read lock — closure receivesRefInner<'a, I>| |async fn call_mut(f)| Exclusive write lock — closure receivesRefMutInner<'a, I>| |fn sync_call(f)| Synchronous read (yields thread until lock is free) | |fn sync_mut_call(f)| Synchronous write (yields thread until lock is free) |
Best for: capping the number of simultaneously executing tasks (e.g. HTTP connections, DB query concurrency).
PCModel<I>uses an async Semaphore (SemaphoreQueue) to enforce a maximum concurrency ofn. Callers that exceed the limit suspend asynchronously until a permit becomes available. Warning:PCModel::inner()returns a direct reference to the inner value without acquiring any semaphore permit. Calls made through that reference are not subject to the configured concurrency limit, silently defeating the purpose ofPCModel. Only useinner()for metadata that is safe to read without counting against the parallelism budget (e.g. configuration flags, addresses). Usecall()for everything else. Public API: | Method | Description | |--------|-------------| |PCModel::new(inner, n)| Create a model allowing at mostnconcurrent executions | |fn inner()| Direct reference — does not acquire a semaphore permit | |async fn call(f)| Acquire one permit, runf, release permit on completion |
The following primitives are publicly exported for advanced use cases:
| Type | Primitive | Use case |
|---|---|---|
AQueue |
async_lock::Mutex |
Serial async execution |
RwQueue |
async_lock::RwLock |
Concurrent reads / exclusive writes |
SemaphoreQueue |
async_lock::Semaphore |
Bounded concurrency |
Enable tokio_time (or async_std_time) to get three timeout macros that wrap a call with a millisecond deadline, returning Err on expiry:
aqueue = { version = "1.4", features = ["tokio_time"] }| Macro | Wraps |
|---|---|
inner_wait!(actor, ms, f) |
actor.inner_call(f) |
call_mut_wait!(model, ms, f) |
model.call_mut(f) |
call_wait!(model, ms, f) |
model.call(f) |
use aqueue::{inner_wait, Actor};
// Returns Err if the operation takes longer than 5 000 ms
let result = inner_wait!(my_actor, 5000, |inner| async move {
inner.get_mut().do_work()
}).await?;Four concurrent tasks accumulate into the same counter with zero data races:
use aqueue::Actor;
use std::sync::Arc;
use tokio::try_join;
#[derive(Default)]
struct Foo { count: u64, i: i128 }
impl Foo {
fn add(&mut self, x: i32) -> i128 { self.count += 1; self.i += x as i128; self.i }
fn get(&self) -> i128 { self.i }
fn get_count(&self) -> u64 { self.count }
}
trait FooRunner {
async fn add(&self, x: i32) -> i128;
async fn get(&self) -> i128;
async fn get_count(&self) -> u64;
}
impl FooRunner for Actor<Foo> {
async fn add(&self, x: i32) -> i128 {
self.inner_call(|inner| async move { inner.get_mut().add(x) }).await
}
async fn get(&self) -> i128 {
self.inner_call(|inner| async move { inner.get().get() }).await
}
async fn get_count(&self) -> u64 {
self.inner_call(|inner| async move { inner.get().get_count() }).await
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let tf = Arc::new(Actor::new(Foo::default()));
let (a, b, c, d) = (tf.clone(), tf.clone(), tf.clone(), tf.clone());
let t1 = tokio::spawn(async move { for i in 0..25_000_000 { a.add(i).await; } });
let t2 = tokio::spawn(async move { for i in 25_000_000..50_000_000 { b.add(i).await; } });
let t3 = tokio::spawn(async move { for i in 50_000_000..75_000_000 { c.add(i).await; } });
let t4 = tokio::spawn(async move { for i in 75_000_000..100_000_000 { d.add(i).await; } });
try_join!(t1, t2, t3, t4)?;
// count=100000000 value=4999999950000000
println!("count={} value={}", tf.get_count().await, tf.get().await);
Ok(())
}Wrapping a SQLite connection pool in an Actor serialises all writes so the auto-incrementing ID is always unique, even across 100 concurrent tasks:
use anyhow::Result;
use aqueue::Actor;
use sqlx::{SqlitePool, sqlite::SqlitePoolOptions};
pub struct DataBases { auto_id: u32, pool: SqlitePool }
unsafe impl Send for DataBases {}
unsafe impl Sync for DataBases {}
impl DataBases {
pub fn new(max: u32) -> Result<Actor<DataBases>> {
let pool = SqlitePoolOptions::new()
.max_connections(max)
.connect_lazy("sqlite://:memory:")?;
Ok(Actor::new(DataBases { auto_id: 0, pool }))
}
async fn create_table(&self) -> Result<()> {
sqlx::query(
r#"CREATE TABLE "user"("id" integer NOT NULL PRIMARY KEY,"name" text,"gold" real);"#,
).execute(&self.pool).await?;
Ok(())
}
async fn insert_user(&mut self, name: &str, gold: f64) -> Result<bool> {
self.auto_id += 1;
let rows = sqlx::query(r#"INSERT INTO `user`(`id`,`name`,`gold`) VALUES(?,?,?)"#)
.bind(self.auto_id).bind(name).bind(gold)
.execute(&self.pool).await?.rows_affected();
Ok(rows == 1)
}
}
trait IDatabase {
async fn create_table(&self) -> Result<()>;
async fn insert_user(&self, name: String, gold: f64) -> Result<bool>;
}
impl IDatabase for Actor<DataBases> {
async fn create_table(&self) -> Result<()> {
self.inner_call(|inner| async move { inner.get().create_table().await }).await
}
async fn insert_user(&self, name: String, gold: f64) -> Result<bool> {
self.inner_call(|inner| async move {
inner.get_mut().insert_user(&name, gold).await
}).await
}
}
lazy_static::lazy_static! {
static ref DB: Actor<DataBases> = DataBases::new(50).expect("db init failed");
}
#[tokio::main]
async fn main() -> Result<()> {
DB.create_table().await?;
// 100 tasks * 1 000 inserts = 100 000 rows; auto_id is always unique
let handles: Vec<_> = (0..100i64).map(|i| {
tokio::spawn(async move {
for j in 0..1000i64 {
DB.insert_user(i.to_string(), j as f64).await?;
}
anyhow::Ok(())
})
}).collect();
for h in handles { h.await??; }
Ok(())
}call permits multiple simultaneous readers; call_mut takes an exclusive write lock:
use aqueue::RwModel;
use std::sync::Arc;
use tokio::try_join;
#[derive(Default)]
struct Foo { count: u64, i: i128 }
impl Foo {
fn add(&mut self, x: i32) -> i128 { self.count += 1; self.i += x as i128; self.i }
fn get(&self) -> i128 { self.i }
fn get_count(&self) -> u64 { self.count }
}
trait FooRunner {
async fn add(&self, x: i32) -> i128;
async fn get(&self) -> i128;
async fn get_count(&self) -> u64;
}
impl FooRunner for RwModel<Foo> {
async fn add(&self, x: i32) -> i128 {
self.call_mut(|mut inner| async move { inner.add(x) }).await // exclusive write
}
async fn get(&self) -> i128 {
self.call(|inner| async move { inner.get() }).await // shared read
}
async fn get_count(&self) -> u64 {
self.call(|inner| async move { inner.get_count() }).await
}
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let tf = Arc::new(RwModel::new(Foo::default()));
let (a, b, c, d) = (tf.clone(), tf.clone(), tf.clone(), tf.clone());
let t1 = tokio::spawn(async move { for i in 0..25_000_000 { a.add(i).await; } });
let t2 = tokio::spawn(async move { for i in 25_000_000..50_000_000 { b.add(i).await; } });
let t3 = tokio::spawn(async move { for i in 50_000_000..75_000_000 { c.add(i).await; } });
let t4 = tokio::spawn(async move { for i in 75_000_000..100_000_000 { d.add(i).await; } });
try_join!(t1, t2, t3, t4)?;
println!("count={} value={}", tf.get_count().await, tf.get().await);
Ok(())
}test rw a count:100000000 value:4999999950000000 time:5.18s qps:19,308,000
test rw b count:100000000 value:4999999950000000 time:5.29s qps:18,892,000
At most 4 requests are in-flight at any time; the rest queue up automatically:
use aqueue::PCModel;
use std::sync::Arc;
struct HttpClient;
impl HttpClient {
async fn get(&self, url: &str) -> Vec<u8> { vec![] }
}
trait IClient {
async fn fetch(&self, url: String) -> Vec<u8>;
}
impl IClient for PCModel<HttpClient> {
async fn fetch(&self, url: String) -> Vec<u8> {
self.call(|c| async move { c.get(&url).await }).await
}
}
#[tokio::main]
async fn main() {
// Allow at most 4 concurrent requests
let client = Arc::new(PCModel::new(HttpClient, 4));
let handles: Vec<_> = (0..20usize).map(|i| {
let c = client.clone();
tokio::spawn(async move {
c.fetch(format!("https://example.com/file/{i}")).await
})
}).collect();
for h in handles { h.await.unwrap(); }
}Run the built-in Criterion benchmarks:
cargo benchFour scenarios are measured (100 000 iterations each):
| Benchmark ID | Description |
|---|---|
single_task_actor_call |
Serial Actor::inner_call on a single-thread runtime |
multi_task_actor_call |
Concurrent Actor::inner_call from 2 tasks (multi-thread runtime) |
single_task_model_call |
Serial RwModel::call_mut on a single-thread runtime |
multi_task_model_call |
Concurrent RwModel::call_mut from 2 tasks (multi-thread runtime) |
aqueue
├── Actor<I> ──► AQueue (async_lock::Mutex) exclusive, serial
├── RwModel<I> ──► RwQueue (async_lock::RwLock) concurrent reads / exclusive writes
├── PCModel<I> ──► SemaphoreQueue (async_lock::Semaphore) bounded parallelism
│
├── InnerStore<T> raw value store (UnsafeCell<T>; safety enforced by the queue above)
├── RefInner<'a,T> shared-reference smart pointer (impl Deref)
└── RefMutInner<'a,T> mutable-reference smart pointer (impl Deref + DerefMut)
All models store their data in InnerStore<T> — an UnsafeCell<T> with manual Send + Sync impls.
Thread safety is enforced entirely by the surrounding async lock primitive, keeping the lock and the
value completely separate with zero additional wrapper overhead.
See CHANGELOG.md for the full release history.
Licensed under either of
- Apache License, Version 2.0 (LICENSE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT License (LICENSE or http://opensource.org/licenses/MIT) at your option.