From 699b0aa00849ef6f7448fcd9952e9957a9866553 Mon Sep 17 00:00:00 2001 From: gabewillen Date: Mon, 29 Jun 2026 02:31:47 +0000 Subject: [PATCH 1/4] Add thread_pool_scheduler opt-in utility policy A multi-consumer work pool with a lifetime-safe, deadlock-free fork/join latch, for action-level inter-op parallelism across SML actor dispatches. It sits alongside inline_scheduler / fifo_scheduler / coroutine_scheduler and exposes the same policy-introspection traits. Opt-in module: gated on C++20 and so the freestanding, dependency-free core stays thread-free by default. Uses a fixed MPMC task ring (no allocation on dispatch), warm-polling workers, and a spin-join that avoids destroy-during-notify on caller-owned join groups. Validated under clang 20 and g++ 13, clean under ASAN/UBSAN and TSAN, with a 20000-round x 8-lane fork/join regression guarding the join-latch deadlock (lane_count == worker_count). --- README.md | 1 + .../sml/utility/thread_pool_scheduler.hpp | 518 ++++++++++++++++++ test/ft/CMakeLists.txt | 4 + test/ft/thread_pool_scheduler.cpp | 66 +++ 4 files changed, 589 insertions(+) create mode 100644 include/boost/sml/utility/thread_pool_scheduler.hpp create mode 100644 test/ft/thread_pool_scheduler.cpp diff --git a/README.md b/README.md index 243c3e71..b4d4b596 100644 --- a/README.md +++ b/README.md @@ -29,6 +29,7 @@ This fork addresses critical bugs, adds utilities for high-throughput and async - **Completion transitions** (`completion`) -- post-event transitions with origin event propagation - **Coroutine state machines** (`co_sm`) -- C++20 coroutine-driven SM with configurable scheduler and allocator policies +- **`thread_pool_scheduler`** -- opt-in C++20 multi-consumer work pool with a lifetime-safe, deadlock-free fork/join latch for action-level inter-op parallelism (warm workers, fixed task ring, no allocation on dispatch) - **`sm_pool`** -- pool-based container for managing thousands of SM instances with indexed and batch dispatch - **`dispatch_table`** -- compile-time dispatch table for ID-based event routing at zero runtime overhead diff --git a/include/boost/sml/utility/thread_pool_scheduler.hpp b/include/boost/sml/utility/thread_pool_scheduler.hpp new file mode 100644 index 00000000..b6c93458 --- /dev/null +++ b/include/boost/sml/utility/thread_pool_scheduler.hpp @@ -0,0 +1,518 @@ +// +// Copyright (c) 2026 stateforward +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#ifndef BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP + +#include "boost/sml.hpp" + +#if defined(_MSVC_LANG) +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG _MSVC_LANG +#else +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG __cplusplus +#endif + +// Opt-in module: unlike the header-only, dependency-free SML core, the thread +// pool scheduler requires C++20 (std::counting_semaphore) and a hosted +// threading runtime. It compiles to nothing when those are unavailable so the +// freestanding core stays thread-free by default. +#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG >= 202002L && __has_include() +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED 1 +#else +#define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED 0 +#endif + +#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#if defined(_MSC_VER) && !defined(__clang__) +#include // _mm_pause / __yield +#endif + +BOOST_SML_NAMESPACE_BEGIN + +namespace utility { +namespace policy { + +// Architecture-appropriate spin hint for short busy-waits (lock-free join / +// queue retries). Reduces contention and power on hyperthreads without yielding +// the scheduler. +inline void cpu_relax() noexcept { +#if defined(_MSC_VER) && !defined(__clang__) +#if defined(_M_X64) || defined(_M_IX86) + _mm_pause(); +#elif defined(_M_ARM64) || defined(_M_ARM) + __yield(); +#else + std::atomic_signal_fence(std::memory_order_seq_cst); +#endif +#elif defined(__x86_64__) || defined(__i386__) + __builtin_ia32_pause(); +#elif defined(__aarch64__) || defined(__arm__) + __asm__ __volatile__("yield" ::: "memory"); +#else + std::atomic_signal_fence(std::memory_order_seq_cst); +#endif +} + +template +class thread_pool_scheduler { + public: + static_assert(worker_count > 0, "thread_pool_scheduler worker count must be non-zero"); + static_assert(capacity > 1, "thread_pool_scheduler capacity must be greater than one"); + static_assert((capacity & (capacity - 1)) == 0, "thread_pool_scheduler capacity must be a power of two"); + static_assert(inline_task_bytes > 0, "thread_pool_scheduler inline storage must be non-zero"); + + static constexpr bool guarantees_fifo = false; + static constexpr bool single_consumer = false; + static constexpr bool multi_consumer = true; + static constexpr bool owns_workers = true; + static constexpr bool run_to_completion = false; + static constexpr std::size_t static_worker_count = worker_count; + static constexpr std::size_t static_capacity = capacity; + + // std::thread may allocate OS/runtime resources here; dispatch uses only the + // fixed task ring below. + thread_pool_scheduler() { start_workers(); } + + ~thread_pool_scheduler() { stop_workers(); } + + thread_pool_scheduler(const thread_pool_scheduler&) = delete; + thread_pool_scheduler& operator=(const thread_pool_scheduler&) = delete; + thread_pool_scheduler(thread_pool_scheduler&&) = delete; + thread_pool_scheduler& operator=(thread_pool_scheduler&&) = delete; + + template + bool try_run_immediate(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + if (queued_or_running_.load(std::memory_order_acquire) != 0u) { + return false; + } + + bool expected = false; + if (!inline_active_.compare_exchange_strong(expected, true, std::memory_order_acq_rel, std::memory_order_acquire)) { + return false; + } + + struct reset_inline { + std::atomic& active; + ~reset_inline() noexcept { active.store(false, std::memory_order_release); } + } reset{inline_active_}; + + if (queued_or_running_.load(std::memory_order_acquire) != 0u) { + return false; + } + + std::forward(fn_in)(); + immediate_run_count_.fetch_add(1u, std::memory_order_relaxed); + return true; + } + + template + bool try_submit(fn&& fn_in) noexcept { + return try_submit_with_completion(std::forward(fn_in), nullptr, nullptr); + } + + // Detached hard-contract wrapper for call sites that have already proven + // scheduler lifetime and queue capacity. Actor-facing RTC paths must use + // thread_pool_scheduler_ref::schedule or run_or_schedule_and_wait. + template + void submit(fn&& fn_in) noexcept { + if (!try_submit(std::forward(fn_in))) { + std::terminate(); + } + } + + template + bool run_or_schedule_and_wait(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + if (try_run_immediate(std::forward(fn_in))) { + return true; + } + if (running_on_this_worker()) { + return false; + } + + // Spin-join on a local flag rather than blocking on a semaphore: the worker + // sets done last, so the waiter returns only after the worker's final write + // and can safely let the flag go out of scope (no destroy-during-notify + // fault). The wait is a bounded RTC join over one already-submitted task. + std::atomic done{false}; + const bool scheduled = try_submit_and_signal([&fn_in]() noexcept(noexcept(fn_in())) { fn_in(); }, done); + if (!scheduled) { + return false; + } + while (!done.load(std::memory_order_acquire)) { + cpu_relax(); + } + return true; + } + + std::uint64_t immediate_run_count() const noexcept { return immediate_run_count_.load(std::memory_order_relaxed); } + + std::uint64_t scheduled_run_count() const noexcept { return scheduled_run_count_.load(std::memory_order_relaxed); } + + std::uint64_t worker_run_count() const noexcept { return worker_run_count_.load(std::memory_order_relaxed); } + + bool is_current_thread_worker() const noexcept { return running_on_this_worker(); } + + template + bool try_submit_with_completion(fn&& fn_in, void* completion_ctx, void (*completion_fn)(void*) noexcept) noexcept { + if (stopping_.load(std::memory_order_acquire)) { + return false; + } + + queued_or_running_.fetch_add(1u, std::memory_order_acq_rel); + const bool enqueued = enqueue(std::forward(fn_in), completion_ctx, completion_fn); + if (!enqueued) { + queued_or_running_.fetch_sub(1u, std::memory_order_acq_rel); + return false; + } + + scheduled_run_count_.fetch_add(1u, std::memory_order_relaxed); + ready_.release(); + return true; + } + + private: + struct task_slot { + using invoke_fn = void (*)(void*) noexcept; + using destroy_fn = void (*)(void*) noexcept; + + alignas(std::max_align_t) std::array storage{}; + std::atomic sequence = 0u; + invoke_fn invoke = nullptr; + destroy_fn destroy = nullptr; + void* completion_ctx = nullptr; + void (*completion_fn)(void*) noexcept = nullptr; + + template + void set(fn&& fn_in, void* completion_ctx_in, void (*completion_fn_in)(void*) noexcept) noexcept { + using fn_type = std::decay_t; + static_assert(sizeof(fn_type) <= inline_task_bytes, "scheduled task exceeds inline storage capacity"); + static_assert(alignof(fn_type) <= alignof(std::max_align_t), + "scheduled task alignment exceeds scheduler storage alignment"); + + new (storage.data()) fn_type(std::forward(fn_in)); + invoke = [](void* ptr) noexcept { (*static_cast(ptr))(); }; + destroy = [](void* ptr) noexcept { static_cast(ptr)->~fn_type(); }; + completion_ctx = completion_ctx_in; + completion_fn = completion_fn_in; + } + + void run() noexcept { + invoke(storage.data()); + destroy(storage.data()); + invoke = nullptr; + destroy = nullptr; + } + + void reset() noexcept { + if (destroy != nullptr) { + destroy(storage.data()); + } + invoke = nullptr; + destroy = nullptr; + completion_ctx = nullptr; + completion_fn = nullptr; + } + }; + + static constexpr std::size_t index_mask = capacity - 1u; + + void start_workers() { + for (std::size_t i = 0; i < capacity; ++i) { + tasks_[i].sequence.store(i, std::memory_order_relaxed); + } + std::size_t started = 0u; + try { + for (; started < worker_count; ++started) { + workers_[started] = std::thread([this]() noexcept { worker_loop(); }); + } + } catch (...) { + stopping_.store(true, std::memory_order_release); + for (std::size_t i = 0; i < started; ++i) { + ready_.release(); + } + for (std::size_t i = 0; i < started; ++i) { + if (workers_[i].joinable()) { + workers_[i].join(); + } + } + throw; + } + } + + void stop_workers() noexcept { + const bool was_stopping = stopping_.exchange(true, std::memory_order_acq_rel); + if (was_stopping) { + return; + } + + for (std::size_t i = 0; i < worker_count; ++i) { + ready_.release(); + } + + for (auto& worker : workers_) { + if (worker.joinable()) { + worker.join(); + } + } + + clear_unrun_tasks(); + } + + template + bool try_submit_and_signal(fn&& fn_in, std::atomic& done) noexcept { + return try_submit_with_completion(std::forward(fn_in), &done, signal_done_flag); + } + + template + bool enqueue(fn&& fn_in, void* completion_ctx, void (*completion_fn)(void*) noexcept) noexcept { + task_slot* slot = nullptr; + std::size_t pos = enqueue_pos_.load(std::memory_order_relaxed); + for (;;) { + slot = &tasks_[pos & index_mask]; + const std::size_t seq = slot->sequence.load(std::memory_order_acquire); + const auto diff = static_cast(seq) - static_cast(pos); + if (diff == 0) { + if (enqueue_pos_.compare_exchange_weak(pos, pos + 1u, std::memory_order_relaxed, std::memory_order_relaxed)) { + break; + } + } else if (diff < 0) { + return false; + } else { + pos = enqueue_pos_.load(std::memory_order_relaxed); + } + } + + slot->set(std::forward(fn_in), completion_ctx, completion_fn); + slot->sequence.store(pos + 1u, std::memory_order_release); + return true; + } + + bool try_dequeue_and_run() noexcept { + task_slot* slot = nullptr; + std::size_t pos = dequeue_pos_.load(std::memory_order_relaxed); + for (;;) { + slot = &tasks_[pos & index_mask]; + const std::size_t seq = slot->sequence.load(std::memory_order_acquire); + const auto diff = static_cast(seq) - static_cast(pos + 1u); + if (diff == 0) { + if (dequeue_pos_.compare_exchange_weak(pos, pos + 1u, std::memory_order_relaxed, std::memory_order_relaxed)) { + break; + } + } else if (diff < 0) { + return false; + } else { + pos = dequeue_pos_.load(std::memory_order_relaxed); + } + } + + slot->run(); + void* completion_ctx = slot->completion_ctx; + void (*completion_fn)(void*) noexcept = slot->completion_fn; + slot->completion_ctx = nullptr; + slot->completion_fn = nullptr; + worker_run_count_.fetch_add(1u, std::memory_order_relaxed); + queued_or_running_.fetch_sub(1u, std::memory_order_acq_rel); + slot->sequence.store(pos + capacity, std::memory_order_release); + if (completion_fn != nullptr) { + completion_fn(completion_ctx); + } + return true; + } + + static void signal_done_flag(void* ctx) noexcept { + static_cast*>(ctx)->store(true, std::memory_order_release); + } + + void worker_loop() noexcept { + struct worker_scope { + const thread_pool_scheduler* previous; + explicit worker_scope(const thread_pool_scheduler* current) noexcept : previous(active_worker_scheduler_) { + active_worker_scheduler_ = current; + } + ~worker_scope() noexcept { active_worker_scheduler_ = previous; } + } scope{this}; + + for (;;) { + // Claim exactly one wake permit before dequeuing, preserving the + // permit-per-task invariant. Spin-claim first so back-to-back fork/joins + // (e.g. a decode burst) keep the worker warm and skip resleep/wakeup + // latency between rounds, the same warm-polling strategy optimized native + // threadpools use; fall back to a blocking acquire once genuinely idle so + // a quiescent pool does not burn a core. + bool claimed = false; + for (std::size_t spin = 0; spin < k_idle_spin_budget; ++spin) { + if (ready_.try_acquire()) { + claimed = true; + break; + } + if (stopping_.load(std::memory_order_acquire)) { + return; + } + cpu_relax(); + } + if (!claimed) { + ready_.acquire(); + } + // The claimed permit promises a published task or a stop signal. The task + // may not be visible at dequeue_pos for a few cycles, so retry rather than + // drop the permit (which would strand the task); re-check stop to exit. + while (!try_dequeue_and_run()) { + if (stopping_.load(std::memory_order_acquire)) { + return; + } + cpu_relax(); + } + } + } + + static constexpr std::size_t k_idle_spin_budget = 2048; + + bool running_on_this_worker() const noexcept { return active_worker_scheduler_ == this; } + + void clear_unrun_tasks() noexcept { + while (try_dequeue_and_run()) { + } + for (auto& task : tasks_) { + task.reset(); + } + } + + std::array tasks_{}; + std::array workers_{}; + std::counting_semaphore<> ready_{0}; + std::atomic enqueue_pos_ = 0u; + std::atomic dequeue_pos_ = 0u; + std::atomic queued_or_running_ = 0u; + std::atomic inline_active_ = false; + std::atomic stopping_ = false; + std::atomic immediate_run_count_ = 0u; + std::atomic scheduled_run_count_ = 0u; + std::atomic worker_run_count_ = 0u; + inline static thread_local const thread_pool_scheduler* active_worker_scheduler_ = nullptr; +}; + +template +class thread_pool_scheduler_ref { + public: + static constexpr bool guarantees_fifo = scheduler::guarantees_fifo; + static constexpr bool single_consumer = scheduler::single_consumer; + static constexpr bool multi_consumer = scheduler::multi_consumer; + static constexpr bool owns_workers = false; + static constexpr bool run_to_completion = true; + static constexpr std::size_t static_worker_count = scheduler::static_worker_count; + static constexpr std::size_t static_capacity = scheduler::static_capacity; + + thread_pool_scheduler_ref() = delete; + explicit thread_pool_scheduler_ref(scheduler& scheduler_in) noexcept : scheduler_(&scheduler_in) {} + + class join_group { + public: + join_group() = default; + ~join_group() = default; + + join_group(const join_group&) = delete; + join_group& operator=(const join_group&) = delete; + join_group(join_group&&) = delete; + join_group& operator=(join_group&&) = delete; + + bool wait() noexcept { + // Spin-join on pending_ rather than blocking on a per-group semaphore. + // The group is caller-owned and typically stack-reused across fork/joins, + // so a notify-based wakeup is unsafe: the waiter could observe completion, + // return, and destroy the group before the last completer finishes its + // release()/notify, faulting on freed semaphore state. With a plain spin, + // a completer's final touch of the group is its pending_ decrement, and + // wait() returns only after observing pending_ == 0 (all decrements done), + // so nothing accesses the group after the caller may destroy it. The wait + // is a bounded RTC fork/join over already-submitted lanes, so the producer + // core would otherwise be idle; spinning gives the lowest join latency. + while (pending_.load(std::memory_order_acquire) != 0u) { + cpu_relax(); + } + return accepted_.load(std::memory_order_acquire); + } + + private: + friend class thread_pool_scheduler_ref; + + void start_one() noexcept { pending_.fetch_add(1u, std::memory_order_acq_rel); } + + void reject_one() noexcept { + accepted_.store(false, std::memory_order_release); + complete_one(); + } + + void reject() noexcept { accepted_.store(false, std::memory_order_release); } + + void complete_one() noexcept { pending_.fetch_sub(1u, std::memory_order_acq_rel); } + + static void complete_one(void* ctx) noexcept { static_cast(ctx)->complete_one(); } + + std::atomic pending_ = 0u; + std::atomic accepted_ = true; + }; + + template + bool try_run_immediate(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + return scheduler_->try_run_immediate(std::forward(fn_in)); + } + + template + bool try_submit(join_group& group, fn&& fn_in) noexcept { + if (scheduler_->is_current_thread_worker()) { + group.reject(); + return false; + } + + group.start_one(); + const bool submitted = scheduler_->try_submit_with_completion(std::forward(fn_in), &group, join_group::complete_one); + if (!submitted) { + group.reject_one(); + return false; + } + return true; + } + + template + void schedule(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + if (!scheduler_->run_or_schedule_and_wait(std::forward(fn_in))) { + std::terminate(); + } + } + + template + bool run_or_schedule_and_wait(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + return scheduler_->run_or_schedule_and_wait(std::forward(fn_in)); + } + + std::uint64_t immediate_run_count() const noexcept { return scheduler_->immediate_run_count(); } + + std::uint64_t scheduled_run_count() const noexcept { return scheduler_->scheduled_run_count(); } + + std::uint64_t worker_run_count() const noexcept { return scheduler_->worker_run_count(); } + + private: + scheduler* scheduler_ = nullptr; +}; + +} // namespace policy +} // namespace utility + +BOOST_SML_NAMESPACE_END +#endif // BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED + +#endif // BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP diff --git a/test/ft/CMakeLists.txt b/test/ft/CMakeLists.txt index 7df0c8a0..5ac48a60 100644 --- a/test/ft/CMakeLists.txt +++ b/test/ft/CMakeLists.txt @@ -19,6 +19,10 @@ add_test(test_composite test_composite) add_executable(test_co_sm co_sm.cpp) add_test(test_co_sm test_co_sm) +add_executable(test_thread_pool_scheduler thread_pool_scheduler.cpp) +add_test(test_thread_pool_scheduler test_thread_pool_scheduler) +target_link_libraries(test_thread_pool_scheduler -lpthread) + add_executable(test_constexpr constexpr.cpp) add_test(test_constexpr test_constexpr) diff --git a/test/ft/thread_pool_scheduler.cpp b/test/ft/thread_pool_scheduler.cpp new file mode 100644 index 00000000..69e468d6 --- /dev/null +++ b/test/ft/thread_pool_scheduler.cpp @@ -0,0 +1,66 @@ +// +// Copyright (c) 2026 stateforward +// +// Distributed under the Boost Software License, Version 1.0. +// (See accompanying file LICENSE_1_0.txt or copy at +// http://www.boost.org/LICENSE_1_0.txt) +// +#include +#include +#include + +#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED +namespace policy = boost::sml::utility::policy; + +using pool_t = policy::thread_pool_scheduler<8>; +using pool_ref_t = policy::thread_pool_scheduler_ref; + +test thread_pool_scheduler_runs_task_inline_when_idle = [] { + pool_t pool{}; + pool_ref_t scheduler{pool}; + int calls = 0; + const bool ran = scheduler.run_or_schedule_and_wait([&calls] { ++calls; }); + expect(ran); + expect(1 == calls); + // An idle pool runs the work on the calling thread rather than a worker. + expect(1u == scheduler.immediate_run_count()); + expect(0u == scheduler.worker_run_count()); +}; + +test thread_pool_scheduler_fork_join_runs_every_lane = [] { + pool_t pool{}; + pool_ref_t scheduler{pool}; + std::atomic calls{0}; + pool_ref_t::join_group group{}; + for (std::size_t lane = 0; lane < 8u; ++lane) { + scheduler.try_submit(group, [&calls] { calls.fetch_add(1, std::memory_order_relaxed); }); + } + const bool accepted = group.wait(); + expect(accepted); + expect(8 == calls.load(std::memory_order_acquire)); +}; + +// Regression guard for the join-latch deadlock: under rapid, repeated fork/join +// with lane_count == worker_count, a Dekker-race close/complete handshake plus a +// destroy-during-release semaphore could strand a wakeup. The lifetime-safe +// spin-join must complete every round without hanging. (Reproduced reliably with +// thousands of rounds; kept here as a standing guard.) +test thread_pool_scheduler_ref_fork_join_survives_rapid_repeated_rounds = [] { + constexpr std::size_t lanes = 8u; + constexpr int rounds = 20000; + pool_t pool{}; + pool_ref_t scheduler{pool}; + std::atomic calls{0}; + for (int round = 0; round < rounds; ++round) { + pool_ref_t::join_group group{}; + for (std::size_t lane = 0; lane < lanes; ++lane) { + scheduler.try_submit(group, [&calls] { calls.fetch_add(1, std::memory_order_relaxed); }); + } + expect(group.wait()); + } + expect(static_cast(lanes) * rounds == calls.load(std::memory_order_acquire)); +}; + +#else +test thread_pool_scheduler_disabled_without_cxx20_and_semaphore = [] { expect(true); }; +#endif From 624ea67efa9ca10e0d726b2bdb59a290878406e4 Mon Sep 17 00:00:00 2001 From: gabewillen Date: Mon, 29 Jun 2026 02:51:33 +0000 Subject: [PATCH 2/4] Address review: portability gate + join_group reuse - Gate __has_include() behind defined(__has_include) so toolchains below C++20 or without the extension never have to parse it. - Clear join_group::accepted_ in wait() once pending_ has drained, so a stack-reused group starts each round clean and a prior rejection no longer sticks to later all-success rounds. Adds a regression test (worker-thread rejection followed by an all-success round) that fails without the reset. The MSVC cpu_relax pause-intrinsic fix landed in the previous commit. --- .../sml/utility/thread_pool_scheduler.hpp | 20 +++++++++++--- test/ft/thread_pool_scheduler.cpp | 26 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/include/boost/sml/utility/thread_pool_scheduler.hpp b/include/boost/sml/utility/thread_pool_scheduler.hpp index b6c93458..2dfcb90b 100644 --- a/include/boost/sml/utility/thread_pool_scheduler.hpp +++ b/include/boost/sml/utility/thread_pool_scheduler.hpp @@ -20,9 +20,16 @@ // pool scheduler requires C++20 (std::counting_semaphore) and a hosted // threading runtime. It compiles to nothing when those are unavailable so the // freestanding core stays thread-free by default. -#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG >= 202002L && __has_include() +// Nest the __has_include probe under defined(__has_include) so toolchains +// without the extension (or below C++20) never have to parse it. +#if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_LANG >= 202002L +#if defined(__has_include) +#if __has_include() #define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED 1 -#else +#endif +#endif +#endif +#ifndef BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED #define BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED 0 #endif @@ -443,7 +450,14 @@ class thread_pool_scheduler_ref { while (pending_.load(std::memory_order_acquire) != 0u) { cpu_relax(); } - return accepted_.load(std::memory_order_acquire); + // pending_ == 0 means every completer is done touching the group, so the + // owning thread can now read and clear accepted_ with no contention. The + // clear lets a stack-reused join_group start the next round clean: without + // it, a single rejected lane would make wait() return false on every later + // round even when all lanes succeed. + const bool accepted = accepted_.load(std::memory_order_acquire); + accepted_.store(true, std::memory_order_release); + return accepted; } private: diff --git a/test/ft/thread_pool_scheduler.cpp b/test/ft/thread_pool_scheduler.cpp index 69e468d6..3bd46da2 100644 --- a/test/ft/thread_pool_scheduler.cpp +++ b/test/ft/thread_pool_scheduler.cpp @@ -40,6 +40,32 @@ test thread_pool_scheduler_fork_join_runs_every_lane = [] { expect(8 == calls.load(std::memory_order_acquire)); }; +// A stack-reused join_group must start each round clean: a rejection in one round +// must not make a later all-success round report failure. (try_submit from inside +// a worker is rejected, since a worker cannot fork into its own pool.) +test thread_pool_scheduler_reused_join_group_clears_prior_rejection = [] { + pool_t pool{}; + pool_ref_t scheduler{pool}; + pool_ref_t::join_group group{}; + + // Round 1: drive a worker that tries to submit into `group` -> rejected. + std::atomic rejected_on_worker{false}; + pool_ref_t::join_group driver{}; + scheduler.try_submit(driver, + [&] { rejected_on_worker.store(!scheduler.try_submit(group, [] {}), std::memory_order_release); }); + driver.wait(); + expect(rejected_on_worker.load(std::memory_order_acquire)); + expect(!group.wait()); // the rejection is observed once... + + // Round 2: same group, all lanes succeed -> must report accepted again. + std::atomic calls{0}; + for (std::size_t lane = 0; lane < 4u; ++lane) { + scheduler.try_submit(group, [&calls] { calls.fetch_add(1, std::memory_order_relaxed); }); + } + expect(group.wait()); // ...and must not stick to false on the next round + expect(4 == calls.load(std::memory_order_acquire)); +}; + // Regression guard for the join-latch deadlock: under rapid, repeated fork/join // with lane_count == worker_count, a Dekker-race close/complete handshake plus a // destroy-during-release semaphore could strand a wakeup. The lifetime-safe From 4e5b799dc7c1ed80833e03faa5805e257014ed62 Mon Sep 17 00:00:00 2001 From: gabewillen Date: Mon, 29 Jun 2026 03:02:07 +0000 Subject: [PATCH 3/4] Address review: no-exceptions build + stateforward include path - Guard start_workers()'s try/catch/throw behind BOOST_SML_DISABLE_EXCEPTIONS so the opt-in header compiles under -fno-exceptions. A failed std::thread spawn there calls std::terminate, matching that configuration's contract. - Add include/stateforward/sml/utility/thread_pool_scheduler.hpp so the scheduler is reachable through the fork's public include path, consistent with co_sm / sm_pool / dispatch_table. --- include/boost/sml/utility/thread_pool_scheduler.hpp | 8 ++++++++ .../stateforward/sml/utility/thread_pool_scheduler.hpp | 7 +++++++ 2 files changed, 15 insertions(+) create mode 100644 include/stateforward/sml/utility/thread_pool_scheduler.hpp diff --git a/include/boost/sml/utility/thread_pool_scheduler.hpp b/include/boost/sml/utility/thread_pool_scheduler.hpp index 2dfcb90b..ee4f524e 100644 --- a/include/boost/sml/utility/thread_pool_scheduler.hpp +++ b/include/boost/sml/utility/thread_pool_scheduler.hpp @@ -241,6 +241,13 @@ class thread_pool_scheduler { for (std::size_t i = 0; i < capacity; ++i) { tasks_[i].sequence.store(i, std::memory_order_relaxed); } +#if BOOST_SML_DISABLE_EXCEPTIONS + // No-exceptions builds cannot observe a failed std::thread spawn (it calls + // std::terminate), so there is nothing to roll back; spawn directly. + for (std::size_t started = 0u; started < worker_count; ++started) { + workers_[started] = std::thread([this]() noexcept { worker_loop(); }); + } +#else std::size_t started = 0u; try { for (; started < worker_count; ++started) { @@ -258,6 +265,7 @@ class thread_pool_scheduler { } throw; } +#endif } void stop_workers() noexcept { diff --git a/include/stateforward/sml/utility/thread_pool_scheduler.hpp b/include/stateforward/sml/utility/thread_pool_scheduler.hpp new file mode 100644 index 00000000..594fed23 --- /dev/null +++ b/include/stateforward/sml/utility/thread_pool_scheduler.hpp @@ -0,0 +1,7 @@ +#ifndef STATEFORWARD_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP +#define STATEFORWARD_SML_UTILITY_THREAD_POOL_SCHEDULER_HPP + +#include +#include + +#endif From 4373d991c02b318f40de9832dd6a4101abdaf21f Mon Sep 17 00:00:00 2001 From: gabewillen Date: Mon, 29 Jun 2026 03:14:55 +0000 Subject: [PATCH 4/4] Address review: make the scheduled wait path consistently noexcept run_or_schedule_and_wait (and the ref's schedule / run_or_schedule_and_wait) may run work through a noexcept worker thunk that cannot propagate, so the conditional noexcept(noexcept(fn())) wrongly advertised propagation that only the inline branch could honor -- the same call would propagate or terminate by pool load. Mark the scheduled-capable entry points unconditionally noexcept (matching fifo_scheduler::schedule); pure-inline try_run_immediate stays conditional. Adds static_assert coverage of the contract. --- .../boost/sml/utility/thread_pool_scheduler.hpp | 13 +++++++++---- test/ft/thread_pool_scheduler.cpp | 14 ++++++++++++++ 2 files changed, 23 insertions(+), 4 deletions(-) diff --git a/include/boost/sml/utility/thread_pool_scheduler.hpp b/include/boost/sml/utility/thread_pool_scheduler.hpp index ee4f524e..6aea85e5 100644 --- a/include/boost/sml/utility/thread_pool_scheduler.hpp +++ b/include/boost/sml/utility/thread_pool_scheduler.hpp @@ -141,8 +141,13 @@ class thread_pool_scheduler { } } + // Unconditionally noexcept (like fifo_scheduler::schedule): when this falls to + // the scheduled path the task runs through a noexcept worker thunk, which + // cannot propagate an exception back to the waiter, so the API is consistently + // no-throw rather than propagate-or-terminate by pool load. Submit noexcept + // work (the RTC actor contract); a throwing task calls std::terminate. template - bool run_or_schedule_and_wait(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + bool run_or_schedule_and_wait(fn&& fn_in) noexcept { if (try_run_immediate(std::forward(fn_in))) { return true; } @@ -155,7 +160,7 @@ class thread_pool_scheduler { // and can safely let the flag go out of scope (no destroy-during-notify // fault). The wait is a bounded RTC join over one already-submitted task. std::atomic done{false}; - const bool scheduled = try_submit_and_signal([&fn_in]() noexcept(noexcept(fn_in())) { fn_in(); }, done); + const bool scheduled = try_submit_and_signal([&fn_in]() noexcept { fn_in(); }, done); if (!scheduled) { return false; } @@ -510,14 +515,14 @@ class thread_pool_scheduler_ref { } template - void schedule(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + void schedule(fn&& fn_in) noexcept { if (!scheduler_->run_or_schedule_and_wait(std::forward(fn_in))) { std::terminate(); } } template - bool run_or_schedule_and_wait(fn&& fn_in) noexcept(noexcept(std::forward(fn_in)())) { + bool run_or_schedule_and_wait(fn&& fn_in) noexcept { return scheduler_->run_or_schedule_and_wait(std::forward(fn_in)); } diff --git a/test/ft/thread_pool_scheduler.cpp b/test/ft/thread_pool_scheduler.cpp index 3bd46da2..0f1c98d2 100644 --- a/test/ft/thread_pool_scheduler.cpp +++ b/test/ft/thread_pool_scheduler.cpp @@ -8,6 +8,7 @@ #include #include #include +#include #if BOOST_SML_UTILITY_THREAD_POOL_SCHEDULER_ENABLED namespace policy = boost::sml::utility::policy; @@ -15,6 +16,19 @@ namespace policy = boost::sml::utility::policy; using pool_t = policy::thread_pool_scheduler<8>; using pool_ref_t = policy::thread_pool_scheduler_ref; +// A non-noexcept callable: the scheduled wait/schedule paths must stay +// unconditionally noexcept (their worker thunk cannot propagate), while the +// pure-inline try_run_immediate stays conditionally noexcept and may propagate. +struct may_throw_callable { + void operator()() const {} +}; +static_assert(noexcept(std::declval().run_or_schedule_and_wait(may_throw_callable{})), + "run_or_schedule_and_wait must be unconditionally noexcept"); +static_assert(noexcept(std::declval().schedule(may_throw_callable{})), + "schedule must be unconditionally noexcept"); +static_assert(!noexcept(std::declval().try_run_immediate(may_throw_callable{})), + "try_run_immediate stays conditionally noexcept (inline path propagates)"); + test thread_pool_scheduler_runs_task_inline_when_idle = [] { pool_t pool{}; pool_ref_t scheduler{pool};