diff --git a/README.md b/README.md index 5b24398e..ef8c04e1 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ model within the kernel context: * Full task management including both UP and SMP scheduling via EEVDF and task migration via IPIs. * Capable of running dynamically linked ELF binaries from Arch Linux. -* Currently implements [105 Linux syscalls](./etc/syscalls_linux_aarch64.md) +* Currently implements [109 Linux syscalls](./etc/syscalls_linux_aarch64.md) * `fork()`, `execve()`, `clone()`, and full process lifecycle management. * Job control support (process groups, waitpid, background tasks). * Signal delivery, masking, and propagation (SIGTERM, SIGSTOP, SIGCONT, SIGCHLD, @@ -172,7 +172,7 @@ moss is under active development. Current focus areas include: * Networking Stack: TCP/IP implementation. * A fully read/write capable filesystem driver. -* Expanding coverage beyond the current 105 calls. +* Expanding coverage beyond the current 109 calls. * systemd bringup. ## Non-Goals (for now) diff --git a/etc/syscalls_linux_aarch64.md b/etc/syscalls_linux_aarch64.md index c3b28eb8..b9deee15 100644 --- a/etc/syscalls_linux_aarch64.md +++ b/etc/syscalls_linux_aarch64.md @@ -304,14 +304,14 @@ | 0x1be (446) | landlock_restrict_self | (const int ruleset_fd, const __u32 flags) | __arm64_sys_landlock_restrict_self | false | | 0x1bf (447) | memfd_secret | (unsigned int flags) | __arm64_sys_memfd_secret | false | | 0x1c0 (448) | process_mrelease | (int pidfd, unsigned int flags) | __arm64_sys_process_mrelease | false | -| 0x1c1 (449) | futex_waitv | (struct futex_waitv *waiters, unsigned int nr_futexes, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_waitv | false | +| 0x1c1 (449) | futex_waitv | (struct futex_waitv *waiters, unsigned int nr_futexes, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_waitv | true | | 0x1c2 (450) | set_mempolicy_home_node | (unsigned long start, unsigned long len, unsigned long home_node, unsigned long flags) | __arm64_sys_set_mempolicy_home_node | false | | 0x1c3 (451) | cachestat | (unsigned int fd, struct cachestat_range *cstat_range, struct cachestat *cstat, unsigned int flags) | __arm64_sys_cachestat | false | | 0x1c4 (452) | fchmodat2 | (int dfd, const char *filename, umode_t mode, unsigned int flags) | __arm64_sys_fchmodat2 | false | | 0x1c5 (453) | map_shadow_stack | (unsigned long addr, unsigned long size, unsigned int flags) | __arm64_sys_map_shadow_stack | false | -| 0x1c6 (454) | futex_wake | (void *uaddr, unsigned long mask, int nr, unsigned int flags) | __arm64_sys_futex_wake | false | -| 0x1c7 (455) | futex_wait | (void *uaddr, unsigned long val, unsigned long mask, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_wait | false | -| 0x1c8 (456) | futex_requeue | (struct futex_waitv *waiters, unsigned int flags, int nr_wake, int nr_requeue) | __arm64_sys_futex_requeue | false | +| 0x1c6 (454) | futex_wake | (void *uaddr, unsigned long mask, int nr, unsigned int flags) | __arm64_sys_futex_wake | true | +| 0x1c7 (455) | futex_wait | (void *uaddr, unsigned long val, unsigned long mask, unsigned int flags, struct __kernel_timespec *timeout, clockid_t clockid) | __arm64_sys_futex_wait | true | +| 0x1c8 (456) | futex_requeue | (struct futex_waitv *waiters, unsigned int flags, int nr_wake, int nr_requeue) | __arm64_sys_futex_requeue | true | | 0x1c9 (457) | statmount | (const struct mnt_id_req *req, struct statmount *buf, size_t bufsize, unsigned int flags) | __arm64_sys_statmount | false | | 0x1ca (458) | listmount | (const struct mnt_id_req *req, u64 *mnt_ids, size_t nr_mnt_ids, unsigned int flags) | __arm64_sys_listmount | false | | 0x1cb (459) | lsm_get_self_attr | (unsigned int attr, struct lsm_ctx *ctx, u32 *size, u32 flags) | __arm64_sys_lsm_get_self_attr | false | diff --git a/libkernel/src/sync/waker_set.rs b/libkernel/src/sync/waker_set.rs index 8e164a2c..83829df4 100644 --- a/libkernel/src/sync/waker_set.rs +++ b/libkernel/src/sync/waker_set.rs @@ -89,11 +89,40 @@ impl WakerSet { } } + /// Removes and returns the first (lowest-token, i.e. FIFO) entry whose + /// data matches `predicate`, without waking it. + pub fn take_if(&mut self, predicate: impl Fn(&T) -> bool) -> Option<(Waker, T)> { + let key = self + .waiters + .iter() + .find(|(_, (_, data))| predicate(data)) + .map(|(key, _)| *key)?; + + self.waiters.remove(&key) + } + + /// Removes and returns the first (lowest-token, i.e. FIFO) entry, without + /// waking it. + pub fn take_first(&mut self) -> Option<(Waker, T)> { + self.waiters.pop_first().map(|(_, entry)| entry) + } + + /// Returns `true` if no wakers are registered. + pub fn is_empty(&self) -> bool { + self.waiters.is_empty() + } + /// Registers a waker together with associated data, returning its token. pub fn register_with_data(&mut self, waker: &Waker, data: T) -> u64 { + self.insert(waker.clone(), data) + } + + /// Inserts an already-owned waker with associated data, returning its + /// token. + pub fn insert(&mut self, waker: Waker, data: T) -> u64 { let id = self.allocate_id(); - self.waiters.insert(id, (waker.clone(), data)); + self.waiters.insert(id, (waker, data)); id } @@ -193,6 +222,54 @@ where } } +#[cfg(test)] +mod waker_set_tests { + use super::*; + + fn set_with_data(data: &[u32]) -> WakerSet { + let mut set = WakerSet::new(); + for &d in data { + set.insert(Waker::noop().clone(), d); + } + set + } + + #[test] + fn take_if_removes_first_match_in_fifo_order() { + let mut set = set_with_data(&[0b01, 0b10, 0b11]); + + let (_, data) = set.take_if(|d| d & 0b10 != 0).unwrap(); + assert_eq!(data, 0b10); + + let (_, data) = set.take_if(|d| d & 0b10 != 0).unwrap(); + assert_eq!(data, 0b11); + + assert!(set.take_if(|d| d & 0b10 != 0).is_none()); + assert!(!set.is_empty()); + } + + #[test] + fn take_first_is_fifo() { + let mut set = set_with_data(&[1, 2, 3]); + + assert_eq!(set.take_first().unwrap().1, 1); + assert_eq!(set.take_first().unwrap().1, 2); + assert_eq!(set.take_first().unwrap().1, 3); + assert!(set.take_first().is_none()); + assert!(set.is_empty()); + } + + #[test] + fn insert_then_remove_by_token() { + let mut set = WakerSet::new(); + let token = set.insert(Waker::noop().clone(), 7u32); + + assert!(set.contains_token(token)); + set.remove(token); + assert!(set.is_empty()); + } +} + #[cfg(test)] mod wait_until_tests { use super::*; diff --git a/src/arch/arm64/exceptions/syscall.rs b/src/arch/arm64/exceptions/syscall.rs index cd572ea7..7e9243a1 100644 --- a/src/arch/arm64/exceptions/syscall.rs +++ b/src/arch/arm64/exceptions/syscall.rs @@ -103,7 +103,11 @@ use crate::{ umask::sys_umask, wait::{sys_wait4, sys_waitid}, }, - threading::{futex::sys_futex, sys_set_robust_list, sys_set_tid_address}, + threading::{ + futex::futex2::{sys_futex_requeue, sys_futex_wait, sys_futex_waitv, sys_futex_wake}, + futex::sys_futex, + sys_set_robust_list, sys_set_tid_address, + }, }, sched::{ self, @@ -824,6 +828,40 @@ pub async fn handle_syscall(mut ctx: ProcessCtx) { .await } 0x1b8 => Ok(0), // process_madvise is a no-op + 0x1c1 => { + sys_futex_waitv( + &ctx, + TUA::from_value(arg1 as _), + arg2 as _, + arg3 as _, + TUA::from_value(arg4 as _), + arg5 as _, + ) + .await + } + 0x1c6 => sys_futex_wake(&ctx, arg1, arg2, arg3 as _, arg4 as _), + 0x1c7 => { + sys_futex_wait( + &ctx, + arg1, + arg2, + arg3, + arg4 as _, + TUA::from_value(arg5 as _), + arg6 as _, + ) + .await + } + 0x1c8 => { + sys_futex_requeue( + &ctx, + TUA::from_value(arg1 as _), + arg2 as _, + arg3 as _, + arg4 as _, + ) + .await + } _ => panic!( "Unhandled syscall 0x{nr:x}, PC: 0x{:x}", ctx.task().ctx.user().elr_el1 diff --git a/src/clock/mod.rs b/src/clock/mod.rs index 1d989da9..0d1991af 100644 --- a/src/clock/mod.rs +++ b/src/clock/mod.rs @@ -3,6 +3,86 @@ pub mod syscalls; pub mod timer; pub mod timespec; +use core::time::Duration; + +use futures::FutureExt; + +use crate::drivers::timer::{sleep, uptime}; +use realtime::{clock_set_generation, clock_was_set_since, date}; + +/// An absolute deadline expressed against a particular clock. +/// +/// Keeping the clock alongside the deadline (rather than pre-flattening to a +/// relative duration) lets [`Deadline::sleep`] re-evaluate against the live +/// clock, so a `CLOCK_REALTIME` deadline still fires at the right wall-clock +/// instant even if the clock is stepped (e.g. by `clock_settime`) while a +/// wait is in progress. +#[derive(Clone, Copy)] +pub enum Deadline { + /// Absolute instant on the monotonic clock (`CLOCK_MONOTONIC`). + Monotonic(Duration), + /// Absolute instant on the realtime clock (`CLOCK_REALTIME`). + Realtime(Duration), +} + +impl Deadline { + /// The clock's current reading. + fn clock_now(self) -> Duration { + match self { + Deadline::Monotonic(_) => uptime(), + Deadline::Realtime(_) => date(), + } + } + + /// The absolute deadline value. + fn target(self) -> Duration { + match self { + Deadline::Monotonic(d) | Deadline::Realtime(d) => d, + } + } + + /// Sleeps until this deadline. + /// + /// The monotonic clock advances uniformly, so a single relative sleep is + /// exact. The realtime clock can be stepped by `clock_settime`, so a + /// realtime wait races the timer against a clock-was-set notification: on + /// either it re-evaluates the deadline against the live clock and re-arms + /// if the target has not yet been reached. This retargets an in-progress + /// wait in both directions across a step. + pub async fn sleep(self) { + loop { + let now = self.clock_now(); + let target = self.target(); + + if now >= target { + return; + } + + let remaining = target - now; + + match self { + // The monotonic clock never steps, so one relative sleep is + // exact. + Deadline::Monotonic(_) => { + sleep(remaining).await; + return; + } + // A realtime step (in either direction) wakes the notifier; + // loop to re-evaluate against the new wall time. + Deadline::Realtime(_) => { + let generation = clock_set_generation(); + let mut timer = core::pin::pin!(sleep(remaining).fuse()); + let mut was_set = core::pin::pin!(clock_was_set_since(generation).fuse()); + futures::select_biased! { + _ = timer => {} + _ = was_set => {} + } + } + } + } + } +} + pub enum ClockId { Realtime = 0, Monotonic = 1, diff --git a/src/clock/realtime.rs b/src/clock/realtime.rs index d30ac6c6..f314c897 100644 --- a/src/clock/realtime.rs +++ b/src/clock/realtime.rs @@ -1,8 +1,11 @@ use crate::{ drivers::timer::{Instant, now, uptime}, - sync::SpinLock, + sync::{OnceLock, SpinLock}, }; +use core::future::poll_fn; +use core::task::Poll; use core::time::Duration; +use libkernel::sync::waker_set::WakerSet; // Return a duration from the epoch. pub fn date() -> Duration { @@ -23,11 +26,60 @@ pub fn set_date(duration: Duration) { let mut epoch_info = EPOCH_DURATION.lock_save_irq(); *epoch_info = Some((duration, now)); } + + // The realtime clock was stepped; wake anyone sleeping against an absolute + // realtime deadline so they can re-evaluate (and re-arm) against the new + // wall time. + let mut waiters = clock_set_waiters().lock_save_irq(); + *CLOCK_SET_GEN.lock_save_irq() += 1; + waiters.wake_all(); } // Represents a known duration since the epoch at the associated instant. static EPOCH_DURATION: SpinLock> = SpinLock::new(None); +/// Tasks waiting to be notified when the realtime clock is stepped. +static CLOCK_SET_WAITERS: OnceLock> = OnceLock::new(); + +fn clock_set_waiters() -> &'static SpinLock { + CLOCK_SET_WAITERS.get_or_init(|| SpinLock::new(WakerSet::new())) +} + +/// Bumped on every realtime clock step, so a waiter can detect a step that +/// happened between checking the clock and parking (closing the lost-wakeup +/// race). +static CLOCK_SET_GEN: SpinLock = SpinLock::new(0); + +/// The current clock-set generation. Sample this before reading the clock, and +/// pass it to [`clock_was_set_since`] to wait for the next step. +pub fn clock_set_generation() -> u64 { + *CLOCK_SET_GEN.lock_save_irq() +} + +/// Resolves once the realtime clock is stepped after `generation` was sampled. +/// If a step already happened since `generation`, returns immediately. +pub async fn clock_was_set_since(generation: u64) { + let mut registered = false; + + poll_fn(|cx| { + // Register before re-checking the generation so a step that races our + // poll cannot be missed. + let mut waiters = clock_set_waiters().lock_save_irq(); + + if *CLOCK_SET_GEN.lock_save_irq() != generation { + return Poll::Ready(()); + } + + if !registered { + waiters.register(cx.waker()); + registered = true; + } + + Poll::Pending + }) + .await; +} + #[cfg(test)] mod tests { use super::*; diff --git a/src/process/threading/futex/futex2.rs b/src/process/threading/futex/futex2.rs new file mode 100644 index 00000000..2dd3b1d8 --- /dev/null +++ b/src/process/threading/futex/futex2.rs @@ -0,0 +1,258 @@ +//! The futex2 syscall family: `futex_waitv`, `futex_wake`, `futex_wait` and +//! `futex_requeue`. +//! +//! These share the wait/wake core with the legacy `futex` syscall; only the +//! argument decoding differs. Timeouts are absolute against the given clock, +//! unlike most of the legacy ops. + +use alloc::vec::Vec; +use core::time::Duration; + +use libkernel::error::{KernelError, Result}; +use libkernel::memory::address::TUA; + +use super::key::FutexKey; +use super::wait::{ParsedWaiter, futex_wait_multi}; +use super::{futex_wait_single, requeue_key, wake_key}; +use crate::clock::Deadline; +use crate::clock::timespec::TimeSpec; +use crate::memory::uaccess::{UserCopyable, copy_obj_array_from_user}; +use crate::sched::syscall_ctx::ProcessCtx; + +const FUTEX2_SIZE_U32: u32 = 0x02; +const FUTEX2_SIZE_MASK: u32 = 0x03; +const FUTEX2_PRIVATE: u32 = 0x80; +const FUTEX2_VALID_MASK: u32 = FUTEX2_SIZE_MASK | FUTEX2_PRIVATE; + +const FUTEX_WAITV_MAX: usize = 128; +const FUTEX_BITSET_MATCH_ANY: u64 = 0xffff_ffff; + +const CLOCK_REALTIME: u32 = 0; +const CLOCK_MONOTONIC: u32 = 1; + +/// Userspace `struct futex_waitv`. +#[repr(C)] +#[derive(Clone, Copy)] +pub struct FutexWaitvUser { + val: u64, + uaddr: u64, + flags: u32, + reserved: u32, +} + +// SAFETY: `FutexWaitvUser` is a plain-old-data `repr(C)` struct; any bit +// pattern is a valid value (validation happens after the copy). +unsafe impl UserCopyable for FutexWaitvUser {} + +/// Validates a futex2 flags word, returning whether `FUTEX2_PRIVATE` is set. +/// +/// Only 32-bit futexes are supported (as in Linux today), so any size other +/// than `FUTEX2_SIZE_U32` is rejected. +fn check_flags(flags: u32) -> Result { + if flags & !FUTEX2_VALID_MASK != 0 { + return Err(KernelError::InvalidValue); + } + + if flags & FUTEX2_SIZE_MASK != FUTEX2_SIZE_U32 { + return Err(KernelError::InvalidValue); + } + + Ok(flags & FUTEX2_PRIVATE != 0) +} + +/// Builds a [`FutexKey`] for a futex2 user address, enforcing the natural +/// alignment futex2 requires. +fn make_key(ctx: &ProcessCtx, uaddr: u64, private: bool) -> Result<(FutexKey, TUA)> { + let addr = TUA::::from_value(uaddr as usize); + + if addr.is_null() { + return Err(KernelError::Fault); + } + + if !uaddr.is_multiple_of(core::mem::size_of::() as u64) { + return Err(KernelError::InvalidValue); + } + + let key = if private { + FutexKey::new_private(ctx, addr) + } else { + FutexKey::new_shared(ctx, addr)? + }; + + Ok((key, addr)) +} + +/// Parses a futex2 absolute timeout into a clock-tagged [`Deadline`]. +/// +/// The clockid is only validated when a timeout is supplied, matching Linux. +/// The deadline keeps its clock so the wait can re-evaluate against the live +/// clock; a `CLOCK_REALTIME` deadline therefore still fires at the right +/// wall-clock instant if the clock is stepped during the wait. +async fn abs_timeout(timeout: TUA, clockid: u32) -> Result> { + if timeout.is_null() { + return Ok(None); + } + + let deadline = Duration::from(TimeSpec::copy_from_user(timeout).await?); + + let deadline = match clockid { + CLOCK_REALTIME => Deadline::Realtime(deadline), + CLOCK_MONOTONIC => Deadline::Monotonic(deadline), + _ => return Err(KernelError::InvalidValue), + }; + + Ok(Some(deadline)) +} + +/// `futex_wait(uaddr, val, mask, flags, timeout, clockid)`: wait on a single +/// futex word while `*uaddr == val`. Returns 0 once woken by a wake whose +/// mask overlaps `mask`. +pub async fn sys_futex_wait( + ctx: &ProcessCtx, + uaddr: u64, + val: u64, + mask: u64, + flags: u32, + timeout: TUA, + clockid: u32, +) -> Result { + let private = check_flags(flags)?; + + // Only 32-bit futexes exist, so values and masks must fit in 32 bits and + // an empty mask could never be woken. + if val > u64::from(u32::MAX) || mask > u64::from(u32::MAX) || mask == 0 { + return Err(KernelError::InvalidValue); + } + + let timeout = abs_timeout(timeout, clockid).await?; + let (key, uaddr) = make_key(ctx, uaddr, private)?; + + let waiter = ParsedWaiter { + key, + uaddr, + val: val as u32, + mask: mask as u32, + }; + + futex_wait_single(waiter, timeout).await +} + +/// `futex_wake(uaddr, mask, nr, flags)`: wake up to `nr` waiters whose masks +/// overlap `mask`. Returns the number woken. +pub fn sys_futex_wake( + ctx: &ProcessCtx, + uaddr: u64, + mask: u64, + nr: i32, + flags: u32, +) -> Result { + let private = check_flags(flags)?; + + if mask > u64::from(u32::MAX) || mask == 0 { + return Err(KernelError::InvalidValue); + } + + // Waking zero (or fewer) waiters is a no-op; short-circuit before + // computing the key, which can fault when translating a shared address. + if nr <= 0 { + return Ok(0); + } + + let (key, _) = make_key(ctx, uaddr, private)?; + + Ok(wake_key(nr as usize, key, mask as u32)) +} + +/// `futex_waitv(waiters, nr_futexes, flags, timeout, clockid)`: wait on up to +/// [`FUTEX_WAITV_MAX`] futexes at once. Returns the array index of the woken +/// waiter. +pub async fn sys_futex_waitv( + ctx: &ProcessCtx, + uwaiters: TUA, + nr_futexes: u32, + flags: u32, + timeout: TUA, + clockid: u32, +) -> Result { + // No syscall-level flags are defined. + if flags != 0 { + return Err(KernelError::InvalidValue); + } + + if nr_futexes == 0 || nr_futexes as usize > FUTEX_WAITV_MAX { + return Err(KernelError::InvalidValue); + } + + let timeout = abs_timeout(timeout, clockid).await?; + + let entries = copy_obj_array_from_user(uwaiters, nr_futexes as usize).await?; + + let mut waiters = Vec::with_capacity(entries.len()); + for entry in entries { + if entry.reserved != 0 || entry.val > u64::from(u32::MAX) { + return Err(KernelError::InvalidValue); + } + + let private = check_flags(entry.flags)?; + let (key, uaddr) = make_key(ctx, entry.uaddr, private)?; + + waiters.push(ParsedWaiter { + key, + uaddr, + val: entry.val as u32, + mask: FUTEX_BITSET_MATCH_ANY as u32, + }); + } + + futex_wait_multi(&waiters, timeout).await +} + +/// `futex_requeue(waiters, flags, nr_wake, nr_requeue)`: wake up to `nr_wake` +/// waiters on `waiters[0].uaddr`, then move up to `nr_requeue` of the rest +/// onto `waiters[1].uaddr`. No value comparison is performed (unlike legacy +/// `FUTEX_CMP_REQUEUE`). Returns the number woken. +pub async fn sys_futex_requeue( + ctx: &ProcessCtx, + uwaiters: TUA, + flags: u32, + nr_wake: i32, + nr_requeue: i32, +) -> Result { + // No syscall-level flags are defined. + if flags != 0 { + return Err(KernelError::InvalidValue); + } + + if nr_wake < 0 || nr_requeue < 0 { + return Err(KernelError::InvalidValue); + } + + let entries = copy_obj_array_from_user(uwaiters, 2).await?; + + let resolve = |entry: &FutexWaitvUser| -> Result { + if entry.reserved != 0 { + return Err(KernelError::InvalidValue); + } + + // `val` is unused by this op and deliberately not validated. + let private = check_flags(entry.flags)?; + let (key, _) = make_key(ctx, entry.uaddr, private)?; + Ok(key) + }; + + let key1 = resolve(&entries[0])?; + let key2 = resolve(&entries[1])?; + + // Requeueing a futex onto itself is invalid; this also catches distinct + // virtual addresses that alias the same shared frame. + if key1 == key2 { + return Err(KernelError::InvalidValue); + } + + Ok(requeue_key( + key1, + key2, + nr_wake as usize, + nr_requeue as usize, + )) +} diff --git a/src/process/threading/futex/key.rs b/src/process/threading/futex/key.rs index f64abd30..6e9bb7bd 100644 --- a/src/process/threading/futex/key.rs +++ b/src/process/threading/futex/key.rs @@ -3,7 +3,7 @@ use libkernel::error::{KernelError, Result}; use libkernel::memory::address::{TUA, VA}; use libkernel::memory::proc_vm::address_space::UserAddressSpace; -#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy)] +#[derive(PartialEq, Eq, PartialOrd, Ord, Hash, Clone, Copy, Debug)] pub enum FutexKey { Private { pid: u32, addr: usize }, Shared { frame: usize, offset: usize }, diff --git a/src/process/threading/futex/mod.rs b/src/process/threading/futex/mod.rs index 2d101da3..f7fa7a30 100644 --- a/src/process/threading/futex/mod.rs +++ b/src/process/threading/futex/mod.rs @@ -1,23 +1,24 @@ -use crate::clock::realtime::date; +use crate::clock::Deadline; use crate::clock::timespec::TimeSpec; -use crate::drivers::timer::sleep; -use crate::process::thread_group::signal::{InterruptResult, Interruptable}; +use crate::drivers::timer::uptime; use crate::sched::syscall_ctx::ProcessCtx; use crate::sync::{OnceLock, SpinLock}; -use alloc::boxed::Box; +use alloc::vec::Vec; use alloc::{collections::btree_map::BTreeMap, sync::Arc}; use core::time::Duration; -use futures::FutureExt; use key::FutexKey; use libkernel::{ error::{KernelError, Result}, memory::address::TUA, sync::waker_set::WakerSet, }; -use wait::FutexWait; +use wait::{ParsedWaiter, futex_wait_multi}; +use waiter::{FutexQueue, WaiterCell}; +pub mod futex2; pub mod key; mod wait; +mod waiter; const FUTEX_WAIT: i32 = 0; const FUTEX_WAKE: i32 = 1; @@ -25,7 +26,7 @@ const FUTEX_WAIT_BITSET: i32 = 9; const FUTEX_WAKE_BITSET: i32 = 10; const FUTEX_PRIVATE_FLAG: i32 = 128; -type FutexTable = BTreeMap>>>; +type FutexTable = BTreeMap; /// Global futex table mapping a futex key to its wait queue. #[allow(clippy::type_complexity)] @@ -35,7 +36,7 @@ fn futex_table() -> &'static SpinLock { FUTEX_TABLE.get_or_init(|| SpinLock::new(BTreeMap::new())) } -fn get_or_create_queue(key: FutexKey) -> Arc>> { +fn get_or_create_queue(key: FutexKey) -> FutexQueue { let table = futex_table(); table @@ -45,50 +46,103 @@ fn get_or_create_queue(key: FutexKey) -> Arc>> { .clone() } -pub fn wake_key(nr_wake: usize, key: FutexKey, bitmask: u32) -> usize { - let mut woke = 0; +pub fn wake_key(nr_wake: usize, key: FutexKey, mask: u32) -> usize { + let mut wakers = Vec::new(); let table = futex_table(); if let Some(waitq_arc) = table.lock_save_irq().get(&key).cloned() { let mut waitq = waitq_arc.lock_save_irq(); - for _ in 0..nr_wake { - if waitq.wake_if(|x| *x & bitmask != 0) { - woke += 1; - } else { - break; + + while wakers.len() < nr_wake { + match waitq.take_if(|cell: &Arc| cell.mask & mask != 0) { + Some((waker, cell)) => { + cell.mark_woken(); + wakers.push(waker); + } + None => break, } } } + // Wake outside the queue lock; a woken task may run immediately and + // re-take futex locks. + let woke = wakers.len(); + for waker in wakers { + waker.wake(); + } + woke } -async fn do_futex_wait( - key: FutexKey, - uaddr: TUA, - val: u32, - bitmask: u32, - timeout: Option, -) -> Result { - // Obtain (or create) the wait-queue for this futex word. - let slot = get_or_create_queue(key); +/// Wakes up to `nr_wake` waiters on `key1`, then moves up to `nr_requeue` of +/// the remaining waiters onto `key2`'s queue without waking them. +/// +/// Wake masks are ignored, matching Linux requeue semantics. Returns the +/// number of waiters woken. +pub fn requeue_key(key1: FutexKey, key2: FutexKey, nr_wake: usize, nr_requeue: usize) -> usize { + // Callers must reject `key1 == key2` (requeue-to-self) before this point; + // the two-queue locking below assumes distinct queues. + debug_assert_ne!(key1, key2); + + let q1_arc = get_or_create_queue(key1); + let q2_arc = get_or_create_queue(key2); + + let mut wakers = Vec::new(); + + { + // Lock both queues in key order so concurrent requeues can't + // deadlock. + let (mut q1, mut q2) = if key1 < key2 { + let q1 = q1_arc.lock_save_irq(); + let q2 = q2_arc.lock_save_irq(); + (q1, q2) + } else { + let q2 = q2_arc.lock_save_irq(); + let q1 = q1_arc.lock_save_irq(); + (q1, q2) + }; + + while wakers.len() < nr_wake { + match q1.take_first() { + Some((waker, cell)) => { + cell.mark_woken(); + wakers.push(waker); + } + None => break, + } + } - // Return 0 on success. - if let Some(dur) = timeout { - let mut wait = FutexWait::new(uaddr, val, bitmask, slot).fuse(); - let mut sleep = Box::pin(sleep(dur).fuse()); - futures::select_biased! { - res = wait => { - res.map(|_| 0) - }, - _ = sleep => { - Err(KernelError::TimedOut) + for _ in 0..nr_requeue { + match q1.take_first() { + Some((waker, cell)) => { + let token = q2.insert(waker, cell.clone()); + cell.requeue_to(q2_arc.clone(), token); + } + None => break, } } - } else { - FutexWait::new(uaddr, val, bitmask, slot).await.map(|_| 0) } + + let woke = wakers.len(); + for waker in wakers { + waker.wake(); + } + + woke +} + +/// Waits on a single futex word, the common case shared by the legacy +/// `FUTEX_WAIT` ops and futex2 `sys_futex_wait`. Interruption (and recovery of +/// a wake that raced a signal) is handled inside [`futex_wait_multi`]. +pub(super) async fn futex_wait_single( + waiter: ParsedWaiter, + timeout: Option, +) -> Result { + // Return 0 on success. + futex_wait_multi(core::slice::from_ref(&waiter), timeout) + .await + .map(|_| 0) } pub async fn sys_futex( @@ -114,34 +168,45 @@ pub async fn sys_futex( let timeout = if timeout.is_null() { None } else { - let timeout = TimeSpec::copy_from_user(timeout).await?; + let ts = Duration::from(TimeSpec::copy_from_user(timeout).await?); if matches!(cmd, FUTEX_WAIT_BITSET) { - Some(Duration::from(timeout) - date()) + // FUTEX_WAIT_BITSET takes an absolute realtime deadline. + Some(Deadline::Realtime(ts)) } else { - Some(Duration::from(timeout)) + // FUTEX_WAIT takes a relative timeout on the monotonic + // clock; convert to an absolute monotonic deadline. + Some(Deadline::Monotonic(uptime() + ts)) } }; - match do_futex_wait( + let bitmask = if cmd == FUTEX_WAIT { u32::MAX } else { val3 }; + + // A zero bitset can never be matched by any wake, so the waiter + // would be unwakeable; reject it as Linux does. + if matches!(cmd, FUTEX_WAIT_BITSET) && bitmask == 0 { + return Err(KernelError::InvalidValue); + } + + let waiter = ParsedWaiter { key, uaddr, val, - if cmd == FUTEX_WAIT { u32::MAX } else { val3 }, - timeout, - ) - .interruptable() - .await - { - InterruptResult::Interrupted => Err(KernelError::Interrupted), - InterruptResult::Uninterrupted(v) => v, - } + mask: bitmask, + }; + + futex_wait_single(waiter, timeout).await } - FUTEX_WAKE | FUTEX_WAKE_BITSET => Ok(wake_key( - val as _, - key, - if cmd == FUTEX_WAKE { u32::MAX } else { val3 }, - )), + FUTEX_WAKE | FUTEX_WAKE_BITSET => { + let mask = if cmd == FUTEX_WAKE { u32::MAX } else { val3 }; + + // A zero bitset matches no waiter; reject it as Linux does. + if matches!(cmd, FUTEX_WAKE_BITSET) && mask == 0 { + return Err(KernelError::InvalidValue); + } + + Ok(wake_key(val as _, key, mask)) + } _ => Err(KernelError::NotSupported), } diff --git a/src/process/threading/futex/wait.rs b/src/process/threading/futex/wait.rs index bec44460..35b41736 100644 --- a/src/process/threading/futex/wait.rs +++ b/src/process/threading/futex/wait.rs @@ -1,116 +1,177 @@ -use alloc::{boxed::Box, sync::Arc}; -use core::{ - future::Future, - pin::Pin, - task::{Context, Poll}, -}; +use alloc::sync::Arc; +use alloc::vec::Vec; +use core::future::poll_fn; +use core::task::{Poll, Waker}; +use futures::FutureExt; use libkernel::{ error::{KernelError, Result}, memory::address::TUA, - sync::waker_set::WakerSet, -}; - -use crate::{ - memory::uaccess::{copy_from_user, try_copy_from_user}, - sync::SpinLock, }; -enum WaitState { - Init, - HandlingFault(Pin> + Send + 'static>>), - Waiting { token: u64 }, +use super::get_or_create_queue; +use super::key::FutexKey; +use super::waiter::WaiterCell; +use crate::clock::Deadline; +use crate::memory::uaccess::{copy_from_user, try_copy_from_user}; +use crate::process::thread_group::signal::{InterruptResult, Interruptable}; + +/// One decoded wait request: wait on `key` while `*uaddr == val`, wakeable by +/// any wake whose mask overlaps `mask`. +pub struct ParsedWaiter { + pub key: FutexKey, + pub uaddr: TUA, + pub val: u32, + pub mask: u32, } -pub struct FutexWait { - uaddr: TUA, - val: u32, - queue: Arc>>, - bitmask: u32, - state: WaitState, +/// Owns the queue registrations of an in-progress multi-wait, so that +/// cancellation (signal, timeout, fault retry) always unregisters them. +#[derive(Default)] +struct WaitGuard { + cells: Vec>, } -impl FutexWait { - pub fn new( - uaddr: TUA, - val: u32, - bitmask: u32, - queue: Arc>>, - ) -> Self { - Self { - uaddr, - val, - bitmask, - queue, - state: WaitState::Init, +impl WaitGuard { + /// Ready with the index of the first woken waiter, if any. + /// + /// Does not re-register the waker: each cell's queue entry already holds + /// the task's waker from setup, and that waker is assumed stable across + /// polls (the same assumption the rest of the kernel makes). + fn poll_woken(&self) -> Poll { + match self.cells.iter().position(|cell| cell.is_woken()) { + Some(idx) => Poll::Ready(idx), + None => Poll::Pending, + } + } + + /// Unregisters every remaining cell. If any wake was consumed (either + /// before this call or racing with it), returns the lowest such index. + fn finish(&mut self) -> Option { + let mut woken = None; + + for (idx, cell) in self.cells.iter().enumerate() { + if cell.unregister() && woken.is_none() { + woken = Some(idx); + } } + + self.cells.clear(); + woken } } -impl Drop for FutexWait { +impl Drop for WaitGuard { fn drop(&mut self) { - if let WaitState::Waiting { token } = &self.state { - self.queue.lock_save_irq().remove(*token); + for cell in &self.cells { + cell.unregister(); } } } -impl Future for FutexWait { - type Output = Result<()>; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - // Safe because we are inside Pin and not moving out of Self - let this = unsafe { self.as_mut().get_unchecked_mut() }; - - loop { - match &mut this.state { - WaitState::Init => { - let mut wait_queue = this.queue.lock_save_irq(); - - match try_copy_from_user(this.uaddr) { - Ok(val) => { - if val != this.val { - return Poll::Ready(Err(KernelError::TryAgain)); - } - - let token = wait_queue.register_with_data(cx.waker(), this.bitmask); +enum Setup { + /// All waiters value-checked and enqueued. + Queued, + /// A futex word didn't match its expected value. + Mismatch, + /// Reading the futex word at this index faulted; fault it in and retry. + NeedFault(usize), +} - this.state = WaitState::Waiting { token }; +/// Enqueues each waiter, checking its futex value under the queue lock. +/// +/// Synchronous; runs inside a single poll so no wake can slip between the +/// value check and registration of any one waiter (the queue lock covers +/// both). Holds at most one queue lock at a time. +fn setup_all(waiters: &[ParsedWaiter], waker: &Waker, guard: &mut WaitGuard) -> Setup { + for (idx, waiter) in waiters.iter().enumerate() { + let queue = get_or_create_queue(waiter.key); + let mut queue_guard = queue.lock_save_irq(); + + match try_copy_from_user(waiter.uaddr) { + Ok(val) => { + if val != waiter.val { + return Setup::Mismatch; + } - return Poll::Pending; - } - Err(_) => { - drop(wait_queue); + let cell = WaiterCell::new(waiter.mask); + let token = queue_guard.register_with_data(waker, cell.clone()); + cell.set_location(queue.clone(), token); + guard.cells.push(cell); + } + Err(_) => return Setup::NeedFault(idx), + } + } - let uaddr = this.uaddr; - let fault_handler = Box::pin(async move { - copy_from_user(uaddr).await?; - Ok(()) - }); + Setup::Queued +} - this.state = WaitState::HandlingFault(fault_handler); - continue; - } - } +/// Waits until any of `waiters` is woken, returning its index. +/// +/// Linux `futex_wait_multiple` semantics: all waiters are enqueued with their +/// values checked atomically against concurrent wakes; on value mismatch the +/// queued prefix is unwound and, if one of those waiters was already woken, +/// that counts as a successful wake rather than `EAGAIN`. +pub async fn futex_wait_multi( + waiters: &[ParsedWaiter], + timeout: Option, +) -> Result { + loop { + let mut guard = WaitGuard::default(); + + // poll_fn is used purely to obtain the task's waker; setup itself + // never returns Pending. + let setup = poll_fn(|cx| Poll::Ready(setup_all(waiters, cx.waker(), &mut guard))).await; + + match setup { + Setup::NeedFault(idx) => { + if let Some(woken) = guard.finish() { + return Ok(woken); } - WaitState::Waiting { token } => { - let wait_queue = this.queue.lock_save_irq(); + copy_from_user(waiters[idx].uaddr).await?; + continue; + } + Setup::Mismatch => { + return guard.finish().ok_or(KernelError::TryAgain); + } + Setup::Queued => {} + } - if !wait_queue.contains_token(*token) { - return Poll::Ready(Ok(())); - } else { - return Poll::Pending; + // Wait for a wake or the timer. Interruption is handled here, while + // `guard` is still alive, so a wake consumed concurrently with a + // signal is recovered via `finish()` rather than lost to EINTR. + let woken = match timeout { + None => poll_fn(|_| guard.poll_woken()).interruptable().await, + Some(deadline) => { + // Map the timer firing to a sentinel so the outer match can + // distinguish it from a real wake. The sleep future is pinned + // on the stack to avoid a per-wait heap allocation. + let timed = async { + let mut wait = poll_fn(|_| guard.poll_woken()).fuse(); + let sleep_fut = deadline.sleep().fuse(); + let mut sleep_fut = core::pin::pin!(sleep_fut); + futures::select_biased! { + idx = wait => idx, + _ = sleep_fut => usize::MAX, } - } + }; - WaitState::HandlingFault(fut) => match fut.as_mut().poll(cx) { - Poll::Ready(Ok(_)) => { - this.state = WaitState::Init; - } - Poll::Ready(Err(e)) => return Poll::Ready(Err(e)), - Poll::Pending => return Poll::Pending, - }, + timed.interruptable().await } - } + }; + + return match woken { + // A real wake landed and was observed. + InterruptResult::Uninterrupted(idx) if idx != usize::MAX => { + guard.finish(); + Ok(idx) + } + // Timer fired, or a signal arrived: a wake may have landed in the + // race window, so `finish()` reports it and we return success in + // that case rather than losing the wake. Otherwise it is the + // genuine timeout / interrupt result. + InterruptResult::Uninterrupted(_) => guard.finish().ok_or(KernelError::TimedOut), + InterruptResult::Interrupted => guard.finish().ok_or(KernelError::Interrupted), + }; } } diff --git a/src/process/threading/futex/waiter.rs b/src/process/threading/futex/waiter.rs new file mode 100644 index 00000000..36cb2cef --- /dev/null +++ b/src/process/threading/futex/waiter.rs @@ -0,0 +1,111 @@ +use alloc::sync::Arc; +use libkernel::sync::waker_set::WakerSet; + +use crate::sync::SpinLock; + +/// A futex wait queue: the set of waiters parked on one [`FutexKey`]. +/// +/// [`FutexKey`]: super::key::FutexKey +pub type FutexQueue = Arc>>>; + +/// Per-waiter shared state, stored both in the waiting future and as the +/// data payload of its [`WakerSet`] entry. +/// +/// The cell tracks which queue currently holds the waiter, which is what +/// keeps requeueing sound: `futex_requeue` can move an entry to another +/// queue while the waiting future is asleep, and the future still finds its +/// real location when it cancels or completes. +/// +/// # Lock ordering +/// +/// Futex table lock → queue lock(s) (ordered by `FutexKey` when taking two) +/// → cell lock. Code that only knows the cell and needs the queue +/// ([`Self::unregister`]) snapshots the location under the cell lock, drops +/// it, locks the queue, then re-locks the cell and verifies the location is +/// unchanged — retrying if a concurrent requeue moved it. Queue tokens are +/// allocated monotonically, so a stale `(queue, token)` snapshot can never +/// alias a new registration. +pub struct WaiterCell { + /// Wake mask; a wake with mask `m` wakes this waiter iff `mask & m != 0`. + /// Only 32-bit futexes exist, so a 32-bit mask is sufficient. + pub mask: u32, + state: SpinLock, +} + +struct CellState { + /// `Some((queue, token))` while enqueued; `None` once woken or + /// unregistered. + location: Option<(FutexQueue, u64)>, + /// Set when a waker removed this entry, distinguishing a genuine wake + /// from self-unregistration. + woken: bool, +} + +impl WaiterCell { + pub fn new(mask: u32) -> Arc { + Arc::new(Self { + mask, + state: SpinLock::new(CellState { + location: None, + woken: false, + }), + }) + } + + pub fn is_woken(&self) -> bool { + self.state.lock_save_irq().woken + } + + /// Records the queue entry for this waiter. Caller must hold the lock of + /// `queue` (the registration and the location update must be atomic with + /// respect to wake/requeue). + pub fn set_location(&self, queue: FutexQueue, token: u64) { + self.state.lock_save_irq().location = Some((queue, token)); + } + + /// Marks the waiter woken and detaches it from its queue. Caller must + /// hold the lock of the queue it was just removed from. + pub fn mark_woken(&self) { + let mut state = self.state.lock_save_irq(); + state.woken = true; + state.location = None; + } + + /// Updates the location after the entry moved to `queue`. Caller must + /// hold the locks of both the source and destination queues. + pub fn requeue_to(&self, queue: FutexQueue, token: u64) { + self.state.lock_save_irq().location = Some((queue, token)); + } + + /// Removes this waiter from whichever queue currently holds it. + /// + /// Returns `true` if a wake landed first (that wake is then consumed by + /// the caller). + pub fn unregister(&self) -> bool { + loop { + // Snapshot the location; we cannot lock the queue while holding + // the cell lock without inverting the queue → cell order. + let (queue, token) = { + let state = self.state.lock_save_irq(); + match &state.location { + Some((queue, token)) => (queue.clone(), *token), + None => return state.woken, + } + }; + + let mut queue_guard = queue.lock_save_irq(); + let mut state = self.state.lock_save_irq(); + + match &state.location { + None => return state.woken, + Some((q, t)) if Arc::ptr_eq(q, &queue) && *t == token => { + queue_guard.remove(token); + state.location = None; + return false; + } + // Requeued between the snapshot and taking the queue lock. + _ => continue, + } + } + } +} diff --git a/usertest/src/futex2.rs b/usertest/src/futex2.rs new file mode 100644 index 00000000..07a7d0ca --- /dev/null +++ b/usertest/src/futex2.rs @@ -0,0 +1,736 @@ +//! Tests for the futex2 syscall family (futex_wait, futex_wake, futex_waitv, +//! futex_requeue). libc has no wrappers for these, so we issue raw syscalls. + +use crate::register_test; +use std::sync::{ + Arc, + atomic::{AtomicU32, Ordering}, +}; +use std::thread; +use std::time::{Duration, Instant}; + +const SYS_FUTEX_WAITV: libc::c_long = 449; +const SYS_FUTEX_WAKE: libc::c_long = 454; +const SYS_FUTEX_WAIT: libc::c_long = 455; +const SYS_FUTEX_REQUEUE: libc::c_long = 456; + +const FUTEX2_SIZE_U32: u32 = 0x02; +const FUTEX2_PRIVATE: u32 = 0x80; +const MATCH_ANY: u64 = 0xffff_ffff; + +/// Userspace mirror of the kernel's `struct futex_waitv`. +#[repr(C)] +#[derive(Clone, Copy)] +struct FutexWaitv { + val: u64, + uaddr: u64, + flags: u32, + reserved: u32, +} + +impl FutexWaitv { + fn new(addr: *const u32, val: u32, flags: u32) -> Self { + Self { + val: val as u64, + uaddr: addr as u64, + flags, + reserved: 0, + } + } +} + +fn errno() -> i32 { + std::io::Error::last_os_error().raw_os_error().unwrap() +} + +unsafe fn futex2_wait( + addr: *const u32, + val: u64, + mask: u64, + flags: u32, + timeout: *const libc::timespec, + clockid: i32, +) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_WAIT, addr, val, mask, flags, timeout, clockid) } +} + +unsafe fn futex2_wake(addr: *const u32, mask: u64, nr: i32, flags: u32) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_WAKE, addr, mask, nr, flags) } +} + +unsafe fn futex2_waitv( + waiters: *const FutexWaitv, + nr: u32, + flags: u32, + timeout: *const libc::timespec, + clockid: i32, +) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_WAITV, waiters, nr, flags, timeout, clockid) } +} + +unsafe fn futex2_requeue( + waiters: *const FutexWaitv, + flags: u32, + nr_wake: i32, + nr_requeue: i32, +) -> i64 { + unsafe { libc::syscall(SYS_FUTEX_REQUEUE, waiters, flags, nr_wake, nr_requeue) } +} + +/// Absolute deadline `offset_ms` in the future on `clockid`. +fn abs_deadline(clockid: i32, offset_ms: i64) -> libc::timespec { + let mut ts: libc::timespec = unsafe { std::mem::zeroed() }; + unsafe { libc::clock_gettime(clockid, &mut ts) }; + + let nsec = ts.tv_nsec + (offset_ms % 1000) * 1_000_000; + ts.tv_sec += offset_ms / 1000 + nsec / 1_000_000_000; + ts.tv_nsec = nsec % 1_000_000_000; + ts +} + +fn test_futex2_basic() { + let futex_word: u32 = 0; + let addr = &futex_word as *const u32; + + unsafe { + // Wake with no waiters wakes nothing. + let ret = futex2_wake(addr, MATCH_ANY, 1, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("futex_wake with no waiters returned {ret}"); + } + + // Wait with a mismatched expected value fails immediately with EAGAIN. + let ret = futex2_wait( + addr, + 1, // actual value is 0 + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ); + if ret != -1 || errno() != libc::EAGAIN { + panic!("futex_wait value mismatch: ret {ret}, errno {}", errno()); + } + } +} + +register_test!(test_futex2_basic); + +fn test_futex2_wait_wake() { + let futex_word = Arc::new(AtomicU32::new(0)); + let futex_clone = futex_word.clone(); + + let t = thread::spawn(move || { + let addr = futex_clone.as_ptr() as *const u32; + let ret = unsafe { + futex2_wait( + addr, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("futex_wait returned {ret}, errno {}", errno()); + } + }); + + thread::sleep(Duration::from_millis(100)); + + let addr = futex_word.as_ptr() as *const u32; + unsafe { + let ret = futex2_wake(addr, MATCH_ANY, 1, FUTEX2_SIZE_U32); + if ret != 1 { + panic!("expected to wake 1 waiter, woke {ret}"); + } + } + + t.join().expect("waiter thread panicked"); +} + +register_test!(test_futex2_wait_wake); + +fn test_futex2_mask() { + let futex_word = Arc::new(AtomicU32::new(0)); + let futex_clone = futex_word.clone(); + + let t = thread::spawn(move || { + let addr = futex_clone.as_ptr() as *const u32; + let ret = unsafe { + futex2_wait( + addr, + 0, + 0x1, // waiter mask + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("masked futex_wait returned {ret}, errno {}", errno()); + } + }); + + thread::sleep(Duration::from_millis(100)); + + let addr = futex_word.as_ptr() as *const u32; + unsafe { + // Non-overlapping mask must not wake the waiter. + let ret = futex2_wake(addr, 0x2, 1, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("woke {ret} waiters despite non-overlapping mask"); + } + + // Overlapping mask wakes it. + let ret = futex2_wake(addr, 0x1, 1, FUTEX2_SIZE_U32); + if ret != 1 { + panic!("expected to wake 1 waiter with matching mask, woke {ret}"); + } + } + + t.join().expect("waiter thread panicked"); +} + +register_test!(test_futex2_mask); + +fn test_futex2_waitv() { + // Three futex words; the waiter sleeps on all of them and must report + // the index of the one that got woken. + let words = Arc::new([AtomicU32::new(0), AtomicU32::new(0), AtomicU32::new(0)]); + let words_clone = words.clone(); + + let flags = FUTEX2_SIZE_U32 | FUTEX2_PRIVATE; + + let t = thread::spawn(move || { + let waiters: Vec = words_clone + .iter() + .map(|w| FutexWaitv::new(w.as_ptr() as *const u32, 0, flags)) + .collect(); + + let ret = unsafe { + futex2_waitv( + waiters.as_ptr(), + waiters.len() as u32, + 0, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 1 { + panic!("futex_waitv returned {ret}, expected woken index 1"); + } + }); + + thread::sleep(Duration::from_millis(100)); + + unsafe { + let ret = futex2_wake(words[1].as_ptr() as *const u32, MATCH_ANY, 1, flags); + if ret != 1 { + panic!("expected to wake 1 waitv waiter, woke {ret}"); + } + } + + t.join().expect("waitv thread panicked"); +} + +register_test!(test_futex2_waitv); + +fn test_futex2_invalid_args() { + let word: u32 = 0; + let addr = &word as *const u32; + let valid = [FutexWaitv::new(addr, 0, FUTEX2_SIZE_U32)]; + + unsafe { + // waitv: one entry's expected value mismatches -> EAGAIN. + let pair = [AtomicU32::new(0), AtomicU32::new(7)]; + let waiters = [ + FutexWaitv::new(pair[0].as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(pair[1].as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + ]; + let ret = futex2_waitv(waiters.as_ptr(), 2, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EAGAIN { + panic!("waitv mismatch: ret {ret}, errno {}", errno()); + } + + // waitv: nr_futexes == 0 and > 128 are invalid. + for nr in [0u32, 129] { + let ret = futex2_waitv(valid.as_ptr(), nr, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv nr={nr}: ret {ret}, errno {}", errno()); + } + } + + // waitv: non-zero syscall-level flags are invalid. + let ret = futex2_waitv(valid.as_ptr(), 1, 1, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv flags=1: ret {ret}, errno {}", errno()); + } + + // waitv: reserved field must be zero. + let mut bad = valid; + bad[0].reserved = 1; + let ret = futex2_waitv(bad.as_ptr(), 1, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv reserved!=0: ret {ret}, errno {}", errno()); + } + + // waitv: entry flags must include FUTEX2_SIZE_U32. + let mut bad = valid; + bad[0].flags = 0; + let ret = futex2_waitv(bad.as_ptr(), 1, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("waitv no-size flags: ret {ret}, errno {}", errno()); + } + + // waitv: NULL waiter array faults. + let ret = futex2_waitv(std::ptr::null(), 1, 0, std::ptr::null(), 0); + if ret != -1 || errno() != libc::EFAULT { + panic!("waitv NULL array: ret {ret}, errno {}", errno()); + } + + // wait: bad clockid is only rejected when a timeout is supplied. + let ts = abs_deadline(libc::CLOCK_MONOTONIC, 50); + let ret = futex2_wait(addr, 1, MATCH_ANY, FUTEX2_SIZE_U32, &ts, 7); + if ret != -1 || errno() != libc::EINVAL { + panic!("wait bad clockid: ret {ret}, errno {}", errno()); + } + + // wait: zero mask can never be woken. + let ret = futex2_wait( + addr, + 0, + 0, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ); + if ret != -1 || errno() != libc::EINVAL { + panic!("wait mask=0: ret {ret}, errno {}", errno()); + } + + // wait: futex word must be 4-byte aligned. + let unaligned = (addr as usize + 2) as *const u32; + let ret = futex2_wait( + unaligned, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ); + if ret != -1 || errno() != libc::EINVAL { + panic!("wait unaligned: ret {ret}, errno {}", errno()); + } + + // requeue: negative counts are invalid. + let pair = [valid[0], valid[0]]; + let ret = futex2_requeue(pair.as_ptr(), 0, -1, 0); + if ret != -1 || errno() != libc::EINVAL { + panic!("requeue nr_wake=-1: ret {ret}, errno {}", errno()); + } + } +} + +register_test!(test_futex2_invalid_args); + +fn test_futex2_timeout() { + let word: u32 = 0; + let addr = &word as *const u32; + + for clockid in [libc::CLOCK_MONOTONIC, libc::CLOCK_REALTIME] { + let ts = abs_deadline(clockid, 50); + let start = Instant::now(); + + let ret = unsafe { futex2_wait(addr, 0, MATCH_ANY, FUTEX2_SIZE_U32, &ts, clockid) }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!( + "wait clockid={clockid}: ret {ret}, errno {}, expected ETIMEDOUT", + errno() + ); + } + if start.elapsed() < Duration::from_millis(50) { + panic!("wait on clockid={clockid} timed out early"); + } + } + + // A deadline that has already passed times out immediately (the zero + // point of both clocks is in the past). + let ts: libc::timespec = unsafe { std::mem::zeroed() }; + let ret = unsafe { + futex2_wait( + addr, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_MONOTONIC, + ) + }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!("wait past deadline: ret {ret}, errno {}", errno()); + } + + // ...but a value mismatch still takes precedence over the timeout. + let ret = unsafe { + futex2_wait( + addr, + 1, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_MONOTONIC, + ) + }; + if ret != -1 || errno() != libc::EAGAIN { + panic!("wait mismatch+past deadline: ret {ret}, errno {}", errno()); + } +} + +register_test!(test_futex2_timeout); + +fn test_futex2_requeue() { + const NR_THREADS: usize = 4; + + let f1 = Arc::new(AtomicU32::new(0)); + let f2 = Arc::new(AtomicU32::new(0)); + let woken = Arc::new(AtomicU32::new(0)); + + let threads: Vec<_> = (0..NR_THREADS) + .map(|_| { + let f1 = f1.clone(); + let woken = woken.clone(); + thread::spawn(move || { + let ret = unsafe { + futex2_wait( + f1.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("requeued futex_wait returned {ret}, errno {}", errno()); + } + woken.fetch_add(1, Ordering::SeqCst); + }) + }) + .collect(); + + // Let all threads park on f1. + thread::sleep(Duration::from_millis(150)); + + let pair = [ + FutexWaitv::new(f1.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(f2.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + ]; + + unsafe { + // Wake one waiter, move the rest to f2. + let ret = futex2_requeue(pair.as_ptr(), 0, 1, NR_THREADS as i32 - 1); + if ret != 1 { + panic!("futex_requeue woke {ret}, expected 1"); + } + } + + thread::sleep(Duration::from_millis(100)); + + let woken_now = woken.load(Ordering::SeqCst); + if woken_now != 1 { + panic!("{woken_now} threads ran after requeue, expected 1"); + } + + unsafe { + // The other three must now be waiting on f2, not f1. + let ret = futex2_wake(f1.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("woke {ret} waiters still on f1 after requeue"); + } + + let ret = futex2_wake(f2.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != NR_THREADS as i64 - 1 { + panic!("woke {ret} waiters on f2, expected {}", NR_THREADS - 1); + } + } + + for t in threads { + t.join().expect("waiter thread panicked"); + } + + if woken.load(Ordering::SeqCst) != NR_THREADS as u32 { + panic!("not all requeued threads completed"); + } +} + +register_test!(test_futex2_requeue); + +fn test_futex2_wake_n_of_m() { + // Six waiters park on one futex; waking 2 must wake exactly 2 (the two + // oldest, FIFO), leaving 4. A final wake-all drains the rest. + const NR_THREADS: usize = 6; + + let f = Arc::new(AtomicU32::new(0)); + let woken = Arc::new(AtomicU32::new(0)); + + let threads: Vec<_> = (0..NR_THREADS) + .map(|_| { + let f = f.clone(); + let woken = woken.clone(); + thread::spawn(move || { + let ret = unsafe { + futex2_wait( + f.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("futex_wait returned {ret}, errno {}", errno()); + } + woken.fetch_add(1, Ordering::SeqCst); + }) + }) + .collect(); + + thread::sleep(Duration::from_millis(150)); + + unsafe { + let ret = futex2_wake(f.as_ptr() as *const u32, MATCH_ANY, 2, FUTEX2_SIZE_U32); + if ret != 2 { + panic!("wake nr=2 woke {ret}, expected 2"); + } + } + + thread::sleep(Duration::from_millis(100)); + let woken_now = woken.load(Ordering::SeqCst); + if woken_now != 2 { + panic!("{woken_now} threads ran after wake nr=2, expected 2"); + } + + unsafe { + let ret = futex2_wake(f.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != NR_THREADS as i64 - 2 { + panic!("wake-all woke {ret}, expected {}", NR_THREADS - 2); + } + } + + for t in threads { + t.join().expect("waiter thread panicked"); + } +} + +register_test!(test_futex2_wake_n_of_m); + +fn test_futex2_wake_nr_zero() { + // Waking 0 waiters is a no-op that returns 0, never EFAULT, even when the + // address has no queue. Guards against computing the key (and faulting on + // a shared translation) before the nr<=0 short-circuit. + let word: u32 = 0; + let addr = &word as *const u32; + + unsafe { + let ret = futex2_wake(addr, MATCH_ANY, 0, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("wake nr=0 returned {ret}, errno {}", errno()); + } + + // Negative count is likewise a 0-wake no-op (not EINVAL). + let ret = futex2_wake(addr, MATCH_ANY, -1, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("wake nr=-1 returned {ret}, errno {}", errno()); + } + } +} + +register_test!(test_futex2_wake_nr_zero); + +fn test_futex2_wake_no_waiters() { + // Waking an address nobody has ever waited on returns 0. + let word: u32 = 0; + let addr = &word as *const u32; + + unsafe { + let ret = futex2_wake(addr, MATCH_ANY, 64, FUTEX2_SIZE_U32 | FUTEX2_PRIVATE); + if ret != 0 { + panic!("wake of un-waited address woke {ret}, expected 0"); + } + } +} + +register_test!(test_futex2_wake_no_waiters); + +fn test_futex2_requeue_to_self() { + // Requeueing a futex onto itself (uaddr1 == uaddr2) is rejected with + // EINVAL, matching Linux. moss detects key1 == key2 before requeueing + // (this also catches distinct virtual addresses aliasing one shared + // frame); this test guards against that reject regressing. + let f = Arc::new(AtomicU32::new(0)); + let woken = Arc::new(AtomicU32::new(0)); + + let f1 = f.clone(); + let woken1 = woken.clone(); + let t = thread::spawn(move || { + let ret = unsafe { + futex2_wait( + f1.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + std::ptr::null(), + libc::CLOCK_MONOTONIC, + ) + }; + if ret != 0 { + panic!("futex_wait returned {ret}, errno {}", errno()); + } + woken1.fetch_add(1, Ordering::SeqCst); + }); + + thread::sleep(Duration::from_millis(100)); + + let same = f.as_ptr() as *const u32; + let pair = [ + FutexWaitv::new(same, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(same, 0, FUTEX2_SIZE_U32), + ]; + + unsafe { + let ret = futex2_requeue(pair.as_ptr(), 0, 1, 1); + if ret != -1 || errno() != libc::EINVAL { + panic!( + "requeue-to-self: ret {ret}, errno {}, expected EINVAL", + errno() + ); + } + } + + // The waiter is still parked (requeue was rejected); wake it so the + // thread can exit cleanly. + unsafe { + futex2_wake(same, MATCH_ANY, 1, FUTEX2_SIZE_U32); + } + t.join().expect("waiter thread panicked"); +} + +register_test!(test_futex2_requeue_to_self); + +fn test_futex2_realtime_retarget() { + // A CLOCK_REALTIME wait with a far-future deadline must retarget when the + // wall clock is stepped forward past the deadline: the waiter should time + // out promptly rather than sleeping out the original (now-past) deadline. + let word = Arc::new(AtomicU32::new(0)); + let word_clone = word.clone(); + + // Snapshot the current realtime so we can restore it afterwards. + let mut saved: libc::timespec = unsafe { std::mem::zeroed() }; + unsafe { libc::clock_gettime(libc::CLOCK_REALTIME, &mut saved) }; + + let t = thread::spawn(move || { + // Deadline 10s out on the realtime clock. + let ts = abs_deadline(libc::CLOCK_REALTIME, 10_000); + let start = Instant::now(); + let ret = unsafe { + futex2_wait( + word_clone.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_REALTIME, + ) + }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!("realtime wait: ret {ret}, errno {}", errno()); + } + // Must have woken well before the original 10s deadline. + if start.elapsed() > Duration::from_secs(5) { + panic!("realtime wait did not retarget after clock step"); + } + }); + + // Let the waiter park, then step the wall clock 20s forward, past the + // waiter's deadline. + thread::sleep(Duration::from_millis(150)); + let mut stepped = saved; + stepped.tv_sec += 20; + let ret = unsafe { libc::clock_settime(libc::CLOCK_REALTIME, &stepped) }; + if ret != 0 { + // Restore and surface the failure. + unsafe { libc::clock_settime(libc::CLOCK_REALTIME, &saved) }; + panic!("clock_settime failed: errno {}", errno()); + } + + t.join().expect("realtime waiter thread panicked"); + + // Restore the clock so later tests see a sane wall time. + unsafe { libc::clock_settime(libc::CLOCK_REALTIME, &saved) }; +} + +register_test!(test_futex2_realtime_retarget); + +fn test_futex2_requeue_timeout_race() { + // Requeue waiters that are about to time out: their timeout-path + // unregistration must find them on the destination queue. + const NR_THREADS: usize = 4; + + let f1 = Arc::new(AtomicU32::new(0)); + let f2 = Arc::new(AtomicU32::new(0)); + + let threads: Vec<_> = (0..NR_THREADS) + .map(|_| { + let f1 = f1.clone(); + thread::spawn(move || { + let ts = abs_deadline(libc::CLOCK_MONOTONIC, 150); + let ret = unsafe { + futex2_wait( + f1.as_ptr() as *const u32, + 0, + MATCH_ANY, + FUTEX2_SIZE_U32, + &ts, + libc::CLOCK_MONOTONIC, + ) + }; + if ret != -1 || errno() != libc::ETIMEDOUT { + panic!("racing wait: ret {ret}, errno {}", errno()); + } + }) + }) + .collect(); + + thread::sleep(Duration::from_millis(50)); + + let pair = [ + FutexWaitv::new(f1.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + FutexWaitv::new(f2.as_ptr() as *const u32, 0, FUTEX2_SIZE_U32), + ]; + + unsafe { + // Move everyone to f2 without waking anybody. + let ret = futex2_requeue(pair.as_ptr(), 0, 0, NR_THREADS as i32); + if ret != 0 { + panic!("requeue woke {ret}, expected 0"); + } + } + + // All threads now time out while parked on f2. + for t in threads { + t.join().expect("waiter thread panicked"); + } + + unsafe { + // Every waiter unregistered itself from f2 on timeout. + let ret = futex2_wake(f2.as_ptr() as *const u32, MATCH_ANY, 64, FUTEX2_SIZE_U32); + if ret != 0 { + panic!("{ret} stale waiters left on f2"); + } + } +} + +register_test!(test_futex2_requeue_timeout_race); diff --git a/usertest/src/main.rs b/usertest/src/main.rs index cad9d145..e9fb6fe7 100644 --- a/usertest/src/main.rs +++ b/usertest/src/main.rs @@ -8,6 +8,7 @@ use std::{ mod epoll; mod fs; mod futex; +mod futex2; mod inotify; mod signalfd; mod signals;