diff --git a/README.md b/README.md index 243c3e71..b4d4b596 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ This fork addresses critical bugs, adds utilities for high-throughput and async - **Completion transitions** (`completion`) -- post-event transitions with origin event propagation - **Coroutine state machines** (`co_sm`) -- C++20 coroutine-driven SM with configurable scheduler and allocator policies +- **`thread_pool_scheduler`** -- opt-in C++20 multi-consumer work pool with a lifetime-safe, deadlock-free fork/join latch for action-level inter-op parallelism (warm workers, fixed task ring, no allocation on dispatch) - **`sm_pool`** -- pool-based container for managing thousands of SM instances with indexed and batch dispatch - **`dispatch_table`** -- compile-time dispatch table for ID-based event routing at zero runtime overhead diff --git a/include/boost/sml/utility/thread_pool_scheduler.hpp b/include/boost/sml/utility/thread_pool_scheduler.hpp new file mode 100644 index 00000000..6aea85e5 --- /dev/null +++ b/include/boost/sml/utility/thread_pool_scheduler.hpp @@ -0,0 +1,545 @@ +// +// Copyright (c) 2026 stateforward +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#ifndef BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP + +#include "boost/sml.hpp" + +#if defined(_MSVC_LANG) +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG _MSVC_LANG +#else +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG __cplusplus +#endif + +// Opt-in module: unlike the header-only, dependency-free SML core, the thread +// pool scheduler requires C++20 (std::counting_semaphore) and a hosted +// threading runtime. It compiles to nothing when those are unavailable so the +// freestanding core stays thread-free by default. +// Nest the __has_include probe under defined(__has_include) so toolchains +// without the extension (or below C++20) never have to parse it. +#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG >= 202002L +#if defined(__has_include) +#if __has_include() +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED 1 +#endif +#endif +#endif +#ifndef BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED 0 +#endif + +#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(_MSC_VER) && !defined(__clang__) +#include // _mm_pause / __yield +#endif + +BOOST_SML_NAMESPACE_BEGIN + +namespace utility { +namespace policy { + +// Architecture-appropriate spin hint for short busy-waits (lock-free join / +// queue retries). Reduces contention and power on hyperthreads without yielding +// the scheduler. +inline void cpu_relax() noexcept { +#if defined(_MSC_VER) && !defined(__clang__) +#if defined(_M_X64) || defined(_M_IX86) + _mm_pause(); +#elif defined(_M_ARM64) || defined(_M_ARM) + __yield(); +#else + std::atomic_signal_fence(std::memory_order_seq_cst); +#endif +#elif defined(__x86_64__) || defined(__i386__) + __builtin_ia32_pause(); +#elif defined(__aarch64__) || defined(__arm__) + __asm__ __volatile__("yield" ::: "memory"); +#else + std::atomic_signal_fence(std::memory_order_seq_cst); +#endif +} + +template +class thread_pool_scheduler { + public: + static_assert(worker_count > 0, "thread_pool_scheduler worker count must be non-zero"); + static_assert(capacity > 1, "thread_pool_scheduler capacity must be greater than one"); + static_assert((capacity & (capacity - 1)) == 0, "thread_pool_scheduler capacity must be a power of two"); + static_assert(inline_task_bytes > 0, "thread_pool_scheduler inline storage must be non-zero"); + + static constexpr bool guarantees_fifo = false; + static constexpr bool single_consumer = false; + static constexpr bool multi_consumer = true; + static constexpr bool owns_workers = true; + static constexpr bool run_to_completion = false; + static constexpr std::size_t static_worker_count = worker_count; + static constexpr std::size_t static_capacity = capacity; + + // std::thread may allocate OS/runtime resources here; dispatch uses only the + // fixed task ring below. + thread_pool_scheduler() { start_workers(); } + + ~thread_pool_scheduler() { stop_workers(); } + + thread_pool_scheduler(const thread_pool_scheduler&) = delete; + thread_pool_scheduler& operator=(const thread_pool_scheduler&) = delete; + thread_pool_scheduler(thread_pool_scheduler&&) = delete; + thread_pool_scheduler& operator=(thread_pool_scheduler&&) = delete; + + template + bool try_run_immediate(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + if (queued_or_running_.load(std::memory_order_acquire) != 0u) { + return false; + } + + bool expected = false; + if (!inline_active_.compare_exchange_strong(expected, true, std::memory_order_acq_rel, std::memory_order_acquire)) { + return false; + } + + struct reset_inline { + std::atomic& active; + ~reset_inline() noexcept { active.store(false, std::memory_order_release); } + } reset{inline_active_}; + + if (queued_or_running_.load(std::memory_order_acquire) != 0u) { + return false; + } + + std::forward(fn_in)(); + immediate_run_count_.fetch_add(1u, std::memory_order_relaxed); + return true; + } + + template + bool try_submit(fn&& fn_in) noexcept { + return try_submit_with_completion(std::forward(fn_in), nullptr, nullptr); + } + + // Detached hard-contract wrapper for call sites that have already proven + // scheduler lifetime and queue capacity. Actor-facing RTC paths must use + // thread_pool_scheduler_ref::schedule or run_or_schedule_and_wait. + template + void submit(fn&& fn_in) noexcept { + if (!try_submit(std::forward(fn_in))) { + std::terminate(); + } + } + + // Unconditionally noexcept (like fifo_scheduler::schedule): when this falls to + // the scheduled path the task runs through a noexcept worker thunk, which + // cannot propagate an exception back to the waiter, so the API is consistently + // no-throw rather than propagate-or-terminate by pool load. Submit noexcept + // work (the RTC actor contract); a throwing task calls std::terminate. + template + bool run_or_schedule_and_wait(fn&& fn_in) noexcept { + if (try_run_immediate(std::forward(fn_in))) { + return true; + } + if (running_on_this_worker()) { + return false; + } + + // Spin-join on a local flag rather than blocking on a semaphore: the worker + // sets done last, so the waiter returns only after the worker's final write + // and can safely let the flag go out of scope (no destroy-during-notify + // fault). The wait is a bounded RTC join over one already-submitted task. + std::atomic done{false}; + const bool scheduled = try_submit_and_signal([&fn_in]() noexcept { fn_in(); }, done); + if (!scheduled) { + return false; + } + while (!done.load(std::memory_order_acquire)) { + cpu_relax(); + } + return true; + } + + std::uint64_t immediate_run_count() const noexcept { return immediate_run_count_.load(std::memory_order_relaxed); } + + std::uint64_t scheduled_run_count() const noexcept { return scheduled_run_count_.load(std::memory_order_relaxed); } + + std::uint64_t worker_run_count() const noexcept { return worker_run_count_.load(std::memory_order_relaxed); } + + bool is_current_thread_worker() const noexcept { return running_on_this_worker(); } + + template + bool try_submit_with_completion(fn&& fn_in, void* completion_ctx, void (*completion_fn)(void*) noexcept) noexcept { + if (stopping_.load(std::memory_order_acquire)) { + return false; + } + + queued_or_running_.fetch_add(1u, std::memory_order_acq_rel); + const bool enqueued = enqueue(std::forward(fn_in), completion_ctx, completion_fn); + if (!enqueued) { + queued_or_running_.fetch_sub(1u, std::memory_order_acq_rel); + return false; + } + + scheduled_run_count_.fetch_add(1u, std::memory_order_relaxed); + ready_.release(); + return true; + } + + private: + struct task_slot { + using invoke_fn = void (*)(void*) noexcept; + using destroy_fn = void (*)(void*) noexcept; + + alignas(std::max_align_t) std::array storage{}; + std::atomic sequence = 0u; + invoke_fn invoke = nullptr; + destroy_fn destroy = nullptr; + void* completion_ctx = nullptr; + void (*completion_fn)(void*) noexcept = nullptr; + + template + void set(fn&& fn_in, void* completion_ctx_in, void (*completion_fn_in)(void*) noexcept) noexcept { + using fn_type = std::decay_t; + static_assert(sizeof(fn_type) <= inline_task_bytes, "scheduled task exceeds inline storage capacity"); + static_assert(alignof(fn_type) <= alignof(std::max_align_t), + "scheduled task alignment exceeds scheduler storage alignment"); + + new (storage.data()) fn_type(std::forward(fn_in)); + invoke = [](void* ptr) noexcept { (*static_cast(ptr))(); }; + destroy = [](void* ptr) noexcept { static_cast(ptr)->~fn_type(); }; + completion_ctx = completion_ctx_in; + completion_fn = completion_fn_in; + } + + void run() noexcept { + invoke(storage.data()); + destroy(storage.data()); + invoke = nullptr; + destroy = nullptr; + } + + void reset() noexcept { + if (destroy != nullptr) { + destroy(storage.data()); + } + invoke = nullptr; + destroy = nullptr; + completion_ctx = nullptr; + completion_fn = nullptr; + } + }; + + static constexpr std::size_t index_mask = capacity - 1u; + + void start_workers() { + for (std::size_t i = 0; i < capacity; ++i) { + tasks_[i].sequence.store(i, std::memory_order_relaxed); + } +#if BOOST_SML_DISABLE_EXCEPTIONS + // No-exceptions builds cannot observe a failed std::thread spawn (it calls + // std::terminate), so there is nothing to roll back; spawn directly. + for (std::size_t started = 0u; started < worker_count; ++started) { + workers_[started] = std::thread([this]() noexcept { worker_loop(); }); + } +#else + std::size_t started = 0u; + try { + for (; started < worker_count; ++started) { + workers_[started] = std::thread([this]() noexcept { worker_loop(); }); + } + } catch (...) { + stopping_.store(true, std::memory_order_release); + for (std::size_t i = 0; i < started; ++i) { + ready_.release(); + } + for (std::size_t i = 0; i < started; ++i) { + if (workers_[i].joinable()) { + workers_[i].join(); + } + } + throw; + } +#endif + } + + void stop_workers() noexcept { + const bool was_stopping = stopping_.exchange(true, std::memory_order_acq_rel); + if (was_stopping) { + return; + } + + for (std::size_t i = 0; i < worker_count; ++i) { + ready_.release(); + } + + for (auto& worker : workers_) { + if (worker.joinable()) { + worker.join(); + } + } + + clear_unrun_tasks(); + } + + template + bool try_submit_and_signal(fn&& fn_in, std::atomic& done) noexcept { + return try_submit_with_completion(std::forward(fn_in), &done, signal_done_flag); + } + + template + bool enqueue(fn&& fn_in, void* completion_ctx, void (*completion_fn)(void*) noexcept) noexcept { + task_slot* slot = nullptr; + std::size_t pos = enqueue_pos_.load(std::memory_order_relaxed); + for (;;) { + slot = &tasks_[pos & index_mask]; + const std::size_t seq = slot->sequence.load(std::memory_order_acquire); + const auto diff = static_cast(seq) - static_cast(pos); + if (diff == 0) { + if (enqueue_pos_.compare_exchange_weak(pos, pos + 1u, std::memory_order_relaxed, std::memory_order_relaxed)) { + break; + } + } else if (diff < 0) { + return false; + } else { + pos = enqueue_pos_.load(std::memory_order_relaxed); + } + } + + slot->set(std::forward(fn_in), completion_ctx, completion_fn); + slot->sequence.store(pos + 1u, std::memory_order_release); + return true; + } + + bool try_dequeue_and_run() noexcept { + task_slot* slot = nullptr; + std::size_t pos = dequeue_pos_.load(std::memory_order_relaxed); + for (;;) { + slot = &tasks_[pos & index_mask]; + const std::size_t seq = slot->sequence.load(std::memory_order_acquire); + const auto diff = static_cast(seq) - static_cast(pos + 1u); + if (diff == 0) { + if (dequeue_pos_.compare_exchange_weak(pos, pos + 1u, std::memory_order_relaxed, std::memory_order_relaxed)) { + break; + } + } else if (diff < 0) { + return false; + } else { + pos = dequeue_pos_.load(std::memory_order_relaxed); + } + } + + slot->run(); + void* completion_ctx = slot->completion_ctx; + void (*completion_fn)(void*) noexcept = slot->completion_fn; + slot->completion_ctx = nullptr; + slot->completion_fn = nullptr; + worker_run_count_.fetch_add(1u, std::memory_order_relaxed); + queued_or_running_.fetch_sub(1u, std::memory_order_acq_rel); + slot->sequence.store(pos + capacity, std::memory_order_release); + if (completion_fn != nullptr) { + completion_fn(completion_ctx); + } + return true; + } + + static void signal_done_flag(void* ctx) noexcept { + static_cast*>(ctx)->store(true, std::memory_order_release); + } + + void worker_loop() noexcept { + struct worker_scope { + const thread_pool_scheduler* previous; + explicit worker_scope(const thread_pool_scheduler* current) noexcept : previous(active_worker_scheduler_) { + active_worker_scheduler_ = current; + } + ~worker_scope() noexcept { active_worker_scheduler_ = previous; } + } scope{this}; + + for (;;) { + // Claim exactly one wake permit before dequeuing, preserving the + // permit-per-task invariant. Spin-claim first so back-to-back fork/joins + // (e.g. a decode burst) keep the worker warm and skip resleep/wakeup + // latency between rounds, the same warm-polling strategy optimized native + // threadpools use; fall back to a blocking acquire once genuinely idle so + // a quiescent pool does not burn a core. + bool claimed = false; + for (std::size_t spin = 0; spin < k_idle_spin_budget; ++spin) { + if (ready_.try_acquire()) { + claimed = true; + break; + } + if (stopping_.load(std::memory_order_acquire)) { + return; + } + cpu_relax(); + } + if (!claimed) { + ready_.acquire(); + } + // The claimed permit promises a published task or a stop signal. The task + // may not be visible at dequeue_pos for a few cycles, so retry rather than + // drop the permit (which would strand the task); re-check stop to exit. + while (!try_dequeue_and_run()) { + if (stopping_.load(std::memory_order_acquire)) { + return; + } + cpu_relax(); + } + } + } + + static constexpr std::size_t k_idle_spin_budget = 2048; + + bool running_on_this_worker() const noexcept { return active_worker_scheduler_ == this; } + + void clear_unrun_tasks() noexcept { + while (try_dequeue_and_run()) { + } + for (auto& task : tasks_) { + task.reset(); + } + } + + std::array tasks_{}; + std::array workers_{}; + std::counting_semaphore<> ready_{0}; + std::atomic enqueue_pos_ = 0u; + std::atomic dequeue_pos_ = 0u; + std::atomic queued_or_running_ = 0u; + std::atomic inline_active_ = false; + std::atomic stopping_ = false; + std::atomic immediate_run_count_ = 0u; + std::atomic scheduled_run_count_ = 0u; + std::atomic worker_run_count_ = 0u; + inline static thread_local const thread_pool_scheduler* active_worker_scheduler_ = nullptr; +}; + +template +class thread_pool_scheduler_ref { + public: + static constexpr bool guarantees_fifo = scheduler::guarantees_fifo; + static constexpr bool single_consumer = scheduler::single_consumer; + static constexpr bool multi_consumer = scheduler::multi_consumer; + static constexpr bool owns_workers = false; + static constexpr bool run_to_completion = true; + static constexpr std::size_t static_worker_count = scheduler::static_worker_count; + static constexpr std::size_t static_capacity = scheduler::static_capacity; + + thread_pool_scheduler_ref() = delete; + explicit thread_pool_scheduler_ref(scheduler& scheduler_in) noexcept : scheduler_(&scheduler_in) {} + + class join_group { + public: + join_group() = default; + ~join_group() = default; + + join_group(const join_group&) = delete; + join_group& operator=(const join_group&) = delete; + join_group(join_group&&) = delete; + join_group& operator=(join_group&&) = delete; + + bool wait() noexcept { + // Spin-join on pending_ rather than blocking on a per-group semaphore. + // The group is caller-owned and typically stack-reused across fork/joins, + // so a notify-based wakeup is unsafe: the waiter could observe completion, + // return, and destroy the group before the last completer finishes its + // release()/notify, faulting on freed semaphore state. With a plain spin, + // a completer's final touch of the group is its pending_ decrement, and + // wait() returns only after observing pending_ == 0 (all decrements done), + // so nothing accesses the group after the caller may destroy it. The wait + // is a bounded RTC fork/join over already-submitted lanes, so the producer + // core would otherwise be idle; spinning gives the lowest join latency. + while (pending_.load(std::memory_order_acquire) != 0u) { + cpu_relax(); + } + // pending_ == 0 means every completer is done touching the group, so the + // owning thread can now read and clear accepted_ with no contention. The + // clear lets a stack-reused join_group start the next round clean: without + // it, a single rejected lane would make wait() return false on every later + // round even when all lanes succeed. + const bool accepted = accepted_.load(std::memory_order_acquire); + accepted_.store(true, std::memory_order_release); + return accepted; + } + + private: + friend class thread_pool_scheduler_ref; + + void start_one() noexcept { pending_.fetch_add(1u, std::memory_order_acq_rel); } + + void reject_one() noexcept { + accepted_.store(false, std::memory_order_release); + complete_one(); + } + + void reject() noexcept { accepted_.store(false, std::memory_order_release); } + + void complete_one() noexcept { pending_.fetch_sub(1u, std::memory_order_acq_rel); } + + static void complete_one(void* ctx) noexcept { static_cast(ctx)->complete_one(); } + + std::atomic pending_ = 0u; + std::atomic accepted_ = true; + }; + + template + bool try_run_immediate(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + return scheduler_->try_run_immediate(std::forward(fn_in)); + } + + template + bool try_submit(join_group& group, fn&& fn_in) noexcept { + if (scheduler_->is_current_thread_worker()) { + group.reject(); + return false; + } + + group.start_one(); + const bool submitted = scheduler_->try_submit_with_completion(std::forward(fn_in), &group, join_group::complete_one); + if (!submitted) { + group.reject_one(); + return false; + } + return true; + } + + template + void schedule(fn&& fn_in) noexcept { + if (!scheduler_->run_or_schedule_and_wait(std::forward(fn_in))) { + std::terminate(); + } + } + + template + bool run_or_schedule_and_wait(fn&& fn_in) noexcept { + return scheduler_->run_or_schedule_and_wait(std::forward(fn_in)); + } + + std::uint64_t immediate_run_count() const noexcept { return scheduler_->immediate_run_count(); } + + std::uint64_t scheduled_run_count() const noexcept { return scheduler_->scheduled_run_count(); } + + std::uint64_t worker_run_count() const noexcept { return scheduler_->worker_run_count(); } + + private: + scheduler* scheduler_ = nullptr; +}; + +} // namespace policy +} // namespace utility + +BOOST_SML_NAMESPACE_END +#endif // BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED + +#endif // BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP diff --git a/include/stateforward/sml/utility/thread_pool_scheduler.hpp b/include/stateforward/sml/utility/thread_pool_scheduler.hpp new file mode 100644 index 00000000..594fed23 --- /dev/null +++ b/include/stateforward/sml/utility/thread_pool_scheduler.hpp @@ -0,0 +1,7 @@ +#ifndef STATEFORWARD_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP +#define STATEFORWARD_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP + +#include +#include + +#endif diff --git a/test/ft/CMakeLists.txt b/test/ft/CMakeLists.txt index 7df0c8a0..5ac48a60 100644 --- a/test/ft/CMakeLists.txt +++ b/test/ft/CMakeLists.txt @@ -19,6 +19,10 @@ add_test(test_composite test_composite) add_executable(test_co_sm co_sm.cpp) add_test(test_co_sm test_co_sm) +add_executable(test_thread_pool_scheduler thread_pool_scheduler.cpp) +add_test(test_thread_pool_scheduler test_thread_pool_scheduler) +target_link_libraries(test_thread_pool_scheduler -lpthread) + add_executable(test_constexpr constexpr.cpp) add_test(test_constexpr test_constexpr) diff --git a/test/ft/thread_pool_scheduler.cpp b/test/ft/thread_pool_scheduler.cpp new file mode 100644 index 00000000..0f1c98d2 --- /dev/null +++ b/test/ft/thread_pool_scheduler.cpp @@ -0,0 +1,106 @@ +// +// Copyright (c) 2026 stateforward +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#include +#include +#include +#include + +#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED +namespace policy = boost::sml::utility::policy; + +using pool_t = policy::thread_pool_scheduler<8>; +using pool_ref_t = policy::thread_pool_scheduler_ref; + +// A non-noexcept callable: the scheduled wait/schedule paths must stay +// unconditionally noexcept (their worker thunk cannot propagate), while the +// pure-inline try_run_immediate stays conditionally noexcept and may propagate. +struct may_throw_callable { + void operator()() const {} +}; +static_assert(noexcept(std::declval().run_or_schedule_and_wait(may_throw_callable{})), + "run_or_schedule_and_wait must be unconditionally noexcept"); +static_assert(noexcept(std::declval().schedule(may_throw_callable{})), + "schedule must be unconditionally noexcept"); +static_assert(!noexcept(std::declval().try_run_immediate(may_throw_callable{})), + "try_run_immediate stays conditionally noexcept (inline path propagates)"); + +test thread_pool_scheduler_runs_task_inline_when_idle = [] { + pool_t pool{}; + pool_ref_t scheduler{pool}; + int calls = 0; + const bool ran = scheduler.run_or_schedule_and_wait([&calls] { ++calls; }); + expect(ran); + expect(1 == calls); + // An idle pool runs the work on the calling thread rather than a worker. + expect(1u == scheduler.immediate_run_count()); + expect(0u == scheduler.worker_run_count()); +}; + +test thread_pool_scheduler_fork_join_runs_every_lane = [] { + pool_t pool{}; + pool_ref_t scheduler{pool}; + std::atomic calls{0}; + pool_ref_t::join_group group{}; + for (std::size_t lane = 0; lane < 8u; ++lane) { + scheduler.try_submit(group, [&calls] { calls.fetch_add(1, std::memory_order_relaxed); }); + } + const bool accepted = group.wait(); + expect(accepted); + expect(8 == calls.load(std::memory_order_acquire)); +}; + +// A stack-reused join_group must start each round clean: a rejection in one round +// must not make a later all-success round report failure. (try_submit from inside +// a worker is rejected, since a worker cannot fork into its own pool.) +test thread_pool_scheduler_reused_join_group_clears_prior_rejection = [] { + pool_t pool{}; + pool_ref_t scheduler{pool}; + pool_ref_t::join_group group{}; + + // Round 1: drive a worker that tries to submit into `group` -> rejected. + std::atomic rejected_on_worker{false}; + pool_ref_t::join_group driver{}; + scheduler.try_submit(driver, + [&] { rejected_on_worker.store(!scheduler.try_submit(group, [] {}), std::memory_order_release); }); + driver.wait(); + expect(rejected_on_worker.load(std::memory_order_acquire)); + expect(!group.wait()); // the rejection is observed once... + + // Round 2: same group, all lanes succeed -> must report accepted again. + std::atomic calls{0}; + for (std::size_t lane = 0; lane < 4u; ++lane) { + scheduler.try_submit(group, [&calls] { calls.fetch_add(1, std::memory_order_relaxed); }); + } + expect(group.wait()); // ...and must not stick to false on the next round + expect(4 == calls.load(std::memory_order_acquire)); +}; + +// Regression guard for the join-latch deadlock: under rapid, repeated fork/join +// with lane_count == worker_count, a Dekker-race close/complete handshake plus a +// destroy-during-release semaphore could strand a wakeup. The lifetime-safe +// spin-join must complete every round without hanging. (Reproduced reliably with +// thousands of rounds; kept here as a standing guard.) +test thread_pool_scheduler_ref_fork_join_survives_rapid_repeated_rounds = [] { + constexpr std::size_t lanes = 8u; + constexpr int rounds = 20000; + pool_t pool{}; + pool_ref_t scheduler{pool}; + std::atomic calls{0}; + for (int round = 0; round < rounds; ++round) { + pool_ref_t::join_group group{}; + for (std::size_t lane = 0; lane < lanes; ++lane) { + scheduler.try_submit(group, [&calls] { calls.fetch_add(1, std::memory_order_relaxed); }); + } + expect(group.wait()); + } + expect(static_cast(lanes) * rounds == calls.load(std::memory_order_acquire)); +}; + +#else +test thread_pool_scheduler_disabled_without_cxx20_and_semaphore = [] { expect(true); }; +#endif