From 096f977e75976ad235c265ec7c3d6722646bb719 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 10 Jun 2026 13:52:17 +0530 Subject: [PATCH 01/14] add util functions for processing waker sets --- libkernel/src/sync/waker_set.rs | 79 ++++++++++++++++++++++++++++++++- 1 file changed, 78 insertions(+), 1 deletion(-) diff --git a/libkernel/src/sync/waker_set.rs b/libkernel/src/sync/waker_set.rs index 8e164a2c..83829df4 100644 --- a/libkernel/src/sync/waker_set.rs +++ b/libkernel/src/sync/waker_set.rs @@ -89,11 +89,40 @@ impl WakerSet { } } + /// Removes and returns the first (lowest-token, i.e. FIFO) entry whose + /// data matches `predicate`, without waking it. + pub fn take_if(&mut self, predicate: impl Fn(&T) -> bool) -> Option<(Waker, T)> { + let key = self + .waiters + .iter() + .find(|(_, (_, data))| predicate(data)) + .map(|(key, _)| *key)?; + + self.waiters.remove(&key) + } + + /// Removes and returns the first (lowest-token, i.e. FIFO) entry, without + /// waking it. + pub fn take_first(&mut self) -> Option<(Waker, T)> { + self.waiters.pop_first().map(|(_, entry)| entry) + } + + /// Returns `true` if no wakers are registered. + pub fn is_empty(&self) -> bool { + self.waiters.is_empty() + } + /// Registers a waker together with associated data, returning its token. pub fn register_with_data(&mut self, waker: &Waker, data: T) -> u64 { + self.insert(waker.clone(), data) + } + + /// Inserts an already-owned waker with associated data, returning its + /// token. + pub fn insert(&mut self, waker: Waker, data: T) -> u64 { let id = self.allocate_id(); - self.waiters.insert(id, (waker.clone(), data)); + self.waiters.insert(id, (waker, data)); id } @@ -193,6 +222,54 @@ where } } +#[cfg(test)] +mod waker_set_tests { + use super::*; + + fn set_with_data(data: &[u32]) -> WakerSet { + let mut set = WakerSet::new(); + for &d in data { + set.insert(Waker::noop().clone(), d); + } + set + } + + #[test] + fn take_if_removes_first_match_in_fifo_order() { + let mut set = set_with_data(&[0b01, 0b10, 0b11]); + + let (_, data) = set.take_if(|d| d & 0b10 != 0).unwrap(); + assert_eq!(data, 0b10); + + let (_, data) = set.take_if(|d| d & 0b10 != 0).unwrap(); + assert_eq!(data, 0b11); + + assert!(set.take_if(|d| d & 0b10 != 0).is_none()); + assert!(!set.is_empty()); + } + + #[test] + fn take_first_is_fifo() { + let mut set = set_with_data(&[1, 2, 3]); + + assert_eq!(set.take_first().unwrap().1, 1); + assert_eq!(set.take_first().unwrap().1, 2); + assert_eq!(set.take_first().unwrap().1, 3); + assert!(set.take_first().is_none()); + assert!(set.is_empty()); + } + + #[test] + fn insert_then_remove_by_token() { + let mut set = WakerSet::new(); + let token = set.insert(Waker::noop().clone(), 7u32); + + assert!(set.contains_token(token)); + set.remove(token); + assert!(set.is_empty()); + } +} + #[cfg(test)] mod wait_until_tests { use super::*; From 6657af912b818198f8c83d1f8b42735d556572c5 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 10 Jun 2026 13:59:57 +0530 Subject: [PATCH 02/14] data structures for futex queues --- src/process/threading/futex/mod.rs | 3 + src/process/threading/futex/waiter.rs | 110 ++++++++++++++++++++++++++ 2 files changed, 113 insertions(+) create mode 100644 src/process/threading/futex/waiter.rs diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs index 2d101da3..81d2f97e 100644 --- a/src/process/threading/futex/mod.rs +++ b/src/process/threading/futex/mod.rs @@ -18,6 +18,9 @@ use wait::FutexWait; pub mod key; mod wait; +// TODO(futex2): expect(dead_code) is temporary until wake/requeue use WaiterCell. +#[expect(dead_code)] +mod waiter; const FUTEX_WAIT: i32 = 0; const FUTEX_WAKE: i32 = 1; diff --git a/src/process/threading/futex/waiter.rs b/src/process/threading/futex/waiter.rs new file mode 100644 index 00000000..fb2dcb00 --- /dev/null +++ b/src/process/threading/futex/waiter.rs @@ -0,0 +1,110 @@ +use alloc::sync::Arc; +use libkernel::sync::waker_set::WakerSet; + +use crate::sync::SpinLock; + +/// A futex wait queue: the set of waiters parked on one [`FutexKey`]. +/// +/// [`FutexKey`]: super::key::FutexKey +pub type FutexQueue = Arc>>>; + +/// Per-waiter shared state, stored both in the waiting future and as the +/// data payload of its [`WakerSet`] entry. +/// +/// The cell tracks which queue currently holds the waiter, which is what +/// keeps requeueing sound: `futex_requeue` can move an entry to another +/// queue while the waiting future is asleep, and the future still finds its +/// real location when it cancels or completes. +/// +/// # Lock ordering +/// +/// Futex table lock → queue lock(s) (ordered by `FutexKey` when taking two) +/// → cell lock. Code that only knows the cell and needs the queue +/// ([`Self::unregister`]) snapshots the location under the cell lock, drops +/// it, locks the queue, then re-locks the cell and verifies the location is +/// unchanged — retrying if a concurrent requeue moved it. Queue tokens are +/// allocated monotonically, so a stale `(queue, token)` snapshot can never +/// alias a new registration. +pub struct WaiterCell { + /// Wake mask; a wake with mask `m` wakes this waiter iff `mask & m != 0`. + pub mask: u64, + state: SpinLock, +} + +struct CellState { + /// `Some((queue, token))` while enqueued; `None` once woken or + /// unregistered. + location: Option<(FutexQueue, u64)>, + /// Set when a waker removed this entry, distinguishing a genuine wake + /// from self-unregistration. + woken: bool, +} + +impl WaiterCell { + pub fn new(mask: u64) -> Arc { + Arc::new(Self { + mask, + state: SpinLock::new(CellState { + location: None, + woken: false, + }), + }) + } + + pub fn is_woken(&self) -> bool { + self.state.lock_save_irq().woken + } + + /// Records the queue entry for this waiter. Caller must hold the lock of + /// `queue` (the registration and the location update must be atomic with + /// respect to wake/requeue). + pub fn set_location(&self, queue: FutexQueue, token: u64) { + self.state.lock_save_irq().location = Some((queue, token)); + } + + /// Marks the waiter woken and detaches it from its queue. Caller must + /// hold the lock of the queue it was just removed from. + pub fn mark_woken(&self) { + let mut state = self.state.lock_save_irq(); + state.woken = true; + state.location = None; + } + + /// Updates the location after the entry moved to `queue`. Caller must + /// hold the locks of both the source and destination queues. + pub fn requeue_to(&self, queue: FutexQueue, token: u64) { + self.state.lock_save_irq().location = Some((queue, token)); + } + + /// Removes this waiter from whichever queue currently holds it. + /// + /// Returns `true` if a wake landed first (that wake is then consumed by + /// the caller). + pub fn unregister(&self) -> bool { + loop { + // Snapshot the location; we cannot lock the queue while holding + // the cell lock without inverting the queue → cell order. + let (queue, token) = { + let state = self.state.lock_save_irq(); + match &state.location { + Some((queue, token)) => (queue.clone(), *token), + None => return state.woken, + } + }; + + let mut queue_guard = queue.lock_save_irq(); + let mut state = self.state.lock_save_irq(); + + match &state.location { + None => return state.woken, + Some((q, t)) if Arc::ptr_eq(q, &queue) && *t == token => { + queue_guard.remove(token); + state.location = None; + return false; + } + // Requeued between the snapshot and taking the queue lock. + _ => continue, + } + } + } +} From 95d1575040f73e5ca270920f62810bc8e85792c3 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 10 Jun 2026 14:07:04 +0530 Subject: [PATCH 03/14] write one core for both futex impl; futex 1 uses futex_wait_multi with 1 waiter --- src/process/exit.rs | 2 +- src/process/threading/futex/mod.rs | 132 +++++++++++---- src/process/threading/futex/wait.rs | 223 ++++++++++++++++---------- src/process/threading/futex/waiter.rs | 3 + 4 files changed, 242 insertions(+), 118 deletions(-) diff --git a/src/process/exit.rs b/src/process/exit.rs index c1da55a8..5ffb135d 100644 --- a/src/process/exit.rs +++ b/src/process/exit.rs @@ -125,7 +125,7 @@ pub async fn sys_exit(ctx: &mut ProcessCtx, exit_code: usize) -> Result { copy_to_user(ptr, 0u32).await?; if let Ok(key) = FutexKey::new_shared(ctx, ptr) { - futex::wake_key(1, key, u32::MAX); + futex::wake_key(1, key, u32::MAX as u64); } else { warn!("Failed to get futex wake key on sys_exit"); } diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs index 81d2f97e..b8aa6bf8 100644 --- a/src/process/threading/futex/mod.rs +++ b/src/process/threading/futex/mod.rs @@ -1,25 +1,22 @@ use crate::clock::realtime::date; use crate::clock::timespec::TimeSpec; -use crate::drivers::timer::sleep; use crate::process::thread_group::signal::{InterruptResult, Interruptable}; use crate::sched::syscall_ctx::ProcessCtx; use crate::sync::{OnceLock, SpinLock}; -use alloc::boxed::Box; +use alloc::vec::Vec; use alloc::{collections::btree_map::BTreeMap, sync::Arc}; use core::time::Duration; -use futures::FutureExt; use key::FutexKey; use libkernel::{ error::{KernelError, Result}, memory::address::TUA, sync::waker_set::WakerSet, }; -use wait::FutexWait; +use wait::{ParsedWaiter, futex_wait_multi}; +use waiter::{FutexQueue, WaiterCell}; pub mod key; mod wait; -// TODO(futex2): expect(dead_code) is temporary until wake/requeue use WaiterCell. -#[expect(dead_code)] mod waiter; const FUTEX_WAIT: i32 = 0; @@ -28,7 +25,7 @@ const FUTEX_WAIT_BITSET: i32 = 9; const FUTEX_WAKE_BITSET: i32 = 10; const FUTEX_PRIVATE_FLAG: i32 = 128; -type FutexTable = BTreeMap>>>; +type FutexTable = BTreeMap; /// Global futex table mapping a futex key to its wait queue. #[allow(clippy::type_complexity)] @@ -38,7 +35,7 @@ fn futex_table() -> &'static SpinLock { FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new())) } -fn get_or_create_queue(key: FutexKey) -> Arc>> { +fn get_or_create_queue(key: FutexKey) -> FutexQueue { let table = futex_table(); table @@ -48,22 +45,92 @@ fn get_or_create_queue(key: FutexKey) -> Arc>> { .clone() } -pub fn wake_key(nr_wake: usize, key: FutexKey, bitmask: u32) -> usize { - let mut woke = 0; +pub fn wake_key(nr_wake: usize, key: FutexKey, mask: u64) -> usize { + let mut wakers = Vec::new(); let table = futex_table(); if let Some(waitq_arc) = table.lock_save_irq().get(&key).cloned() { let mut waitq = waitq_arc.lock_save_irq(); - for _ in 0..nr_wake { - if waitq.wake_if(|x| *x & bitmask != 0) { - woke += 1; - } else { - break; + + while wakers.len() < nr_wake { + match waitq.take_if(|cell: &Arc| cell.mask & mask != 0) { + Some((waker, cell)) => { + cell.mark_woken(); + wakers.push(waker); + } + None => break, } } } + // Wake outside the queue lock; a woken task may run immediately and + // re-take futex locks. + let woke = wakers.len(); + for waker in wakers { + waker.wake(); + } + + woke +} + +/// Wakes up to `nr_wake` waiters on `key1`, then moves up to `nr_requeue` of +/// the remaining waiters onto `key2`'s queue without waking them. +/// +/// Wake masks are ignored, matching Linux requeue semantics. Returns the +/// number of waiters woken. +// TODO(futex2): expect(dead_code) is temporary until sys_futex_requeue lands. +#[expect(dead_code)] +pub fn requeue_key(key1: FutexKey, key2: FutexKey, nr_wake: usize, nr_requeue: usize) -> usize { + if key1 == key2 { + // Requeueing onto the same queue is a no-op; just wake. + return wake_key(nr_wake, key1, u64::MAX); + } + + let q1_arc = get_or_create_queue(key1); + let q2_arc = get_or_create_queue(key2); + + let mut wakers = Vec::new(); + + { + // Lock both queues in key order so concurrent requeues can't + // deadlock. + let (mut q1, mut q2) = if key1 < key2 { + let q1 = q1_arc.lock_save_irq(); + let q2 = q2_arc.lock_save_irq(); + (q1, q2) + } else { + let q2 = q2_arc.lock_save_irq(); + let q1 = q1_arc.lock_save_irq(); + (q1, q2) + }; + + while wakers.len() < nr_wake { + match q1.take_first() { + Some((waker, cell)) => { + cell.mark_woken(); + wakers.push(waker); + } + None => break, + } + } + + for _ in 0..nr_requeue { + match q1.take_first() { + Some((waker, cell)) => { + let token = q2.insert(waker, cell.clone()); + cell.requeue_to(q2_arc.clone(), token); + } + None => break, + } + } + } + + let woke = wakers.len(); + for waker in wakers { + waker.wake(); + } + woke } @@ -74,24 +141,17 @@ async fn do_futex_wait( bitmask: u32, timeout: Option, ) -> Result { - // Obtain (or create) the wait-queue for this futex word. - let slot = get_or_create_queue(key); + let waiter = ParsedWaiter { + key, + uaddr, + val, + mask: bitmask as u64, + }; // Return 0 on success. - if let Some(dur) = timeout { - let mut wait = FutexWait::new(uaddr, val, bitmask, slot).fuse(); - let mut sleep = Box::pin(sleep(dur).fuse()); - futures::select_biased! { - res = wait => { - res.map(|_| 0) - }, - _ = sleep => { - Err(KernelError::TimedOut) - } - } - } else { - FutexWait::new(uaddr, val, bitmask, slot).await.map(|_| 0) - } + futex_wait_multi(core::slice::from_ref(&waiter), timeout) + .await + .map(|_| 0) } pub async fn sys_futex( @@ -119,7 +179,9 @@ pub async fn sys_futex( } else { let timeout = TimeSpec::copy_from_user(timeout).await?; if matches!(cmd, FUTEX_WAIT_BITSET) { - Some(Duration::from(timeout) - date()) + // The deadline is absolute and may already have passed; + // a zero timeout still performs the value check below. + Some(Duration::from(timeout).saturating_sub(date())) } else { Some(Duration::from(timeout)) } @@ -143,7 +205,11 @@ pub async fn sys_futex( FUTEX_WAKE | FUTEX_WAKE_BITSET => Ok(wake_key( val as _, key, - if cmd == FUTEX_WAKE { u32::MAX } else { val3 }, + if cmd == FUTEX_WAKE { + u32::MAX as u64 + } else { + val3 as u64 + }, )), _ => Err(KernelError::NotSupported), diff --git a/src/process/threading/futex/wait.rs b/src/process/threading/futex/wait.rs index bec44460..a6a9dcd5 100644 --- a/src/process/threading/futex/wait.rs +++ b/src/process/threading/futex/wait.rs @@ -1,116 +1,171 @@ -use alloc::{boxed::Box, sync::Arc}; -use core::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; +use alloc::boxed::Box; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::future::poll_fn; +use core::task::{Poll, Waker}; +use core::time::Duration; +use futures::FutureExt; use libkernel::{ error::{KernelError, Result}, memory::address::TUA, - sync::waker_set::WakerSet, -}; - -use crate::{ - memory::uaccess::{copy_from_user, try_copy_from_user}, - sync::SpinLock, }; -enum WaitState { - Init, - HandlingFault(Pin> + Send + 'static>>), - Waiting { token: u64 }, +use super::get_or_create_queue; +use super::key::FutexKey; +use super::waiter::WaiterCell; +use crate::drivers::timer::sleep; +use crate::memory::uaccess::{copy_from_user, try_copy_from_user}; + +/// One decoded wait request: wait on `key` while `*uaddr == val`, wakeable by +/// any wake whose mask overlaps `mask`. +pub struct ParsedWaiter { + pub key: FutexKey, + pub uaddr: TUA, + pub val: u32, + pub mask: u64, } -pub struct FutexWait { - uaddr: TUA, - val: u32, - queue: Arc>>, - bitmask: u32, - state: WaitState, +/// Owns the queue registrations of an in-progress multi-wait, so that +/// cancellation (signal, timeout, fault retry) always unregisters them. +#[derive(Default)] +struct WaitGuard { + cells: Vec>, } -impl FutexWait { - pub fn new( - uaddr: TUA, - val: u32, - bitmask: u32, - queue: Arc>>, - ) -> Self { - Self { - uaddr, - val, - bitmask, - queue, - state: WaitState::Init, +impl WaitGuard { + /// Ready with the index of the first woken waiter, if any. + /// + /// Does not re-register the waker: each cell's queue entry already holds + /// the task's waker from setup, and that waker is assumed stable across + /// polls (the same assumption the rest of the kernel makes). + fn poll_woken(&self) -> Poll { + match self.cells.iter().position(|cell| cell.is_woken()) { + Some(idx) => Poll::Ready(idx), + None => Poll::Pending, } } + + /// Unregisters every remaining cell. If any wake was consumed (either + /// before this call or racing with it), returns the lowest such index. + fn finish(&mut self) -> Option { + let mut woken = None; + + for (idx, cell) in self.cells.iter().enumerate() { + if cell.unregister() && woken.is_none() { + woken = Some(idx); + } + } + + self.cells.clear(); + woken + } } -impl Drop for FutexWait { +impl Drop for WaitGuard { fn drop(&mut self) { - if let WaitState::Waiting { token } = &self.state { - self.queue.lock_save_irq().remove(*token); + for cell in &self.cells { + cell.unregister(); } } } -impl Future for FutexWait { - type Output = Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Safe because we are inside Pin and not moving out of Self - let this = unsafe { self.as_mut().get_unchecked_mut() }; +enum Setup { + /// All waiters value-checked and enqueued. + Queued, + /// A futex word didn't match its expected value. + Mismatch, + /// Reading the futex word at this index faulted; fault it in and retry. + NeedFault(usize), +} - loop { - match &mut this.state { - WaitState::Init => { - let mut wait_queue = this.queue.lock_save_irq(); +/// Enqueues each waiter, checking its futex value under the queue lock. +/// +/// Synchronous; runs inside a single poll so no wake can slip between the +/// value check and registration of any one waiter (the queue lock covers +/// both). Holds at most one queue lock at a time. +fn setup_all(waiters: &[ParsedWaiter], waker: &Waker, guard: &mut WaitGuard) -> Setup { + for (idx, waiter) in waiters.iter().enumerate() { + let queue = get_or_create_queue(waiter.key); + let mut queue_guard = queue.lock_save_irq(); + + match try_copy_from_user(waiter.uaddr) { + Ok(val) => { + if val != waiter.val { + return Setup::Mismatch; + } - match try_copy_from_user(this.uaddr) { - Ok(val) => { - if val != this.val { - return Poll::Ready(Err(KernelError::TryAgain)); - } + let cell = WaiterCell::new(waiter.mask); + let token = queue_guard.register_with_data(waker, cell.clone()); + cell.set_location(queue.clone(), token); + guard.cells.push(cell); + } + Err(_) => return Setup::NeedFault(idx), + } + } - let token = wait_queue.register_with_data(cx.waker(), this.bitmask); + Setup::Queued +} - this.state = WaitState::Waiting { token }; +/// Waits until any of `waiters` is woken, returning its index. +/// +/// Linux `futex_wait_multiple` semantics: all waiters are enqueued with their +/// values checked atomically against concurrent wakes; on value mismatch the +/// queued prefix is unwound and, if one of those waiters was already woken, +/// that counts as a successful wake rather than `EAGAIN`. +pub async fn futex_wait_multi( + waiters: &[ParsedWaiter], + timeout: Option, +) -> Result { + loop { + let mut guard = WaitGuard::default(); + + // poll_fn is used purely to obtain the task's waker; setup itself + // never returns Pending. + let setup = poll_fn(|cx| Poll::Ready(setup_all(waiters, cx.waker(), &mut guard))).await; + + match setup { + Setup::NeedFault(idx) => { + if let Some(woken) = guard.finish() { + return Ok(woken); + } - return Poll::Pending; - } - Err(_) => { - drop(wait_queue); + copy_from_user(waiters[idx].uaddr).await?; + continue; + } + Setup::Mismatch => { + return guard.finish().ok_or(KernelError::TryAgain); + } + Setup::Queued => {} + } - let uaddr = this.uaddr; - let fault_handler = Box::pin(async move { - copy_from_user(uaddr).await?; - Ok(()) - }); + return match timeout { + None => { + let woken = poll_fn(|_| guard.poll_woken()).await; + guard.finish(); + Ok(woken) + } + Some(dur) => { + let mut wait = poll_fn(|_| guard.poll_woken()).fuse(); + let mut sleep_fut = Box::pin(sleep(dur).fuse()); - this.state = WaitState::HandlingFault(fault_handler); - continue; - } - } - } + let woken = futures::select_biased! { + idx = wait => Some(idx), + _ = sleep_fut => None, + }; - WaitState::Waiting { token } => { - let wait_queue = this.queue.lock_save_irq(); + drop(wait); - if !wait_queue.contains_token(*token) { - return Poll::Ready(Ok(())); - } else { - return Poll::Pending; + match woken { + Some(idx) => { + guard.finish(); + Ok(idx) } + // A wake may have landed between the timer firing and us + // unregistering; finish() reports it so the wake isn't + // lost. + None => guard.finish().ok_or(KernelError::TimedOut), } - - WaitState::HandlingFault(fut) => match fut.as_mut().poll(cx) { - Poll::Ready(Ok(_)) => { - this.state = WaitState::Init; - } - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Pending => return Poll::Pending, - }, } - } + }; } } diff --git a/src/process/threading/futex/waiter.rs b/src/process/threading/futex/waiter.rs index fb2dcb00..990b31e1 100644 --- a/src/process/threading/futex/waiter.rs +++ b/src/process/threading/futex/waiter.rs @@ -72,6 +72,9 @@ impl WaiterCell { /// Updates the location after the entry moved to `queue`. Caller must /// hold the locks of both the source and destination queues. + // TODO(futex2): expect(dead_code) is temporary until sys_futex_requeue + // lands. + #[expect(dead_code)] pub fn requeue_to(&self, queue: FutexQueue, token: u64) { self.state.lock_save_irq().location = Some((queue, token)); } From 97a8e2123ebce4d6cd9bb779ec85b456909e3e6c Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 10 Jun 2026 14:13:45 +0530 Subject: [PATCH 04/14] wire up the futex2 syscalls --- etc/syscalls_linux_aarch64.md | 8 +- src/arch/arm64/exceptions/syscall.rs | 40 +++- src/process/threading/futex/futex2.rs | 261 ++++++++++++++++++++++++++ src/process/threading/futex/mod.rs | 3 +- src/process/threading/futex/waiter.rs | 3 - 5 files changed, 305 insertions(+), 10 deletions(-) create mode 100644 src/process/threading/futex/futex2.rs diff --git a/etc/syscalls_linux_aarch64.md b/etc/syscalls_linux_aarch64.md index c3b28eb8..b9deee15 100644 --- a/etc/syscalls_linux_aarch64.md +++ b/etc/syscalls_linux_aarch64.md @@ -304,14 +304,14 @@ | 0x1be (446) | landlock_restrict_self | (const int ruleset_fd, const __u32 flags) | __arm64_sys_landlock_restrict_self | false | | 0x1bf (447) | memfd_secret | (unsigned int flags) | __arm64_sys_memfd_secret | false | | 0x1c0 (448) | process_mrelease | (int pidfd, unsigned int flags) | __arm64_sys_process_mrelease | false | -| 0x1c1 (449) | futex_waitv | (struct futex_waitv *waiters, unsigned int nr_futexes, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_waitv | false | +| 0x1c1 (449) | futex_waitv | (struct futex_waitv *waiters, unsigned int nr_futexes, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_waitv | true | | 0x1c2 (450) | set_mempolicy_home_node | (unsigned long start, unsigned long len, unsigned long home_node, unsigned long flags) | __arm64_sys_set_mempolicy_home_node | false | | 0x1c3 (451) | cachestat | (unsigned int fd, struct cachestat_range *cstat_range, struct cachestat *cstat, unsigned int flags) | __arm64_sys_cachestat | false | | 0x1c4 (452) | fchmodat2 | (int dfd, const char *filename, umode_t mode, unsigned int flags) | __arm64_sys_fchmodat2 | false | | 0x1c5 (453) | map_shadow_stack | (unsigned long addr, unsigned long size, unsigned int flags) | __arm64_sys_map_shadow_stack | false | -| 0x1c6 (454) | futex_wake | (void *uaddr, unsigned long mask, int nr, unsigned int flags) | __arm64_sys_futex_wake | false | -| 0x1c7 (455) | futex_wait | (void *uaddr, unsigned long val, unsigned long mask, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_wait | false | -| 0x1c8 (456) | futex_requeue | (struct futex_waitv *waiters, unsigned int flags, int nr_wake, int nr_requeue) | __arm64_sys_futex_requeue | false | +| 0x1c6 (454) | futex_wake | (void *uaddr, unsigned long mask, int nr, unsigned int flags) | __arm64_sys_futex_wake | true | +| 0x1c7 (455) | futex_wait | (void *uaddr, unsigned long val, unsigned long mask, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_wait | true | +| 0x1c8 (456) | futex_requeue | (struct futex_waitv *waiters, unsigned int flags, int nr_wake, int nr_requeue) | __arm64_sys_futex_requeue | true | | 0x1c9 (457) | statmount | (const struct mnt_id_req *req, struct statmount *buf, size_t bufsize, unsigned int flags) | __arm64_sys_statmount | false | | 0x1ca (458) | listmount | (const struct mnt_id_req *req, u64 *mnt_ids, size_t nr_mnt_ids, unsigned int flags) | __arm64_sys_listmount | false | | 0x1cb (459) | lsm_get_self_attr | (unsigned int attr, struct lsm_ctx *ctx, u32 *size, u32 flags) | __arm64_sys_lsm_get_self_attr | false | diff --git a/src/arch/arm64/exceptions/syscall.rs b/src/arch/arm64/exceptions/syscall.rs index cd572ea7..7e9243a1 100644 --- a/src/arch/arm64/exceptions/syscall.rs +++ b/src/arch/arm64/exceptions/syscall.rs @@ -103,7 +103,11 @@ use crate::{ umask::sys_umask, wait::{sys_wait4, sys_waitid}, }, - threading::{futex::sys_futex, sys_set_robust_list, sys_set_tid_address}, + threading::{ + futex::futex2::{sys_futex_requeue, sys_futex_wait, sys_futex_waitv, sys_futex_wake}, + futex::sys_futex, + sys_set_robust_list, sys_set_tid_address, + }, }, sched::{ self, @@ -824,6 +828,40 @@ pub async fn handle_syscall(mut ctx: ProcessCtx) { .await } 0x1b8 => Ok(0), // process_madvise is a no-op + 0x1c1 => { + sys_futex_waitv( + &ctx, + TUA::from_value(arg1 as _), + arg2 as _, + arg3 as _, + TUA::from_value(arg4 as _), + arg5 as _, + ) + .await + } + 0x1c6 => sys_futex_wake(&ctx, arg1, arg2, arg3 as _, arg4 as _), + 0x1c7 => { + sys_futex_wait( + &ctx, + arg1, + arg2, + arg3, + arg4 as _, + TUA::from_value(arg5 as _), + arg6 as _, + ) + .await + } + 0x1c8 => { + sys_futex_requeue( + &ctx, + TUA::from_value(arg1 as _), + arg2 as _, + arg3 as _, + arg4 as _, + ) + .await + } _ => panic!( "Unhandled syscall 0x{nr:x}, PC: 0x{:x}", ctx.task().ctx.user().elr_el1 diff --git a/src/process/threading/futex/futex2.rs b/src/process/threading/futex/futex2.rs new file mode 100644 index 00000000..ac910ad6 --- /dev/null +++ b/src/process/threading/futex/futex2.rs @@ -0,0 +1,261 @@ +//! The futex2 syscall family: `futex_waitv`, `futex_wake`, `futex_wait` and +//! `futex_requeue`. +//! +//! These share the wait/wake core with the legacy `futex` syscall; only the +//! argument decoding differs. Timeouts are absolute against the given clock, +//! unlike most of the legacy ops. + +use alloc::vec::Vec; +use core::time::Duration; + +use libkernel::error::{KernelError, Result}; +use libkernel::memory::address::TUA; + +use super::key::FutexKey; +use super::wait::{ParsedWaiter, futex_wait_multi}; +use super::{requeue_key, wake_key}; +use crate::clock::realtime::date; +use crate::clock::timespec::TimeSpec; +use crate::drivers::timer::uptime; +use crate::memory::uaccess::{UserCopyable, copy_obj_array_from_user}; +use crate::process::thread_group::signal::{InterruptResult, Interruptable}; +use crate::sched::syscall_ctx::ProcessCtx; + +const FUTEX2_SIZE_U32: u32 = 0x02; +const FUTEX2_SIZE_MASK: u32 = 0x03; +const FUTEX2_PRIVATE: u32 = 0x80; +const FUTEX2_VALID_MASK: u32 = FUTEX2_SIZE_MASK | FUTEX2_PRIVATE; + +const FUTEX_WAITV_MAX: usize = 128; +const FUTEX_BITSET_MATCH_ANY: u64 = 0xffff_ffff; + +const CLOCK_REALTIME: u32 = 0; +const CLOCK_MONOTONIC: u32 = 1; + +/// Userspace `struct futex_waitv`. +#[repr(C)] +#[derive(Clone, Copy)] +pub struct FutexWaitvUser { + val: u64, + uaddr: u64, + flags: u32, + reserved: u32, +} + +// SAFETY: `FutexWaitvUser` is a plain-old-data `repr(C)` struct; any bit +// pattern is a valid value (validation happens after the copy). +unsafe impl UserCopyable for FutexWaitvUser {} + +/// Validates a futex2 flags word, returning whether `FUTEX2_PRIVATE` is set. +/// +/// Only 32-bit futexes are supported (as in Linux today), so any size other +/// than `FUTEX2_SIZE_U32` is rejected. +fn check_flags(flags: u32) -> Result { + if flags & !FUTEX2_VALID_MASK != 0 { + return Err(KernelError::InvalidValue); + } + + if flags & FUTEX2_SIZE_MASK != FUTEX2_SIZE_U32 { + return Err(KernelError::InvalidValue); + } + + Ok(flags & FUTEX2_PRIVATE != 0) +} + +/// Builds a [`FutexKey`] for a futex2 user address, enforcing the natural +/// alignment futex2 requires. +fn make_key(ctx: &ProcessCtx, uaddr: u64, private: bool) -> Result<(FutexKey, TUA)> { + let addr = TUA::::from_value(uaddr as usize); + + if addr.is_null() { + return Err(KernelError::Fault); + } + + if !uaddr.is_multiple_of(core::mem::size_of::() as u64) { + return Err(KernelError::InvalidValue); + } + + let key = if private { + FutexKey::new_private(ctx, addr) + } else { + FutexKey::new_shared(ctx, addr)? + }; + + Ok((key, addr)) +} + +/// Converts a futex2 absolute timeout into a relative [`Duration`]. +/// +/// The clockid is only validated when a timeout is supplied, matching Linux. +/// A deadline already in the past yields a zero timeout; the futex value is +/// still checked first, so `EAGAIN` takes precedence over `ETIMEDOUT`. +/// +/// `CLOCK_REALTIME` deadlines are converted to a relative sleep up front, so +/// a concurrent `clock_settime` does not retarget an in-progress wait (same +/// simplification as the legacy `FUTEX_WAIT_BITSET` path). +async fn abs_timeout(timeout: TUA, clockid: u32) -> Result> { + if timeout.is_null() { + return Ok(None); + } + + let deadline = Duration::from(TimeSpec::copy_from_user(timeout).await?); + + let base = match clockid { + CLOCK_REALTIME => date(), + CLOCK_MONOTONIC => uptime(), + _ => return Err(KernelError::InvalidValue), + }; + + Ok(Some(deadline.saturating_sub(base))) +} + +/// `futex_wait(uaddr, val, mask, flags, timeout, clockid)`: wait on a single +/// futex word while `*uaddr == val`. Returns 0 once woken by a wake whose +/// mask overlaps `mask`. +pub async fn sys_futex_wait( + ctx: &ProcessCtx, + uaddr: u64, + val: u64, + mask: u64, + flags: u32, + timeout: TUA, + clockid: u32, +) -> Result { + let private = check_flags(flags)?; + + // Only 32-bit futexes exist, so values and masks must fit in 32 bits and + // an empty mask could never be woken. + if val > u64::from(u32::MAX) || mask > u64::from(u32::MAX) || mask == 0 { + return Err(KernelError::InvalidValue); + } + + let timeout = abs_timeout(timeout, clockid).await?; + let (key, uaddr) = make_key(ctx, uaddr, private)?; + + let waiter = ParsedWaiter { + key, + uaddr, + val: val as u32, + mask, + }; + + match futex_wait_multi(core::slice::from_ref(&waiter), timeout) + .interruptable() + .await + { + InterruptResult::Interrupted => Err(KernelError::Interrupted), + InterruptResult::Uninterrupted(res) => res.map(|_| 0), + } +} + +/// `futex_wake(uaddr, mask, nr, flags)`: wake up to `nr` waiters whose masks +/// overlap `mask`. Returns the number woken. +pub fn sys_futex_wake( + ctx: &ProcessCtx, + uaddr: u64, + mask: u64, + nr: i32, + flags: u32, +) -> Result { + let private = check_flags(flags)?; + + if mask > u64::from(u32::MAX) || mask == 0 { + return Err(KernelError::InvalidValue); + } + + let (key, _) = make_key(ctx, uaddr, private)?; + + if nr <= 0 { + return Ok(0); + } + + Ok(wake_key(nr as usize, key, mask)) +} + +/// `futex_waitv(waiters, nr_futexes, flags, timeout, clockid)`: wait on up to +/// [`FUTEX_WAITV_MAX`] futexes at once. Returns the array index of the woken +/// waiter. +pub async fn sys_futex_waitv( + ctx: &ProcessCtx, + uwaiters: TUA, + nr_futexes: u32, + flags: u32, + timeout: TUA, + clockid: u32, +) -> Result { + // No syscall-level flags are defined. + if flags != 0 { + return Err(KernelError::InvalidValue); + } + + if nr_futexes == 0 || nr_futexes as usize > FUTEX_WAITV_MAX { + return Err(KernelError::InvalidValue); + } + + let timeout = abs_timeout(timeout, clockid).await?; + + let entries = copy_obj_array_from_user(uwaiters, nr_futexes as usize).await?; + + let mut waiters = Vec::with_capacity(entries.len()); + for entry in entries { + if entry.reserved != 0 || entry.val > u64::from(u32::MAX) { + return Err(KernelError::InvalidValue); + } + + let private = check_flags(entry.flags)?; + let (key, uaddr) = make_key(ctx, entry.uaddr, private)?; + + waiters.push(ParsedWaiter { + key, + uaddr, + val: entry.val as u32, + mask: FUTEX_BITSET_MATCH_ANY, + }); + } + + match futex_wait_multi(&waiters, timeout).interruptable().await { + InterruptResult::Interrupted => Err(KernelError::Interrupted), + InterruptResult::Uninterrupted(res) => res, + } +} + +/// `futex_requeue(waiters, flags, nr_wake, nr_requeue)`: wake up to `nr_wake` +/// waiters on `waiters[0].uaddr`, then move up to `nr_requeue` of the rest +/// onto `waiters[1].uaddr`. No value comparison is performed (unlike legacy +/// `FUTEX_CMP_REQUEUE`). Returns the number woken. +pub async fn sys_futex_requeue( + ctx: &ProcessCtx, + uwaiters: TUA, + flags: u32, + nr_wake: i32, + nr_requeue: i32, +) -> Result { + // No syscall-level flags are defined. + if flags != 0 { + return Err(KernelError::InvalidValue); + } + + if nr_wake < 0 || nr_requeue < 0 { + return Err(KernelError::InvalidValue); + } + + let entries = copy_obj_array_from_user(uwaiters, 2).await?; + + let mut keys = [FutexKey::Private { pid: 0, addr: 0 }; 2]; + for (key, entry) in keys.iter_mut().zip(&entries) { + if entry.reserved != 0 { + return Err(KernelError::InvalidValue); + } + + // `val` is unused by this op and deliberately not validated. + let private = check_flags(entry.flags)?; + (*key, _) = make_key(ctx, entry.uaddr, private)?; + } + + Ok(requeue_key( + keys[0], + keys[1], + nr_wake as usize, + nr_requeue as usize, + )) +} diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs index b8aa6bf8..751ffc66 100644 --- a/src/process/threading/futex/mod.rs +++ b/src/process/threading/futex/mod.rs @@ -15,6 +15,7 @@ use libkernel::{ use wait::{ParsedWaiter, futex_wait_multi}; use waiter::{FutexQueue, WaiterCell}; +pub mod futex2; pub mod key; mod wait; mod waiter; @@ -79,8 +80,6 @@ pub fn wake_key(nr_wake: usize, key: FutexKey, mask: u64) -> usize { /// /// Wake masks are ignored, matching Linux requeue semantics. Returns the /// number of waiters woken. -// TODO(futex2): expect(dead_code) is temporary until sys_futex_requeue lands. -#[expect(dead_code)] pub fn requeue_key(key1: FutexKey, key2: FutexKey, nr_wake: usize, nr_requeue: usize) -> usize { if key1 == key2 { // Requeueing onto the same queue is a no-op; just wake. diff --git a/src/process/threading/futex/waiter.rs b/src/process/threading/futex/waiter.rs index 990b31e1..fb2dcb00 100644 --- a/src/process/threading/futex/waiter.rs +++ b/src/process/threading/futex/waiter.rs @@ -72,9 +72,6 @@ impl WaiterCell { /// Updates the location after the entry moved to `queue`. Caller must /// hold the locks of both the source and destination queues. - // TODO(futex2): expect(dead_code) is temporary until sys_futex_requeue - // lands. - #[expect(dead_code)] pub fn requeue_to(&self, queue: FutexQueue, token: u64) { self.state.lock_save_irq().location = Some((queue, token)); } From cd1987718b55c58f58bf39a2fd8f5517317b623f Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 10 Jun 2026 14:27:15 +0530 Subject: [PATCH 05/14] add tests for futex2 --- usertest/src/futex2.rs | 529 +++++++++++++++++++++++++++++++++++++++++ usertest/src/main.rs | 1 + 2 files changed, 530 insertions(+) create mode 100644 usertest/src/futex2.rs diff --git a/usertest/src/futex2.rs b/usertest/src/futex2.rs new file mode 100644 index 00000000..eb80b910 --- /dev/null +++ b/usertest/src/futex2.rs @@ -0,0 +1,529 @@ +//! Tests for the futex2 syscall family (futex_wait, futex_wake, futex_waitv, +//! futex_requeue). libc has no wrappers for these, so we issue raw syscalls. + +use crate::register_test; +use std::sync::{ + Arc, + atomic::{AtomicU32, Ordering}, +}; +use std::thread; +use std::time::{Duration, Instant}; + +const SYS_FUTEX_WAITV: libc::c_long = 449; +const SYS_FUTEX_WAKE: libc::c_long = 454; +const SYS_FUTEX_WAIT: libc::c_long = 455; +const SYS_FUTEX_REQUEUE: libc::c_long = 456; + +const FUTEX2_SIZE_U32: u32 = 0x02; +const FUTEX2_PRIVATE: u32 = 0x80; +const MATCH_ANY: u64 = 0xffff_ffff; + +/// Userspace mirror of the kernel's `struct futex_waitv`. +#[repr(C)] +#[derive(Clone, Copy)] +struct FutexWaitv { + val: u64, + uaddr: u64, + flags: u32, + reserved: u32, +} + +impl FutexWaitv { + fn new(addr: *const u32, val: u32, flags: u32) -> Self { + Self { + val: val as u64, + uaddr: addr as u64, + flags, + reserved: 0, + } + } +} + +fn errno() -> i32 { + std::io::Error::last_os_error().raw_os_error().unwrap() +} + +unsafe fn futex2_wait( + addr: *const u32, + val: u64, + mask: u64, + flags: u32, + timeout: *const libc::timespec, + clockid: i32, +) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_WAIT, addr, val, mask, flags, timeout, clockid) } +} + +unsafe fn futex2_wake(addr: *const u32, mask: u64, nr: i32, flags: u32) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_WAKE, addr, mask, nr, flags) } +} + +unsafe fn futex2_waitv( + waiters: *const FutexWaitv, + nr: u32, + flags: u32, + timeout: *const libc::timespec, + clockid: i32, +) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_WAITV, waiters, nr, flags, timeout, clockid) } +} + +unsafe fn futex2_requeue( + waiters: *const FutexWaitv, + flags: u32, + nr_wake: i32, + nr_requeue: i32, +) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_REQUEUE, waiters, flags, nr_wake, nr_requeue) } +} + +/// Absolute deadline `offset_ms` in the future on `clockid`. +fn abs_deadline(clockid: i32, offset_ms: i64) -> libc::timespec { + let mut ts: libc::timespec = unsafe { std::mem::zeroed() }; + unsafe { libc::clock_gettime(clockid, &mut ts) }; + + let nsec = ts.tv_nsec + (offset_ms % 1000) * 1_000_000; + ts.tv_sec += offset_ms / 1000 + nsec / 1_000_000_000; + ts.tv_nsec = nsec % 1_000_000_000; + ts +} + +fn test_futex2_basic() { + let futex_word: u32 = 0; + let addr = &futex_word as *const u32; + + unsafe { + // Wake with no waiters wakes nothing. + let ret = futex2_wake(addr, MATCH_ANY, 1, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("futex_wake with no waiters returned {ret}"); + } + + // Wait with a mismatched expected value fails immediately with EAGAIN. + let ret = futex2_wait( + addr, + 1, // actual value is 0 + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ); + if ret != -1 || errno() != libc::EAGAIN { + panic!("futex_wait value mismatch: ret {ret}, errno {}", errno()); + } + } +} + +register_test!(test_futex2_basic); + +fn test_futex2_wait_wake() { + let futex_word = Arc::new(AtomicU32::new(0)); + let futex_clone = futex_word.clone(); + + let t = thread::spawn(move || { + let addr = futex_clone.as_ptr() as *const u32; + let ret = unsafe { + futex2_wait( + addr, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("futex_wait returned {ret}, errno {}", errno()); + } + }); + + thread::sleep(Duration::from_millis(100)); + + let addr = futex_word.as_ptr() as *const u32; + unsafe { + let ret = futex2_wake(addr, MATCH_ANY, 1, FUTEX2_SIZE_U32); + if ret != 1 { + panic!("expected to wake 1 waiter, woke {ret}"); + } + } + + t.join().expect("waiter thread panicked"); +} + +register_test!(test_futex2_wait_wake); + +fn test_futex2_mask() { + let futex_word = Arc::new(AtomicU32::new(0)); + let futex_clone = futex_word.clone(); + + let t = thread::spawn(move || { + let addr = futex_clone.as_ptr() as *const u32; + let ret = unsafe { + futex2_wait( + addr, + 0, + 0x1, // waiter mask + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("masked futex_wait returned {ret}, errno {}", errno()); + } + }); + + thread::sleep(Duration::from_millis(100)); + + let addr = futex_word.as_ptr() as *const u32; + unsafe { + // Non-overlapping mask must not wake the waiter. + let ret = futex2_wake(addr, 0x2, 1, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("woke {ret} waiters despite non-overlapping mask"); + } + + // Overlapping mask wakes it. + let ret = futex2_wake(addr, 0x1, 1, FUTEX2_SIZE_U32); + if ret != 1 { + panic!("expected to wake 1 waiter with matching mask, woke {ret}"); + } + } + + t.join().expect("waiter thread panicked"); +} + +register_test!(test_futex2_mask); + +fn test_futex2_waitv() { + // Three futex words; the waiter sleeps on all of them and must report + // the index of the one that got woken. + let words = Arc::new([AtomicU32::new(0), AtomicU32::new(0), AtomicU32::new(0)]); + let words_clone = words.clone(); + + let flags = FUTEX2_SIZE_U32 | FUTEX2_PRIVATE; + + let t = thread::spawn(move || { + let waiters: Vec = words_clone + .iter() + .map(|w| FutexWaitv::new(w.as_ptr() as *const u32, 0, flags)) + .collect(); + + let ret = unsafe { + futex2_waitv( + waiters.as_ptr(), + waiters.len() as u32, + 0, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 1 { + panic!("futex_waitv returned {ret}, expected woken index 1"); + } + }); + + thread::sleep(Duration::from_millis(100)); + + unsafe { + let ret = futex2_wake(words[1].as_ptr() as *const u32, MATCH_ANY, 1, flags); + if ret != 1 { + panic!("expected to wake 1 waitv waiter, woke {ret}"); + } + } + + t.join().expect("waitv thread panicked"); +} + +register_test!(test_futex2_waitv); + +fn test_futex2_invalid_args() { + let word: u32 = 0; + let addr = &word as *const u32; + let valid = [FutexWaitv::new(addr, 0, FUTEX2_SIZE_U32)]; + + unsafe { + // waitv: one entry's expected value mismatches -> EAGAIN. + let pair = [AtomicU32::new(0), AtomicU32::new(7)]; + let waiters = [ + FutexWaitv::new(pair[0].as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(pair[1].as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + ]; + let ret = futex2_waitv(waiters.as_ptr(), 2, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EAGAIN { + panic!("waitv mismatch: ret {ret}, errno {}", errno()); + } + + // waitv: nr_futexes == 0 and > 128 are invalid. + for nr in [0u32, 129] { + let ret = futex2_waitv(valid.as_ptr(), nr, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv nr={nr}: ret {ret}, errno {}", errno()); + } + } + + // waitv: non-zero syscall-level flags are invalid. + let ret = futex2_waitv(valid.as_ptr(), 1, 1, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv flags=1: ret {ret}, errno {}", errno()); + } + + // waitv: reserved field must be zero. + let mut bad = valid; + bad[0].reserved = 1; + let ret = futex2_waitv(bad.as_ptr(), 1, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv reserved!=0: ret {ret}, errno {}", errno()); + } + + // waitv: entry flags must include FUTEX2_SIZE_U32. + let mut bad = valid; + bad[0].flags = 0; + let ret = futex2_waitv(bad.as_ptr(), 1, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv no-size flags: ret {ret}, errno {}", errno()); + } + + // waitv: NULL waiter array faults. + let ret = futex2_waitv(std::ptr::null(), 1, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EFAULT { + panic!("waitv NULL array: ret {ret}, errno {}", errno()); + } + + // wait: bad clockid is only rejected when a timeout is supplied. + let ts = abs_deadline(libc::CLOCK_MONOTONIC, 50); + let ret = futex2_wait(addr, 1, MATCH_ANY, FUTEX2_SIZE_U32, &ts, 7); + if ret != -1 || errno() != libc::EINVAL { + panic!("wait bad clockid: ret {ret}, errno {}", errno()); + } + + // wait: zero mask can never be woken. + let ret = futex2_wait( + addr, + 0, + 0, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ); + if ret != -1 || errno() != libc::EINVAL { + panic!("wait mask=0: ret {ret}, errno {}", errno()); + } + + // wait: futex word must be 4-byte aligned. + let unaligned = (addr as usize + 2) as *const u32; + let ret = futex2_wait( + unaligned, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ); + if ret != -1 || errno() != libc::EINVAL { + panic!("wait unaligned: ret {ret}, errno {}", errno()); + } + + // requeue: negative counts are invalid. + let pair = [valid[0], valid[0]]; + let ret = futex2_requeue(pair.as_ptr(), 0, -1, 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("requeue nr_wake=-1: ret {ret}, errno {}", errno()); + } + } +} + +register_test!(test_futex2_invalid_args); + +fn test_futex2_timeout() { + let word: u32 = 0; + let addr = &word as *const u32; + + for clockid in [libc::CLOCK_MONOTONIC, libc::CLOCK_REALTIME] { + let ts = abs_deadline(clockid, 50); + let start = Instant::now(); + + let ret = unsafe { futex2_wait(addr, 0, MATCH_ANY, FUTEX2_SIZE_U32, &ts, clockid) }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!( + "wait clockid={clockid}: ret {ret}, errno {}, expected ETIMEDOUT", + errno() + ); + } + if start.elapsed() < Duration::from_millis(50) { + panic!("wait on clockid={clockid} timed out early"); + } + } + + // A deadline that has already passed times out immediately (the zero + // point of both clocks is in the past). + let ts: libc::timespec = unsafe { std::mem::zeroed() }; + let ret = unsafe { + futex2_wait( + addr, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_MONOTONIC, + ) + }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!("wait past deadline: ret {ret}, errno {}", errno()); + } + + // ...but a value mismatch still takes precedence over the timeout. + let ret = unsafe { + futex2_wait( + addr, + 1, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_MONOTONIC, + ) + }; + if ret != -1 || errno() != libc::EAGAIN { + panic!("wait mismatch+past deadline: ret {ret}, errno {}", errno()); + } +} + +register_test!(test_futex2_timeout); + +fn test_futex2_requeue() { + const NR_THREADS: usize = 4; + + let f1 = Arc::new(AtomicU32::new(0)); + let f2 = Arc::new(AtomicU32::new(0)); + let woken = Arc::new(AtomicU32::new(0)); + + let threads: Vec<_> = (0..NR_THREADS) + .map(|_| { + let f1 = f1.clone(); + let woken = woken.clone(); + thread::spawn(move || { + let ret = unsafe { + futex2_wait( + f1.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("requeued futex_wait returned {ret}, errno {}", errno()); + } + woken.fetch_add(1, Ordering::SeqCst); + }) + }) + .collect(); + + // Let all threads park on f1. + thread::sleep(Duration::from_millis(150)); + + let pair = [ + FutexWaitv::new(f1.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(f2.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + ]; + + unsafe { + // Wake one waiter, move the rest to f2. + let ret = futex2_requeue(pair.as_ptr(), 0, 1, NR_THREADS as i32 - 1); + if ret != 1 { + panic!("futex_requeue woke {ret}, expected 1"); + } + } + + thread::sleep(Duration::from_millis(100)); + + let woken_now = woken.load(Ordering::SeqCst); + if woken_now != 1 { + panic!("{woken_now} threads ran after requeue, expected 1"); + } + + unsafe { + // The other three must now be waiting on f2, not f1. + let ret = futex2_wake(f1.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("woke {ret} waiters still on f1 after requeue"); + } + + let ret = futex2_wake(f2.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != NR_THREADS as i64 - 1 { + panic!("woke {ret} waiters on f2, expected {}", NR_THREADS - 1); + } + } + + for t in threads { + t.join().expect("waiter thread panicked"); + } + + if woken.load(Ordering::SeqCst) != NR_THREADS as u32 { + panic!("not all requeued threads completed"); + } +} + +register_test!(test_futex2_requeue); + +fn test_futex2_requeue_timeout_race() { + // Requeue waiters that are about to time out: their timeout-path + // unregistration must find them on the destination queue. + const NR_THREADS: usize = 4; + + let f1 = Arc::new(AtomicU32::new(0)); + let f2 = Arc::new(AtomicU32::new(0)); + + let threads: Vec<_> = (0..NR_THREADS) + .map(|_| { + let f1 = f1.clone(); + thread::spawn(move || { + let ts = abs_deadline(libc::CLOCK_MONOTONIC, 150); + let ret = unsafe { + futex2_wait( + f1.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_MONOTONIC, + ) + }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!("racing wait: ret {ret}, errno {}", errno()); + } + }) + }) + .collect(); + + thread::sleep(Duration::from_millis(50)); + + let pair = [ + FutexWaitv::new(f1.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(f2.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + ]; + + unsafe { + // Move everyone to f2 without waking anybody. + let ret = futex2_requeue(pair.as_ptr(), 0, 0, NR_THREADS as i32); + if ret != 0 { + panic!("requeue woke {ret}, expected 0"); + } + } + + // All threads now time out while parked on f2. + for t in threads { + t.join().expect("waiter thread panicked"); + } + + unsafe { + // Every waiter unregistered itself from f2 on timeout. + let ret = futex2_wake(f2.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("{ret} stale waiters left on f2"); + } + } +} + +register_test!(test_futex2_requeue_timeout_race); diff --git a/usertest/src/main.rs b/usertest/src/main.rs index cad9d145..e9fb6fe7 100644 --- a/usertest/src/main.rs +++ b/usertest/src/main.rs @@ -8,6 +8,7 @@ use std::{ mod epoll; mod fs; mod futex; +mod futex2; mod inotify; mod signalfd; mod signals; From cce47fdf42e50ecf107b750294d4f1d6514cc47d Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 10 Jun 2026 14:41:44 +0530 Subject: [PATCH 06/14] add doc for futex2 changes --- etc/futex2.md | 108 ++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 108 insertions(+) create mode 100644 etc/futex2.md diff --git a/etc/futex2.md b/etc/futex2.md new file mode 100644 index 00000000..1eceef67 --- /dev/null +++ b/etc/futex2.md @@ -0,0 +1,108 @@ +# futex2 in moss + +## What futex2 is + +A futex ("fast userspace mutex") is the kernel primitive userspace locking is +built on. A lock is just a 32-bit word in user memory; the fast path (taking an +uncontended lock) never enters the kernel. Only on contention does a thread ask +the kernel to put it to sleep until another thread wakes it. + +futex2 is the modern generation of this interface, added to Linux in stages +from 5.16 onward. Instead of one multiplexed `futex` syscall with an `op` +argument, it provides separate syscalls: + +- **futex_wait** — sleep on one futex word, as long as it still holds an + expected value. Returns once woken. +- **futex_wake** — wake up to a given number of threads sleeping on a word. +- **futex_waitv** — sleep on *several* futex words at once (up to 128) and + return the index of whichever one got woken. This is the headline feature; + it was driven by Wine/Proton, which needs Windows' "wait for any of these + objects" semantics to run games efficiently. +- **futex_requeue** — wake some waiters of one word and silently move the rest + onto a different word, without waking them. Used by condition-variable + implementations to avoid thundering herds: one broadcast moves everyone onto + the mutex's wait queue instead of waking them all to fight over the lock. + +All waits take a wake *mask*: a waiter is only woken by a wake whose mask +shares at least one bit with its own. Timeouts are *absolute* deadlines against +a chosen clock (monotonic or realtime), not relative durations. + +moss implements all four on aarch64 (syscall numbers 449, 454, 455, 456), with +Linux-compatible argument validation and error codes. + +## How futex2 differs from legacy futex + +- Separate syscalls instead of one `op`-multiplexed entry point. +- Sized futexes: each call carries a flag stating the width of the futex word. + Like Linux today, moss accepts only the 32-bit size. +- Wake masks are first-class on every wait/wake, not a special bitset op + bolted on (legacy `FUTEX_WAIT_BITSET`/`FUTEX_WAKE_BITSET`). +- Timeouts are always absolute, with an explicit clock choice per call. +- Multi-wait (`futex_waitv`) exists only in futex2. +- futex2's requeue performs no value comparison, unlike legacy + `FUTEX_CMP_REQUEUE`; it trusts userspace and simply moves waiters. +- Stricter validation: unaligned futex addresses, unknown flags, reserved + fields and oversized values are rejected outright. + +The legacy `futex` syscall remains fully supported; nothing changed in its +userspace-visible behaviour. + +## How moss implements it + +Both generations share one wait/wake core; the syscalls differ only in how +they decode arguments. + +Every futex word is identified by a key — for process-private futexes the +process id plus the virtual address, for shared ones the physical frame plus +offset, so different mappings of the same memory still meet on the same futex. +A global table maps each key to its wait queue. + +Each sleeping thread owns a small shared record (its "waiter cell") holding +its wake mask, whether it has been woken, and *which queue currently holds +it*. That last part is what makes requeueing sound: requeue moves entries +between queues while their owners sleep, and a waiter that later needs to +cancel (timeout, signal) consults its own cell to find where it currently +lives rather than assuming it is still where it went to sleep. Because lock +ordering forbids going from a cell to its queue directly, cancellation +snapshots the location, locks the queue, then re-checks the location and +retries if a concurrent requeue moved it in the meantime. + +Waiting — single or multi — is one routine: for each requested futex it locks +the queue, re-reads the user word, and only enqueues if the value still +matches, so a wake cannot slip between the check and the sleep. If any value +mismatches, everything already enqueued is unwound; if one of those waiters +was woken during the unwind, that wake is honoured rather than lost. Legacy +wait and futex2's wait are this routine with a list of one; waitv passes the +whole list and reports the index that fired. Timeouts race the wait against a +timer, with a final "did a wake land anyway" check before reporting a timeout. + +Waking finds the queue, removes up to the requested number of waiters whose +masks overlap, marks their cells woken, and only then — after dropping the +queue lock — actually wakes the threads. Requeueing locks both queues (in a +fixed order, so two concurrent requeues cannot deadlock), wakes from the +first, and transplants the remainder onto the second, updating each moved +cell's location. + +## Remaining issues + +- **Realtime deadlines do not track clock changes.** An absolute + `CLOCK_REALTIME` deadline is converted into a relative sleep when the + syscall starts. If the wall clock is stepped while a thread waits, the + deadline does not move with it, where Linux would honour the new clock. + Fixing this needs realtime-aware timers in the timer subsystem. The legacy + bitset-wait path has always had the same simplification. +- **A wake can be lost when a signal interrupts a waiter.** If a wake and a + signal arrive at almost the same moment, the wake may be consumed (the waker + is told it woke someone) while the waiter returns "interrupted" instead of + "woken". Linux closes this window with syscall-restart machinery, which moss + does not yet have. Inherited from the legacy implementation. The equivalent + wake-versus-timeout race *is* handled. +- **The futex table never shrinks.** A queue entry is created for every futex + address ever waited on and is kept after the queue drains. Long-running + systems touching many distinct addresses leak table entries slowly. + Pre-existing behaviour; pruning empty queues on the last waiter's exit is + the obvious fix. +- **Host unit tests skip the sync primitives by default.** The unit-test + recipe builds the kernel library without optional features, so the tests + for the synchronisation primitives (including the waker-set operations this + work added) only run when the full feature set is enabled explicitly. From 402391d604b12dc2d7b3439d47ad44214f43fbfb Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 10 Jun 2026 14:42:40 +0530 Subject: [PATCH 07/14] change readme supported syscall number --- README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 5b24398e..ef8c04e1 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ model within the kernel context: * Full task management including both UP and SMP scheduling via EEVDF and task migration via IPIs. * Capable of running dynamically linked ELF binaries from Arch Linux. -* Currently implements [105 Linux syscalls](./etc/syscalls_linux_aarch64.md) +* Currently implements [109 Linux syscalls](./etc/syscalls_linux_aarch64.md) * `fork()`, `execve()`, `clone()`, and full process lifecycle management. * Job control support (process groups, waitpid, background tasks). * Signal delivery, masking, and propagation (SIGTERM, SIGSTOP, SIGCONT, SIGCHLD, @@ -172,7 +172,7 @@ moss is under active development. Current focus areas include: * Networking Stack: TCP/IP implementation. * A fully read/write capable filesystem driver. -* Expanding coverage beyond the current 105 calls. +* Expanding coverage beyond the current 109 calls. * systemd bringup. ## Non-Goals (for now) From 6055fc53472884a4b78ce748c3a170a13b2e5eb4 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 1 Jul 2026 01:06:53 +0530 Subject: [PATCH 08/14] tests: additional tests after looking at linux src --- usertest/src/futex2.rs | 150 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 150 insertions(+) diff --git a/usertest/src/futex2.rs b/usertest/src/futex2.rs index eb80b910..ef889515 100644 --- a/usertest/src/futex2.rs +++ b/usertest/src/futex2.rs @@ -467,6 +467,156 @@ fn test_futex2_requeue() { register_test!(test_futex2_requeue); +fn test_futex2_wake_n_of_m() { + // Six waiters park on one futex; waking 2 must wake exactly 2 (the two + // oldest, FIFO), leaving 4. A final wake-all drains the rest. + const NR_THREADS: usize = 6; + + let f = Arc::new(AtomicU32::new(0)); + let woken = Arc::new(AtomicU32::new(0)); + + let threads: Vec<_> = (0..NR_THREADS) + .map(|_| { + let f = f.clone(); + let woken = woken.clone(); + thread::spawn(move || { + let ret = unsafe { + futex2_wait( + f.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("futex_wait returned {ret}, errno {}", errno()); + } + woken.fetch_add(1, Ordering::SeqCst); + }) + }) + .collect(); + + thread::sleep(Duration::from_millis(150)); + + unsafe { + let ret = futex2_wake(f.as_ptr() as *const u32, MATCH_ANY, 2, FUTEX2_SIZE_U32); + if ret != 2 { + panic!("wake nr=2 woke {ret}, expected 2"); + } + } + + thread::sleep(Duration::from_millis(100)); + let woken_now = woken.load(Ordering::SeqCst); + if woken_now != 2 { + panic!("{woken_now} threads ran after wake nr=2, expected 2"); + } + + unsafe { + let ret = futex2_wake(f.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != NR_THREADS as i64 - 2 { + panic!("wake-all woke {ret}, expected {}", NR_THREADS - 2); + } + } + + for t in threads { + t.join().expect("waiter thread panicked"); + } +} + +register_test!(test_futex2_wake_n_of_m); + +fn test_futex2_wake_nr_zero() { + // Waking 0 waiters is a no-op that returns 0, never EFAULT, even when the + // address has no queue. Guards against computing the key (and faulting on + // a shared translation) before the nr<=0 short-circuit. + let word: u32 = 0; + let addr = &word as *const u32; + + unsafe { + let ret = futex2_wake(addr, MATCH_ANY, 0, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("wake nr=0 returned {ret}, errno {}", errno()); + } + + // Negative count is likewise a 0-wake no-op (not EINVAL). + let ret = futex2_wake(addr, MATCH_ANY, -1, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("wake nr=-1 returned {ret}, errno {}", errno()); + } + } +} + +register_test!(test_futex2_wake_nr_zero); + +fn test_futex2_wake_no_waiters() { + // Waking an address nobody has ever waited on returns 0. + let word: u32 = 0; + let addr = &word as *const u32; + + unsafe { + let ret = futex2_wake(addr, MATCH_ANY, 64, FUTEX2_SIZE_U32 | FUTEX2_PRIVATE); + if ret != 0 { + panic!("wake of un-waited address woke {ret}, expected 0"); + } + } +} + +register_test!(test_futex2_wake_no_waiters); + +fn test_futex2_requeue_to_self() { + // Requeueing a futex onto itself (uaddr1 == uaddr2) is rejected with + // EINVAL on Linux. moss currently short-circuits this to a plain wake and + // returns success, silently dropping nr_requeue -- this test encodes the + // Linux contract and is expected to fail until that is fixed. + let f = Arc::new(AtomicU32::new(0)); + let woken = Arc::new(AtomicU32::new(0)); + + let f1 = f.clone(); + let woken1 = woken.clone(); + let t = thread::spawn(move || { + let ret = unsafe { + futex2_wait( + f1.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("futex_wait returned {ret}, errno {}", errno()); + } + woken1.fetch_add(1, Ordering::SeqCst); + }); + + thread::sleep(Duration::from_millis(100)); + + let same = f.as_ptr() as *const u32; + let pair = [ + FutexWaitv::new(same, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(same, 0, FUTEX2_SIZE_U32), + ]; + + unsafe { + let ret = futex2_requeue(pair.as_ptr(), 0, 1, 1); + if ret != -1 || errno() != libc::EINVAL { + panic!("requeue-to-self: ret {ret}, errno {}, expected EINVAL", errno()); + } + } + + // The waiter is still parked (requeue was rejected); wake it so the + // thread can exit cleanly. + unsafe { + futex2_wake(same, MATCH_ANY, 1, FUTEX2_SIZE_U32); + } + t.join().expect("waiter thread panicked"); +} + +register_test!(test_futex2_requeue_to_self); + fn test_futex2_requeue_timeout_race() { // Requeue waiters that are about to time out: their timeout-path // unregistration must find them on the destination queue. From 992aff3425085d4bcf1498ee26891adc12343af7 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 1 Jul 2026 01:17:11 +0530 Subject: [PATCH 09/14] fixes from code review --- etc/futex2_review.md | 81 +++++++++++++++++++++++++++ src/process/exit.rs | 2 +- src/process/threading/futex/futex2.rs | 47 ++++++++-------- src/process/threading/futex/key.rs | 2 +- src/process/threading/futex/mod.rs | 70 ++++++++++------------- src/process/threading/futex/wait.rs | 61 ++++++++++++-------- src/process/threading/futex/waiter.rs | 5 +- 7 files changed, 178 insertions(+), 90 deletions(-) create mode 100644 etc/futex2_review.md diff --git a/etc/futex2_review.md b/etc/futex2_review.md new file mode 100644 index 00000000..64d345b7 --- /dev/null +++ b/etc/futex2_review.md @@ -0,0 +1,81 @@ +# futex2 review findings + +Review of branch `futex2` vs `master`. High-effort multi-agent pass (5 finders, verified). + +Ranked most-severe first. Confirmed = traced in code; Plausible = real & reachable, lower confidence or known/benign. + +## Correctness — confirmed + +### 1. Lost wake on signal interrupt *(top bug)* +`src/process/threading/futex/wait.rs:64` + +On signal, the wait future is dropped and `WaitGuard::drop` calls `cell.unregister()` but discards its bool. If `futex_wake` already consumed the wake (counted it via `mark_woken`), the waiter returns `EINTR` and the wake is lost — userspace condvar/mutex can hang. The timeout branch recovers this race via `finish().ok_or(TimedOut)`; the interrupt path has no equivalent. Affects both futex1 and futex2 (shared core). + +**Fix:** on interrupt, inspect `unregister()` return — if a wake was consumed, return success (mirror timeout `finish()` recovery) or re-wake another waiter. + +*Acknowledged in etc/futex2.md but still live.* + +### 2. requeue-to-self silently drops nr_requeue +`src/process/threading/futex/mod.rs:84` + +`requeue_key` shortcuts `key1==key2` → `wake_key(nr_wake, key1, u64::MAX)`, ignoring `nr_requeue`. Linux returns `EINVAL` for requeue onto the same address. Shared futexes: two VAs mapping the same frame collapse to one key and hit this unexpectedly. + +### 3. Zero bitset not rejected on legacy path +`src/process/threading/futex/mod.rs:204` + +`FUTEX_WAIT_BITSET` / `FUTEX_WAKE_BITSET` forward `val3` as the mask with no zero check. `mask==0` → `cell.mask & mask` always 0 → waiter permanently unwakeable. Linux returns `EINVAL`. The futex2 paths already guard `mask==0`; the legacy path doesn't. Inconsistent. + +### 4. wake computes key before nr<=0 check +`src/process/threading/futex/futex2.rs:166` + +`make_key` (translates the address, can fault for a shared futex) runs before the `nr<=0` early return. `futex_wake(nr=0)` on an unmapped shared address returns `EFAULT` instead of `Ok(0)`. Move the `nr` check above `make_key`. + +## Correctness — plausible + +### 5. Duplicate uaddr in waitv inflates wake count +`src/process/threading/futex/futex2.rs:200` + +Same `uaddr` at two `futex_waitv` indices → two cells on one queue for one task. `futex_wake(nr>=2)` matches both, returns `woke=2` though only one logical waiter exists. No panic, returned index valid, but count is wrong. No dedup guard. (requeue has the analogous issue via `take_first`.) + +### 6. abs_timeout realtime semantics +`src/process/threading/futex/futex2.rs:438` + +Absolute `CLOCK_REALTIME` deadline converted to a relative sleep once at syscall entry. If `set_date()` was never called, `date()` falls back to `uptime()` → deadline saturates to an effectively infinite timeout. Also a concurrent `clock_settime` doesn't retarget an in-progress wait. Documented limitation in etc/futex2.md, but user-visible. + +## Quality + +### 7. do_futex_wait duplicates futex2 wait core +`src/process/threading/futex/mod.rs:136` + +Both `do_futex_wait` and `sys_futex_wait` build a single `ParsedWaiter`, call `futex_wait_multi(slice::from_ref(&waiter), timeout)`, and `.map(|_| 0)`. Extract one shared single-waiter helper — also lets the #1 fix land in one place instead of two. + +### 8. Box::pin(sleep) heap-allocs the hot path +`src/process/threading/futex/wait.rs:147` + +The timeout branch does `Box::pin(sleep(dur))` + a fresh `select_biased` on every timed wait — a heap alloc/free on every contended timed lock acquire. Pin on the stack with `core::pin::pin!` / `pin_mut!`. + +### 9. Sentinel FutexKey in requeue +`src/process/threading/futex/futex2.rs:244` + +Seeds the keys array with a dummy `FutexKey::Private { pid: 0, addr: 0 }` and overwrites it in the loop. If a future refactor leaves an entry unwritten, the bogus key leaks into `requeue_key`. Collect the two keys directly (map entries into the array) so no sentinel exists. + +### 10. u64 mask is unused width +`src/process/threading/futex/mod.rs:49` + +Wake mask widened to `u64` everywhere (`WaiterCell.mask`, `ParsedWaiter.mask`, `wake_key`, `requeue_key`) but no path carries >32 bits — `check_flags` rejects masks > `u32::MAX` and every caller casts a u32. Unused width + casts at every call site. Should be `u32` (matches the only supported `FUTEX2_SIZE_U32`). + +--- + +## Linux reference + +futex2 = the `futex_waitv` syscall. Real kernel source: + +- `kernel/futex/` — https://github.com/torvalds/linux/tree/master/kernel/futex + - `core.c` — hash buckets, queue, wait/wake core + - `waitwake.c` — `futex_wait`, `futex_wake`, **`futex_wait_multiple`** (the multi-wait core; our `futex_wait_multi` analog) + - `syscalls.c` — `sys_futex`, **`sys_futex_waitv`**, `futex_parse_waitv` + - `requeue.c`, `pi.c` — requeue + priority inheritance (out of scope here) + - `futex.h` — `futex_q`, `futex_hash_bucket` +- UAPI: `include/uapi/linux/futex.h` — flags, `struct futex_waitv` +- Docs: https://docs.kernel.org/userspace-api/futex2.html +- man: `man 2 futex_waitv`, `man 2 futex` diff --git a/src/process/exit.rs b/src/process/exit.rs index 5ffb135d..c1da55a8 100644 --- a/src/process/exit.rs +++ b/src/process/exit.rs @@ -125,7 +125,7 @@ pub async fn sys_exit(ctx: &mut ProcessCtx, exit_code: usize) -> Result { copy_to_user(ptr, 0u32).await?; if let Ok(key) = FutexKey::new_shared(ctx, ptr) { - futex::wake_key(1, key, u32::MAX as u64); + futex::wake_key(1, key, u32::MAX); } else { warn!("Failed to get futex wake key on sys_exit"); } diff --git a/src/process/threading/futex/futex2.rs b/src/process/threading/futex/futex2.rs index ac910ad6..d76a2874 100644 --- a/src/process/threading/futex/futex2.rs +++ b/src/process/threading/futex/futex2.rs @@ -13,12 +13,11 @@ use libkernel::memory::address::TUA; use super::key::FutexKey; use super::wait::{ParsedWaiter, futex_wait_multi}; -use super::{requeue_key, wake_key}; +use super::{futex_wait_single, requeue_key, wake_key}; use crate::clock::realtime::date; use crate::clock::timespec::TimeSpec; use crate::drivers::timer::uptime; use crate::memory::uaccess::{UserCopyable, copy_obj_array_from_user}; -use crate::process::thread_group::signal::{InterruptResult, Interruptable}; use crate::sched::syscall_ctx::ProcessCtx; const FUTEX2_SIZE_U32: u32 = 0x02; @@ -136,16 +135,10 @@ pub async fn sys_futex_wait( key, uaddr, val: val as u32, - mask, + mask: mask as u32, }; - match futex_wait_multi(core::slice::from_ref(&waiter), timeout) - .interruptable() - .await - { - InterruptResult::Interrupted => Err(KernelError::Interrupted), - InterruptResult::Uninterrupted(res) => res.map(|_| 0), - } + futex_wait_single(waiter, timeout).await } /// `futex_wake(uaddr, mask, nr, flags)`: wake up to `nr` waiters whose masks @@ -163,13 +156,15 @@ pub fn sys_futex_wake( return Err(KernelError::InvalidValue); } - let (key, _) = make_key(ctx, uaddr, private)?; - + // Waking zero (or fewer) waiters is a no-op; short-circuit before + // computing the key, which can fault when translating a shared address. if nr <= 0 { return Ok(0); } - Ok(wake_key(nr as usize, key, mask)) + let (key, _) = make_key(ctx, uaddr, private)?; + + Ok(wake_key(nr as usize, key, mask as u32)) } /// `futex_waitv(waiters, nr_futexes, flags, timeout, clockid)`: wait on up to @@ -209,14 +204,11 @@ pub async fn sys_futex_waitv( key, uaddr, val: entry.val as u32, - mask: FUTEX_BITSET_MATCH_ANY, + mask: FUTEX_BITSET_MATCH_ANY as u32, }); } - match futex_wait_multi(&waiters, timeout).interruptable().await { - InterruptResult::Interrupted => Err(KernelError::Interrupted), - InterruptResult::Uninterrupted(res) => res, - } + futex_wait_multi(&waiters, timeout).await } /// `futex_requeue(waiters, flags, nr_wake, nr_requeue)`: wake up to `nr_wake` @@ -241,20 +233,29 @@ pub async fn sys_futex_requeue( let entries = copy_obj_array_from_user(uwaiters, 2).await?; - let mut keys = [FutexKey::Private { pid: 0, addr: 0 }; 2]; - for (key, entry) in keys.iter_mut().zip(&entries) { + let resolve = |entry: &FutexWaitvUser| -> Result { if entry.reserved != 0 { return Err(KernelError::InvalidValue); } // `val` is unused by this op and deliberately not validated. let private = check_flags(entry.flags)?; - (*key, _) = make_key(ctx, entry.uaddr, private)?; + let (key, _) = make_key(ctx, entry.uaddr, private)?; + Ok(key) + }; + + let key1 = resolve(&entries[0])?; + let key2 = resolve(&entries[1])?; + + // Requeueing a futex onto itself is invalid; this also catches distinct + // virtual addresses that alias the same shared frame. + if key1 == key2 { + return Err(KernelError::InvalidValue); } Ok(requeue_key( - keys[0], - keys[1], + key1, + key2, nr_wake as usize, nr_requeue as usize, )) diff --git a/src/process/threading/futex/key.rs b/src/process/threading/futex/key.rs index f64abd30..6e9bb7bd 100644 --- a/src/process/threading/futex/key.rs +++ b/src/process/threading/futex/key.rs @@ -3,7 +3,7 @@ use libkernel::error::{KernelError, Result}; use libkernel::memory::address::{TUA, VA}; use libkernel::memory::proc_vm::address_space::UserAddressSpace; -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, Debug)] pub enum FutexKey { Private { pid: u32, addr: usize }, Shared { frame: usize, offset: usize }, diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs index 751ffc66..025b5493 100644 --- a/src/process/threading/futex/mod.rs +++ b/src/process/threading/futex/mod.rs @@ -1,6 +1,5 @@ use crate::clock::realtime::date; use crate::clock::timespec::TimeSpec; -use crate::process::thread_group::signal::{InterruptResult, Interruptable}; use crate::sched::syscall_ctx::ProcessCtx; use crate::sync::{OnceLock, SpinLock}; use alloc::vec::Vec; @@ -46,7 +45,7 @@ fn get_or_create_queue(key: FutexKey) -> FutexQueue { .clone() } -pub fn wake_key(nr_wake: usize, key: FutexKey, mask: u64) -> usize { +pub fn wake_key(nr_wake: usize, key: FutexKey, mask: u32) -> usize { let mut wakers = Vec::new(); let table = futex_table(); @@ -81,10 +80,9 @@ pub fn wake_key(nr_wake: usize, key: FutexKey, mask: u64) -> usize { /// Wake masks are ignored, matching Linux requeue semantics. Returns the /// number of waiters woken. pub fn requeue_key(key1: FutexKey, key2: FutexKey, nr_wake: usize, nr_requeue: usize) -> usize { - if key1 == key2 { - // Requeueing onto the same queue is a no-op; just wake. - return wake_key(nr_wake, key1, u64::MAX); - } + // Callers must reject `key1 == key2` (requeue-to-self) before this point; + // the two-queue locking below assumes distinct queues. + debug_assert_ne!(key1, key2); let q1_arc = get_or_create_queue(key1); let q2_arc = get_or_create_queue(key2); @@ -133,20 +131,10 @@ pub fn requeue_key(key1: FutexKey, key2: FutexKey, nr_wake: usize, nr_requeue: u woke } -async fn do_futex_wait( - key: FutexKey, - uaddr: TUA, - val: u32, - bitmask: u32, - timeout: Option, -) -> Result { - let waiter = ParsedWaiter { - key, - uaddr, - val, - mask: bitmask as u64, - }; - +/// Waits on a single futex word, the common case shared by the legacy +/// `FUTEX_WAIT` ops and futex2 `sys_futex_wait`. Interruption (and recovery of +/// a wake that raced a signal) is handled inside [`futex_wait_multi`]. +pub(super) async fn futex_wait_single(waiter: ParsedWaiter, timeout: Option) -> Result { // Return 0 on success. futex_wait_multi(core::slice::from_ref(&waiter), timeout) .await @@ -186,30 +174,34 @@ pub async fn sys_futex( } }; - match do_futex_wait( + let bitmask = if cmd == FUTEX_WAIT { u32::MAX } else { val3 }; + + // A zero bitset can never be matched by any wake, so the waiter + // would be unwakeable; reject it as Linux does. + if matches!(cmd, FUTEX_WAIT_BITSET) && bitmask == 0 { + return Err(KernelError::InvalidValue); + } + + let waiter = ParsedWaiter { key, uaddr, val, - if cmd == FUTEX_WAIT { u32::MAX } else { val3 }, - timeout, - ) - .interruptable() - .await - { - InterruptResult::Interrupted => Err(KernelError::Interrupted), - InterruptResult::Uninterrupted(v) => v, - } + mask: bitmask, + }; + + futex_wait_single(waiter, timeout).await } - FUTEX_WAKE | FUTEX_WAKE_BITSET => Ok(wake_key( - val as _, - key, - if cmd == FUTEX_WAKE { - u32::MAX as u64 - } else { - val3 as u64 - }, - )), + FUTEX_WAKE | FUTEX_WAKE_BITSET => { + let mask = if cmd == FUTEX_WAKE { u32::MAX } else { val3 }; + + // A zero bitset matches no waiter; reject it as Linux does. + if matches!(cmd, FUTEX_WAKE_BITSET) && mask == 0 { + return Err(KernelError::InvalidValue); + } + + Ok(wake_key(val as _, key, mask)) + } _ => Err(KernelError::NotSupported), } diff --git a/src/process/threading/futex/wait.rs b/src/process/threading/futex/wait.rs index a6a9dcd5..9e50cfbb 100644 --- a/src/process/threading/futex/wait.rs +++ b/src/process/threading/futex/wait.rs @@ -15,6 +15,7 @@ use super::key::FutexKey; use super::waiter::WaiterCell; use crate::drivers::timer::sleep; use crate::memory::uaccess::{copy_from_user, try_copy_from_user}; +use crate::process::thread_group::signal::{InterruptResult, Interruptable}; /// One decoded wait request: wait on `key` while `*uaddr == val`, wakeable by /// any wake whose mask overlaps `mask`. @@ -22,7 +23,7 @@ pub struct ParsedWaiter { pub key: FutexKey, pub uaddr: TUA, pub val: u32, - pub mask: u64, + pub mask: u32, } /// Owns the queue registrations of an in-progress multi-wait, so that @@ -138,33 +139,45 @@ pub async fn futex_wait_multi( Setup::Queued => {} } - return match timeout { - None => { - let woken = poll_fn(|_| guard.poll_woken()).await; - guard.finish(); - Ok(woken) - } + // Wait for a wake or the timer. Interruption is handled here, while + // `guard` is still alive, so a wake consumed concurrently with a + // signal is recovered via `finish()` rather than lost to EINTR. + let woken = match timeout { + None => poll_fn(|_| guard.poll_woken()).interruptable().await, Some(dur) => { - let mut wait = poll_fn(|_| guard.poll_woken()).fuse(); - let mut sleep_fut = Box::pin(sleep(dur).fuse()); - - let woken = futures::select_biased! { - idx = wait => Some(idx), - _ = sleep_fut => None, + let wait = poll_fn(|_| guard.poll_woken()); + let sleep_fut = Box::pin(sleep(dur).fuse()); + + // Map the timer firing to a sentinel so the outer match can + // distinguish it from a real wake. + let timed = async move { + let mut wait = wait.fuse(); + let mut sleep_fut = sleep_fut; + futures::select_biased! { + idx = wait => idx, + _ = sleep_fut => usize::MAX, + } }; - drop(wait); + timed.interruptable().await + } + }; - match woken { - Some(idx) => { - guard.finish(); - Ok(idx) - } - // A wake may have landed between the timer firing and us - // unregistering; finish() reports it so the wake isn't - // lost. - None => guard.finish().ok_or(KernelError::TimedOut), - } + return match woken { + // A real wake landed and was observed. + InterruptResult::Uninterrupted(idx) if idx != usize::MAX => { + guard.finish(); + Ok(idx) + } + // Timer fired, or a signal arrived: a wake may have landed in the + // race window, so `finish()` reports it and we return success in + // that case rather than losing the wake. Otherwise it is the + // genuine timeout / interrupt result. + InterruptResult::Uninterrupted(_) => { + guard.finish().ok_or(KernelError::TimedOut) + } + InterruptResult::Interrupted => { + guard.finish().ok_or(KernelError::Interrupted) } }; } diff --git a/src/process/threading/futex/waiter.rs b/src/process/threading/futex/waiter.rs index fb2dcb00..36cb2cef 100644 --- a/src/process/threading/futex/waiter.rs +++ b/src/process/threading/futex/waiter.rs @@ -27,7 +27,8 @@ pub type FutexQueue = Arc>>>; /// alias a new registration. pub struct WaiterCell { /// Wake mask; a wake with mask `m` wakes this waiter iff `mask & m != 0`. - pub mask: u64, + /// Only 32-bit futexes exist, so a 32-bit mask is sufficient. + pub mask: u32, state: SpinLock, } @@ -41,7 +42,7 @@ struct CellState { } impl WaiterCell { - pub fn new(mask: u64) -> Arc { + pub fn new(mask: u32) -> Arc { Arc::new(Self { mask, state: SpinLock::new(CellState { From 92a48ab5b36c93704731d5bc962c15300beb39d0 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 1 Jul 2026 01:33:56 +0530 Subject: [PATCH 10/14] fix a bug with the calculation of time --- src/clock/mod.rs | 85 +++++++++++++++++++++++++++ src/clock/realtime.rs | 54 ++++++++++++++++- src/process/threading/futex/futex2.rs | 24 ++++---- src/process/threading/futex/mod.rs | 16 ++--- src/process/threading/futex/wait.rs | 21 +++---- usertest/src/futex2.rs | 54 +++++++++++++++++ 6 files changed, 220 insertions(+), 34 deletions(-) diff --git a/src/clock/mod.rs b/src/clock/mod.rs index 1d989da9..0a3ebdc1 100644 --- a/src/clock/mod.rs +++ b/src/clock/mod.rs @@ -3,6 +3,91 @@ pub mod syscalls; pub mod timer; pub mod timespec; +use core::time::Duration; + +use futures::FutureExt; + +use crate::drivers::timer::{sleep, uptime}; +use realtime::{clock_set_generation, clock_was_set_since, date}; + +/// An absolute deadline expressed against a particular clock. +/// +/// Keeping the clock alongside the deadline (rather than pre-flattening to a +/// relative duration) lets [`Deadline::sleep`] re-evaluate against the live +/// clock, so a `CLOCK_REALTIME` deadline still fires at the right wall-clock +/// instant even if the clock is stepped (e.g. by `clock_settime`) while a +/// wait is in progress. +#[derive(Clone, Copy)] +pub enum Deadline { + /// Absolute instant on the monotonic clock (`CLOCK_MONOTONIC`). + Monotonic(Duration), + /// Absolute instant on the realtime clock (`CLOCK_REALTIME`). + Realtime(Duration), +} + +impl Deadline { + /// The clock's current reading. + fn clock_now(self) -> Duration { + match self { + Deadline::Monotonic(_) => uptime(), + Deadline::Realtime(_) => date(), + } + } + + /// The absolute deadline value. + fn target(self) -> Duration { + match self { + Deadline::Monotonic(d) | Deadline::Realtime(d) => d, + } + } + + /// Returns `true` if the deadline has already passed on its clock. + pub fn has_passed(self) -> bool { + self.clock_now() >= self.target() + } + + /// Sleeps until this deadline. + /// + /// The monotonic clock advances uniformly, so a single relative sleep is + /// exact. The realtime clock can be stepped by `clock_settime`, so a + /// realtime wait races the timer against a clock-was-set notification: on + /// either it re-evaluates the deadline against the live clock and re-arms + /// if the target has not yet been reached. This retargets an in-progress + /// wait in both directions across a step. + pub async fn sleep(self) { + loop { + let now = self.clock_now(); + let target = self.target(); + + if now >= target { + return; + } + + let remaining = target - now; + + match self { + // The monotonic clock never steps, so one relative sleep is + // exact. + Deadline::Monotonic(_) => { + sleep(remaining).await; + return; + } + // A realtime step (in either direction) wakes the notifier; + // loop to re-evaluate against the new wall time. + Deadline::Realtime(_) => { + let generation = clock_set_generation(); + let mut timer = core::pin::pin!(sleep(remaining).fuse()); + let mut was_set = core::pin::pin!(clock_was_set_since(generation).fuse()); + futures::select_biased! { + _ = timer => {} + _ = was_set => {} + } + } + } + } + } +} + pub enum ClockId { Realtime = 0, Monotonic = 1, diff --git a/src/clock/realtime.rs b/src/clock/realtime.rs index d30ac6c6..f314c897 100644 --- a/src/clock/realtime.rs +++ b/src/clock/realtime.rs @@ -1,8 +1,11 @@ use crate::{ drivers::timer::{Instant, now, uptime}, - sync::SpinLock, + sync::{OnceLock, SpinLock}, }; +use core::future::poll_fn; +use core::task::Poll; use core::time::Duration; +use libkernel::sync::waker_set::WakerSet; // Return a duration from the epoch. pub fn date() -> Duration { @@ -23,11 +26,60 @@ pub fn set_date(duration: Duration) { let mut epoch_info = EPOCH_DURATION.lock_save_irq(); *epoch_info = Some((duration, now)); } + + // The realtime clock was stepped; wake anyone sleeping against an absolute + // realtime deadline so they can re-evaluate (and re-arm) against the new + // wall time. + let mut waiters = clock_set_waiters().lock_save_irq(); + *CLOCK_SET_GEN.lock_save_irq() += 1; + waiters.wake_all(); } // Represents a known duration since the epoch at the associated instant. static EPOCH_DURATION: SpinLock> = SpinLock::new(None); +/// Tasks waiting to be notified when the realtime clock is stepped. +static CLOCK_SET_WAITERS: OnceLock> = OnceLock::new(); + +fn clock_set_waiters() -> &'static SpinLock { + CLOCK_SET_WAITERS.get_or_init(|| SpinLock::new(WakerSet::new())) +} + +/// Bumped on every realtime clock step, so a waiter can detect a step that +/// happened between checking the clock and parking (closing the lost-wakeup +/// race). +static CLOCK_SET_GEN: SpinLock = SpinLock::new(0); + +/// The current clock-set generation. Sample this before reading the clock, and +/// pass it to [`clock_was_set_since`] to wait for the next step. +pub fn clock_set_generation() -> u64 { + *CLOCK_SET_GEN.lock_save_irq() +} + +/// Resolves once the realtime clock is stepped after `generation` was sampled. +/// If a step already happened since `generation`, returns immediately. +pub async fn clock_was_set_since(generation: u64) { + let mut registered = false; + + poll_fn(|cx| { + // Register before re-checking the generation so a step that races our + // poll cannot be missed. + let mut waiters = clock_set_waiters().lock_save_irq(); + + if *CLOCK_SET_GEN.lock_save_irq() != generation { + return Poll::Ready(()); + } + + if !registered { + waiters.register(cx.waker()); + registered = true; + } + + Poll::Pending + }) + .await; +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/process/threading/futex/futex2.rs b/src/process/threading/futex/futex2.rs index d76a2874..2dd3b1d8 100644 --- a/src/process/threading/futex/futex2.rs +++ b/src/process/threading/futex/futex2.rs @@ -14,9 +14,8 @@ use libkernel::memory::address::TUA; use super::key::FutexKey; use super::wait::{ParsedWaiter, futex_wait_multi}; use super::{futex_wait_single, requeue_key, wake_key}; -use crate::clock::realtime::date; +use crate::clock::Deadline; use crate::clock::timespec::TimeSpec; -use crate::drivers::timer::uptime; use crate::memory::uaccess::{UserCopyable, copy_obj_array_from_user}; use crate::sched::syscall_ctx::ProcessCtx; @@ -83,29 +82,26 @@ fn make_key(ctx: &ProcessCtx, uaddr: u64, private: bool) -> Result<(FutexKey, TU Ok((key, addr)) } -/// Converts a futex2 absolute timeout into a relative [`Duration`]. +/// Parses a futex2 absolute timeout into a clock-tagged [`Deadline`]. /// /// The clockid is only validated when a timeout is supplied, matching Linux. -/// A deadline already in the past yields a zero timeout; the futex value is -/// still checked first, so `EAGAIN` takes precedence over `ETIMEDOUT`. -/// -/// `CLOCK_REALTIME` deadlines are converted to a relative sleep up front, so -/// a concurrent `clock_settime` does not retarget an in-progress wait (same -/// simplification as the legacy `FUTEX_WAIT_BITSET` path). -async fn abs_timeout(timeout: TUA, clockid: u32) -> Result> { +/// The deadline keeps its clock so the wait can re-evaluate against the live +/// clock; a `CLOCK_REALTIME` deadline therefore still fires at the right +/// wall-clock instant if the clock is stepped during the wait. +async fn abs_timeout(timeout: TUA, clockid: u32) -> Result> { if timeout.is_null() { return Ok(None); } let deadline = Duration::from(TimeSpec::copy_from_user(timeout).await?); - let base = match clockid { - CLOCK_REALTIME => date(), - CLOCK_MONOTONIC => uptime(), + let deadline = match clockid { + CLOCK_REALTIME => Deadline::Realtime(deadline), + CLOCK_MONOTONIC => Deadline::Monotonic(deadline), _ => return Err(KernelError::InvalidValue), }; - Ok(Some(deadline.saturating_sub(base))) + Ok(Some(deadline)) } /// `futex_wait(uaddr, val, mask, flags, timeout, clockid)`: wait on a single diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs index 025b5493..4e60beaf 100644 --- a/src/process/threading/futex/mod.rs +++ b/src/process/threading/futex/mod.rs @@ -1,5 +1,6 @@ -use crate::clock::realtime::date; +use crate::clock::Deadline; use crate::clock::timespec::TimeSpec; +use crate::drivers::timer::uptime; use crate::sched::syscall_ctx::ProcessCtx; use crate::sync::{OnceLock, SpinLock}; use alloc::vec::Vec; @@ -134,7 +135,7 @@ pub fn requeue_key(key1: FutexKey, key2: FutexKey, nr_wake: usize, nr_requeue: u /// Waits on a single futex word, the common case shared by the legacy /// `FUTEX_WAIT` ops and futex2 `sys_futex_wait`. Interruption (and recovery of /// a wake that raced a signal) is handled inside [`futex_wait_multi`]. -pub(super) async fn futex_wait_single(waiter: ParsedWaiter, timeout: Option) -> Result { +pub(super) async fn futex_wait_single(waiter: ParsedWaiter, timeout: Option) -> Result { // Return 0 on success. futex_wait_multi(core::slice::from_ref(&waiter), timeout) .await @@ -164,13 +165,14 @@ pub async fn sys_futex( let timeout = if timeout.is_null() { None } else { - let timeout = TimeSpec::copy_from_user(timeout).await?; + let ts = Duration::from(TimeSpec::copy_from_user(timeout).await?); if matches!(cmd, FUTEX_WAIT_BITSET) { - // The deadline is absolute and may already have passed; - // a zero timeout still performs the value check below. - Some(Duration::from(timeout).saturating_sub(date())) + // FUTEX_WAIT_BITSET takes an absolute realtime deadline. + Some(Deadline::Realtime(ts)) } else { - Some(Duration::from(timeout)) + // FUTEX_WAIT takes a relative timeout on the monotonic + // clock; convert to an absolute monotonic deadline. + Some(Deadline::Monotonic(uptime() + ts)) } }; diff --git a/src/process/threading/futex/wait.rs b/src/process/threading/futex/wait.rs index 9e50cfbb..e071b43e 100644 --- a/src/process/threading/futex/wait.rs +++ b/src/process/threading/futex/wait.rs @@ -1,9 +1,7 @@ -use alloc::boxed::Box; use alloc::sync::Arc; use alloc::vec::Vec; use core::future::poll_fn; use core::task::{Poll, Waker}; -use core::time::Duration; use futures::FutureExt; use libkernel::{ error::{KernelError, Result}, @@ -13,7 +11,7 @@ use libkernel::{ use super::get_or_create_queue; use super::key::FutexKey; use super::waiter::WaiterCell; -use crate::drivers::timer::sleep; +use crate::clock::Deadline; use crate::memory::uaccess::{copy_from_user, try_copy_from_user}; use crate::process::thread_group::signal::{InterruptResult, Interruptable}; @@ -115,7 +113,7 @@ fn setup_all(waiters: &[ParsedWaiter], waker: &Waker, guard: &mut WaitGuard) -> /// that counts as a successful wake rather than `EAGAIN`. pub async fn futex_wait_multi( waiters: &[ParsedWaiter], - timeout: Option, + timeout: Option, ) -> Result { loop { let mut guard = WaitGuard::default(); @@ -144,15 +142,14 @@ pub async fn futex_wait_multi( // signal is recovered via `finish()` rather than lost to EINTR. let woken = match timeout { None => poll_fn(|_| guard.poll_woken()).interruptable().await, - Some(dur) => { - let wait = poll_fn(|_| guard.poll_woken()); - let sleep_fut = Box::pin(sleep(dur).fuse()); - + Some(deadline) => { // Map the timer firing to a sentinel so the outer match can - // distinguish it from a real wake. - let timed = async move { - let mut wait = wait.fuse(); - let mut sleep_fut = sleep_fut; + // distinguish it from a real wake. The sleep future is pinned + // on the stack to avoid a per-wait heap allocation. + let timed = async { + let mut wait = poll_fn(|_| guard.poll_woken()).fuse(); + let sleep_fut = deadline.sleep().fuse(); + let mut sleep_fut = core::pin::pin!(sleep_fut); futures::select_biased! { idx = wait => idx, _ = sleep_fut => usize::MAX, diff --git a/usertest/src/futex2.rs b/usertest/src/futex2.rs index ef889515..ab87fd20 100644 --- a/usertest/src/futex2.rs +++ b/usertest/src/futex2.rs @@ -617,6 +617,60 @@ fn test_futex2_requeue_to_self() { register_test!(test_futex2_requeue_to_self); +fn test_futex2_realtime_retarget() { + // A CLOCK_REALTIME wait with a far-future deadline must retarget when the + // wall clock is stepped forward past the deadline: the waiter should time + // out promptly rather than sleeping out the original (now-past) deadline. + let word = Arc::new(AtomicU32::new(0)); + let word_clone = word.clone(); + + // Snapshot the current realtime so we can restore it afterwards. + let mut saved: libc::timespec = unsafe { std::mem::zeroed() }; + unsafe { libc::clock_gettime(libc::CLOCK_REALTIME, &mut saved) }; + + let t = thread::spawn(move || { + // Deadline 10s out on the realtime clock. + let ts = abs_deadline(libc::CLOCK_REALTIME, 10_000); + let start = Instant::now(); + let ret = unsafe { + futex2_wait( + word_clone.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_REALTIME, + ) + }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!("realtime wait: ret {ret}, errno {}", errno()); + } + // Must have woken well before the original 10s deadline. + if start.elapsed() > Duration::from_secs(5) { + panic!("realtime wait did not retarget after clock step"); + } + }); + + // Let the waiter park, then step the wall clock 20s forward, past the + // waiter's deadline. + thread::sleep(Duration::from_millis(150)); + let mut stepped = saved; + stepped.tv_sec += 20; + let ret = unsafe { libc::clock_settime(libc::CLOCK_REALTIME, &stepped) }; + if ret != 0 { + // Restore and surface the failure. + unsafe { libc::clock_settime(libc::CLOCK_REALTIME, &saved) }; + panic!("clock_settime failed: errno {}", errno()); + } + + t.join().expect("realtime waiter thread panicked"); + + // Restore the clock so later tests see a sane wall time. + unsafe { libc::clock_settime(libc::CLOCK_REALTIME, &saved) }; +} + +register_test!(test_futex2_realtime_retarget); + fn test_futex2_requeue_timeout_race() { // Requeue waiters that are about to time out: their timeout-path // unregistration must find them on the destination queue. From ae9ee6a8ebb552ac22ea7270bbe96c97b577163f Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 1 Jul 2026 01:36:34 +0530 Subject: [PATCH 11/14] remove docs --- etc/futex2.md | 108 ------------------------------------------- etc/futex2_review.md | 81 -------------------------------- 2 files changed, 189 deletions(-) delete mode 100644 etc/futex2.md delete mode 100644 etc/futex2_review.md diff --git a/etc/futex2.md b/etc/futex2.md deleted file mode 100644 index 1eceef67..00000000 --- a/etc/futex2.md +++ /dev/null @@ -1,108 +0,0 @@ -# futex2 in moss - -## What futex2 is - -A futex ("fast userspace mutex") is the kernel primitive userspace locking is -built on. A lock is just a 32-bit word in user memory; the fast path (taking an -uncontended lock) never enters the kernel. Only on contention does a thread ask -the kernel to put it to sleep until another thread wakes it. - -futex2 is the modern generation of this interface, added to Linux in stages -from 5.16 onward. Instead of one multiplexed `futex` syscall with an `op` -argument, it provides separate syscalls: - -- **futex_wait** — sleep on one futex word, as long as it still holds an - expected value. Returns once woken. -- **futex_wake** — wake up to a given number of threads sleeping on a word. -- **futex_waitv** — sleep on *several* futex words at once (up to 128) and - return the index of whichever one got woken. This is the headline feature; - it was driven by Wine/Proton, which needs Windows' "wait for any of these - objects" semantics to run games efficiently. -- **futex_requeue** — wake some waiters of one word and silently move the rest - onto a different word, without waking them. Used by condition-variable - implementations to avoid thundering herds: one broadcast moves everyone onto - the mutex's wait queue instead of waking them all to fight over the lock. - -All waits take a wake *mask*: a waiter is only woken by a wake whose mask -shares at least one bit with its own. Timeouts are *absolute* deadlines against -a chosen clock (monotonic or realtime), not relative durations. - -moss implements all four on aarch64 (syscall numbers 449, 454, 455, 456), with -Linux-compatible argument validation and error codes. - -## How futex2 differs from legacy futex - -- Separate syscalls instead of one `op`-multiplexed entry point. -- Sized futexes: each call carries a flag stating the width of the futex word. - Like Linux today, moss accepts only the 32-bit size. -- Wake masks are first-class on every wait/wake, not a special bitset op - bolted on (legacy `FUTEX_WAIT_BITSET`/`FUTEX_WAKE_BITSET`). -- Timeouts are always absolute, with an explicit clock choice per call. -- Multi-wait (`futex_waitv`) exists only in futex2. -- futex2's requeue performs no value comparison, unlike legacy - `FUTEX_CMP_REQUEUE`; it trusts userspace and simply moves waiters. -- Stricter validation: unaligned futex addresses, unknown flags, reserved - fields and oversized values are rejected outright. - -The legacy `futex` syscall remains fully supported; nothing changed in its -userspace-visible behaviour. - -## How moss implements it - -Both generations share one wait/wake core; the syscalls differ only in how -they decode arguments. - -Every futex word is identified by a key — for process-private futexes the -process id plus the virtual address, for shared ones the physical frame plus -offset, so different mappings of the same memory still meet on the same futex. -A global table maps each key to its wait queue. - -Each sleeping thread owns a small shared record (its "waiter cell") holding -its wake mask, whether it has been woken, and *which queue currently holds -it*. That last part is what makes requeueing sound: requeue moves entries -between queues while their owners sleep, and a waiter that later needs to -cancel (timeout, signal) consults its own cell to find where it currently -lives rather than assuming it is still where it went to sleep. Because lock -ordering forbids going from a cell to its queue directly, cancellation -snapshots the location, locks the queue, then re-checks the location and -retries if a concurrent requeue moved it in the meantime. - -Waiting — single or multi — is one routine: for each requested futex it locks -the queue, re-reads the user word, and only enqueues if the value still -matches, so a wake cannot slip between the check and the sleep. If any value -mismatches, everything already enqueued is unwound; if one of those waiters -was woken during the unwind, that wake is honoured rather than lost. Legacy -wait and futex2's wait are this routine with a list of one; waitv passes the -whole list and reports the index that fired. Timeouts race the wait against a -timer, with a final "did a wake land anyway" check before reporting a timeout. - -Waking finds the queue, removes up to the requested number of waiters whose -masks overlap, marks their cells woken, and only then — after dropping the -queue lock — actually wakes the threads. Requeueing locks both queues (in a -fixed order, so two concurrent requeues cannot deadlock), wakes from the -first, and transplants the remainder onto the second, updating each moved -cell's location. - -## Remaining issues - -- **Realtime deadlines do not track clock changes.** An absolute - `CLOCK_REALTIME` deadline is converted into a relative sleep when the - syscall starts. If the wall clock is stepped while a thread waits, the - deadline does not move with it, where Linux would honour the new clock. - Fixing this needs realtime-aware timers in the timer subsystem. The legacy - bitset-wait path has always had the same simplification. -- **A wake can be lost when a signal interrupts a waiter.** If a wake and a - signal arrive at almost the same moment, the wake may be consumed (the waker - is told it woke someone) while the waiter returns "interrupted" instead of - "woken". Linux closes this window with syscall-restart machinery, which moss - does not yet have. Inherited from the legacy implementation. The equivalent - wake-versus-timeout race *is* handled. -- **The futex table never shrinks.** A queue entry is created for every futex - address ever waited on and is kept after the queue drains. Long-running - systems touching many distinct addresses leak table entries slowly. - Pre-existing behaviour; pruning empty queues on the last waiter's exit is - the obvious fix. -- **Host unit tests skip the sync primitives by default.** The unit-test - recipe builds the kernel library without optional features, so the tests - for the synchronisation primitives (including the waker-set operations this - work added) only run when the full feature set is enabled explicitly. diff --git a/etc/futex2_review.md b/etc/futex2_review.md deleted file mode 100644 index 64d345b7..00000000 --- a/etc/futex2_review.md +++ /dev/null @@ -1,81 +0,0 @@ -# futex2 review findings - -Review of branch `futex2` vs `master`. High-effort multi-agent pass (5 finders, verified). - -Ranked most-severe first. Confirmed = traced in code; Plausible = real & reachable, lower confidence or known/benign. - -## Correctness — confirmed - -### 1. Lost wake on signal interrupt *(top bug)* -`src/process/threading/futex/wait.rs:64` - -On signal, the wait future is dropped and `WaitGuard::drop` calls `cell.unregister()` but discards its bool. If `futex_wake` already consumed the wake (counted it via `mark_woken`), the waiter returns `EINTR` and the wake is lost — userspace condvar/mutex can hang. The timeout branch recovers this race via `finish().ok_or(TimedOut)`; the interrupt path has no equivalent. Affects both futex1 and futex2 (shared core). - -**Fix:** on interrupt, inspect `unregister()` return — if a wake was consumed, return success (mirror timeout `finish()` recovery) or re-wake another waiter. - -*Acknowledged in etc/futex2.md but still live.* - -### 2. requeue-to-self silently drops nr_requeue -`src/process/threading/futex/mod.rs:84` - -`requeue_key` shortcuts `key1==key2` → `wake_key(nr_wake, key1, u64::MAX)`, ignoring `nr_requeue`. Linux returns `EINVAL` for requeue onto the same address. Shared futexes: two VAs mapping the same frame collapse to one key and hit this unexpectedly. - -### 3. Zero bitset not rejected on legacy path -`src/process/threading/futex/mod.rs:204` - -`FUTEX_WAIT_BITSET` / `FUTEX_WAKE_BITSET` forward `val3` as the mask with no zero check. `mask==0` → `cell.mask & mask` always 0 → waiter permanently unwakeable. Linux returns `EINVAL`. The futex2 paths already guard `mask==0`; the legacy path doesn't. Inconsistent. - -### 4. wake computes key before nr<=0 check -`src/process/threading/futex/futex2.rs:166` - -`make_key` (translates the address, can fault for a shared futex) runs before the `nr<=0` early return. `futex_wake(nr=0)` on an unmapped shared address returns `EFAULT` instead of `Ok(0)`. Move the `nr` check above `make_key`. - -## Correctness — plausible - -### 5. Duplicate uaddr in waitv inflates wake count -`src/process/threading/futex/futex2.rs:200` - -Same `uaddr` at two `futex_waitv` indices → two cells on one queue for one task. `futex_wake(nr>=2)` matches both, returns `woke=2` though only one logical waiter exists. No panic, returned index valid, but count is wrong. No dedup guard. (requeue has the analogous issue via `take_first`.) - -### 6. abs_timeout realtime semantics -`src/process/threading/futex/futex2.rs:438` - -Absolute `CLOCK_REALTIME` deadline converted to a relative sleep once at syscall entry. If `set_date()` was never called, `date()` falls back to `uptime()` → deadline saturates to an effectively infinite timeout. Also a concurrent `clock_settime` doesn't retarget an in-progress wait. Documented limitation in etc/futex2.md, but user-visible. - -## Quality - -### 7. do_futex_wait duplicates futex2 wait core -`src/process/threading/futex/mod.rs:136` - -Both `do_futex_wait` and `sys_futex_wait` build a single `ParsedWaiter`, call `futex_wait_multi(slice::from_ref(&waiter), timeout)`, and `.map(|_| 0)`. Extract one shared single-waiter helper — also lets the #1 fix land in one place instead of two. - -### 8. Box::pin(sleep) heap-allocs the hot path -`src/process/threading/futex/wait.rs:147` - -The timeout branch does `Box::pin(sleep(dur))` + a fresh `select_biased` on every timed wait — a heap alloc/free on every contended timed lock acquire. Pin on the stack with `core::pin::pin!` / `pin_mut!`. - -### 9. Sentinel FutexKey in requeue -`src/process/threading/futex/futex2.rs:244` - -Seeds the keys array with a dummy `FutexKey::Private { pid: 0, addr: 0 }` and overwrites it in the loop. If a future refactor leaves an entry unwritten, the bogus key leaks into `requeue_key`. Collect the two keys directly (map entries into the array) so no sentinel exists. - -### 10. u64 mask is unused width -`src/process/threading/futex/mod.rs:49` - -Wake mask widened to `u64` everywhere (`WaiterCell.mask`, `ParsedWaiter.mask`, `wake_key`, `requeue_key`) but no path carries >32 bits — `check_flags` rejects masks > `u32::MAX` and every caller casts a u32. Unused width + casts at every call site. Should be `u32` (matches the only supported `FUTEX2_SIZE_U32`). - ---- - -## Linux reference - -futex2 = the `futex_waitv` syscall. Real kernel source: - -- `kernel/futex/` — https://github.com/torvalds/linux/tree/master/kernel/futex - - `core.c` — hash buckets, queue, wait/wake core - - `waitwake.c` — `futex_wait`, `futex_wake`, **`futex_wait_multiple`** (the multi-wait core; our `futex_wait_multi` analog) - - `syscalls.c` — `sys_futex`, **`sys_futex_waitv`**, `futex_parse_waitv` - - `requeue.c`, `pi.c` — requeue + priority inheritance (out of scope here) - - `futex.h` — `futex_q`, `futex_hash_bucket` -- UAPI: `include/uapi/linux/futex.h` — flags, `struct futex_waitv` -- Docs: https://docs.kernel.org/userspace-api/futex2.html -- man: `man 2 futex_waitv`, `man 2 futex` From fddedd7c3c78126dc807246d85f5927b6147d378 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 1 Jul 2026 01:40:44 +0530 Subject: [PATCH 12/14] code formatting --- src/process/threading/futex/mod.rs | 5 ++++- src/process/threading/futex/wait.rs | 8 ++------ usertest/src/futex2.rs | 5 ++++- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs index 4e60beaf..f7fa7a30 100644 --- a/src/process/threading/futex/mod.rs +++ b/src/process/threading/futex/mod.rs @@ -135,7 +135,10 @@ pub fn requeue_key(key1: FutexKey, key2: FutexKey, nr_wake: usize, nr_requeue: u /// Waits on a single futex word, the common case shared by the legacy /// `FUTEX_WAIT` ops and futex2 `sys_futex_wait`. Interruption (and recovery of /// a wake that raced a signal) is handled inside [`futex_wait_multi`]. -pub(super) async fn futex_wait_single(waiter: ParsedWaiter, timeout: Option) -> Result { +pub(super) async fn futex_wait_single( + waiter: ParsedWaiter, + timeout: Option, +) -> Result { // Return 0 on success. futex_wait_multi(core::slice::from_ref(&waiter), timeout) .await diff --git a/src/process/threading/futex/wait.rs b/src/process/threading/futex/wait.rs index e071b43e..35b41736 100644 --- a/src/process/threading/futex/wait.rs +++ b/src/process/threading/futex/wait.rs @@ -170,12 +170,8 @@ pub async fn futex_wait_multi( // race window, so `finish()` reports it and we return success in // that case rather than losing the wake. Otherwise it is the // genuine timeout / interrupt result. - InterruptResult::Uninterrupted(_) => { - guard.finish().ok_or(KernelError::TimedOut) - } - InterruptResult::Interrupted => { - guard.finish().ok_or(KernelError::Interrupted) - } + InterruptResult::Uninterrupted(_) => guard.finish().ok_or(KernelError::TimedOut), + InterruptResult::Interrupted => guard.finish().ok_or(KernelError::Interrupted), }; } } diff --git a/usertest/src/futex2.rs b/usertest/src/futex2.rs index ab87fd20..9a9d598e 100644 --- a/usertest/src/futex2.rs +++ b/usertest/src/futex2.rs @@ -603,7 +603,10 @@ fn test_futex2_requeue_to_self() { unsafe { let ret = futex2_requeue(pair.as_ptr(), 0, 1, 1); if ret != -1 || errno() != libc::EINVAL { - panic!("requeue-to-self: ret {ret}, errno {}, expected EINVAL", errno()); + panic!( + "requeue-to-self: ret {ret}, errno {}, expected EINVAL", + errno() + ); } } From 10aac3e52e3389a585b8a1fa6798ac00c214b1c0 Mon Sep 17 00:00:00 2001 From: samarth Date: Wed, 1 Jul 2026 01:46:47 +0530 Subject: [PATCH 13/14] remove unused function --- src/clock/mod.rs | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/clock/mod.rs b/src/clock/mod.rs index 0a3ebdc1..0d1991af 100644 --- a/src/clock/mod.rs +++ b/src/clock/mod.rs @@ -41,11 +41,6 @@ impl Deadline { } } - /// Returns `true` if the deadline has already passed on its clock. - pub fn has_passed(self) -> bool { - self.clock_now() >= self.target() - } - /// Sleeps until this deadline. /// /// The monotonic clock advances uniformly, so a single relative sleep is From ee130ad177f1985e9cb717b4b235b8545e5be86e Mon Sep 17 00:00:00 2001 From: samarth Date: Sat, 4 Jul 2026 14:23:43 +0530 Subject: [PATCH 14/14] update outdated comment about not returning EINVAL when you requeue a futex on itself --- usertest/src/futex2.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/usertest/src/futex2.rs b/usertest/src/futex2.rs index 9a9d598e..07a7d0ca 100644 --- a/usertest/src/futex2.rs +++ b/usertest/src/futex2.rs @@ -567,9 +567,9 @@ register_test!(test_futex2_wake_no_waiters); fn test_futex2_requeue_to_self() { // Requeueing a futex onto itself (uaddr1 == uaddr2) is rejected with - // EINVAL on Linux. moss currently short-circuits this to a plain wake and - // returns success, silently dropping nr_requeue -- this test encodes the - // Linux contract and is expected to fail until that is fixed. + // EINVAL, matching Linux. moss detects key1 == key2 before requeueing + // (this also catches distinct virtual addresses aliasing one shared + // frame); this test guards against that reject regressing. let f = Arc::new(AtomicU32::new(0)); let woken = Arc::new(AtomicU32::new(0));