From d6be8c250113b875ae16fe91e57bb63083892027 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Mon, 11 May 2026 13:25:47 -0700 Subject: [PATCH 1/8] feat: add server FDv2 heuristic fallback and recovery --- .../include/launchdarkly/async/promise.hpp | 24 ++ libs/internal/tests/promise_test.cpp | 28 +++ libs/server-sdk/src/CMakeLists.txt | 2 + .../source/fdv2_source_result.hpp | 13 +- .../source/ifdv2_condition.hpp | 97 ++++++++ .../source/ifdv2_synchronizer.hpp | 11 +- .../src/data_systems/fdv2/conditions.cpp | 156 ++++++++++++ .../src/data_systems/fdv2/conditions.hpp | 166 +++++++++++++ .../data_systems/fdv2/fdv2_data_system.cpp | 93 +++++-- .../data_systems/fdv2/fdv2_data_system.hpp | 25 ++ .../fdv2/polling_synchronizer.cpp | 39 +-- .../fdv2/polling_synchronizer.hpp | 13 +- .../fdv2/streaming_synchronizer.cpp | 22 +- .../fdv2/streaming_synchronizer.hpp | 9 - libs/server-sdk/tests/conditions_test.cpp | 228 ++++++++++++++++++ .../tests/fdv2_data_system_test.cpp | 176 +++++++++----- .../fdv2_streaming_synchronizer_test.cpp | 77 ++---- 17 files changed, 949 insertions(+), 230 deletions(-) create mode 100644 libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/conditions.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/conditions.hpp create mode 100644 libs/server-sdk/tests/conditions_test.cpp diff --git a/libs/internal/include/launchdarkly/async/promise.hpp b/libs/internal/include/launchdarkly/async/promise.hpp index 030f94680..ca9370334 100644 --- a/libs/internal/include/launchdarkly/async/promise.hpp +++ b/libs/internal/include/launchdarkly/async/promise.hpp @@ -534,6 +534,30 @@ Future WhenAny(Future... futures) { return result; } +// Vector overload of WhenAny. Returns a Future that resolves with +// the 0-based index of whichever input future resolves first. If the vector is +// empty, the returned future never resolves. +template +Future WhenAny(std::vector> const& futures) { + Promise promise; + Future result = promise.GetFuture(); + + auto shared_promise = + std::make_shared>(std::move(promise)); + + for (std::size_t i = 0; i < futures.size(); ++i) { + Future future = futures[i]; + future.Then( + [shared_promise, i](T const&) -> std::monostate { + shared_promise->Resolve(i); + return std::monostate{}; + }, + [](Continuation f) { f(); }); + } + + return result; +} + // MakeFuture returns an already-resolved Future. Useful in flattening Then // continuations where some branches produce a result immediately and others // return a Future, requiring a uniform Future return type across all diff --git a/libs/internal/tests/promise_test.cpp b/libs/internal/tests/promise_test.cpp index 8d5ecf08c..52867acf1 100644 --- a/libs/internal/tests/promise_test.cpp +++ b/libs/internal/tests/promise_test.cpp @@ -416,6 +416,34 @@ TEST(WhenAny, MixedTypesFirstWins) { EXPECT_EQ(std::get(*result.GetResult()), "hello"); } +// Verifies that the vector overload of WhenAny resolves with the index of +// the first future to resolve. +TEST(WhenAny, VectorFirstResolved) { + Promise p0; + Promise p1; + Promise p2; + + std::vector> futures{p0.GetFuture(), p1.GetFuture(), + p2.GetFuture()}; + Future result = WhenAny(futures); + + EXPECT_FALSE(result.IsFinished()); + p2.Resolve(99); + + auto r = result.WaitForResult(std::chrono::seconds(5)); + ASSERT_TRUE(r.has_value()); + EXPECT_EQ(*r, 2u); +} + +// Verifies that an empty vector produces a future that never resolves. +TEST(WhenAny, VectorEmptyNeverResolves) { + std::vector> futures; + Future result = WhenAny(futures); + + auto r = result.WaitForResult(std::chrono::milliseconds(50)); + EXPECT_FALSE(r.has_value()); +} + // Verifies that WhenAny resolves immediately if a future is already resolved. TEST(WhenAny, AlreadyResolved) { Promise p0; diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index bf3177473..0b6c67b01 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -60,6 +60,8 @@ target_sources(${LIBNAME} data_systems/fdv2/polling_synchronizer.cpp data_systems/fdv2/streaming_synchronizer.hpp data_systems/fdv2/streaming_synchronizer.cpp + data_systems/fdv2/conditions.hpp + data_systems/fdv2/conditions.cpp data_systems/fdv2/fdv2_data_system.hpp data_systems/fdv2/fdv2_data_system.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp diff --git a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp index a2a7589e3..63c807705 100644 --- a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp +++ b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp @@ -56,17 +56,8 @@ struct FDv2SourceResult { bool fdv1_fallback; }; - /** - * Next() returned because the timeout expired before a result arrived. - */ - struct Timeout {}; - - using Value = std::variant; + using Value = + std::variant; Value value; }; diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp new file mode 100644 index 000000000..63a078aa3 --- /dev/null +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp @@ -0,0 +1,97 @@ +#pragma once + +#include "fdv2_source_result.hpp" + +#include + +#include + +namespace launchdarkly::server_side::data_interfaces { + +/** + * A condition observes the orchestrator's stream of synchronizer results and + * fires when criteria for a synchronizer transition are met. + * + * Each condition plays one of two roles, identified by Type(): + * - kFallback: when fired, the orchestrator stops the active synchronizer + * and starts the next-preferred one. + * - kRecovery: when fired, the orchestrator stops the active fallback + * synchronizer and returns to the most-preferred synchronizer. + * + * Conditions are stateful: the orchestrator pushes results into a condition + * via Inform() so the condition can update its internal state (typically a + * timer). When the condition's criteria are satisfied, the future returned + * by Execute() resolves with the condition's Type. + * + * Conditions are single-use: once fired, they are not re-armed. The + * orchestrator builds a fresh condition for each synchronizer activation via + * an IFDv2ConditionFactory. + * + * Close() cancels any pending internal work (e.g., a timer) without resolving + * the future. After Close() returns the condition's future will not resolve. + */ +class IFDv2Condition { + public: + enum class Type { + kFallback, + kRecovery, + }; + + /** + * Returns a Future that resolves with the condition's Type once the + * condition's criteria are satisfied. May be called multiple times; each + * call returns a Future referring to the same underlying state. + */ + [[nodiscard]] virtual async::Future Execute() = 0; + + /** + * Pushes a synchronizer result into the condition so it can update any + * internal state (e.g., arm or cancel a timer). + */ + virtual void Inform(FDv2SourceResult const& result) = 0; + + /** + * Cancels any pending internal work and ensures the future will not + * resolve. Idempotent. + */ + virtual void Close() = 0; + + /** + * Returns the condition's role in the orchestrator. + */ + [[nodiscard]] virtual Type GetType() const = 0; + + virtual ~IFDv2Condition() = default; + IFDv2Condition(IFDv2Condition const&) = delete; + IFDv2Condition(IFDv2Condition&&) = delete; + IFDv2Condition& operator=(IFDv2Condition const&) = delete; + IFDv2Condition& operator=(IFDv2Condition&&) = delete; + + protected: + IFDv2Condition() = default; +}; + +/** + * Builds new IFDv2Condition instances on demand. Each call to Build() produces + * a fresh condition with no prior state. + */ +class IFDv2ConditionFactory { + public: + [[nodiscard]] virtual std::unique_ptr Build() = 0; + + /** + * Returns the type of conditions this factory builds. + */ + [[nodiscard]] virtual IFDv2Condition::Type GetType() const = 0; + + virtual ~IFDv2ConditionFactory() = default; + IFDv2ConditionFactory(IFDv2ConditionFactory const&) = delete; + IFDv2ConditionFactory(IFDv2ConditionFactory&&) = delete; + IFDv2ConditionFactory& operator=(IFDv2ConditionFactory const&) = delete; + IFDv2ConditionFactory& operator=(IFDv2ConditionFactory&&) = delete; + + protected: + IFDv2ConditionFactory() = default; +}; + +} // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp index 96bfc54c0..182994821 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer.hpp @@ -5,7 +5,6 @@ #include #include -#include #include namespace launchdarkly::server_side::data_interfaces { @@ -20,25 +19,19 @@ namespace launchdarkly::server_side::data_interfaces { class IFDv2Synchronizer { public: /** - * Returns a Future that resolves with the next result once it is available - * or the timeout expires. + * Returns a Future that resolves with the next result once it is + * available. * * On the first call, the synchronizer starts its underlying connection. * Subsequent calls continue reading from the same connection. * - * If the timeout expires before a result arrives, the future resolves with - * FDv2SourceResult::Timeout. The orchestrator uses this to evaluate - * fallback conditions. - * * Close() may be called from another thread to unblock Next(), in which * case the future resolves with FDv2SourceResult::Shutdown. * - * @param timeout Maximum time to wait for the next result. * @param selector The selector to send with the request, reflecting any * changesets applied since the previous call. */ virtual async::Future Next( - std::chrono::milliseconds timeout, data_model::Selector selector) = 0; /** diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp new file mode 100644 index 000000000..71640f378 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp @@ -0,0 +1,156 @@ +#include "conditions.hpp" + +#include + +#include +#include + +namespace launchdarkly::server_side::data_systems { + +using data_interfaces::FDv2SourceResult; +using data_interfaces::IFDv2Condition; + +TimedCondition::TimedCondition(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout) + : executor_(std::move(executor)), + timeout_(timeout), + state_(std::make_shared()) {} + +TimedCondition::~TimedCondition() { + Close(); +} + +async::Future TimedCondition::Execute() { + std::lock_guard lock(state_->mutex); + return state_->promise.GetFuture(); +} + +void TimedCondition::Close() { + std::lock_guard lock(state_->mutex); + if (state_->closed) { + return; + } + state_->closed = true; + if (state_->timer_cancel) { + state_->timer_cancel->Cancel(); + state_->timer_cancel.reset(); + } +} + +void TimedCondition::ArmTimer() { + Type const type = GetType(); + auto state = state_; + + std::lock_guard lock(state->mutex); + if (state->closed || state->timer_cancel.has_value()) { + return; + } + state->timer_cancel.emplace(); + async::Delay(executor_, timeout_, state->timer_cancel->GetToken()) + .Then( + [state, type](bool const& fired_normally) -> std::monostate { + if (fired_normally) { + state->promise.Resolve(type); + } + return {}; + }, + [](async::Continuation work) { work(); }); +} + +void TimedCondition::CancelTimer() { + std::lock_guard lock(state_->mutex); + if (state_->timer_cancel) { + state_->timer_cancel->Cancel(); + state_->timer_cancel.reset(); + } +} + +FallbackCondition::FallbackCondition(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout) + : TimedCondition(std::move(executor), timeout) {} + +void FallbackCondition::Inform(FDv2SourceResult const& result) { + if (std::get_if(&result.value)) { + CancelTimer(); + } else if (std::get_if(&result.value)) { + ArmTimer(); + } +} + +RecoveryCondition::RecoveryCondition(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout) + : TimedCondition(std::move(executor), timeout) { + ArmTimer(); +} + +void RecoveryCondition::Inform(FDv2SourceResult const&) {} + +FallbackConditionFactory::FallbackConditionFactory( + boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout) + : executor_(std::move(executor)), timeout_(timeout) {} + +std::unique_ptr FallbackConditionFactory::Build() { + return std::make_unique(executor_, timeout_); +} + +IFDv2Condition::Type FallbackConditionFactory::GetType() const { + return IFDv2Condition::Type::kFallback; +} + +RecoveryConditionFactory::RecoveryConditionFactory( + boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout) + : executor_(std::move(executor)), timeout_(timeout) {} + +std::unique_ptr RecoveryConditionFactory::Build() { + return std::make_unique(executor_, timeout_); +} + +IFDv2Condition::Type RecoveryConditionFactory::GetType() const { + return IFDv2Condition::Type::kRecovery; +} + +namespace { + +async::Future MakeAggregateFuture( + std::vector> const& conditions) { + std::vector> futures; + futures.reserve(conditions.size()); + for (auto const& condition : conditions) { + futures.push_back(condition->Execute()); + } + return async::WhenAny(futures).Then( + [futures](std::size_t const& idx) -> IFDv2Condition::Type { + return *futures[idx].GetResult(); + }, + async::kInlineExecutor); +} + +} // namespace + +Conditions::Conditions(std::vector> conditions) + : conditions_(std::move(conditions)), + future_(MakeAggregateFuture(conditions_)) {} + +Conditions::~Conditions() { + Close(); +} + +async::Future Conditions::GetFuture() const { + return future_; +} + +void Conditions::Inform(FDv2SourceResult const& result) { + for (auto const& condition : conditions_) { + condition->Inform(result); + } +} + +void Conditions::Close() { + for (auto const& condition : conditions_) { + condition->Close(); + } +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.hpp b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp new file mode 100644 index 000000000..71e3d0a93 --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp @@ -0,0 +1,166 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_condition.hpp" + +#include +#include + +#include + +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * Base class for conditions that fire after a duration elapses on the + * orchestrator's executor. Owns the result promise, the cancellation handle + * for the active timer (if any), and the state required to safely arm, + * cancel, and resolve the timer across threads. + * + * Derived classes implement Inform() to translate orchestrator events into + * arm/cancel actions on the timer. Subclasses also implement GetType() to + * report whether they are a fallback or recovery condition. + */ +class TimedCondition : public data_interfaces::IFDv2Condition { + public: + TimedCondition(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout); + + ~TimedCondition() override; + + [[nodiscard]] async::Future Execute() override; + + void Close() override; + + protected: + /** + * Arms the timer if not already armed. When the timer fires, the + * condition's future resolves with GetType(). Safe to call concurrently; + * a no-op if a timer is already armed or the condition is closed. + */ + void ArmTimer(); + + /** + * Cancels the active timer if armed, leaving the condition unresolved. + * Safe to call when no timer is armed. + */ + void CancelTimer(); + + private: + struct State { + std::mutex mutex; + bool closed = false; + async::Promise promise; + std::optional timer_cancel; + }; + + boost::asio::any_io_executor const executor_; + std::chrono::milliseconds const timeout_; + std::shared_ptr const state_; +}; + +/** + * Fires after the active synchronizer has been continuously interrupted for + * the configured timeout. Each CHANGE_SET result cancels any pending timer; + * the next Interrupted status re-arms it. + */ +class FallbackCondition final : public TimedCondition { + public: + FallbackCondition(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout); + + void Inform(data_interfaces::FDv2SourceResult const& result) override; + + [[nodiscard]] Type GetType() const override { return Type::kFallback; } +}; + +/** + * Fires after the active synchronizer has been running for the configured + * timeout, regardless of result content. The timer is started at + * construction; Inform() is a no-op. + */ +class RecoveryCondition final : public TimedCondition { + public: + RecoveryCondition(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout); + + void Inform(data_interfaces::FDv2SourceResult const& result) override; + + [[nodiscard]] Type GetType() const override { return Type::kRecovery; } +}; + +/** + * Builds fresh FallbackCondition instances on demand. + */ +class FallbackConditionFactory final + : public data_interfaces::IFDv2ConditionFactory { + public: + FallbackConditionFactory(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout); + + [[nodiscard]] std::unique_ptr Build() + override; + + [[nodiscard]] data_interfaces::IFDv2Condition::Type GetType() + const override; + + private: + boost::asio::any_io_executor const executor_; + std::chrono::milliseconds const timeout_; +}; + +/** + * Builds fresh RecoveryCondition instances on demand. + */ +class RecoveryConditionFactory final + : public data_interfaces::IFDv2ConditionFactory { + public: + RecoveryConditionFactory(boost::asio::any_io_executor executor, + std::chrono::milliseconds timeout); + + [[nodiscard]] std::unique_ptr Build() + override; + + [[nodiscard]] data_interfaces::IFDv2Condition::Type GetType() + const override; + + private: + boost::asio::any_io_executor const executor_; + std::chrono::milliseconds const timeout_; +}; + +/** + * Aggregates a set of conditions into a single Future that resolves with the + * type of the first condition to fire. Inform() and Close() forward to every + * underlying condition. If constructed with no conditions, GetFuture() + * returns a Future that never resolves. + */ +class Conditions final { + public: + explicit Conditions( + std::vector> + conditions); + + ~Conditions(); + + Conditions(Conditions const&) = delete; + Conditions(Conditions&&) = delete; + Conditions& operator=(Conditions const&) = delete; + Conditions& operator=(Conditions&&) = delete; + + [[nodiscard]] async::Future + GetFuture() const; + + void Inform(data_interfaces::FDv2SourceResult const& result); + + void Close(); + + private: + std::vector> conditions_; + async::Future future_; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index edbd55e4e..496934798 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -5,7 +5,6 @@ #include #include -#include #include #include @@ -21,9 +20,6 @@ struct overloaded : Ts... { template overloaded(Ts...) -> overloaded; -// Default until fallback/recovery is implemented. -constexpr std::chrono::hours kSynchronizerNextTimeout{24}; - } // namespace FDv2DataSystem::FDv2DataSystem( @@ -31,6 +27,10 @@ FDv2DataSystem::FDv2DataSystem( initializer_factories, std::vector> synchronizer_factories, + std::unique_ptr + fallback_condition_factory, + std::unique_ptr + recovery_condition_factory, boost::asio::any_io_executor ioc, data_components::DataSourceStatusManager* status_manager, Logger const& logger) @@ -38,6 +38,8 @@ FDv2DataSystem::FDv2DataSystem( ioc_(std::move(ioc)), initializer_factories_(std::move(initializer_factories)), synchronizer_factories_(std::move(synchronizer_factories)), + fallback_condition_factory_(std::move(fallback_condition_factory)), + recovery_condition_factory_(std::move(recovery_condition_factory)), status_manager_(status_manager), store_(), change_notifier_(store_, store_), @@ -47,7 +49,8 @@ FDv2DataSystem::FDv2DataSystem( initializer_index_(0), synchronizer_index_(0), active_initializer_(nullptr), - active_synchronizer_(nullptr) {} + active_synchronizer_(nullptr), + active_conditions_(nullptr) {} FDv2DataSystem::~FDv2DataSystem() { Close(); @@ -62,6 +65,9 @@ void FDv2DataSystem::Close() { if (active_synchronizer_) { active_synchronizer_->Close(); } + if (active_conditions_) { + active_conditions_->Close(); + } status_manager_->SetState(DataSourceStatus::DataSourceState::kOff); } @@ -170,10 +176,6 @@ void FDv2DataSystem::OnInitializerResult( LD_LOG(logger_, LogLevel::kDebug) << Identity() << ": ignoring goodbye from initializer"; }, - [&](Result::Timeout const&) { - LD_LOG(logger_, LogLevel::kDebug) - << Identity() << ": ignoring timeout from initializer"; - }, }, result.value); @@ -204,8 +206,11 @@ void FDv2DataSystem::StartSynchronizers() { exhausted = true; cycled_synchronizers = synchronizer_index_ > 0; } else { - auto& factory = synchronizer_factories_[synchronizer_index_++]; + auto& factory = synchronizer_factories_[synchronizer_index_]; active_synchronizer_ = factory->Build(); + active_conditions_ = + BuildConditionsForSynchronizer(synchronizer_index_); + ++synchronizer_index_; } } @@ -234,11 +239,17 @@ void FDv2DataSystem::RunSynchronizerNext() { if (closed_ || !active_synchronizer_) { return; } - active_synchronizer_->Next(kSynchronizerNextTimeout, selector_) + auto next_future = active_synchronizer_->Next(selector_); + auto cond_future = active_conditions_->GetFuture(); + async::WhenAny(cond_future, next_future) .Then( - [this](data_interfaces::FDv2SourceResult const& result) - -> std::monostate { - OnSynchronizerResult(result); + [this, cond_future, + next_future](std::size_t const& idx) -> std::monostate { + if (idx == 0) { + OnConditionFired(*cond_future.GetResult()); + } else { + OnSynchronizerResult(*next_future.GetResult()); + } return {}; }, [ioc = ioc_](async::Continuation work) { @@ -246,10 +257,58 @@ void FDv2DataSystem::RunSynchronizerNext() { }); } +void FDv2DataSystem::OnConditionFired( + data_interfaces::IFDv2Condition::Type type) { + using Type = data_interfaces::IFDv2Condition::Type; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + // Destructors close the active synchronizer and conditions. + active_synchronizer_.reset(); + active_conditions_.reset(); + if (type == Type::kRecovery) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": recovery condition met"; + synchronizer_index_ = 0; + } else { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": fallback condition met"; + } + } + StartSynchronizers(); +} + +std::unique_ptr FDv2DataSystem::BuildConditionsForSynchronizer( + std::size_t synchronizer_position) const { + std::vector> conditions; + if (synchronizer_factories_.size() <= 1) { + return std::make_unique(std::move(conditions)); + } + if (fallback_condition_factory_) { + conditions.push_back(fallback_condition_factory_->Build()); + } + if (synchronizer_position > 0 && recovery_condition_factory_) { + conditions.push_back(recovery_condition_factory_->Build()); + } + return std::make_unique(std::move(conditions)); +} + void FDv2DataSystem::OnSynchronizerResult( data_interfaces::FDv2SourceResult result) { using Result = data_interfaces::FDv2SourceResult; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + if (active_conditions_) { + active_conditions_->Inform(result); + } + } + bool got_shutdown = false; bool advance = false; @@ -283,10 +342,6 @@ void FDv2DataSystem::OnSynchronizerResult( << Identity() << ": ignoring goodbye from synchronizer" << (gb.reason ? ": " + *gb.reason : ""); }, - [&](Result::Timeout const&) { - LD_LOG(logger_, LogLevel::kDebug) - << Identity() << ": synchronizer timed out; retrying"; - }, }, result.value); @@ -294,10 +349,12 @@ void FDv2DataSystem::OnSynchronizerResult( std::lock_guard lock(mutex_); if (closed_ || got_shutdown) { active_synchronizer_.reset(); + active_conditions_.reset(); return; } if (advance) { active_synchronizer_.reset(); + active_conditions_.reset(); } } diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp index b359945f9..fb1ed46e3 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp @@ -3,9 +3,11 @@ #include "../../data_components/change_notifier/change_notifier.hpp" #include "../../data_components/memory_store/memory_store.hpp" #include "../../data_components/status_notifications/data_source_status_manager.hpp" +#include "../../data_interfaces/source/ifdv2_condition.hpp" #include "../../data_interfaces/source/ifdv2_initializer_factory.hpp" #include "../../data_interfaces/source/ifdv2_synchronizer_factory.hpp" #include "../../data_interfaces/system/idata_system.hpp" +#include "conditions.hpp" #include #include @@ -141,6 +143,13 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { * order during the initialization phase. * @param synchronizer_factories Factories that build synchronizers, used * in order for ongoing updates after initialization. + * @param fallback_condition_factory Factory for the per-synchronizer + * fallback condition. May be null. If null, fallback transitions + * are never triggered; synchronizers only rotate on terminal errors. + * @param recovery_condition_factory Factory for the per-synchronizer + * recovery condition. May be null. If null, the orchestrator does + * not return to a more-preferred synchronizer once it has fallen + * back. * @param ioc Executor on which orchestration callbacks run. * @param status_manager Non-owning. Must outlive this object; the caller * is responsible for ensuring this. Used to publish data-source @@ -153,6 +162,10 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { initializer_factories, std::vector> synchronizer_factories, + std::unique_ptr + fallback_condition_factory, + std::unique_ptr + recovery_condition_factory, boost::asio::any_io_executor ioc, data_components::DataSourceStatusManager* status_manager, Logger const& logger); @@ -233,6 +246,13 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { void StartSynchronizers(); void RunSynchronizerNext(); void OnSynchronizerResult(data_interfaces::FDv2SourceResult result); + void OnConditionFired(data_interfaces::IFDv2Condition::Type type); + + // Builds the conditions to apply to a synchronizer at the given chain + // position. Reads only const-after-construction state, so no + // synchronization is required. + std::unique_ptr BuildConditionsForSynchronizer( + std::size_t synchronizer_position) const; // Applies a typed FDv2 changeset to the in-memory store and updates the // tracked selector if the changeset's selector is non-empty. @@ -249,6 +269,10 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { std::vector< std::unique_ptr> const synchronizer_factories_; + std::unique_ptr const + fallback_condition_factory_; + std::unique_ptr const + recovery_condition_factory_; // Non-owning. Lifetime guaranteed by the caller (see constructor doc). data_components::DataSourceStatusManager* const status_manager_; @@ -269,6 +293,7 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { std::size_t synchronizer_index_; std::unique_ptr active_initializer_; std::unique_ptr active_synchronizer_; + std::unique_ptr active_conditions_; }; } // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp index c052d4827..84df5e0f3 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.cpp @@ -103,10 +103,8 @@ FDv2PollingSynchronizer::~FDv2PollingSynchronizer() { } async::Future FDv2PollingSynchronizer::Next( - std::chrono::milliseconds timeout, data_model::Selector selector) { - return DoNext(state_, close_promise_.GetFuture(), timeout, - std::move(selector)); + return DoNext(state_, close_promise_.GetFuture(), std::move(selector)); } void FDv2PollingSynchronizer::Close() { @@ -121,7 +119,6 @@ std::string const& FDv2PollingSynchronizer::Identity() const { /* static */ async::Future FDv2PollingSynchronizer::DoNext( std::shared_ptr state, async::Future closed, - std::chrono::milliseconds timeout, data_model::Selector selector) { if (closed.IsFinished()) { return async::MakeFuture( @@ -129,18 +126,12 @@ std::string const& FDv2PollingSynchronizer::Identity() const { } async::CancellationSource cancel; - auto now = std::chrono::steady_clock::now(); - auto timeout_deadline = now + timeout; - auto timeout_future = state->Delay(timeout, cancel.GetToken()); - - // Figure out how much to delay before starting. auto delay_future = state->CreatePollDelayFuture(cancel.GetToken()); - return async::WhenAny(closed, std::move(timeout_future), - std::move(delay_future)) + return async::WhenAny(closed, std::move(delay_future)) .Then( [state = std::move(state), closed = std::move(closed), - timeout_deadline, selector = std::move(selector), + selector = std::move(selector), cancel = std::move(cancel)](std::size_t const& idx) mutable -> async::Future { cancel.Cancel(); @@ -148,12 +139,7 @@ std::string const& FDv2PollingSynchronizer::Identity() const { return async::MakeFuture( FDv2SourceResult{FDv2SourceResult::Shutdown{}}); } - if (idx == 1) { - return async::MakeFuture( - FDv2SourceResult{FDv2SourceResult::Timeout{}}); - } - return DoPoll(std::move(state), std::move(closed), - timeout_deadline, selector); + return DoPoll(std::move(state), std::move(closed), selector); }, async::kInlineExecutor); } @@ -161,7 +147,6 @@ std::string const& FDv2PollingSynchronizer::Identity() const { /* static */ async::Future FDv2PollingSynchronizer::DoPoll( std::shared_ptr state, async::Future closed, - std::chrono::time_point timeout_deadline, data_model::Selector const& selector) { if (closed.IsFinished()) { return async::MakeFuture( @@ -170,27 +155,15 @@ std::string const& FDv2PollingSynchronizer::Identity() const { state->RecordPollStarted(); - async::CancellationSource cancel; - auto now = std::chrono::steady_clock::now(); - auto timeout_future = - state->Delay(timeout_deadline - now, cancel.GetToken()); - - // TODO: pass cancel.GetToken() to Request() once HTTP requests support it. auto http_future = state->Request(selector); - return async::WhenAny(std::move(closed), std::move(timeout_future), - http_future) + return async::WhenAny(std::move(closed), http_future) .Then( - [state = std::move(state), http_future = std::move(http_future), - cancel = std::move(cancel)]( + [state = std::move(state), http_future = std::move(http_future)]( std::size_t const& idx) mutable -> FDv2SourceResult { - cancel.Cancel(); if (idx == 0) { return FDv2SourceResult{FDv2SourceResult::Shutdown{}}; } - if (idx == 1) { - return FDv2SourceResult{FDv2SourceResult::Timeout{}}; - } return state->HandlePollResult(*http_future.GetResult()); }, async::kInlineExecutor); diff --git a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp index 2a8bd3231..954acbc9d 100644 --- a/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/polling_synchronizer.hpp @@ -47,7 +47,6 @@ class FDv2PollingSynchronizer final ~FDv2PollingSynchronizer() override; async::Future Next( - std::chrono::milliseconds timeout, data_model::Selector selector) override; void Close() override; @@ -110,25 +109,21 @@ class FDv2PollingSynchronizer final }; /** - * Waits for the poll interval, then delegates to DoPoll. - * Resolves with Shutdown if closed, or Timeout if the timeout expires - * first. + * Waits for the poll interval, then delegates to DoPoll. Resolves with + * Shutdown if closed before the next poll begins. */ static async::Future DoNext( std::shared_ptr state, async::Future closed, - std::chrono::milliseconds timeout, data_model::Selector selector); /** - * Issues a single HTTP poll request and returns the result. - * Resolves with Shutdown if closed, or Timeout if timeout_deadline passes - * first. + * Issues a single HTTP poll request and returns the result. Resolves + * with Shutdown if closed before the request completes. */ static async::Future DoPoll( std::shared_ptr state, async::Future closed, - std::chrono::time_point timeout_deadline, data_model::Selector const& selector); // Resolved by Close() or on destruction, cancelling any outstanding Next() diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp index cc2c861a2..e4a5f0a6f 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -322,12 +322,6 @@ void FDv2StreamingSynchronizer::State::Shutdown() { } } -async::Future FDv2StreamingSynchronizer::State::Delay( - std::chrono::milliseconds duration, - async::CancellationToken token) { - return async::Delay(executor_, duration, std::move(token)); -} - FDv2StreamingSynchronizer::FDv2StreamingSynchronizer( boost::asio::any_io_executor const& executor, Logger const& logger, @@ -347,7 +341,6 @@ FDv2StreamingSynchronizer::~FDv2StreamingSynchronizer() { } async::Future FDv2StreamingSynchronizer::Next( - std::chrono::milliseconds timeout, data_model::Selector selector) { auto closed = close_promise_.GetFuture(); if (closed.IsFinished()) { @@ -360,25 +353,14 @@ async::Future FDv2StreamingSynchronizer::Next( return result_future; } - async::CancellationSource cancel; - auto timeout_future = state_->Delay(timeout, cancel.GetToken()); - - return async::WhenAny(closed, timeout_future, result_future) + return async::WhenAny(closed, result_future) .Then( - [state = state_, cancel = std::move(cancel), result_future]( + [state = state_, result_future]( std::size_t const& idx) mutable -> FDv2SourceResult { - cancel.Cancel(); if (idx == 0) { state->ClearPendingPromise(); return FDv2SourceResult{FDv2SourceResult::Shutdown{}}; } - if (idx == 1) { - state->ClearPendingPromise(); - if (result_future.IsFinished()) { - return *result_future.GetResult(); - } - return FDv2SourceResult{FDv2SourceResult::Timeout{}}; - } return *result_future.GetResult(); }, async::kInlineExecutor); diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp index e5686f9bb..95974ca87 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp @@ -54,7 +54,6 @@ class FDv2StreamingSynchronizer final ~FDv2StreamingSynchronizer() override; async::Future Next( - std::chrono::milliseconds timeout, data_model::Selector selector) override; void Close() override; @@ -104,14 +103,6 @@ class FDv2StreamingSynchronizer final */ void Shutdown(); - /** - * Returns a Future that resolves after the given duration. Resolves - * early with false if the token is cancelled before the duration - * elapses. - */ - async::Future Delay(std::chrono::milliseconds duration, - async::CancellationToken token = {}); - private: using HttpRequest = boost::beast::http::request; diff --git a/libs/server-sdk/tests/conditions_test.cpp b/libs/server-sdk/tests/conditions_test.cpp new file mode 100644 index 000000000..722b8e759 --- /dev/null +++ b/libs/server-sdk/tests/conditions_test.cpp @@ -0,0 +1,228 @@ +#include + +#include + +#include +#include + +#include +#include + +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; +using namespace std::chrono_literals; + +namespace { + +// Holds an io_context running on a worker thread; the executor produced by +// GetExecutor() is what the conditions schedule timer work on. Using a real +// running executor exercises the full async::Delay path including timer +// cancellation and thread-handoff of the resolution callback. +class RunningIoContext { + public: + RunningIoContext() + : work_guard_(boost::asio::make_work_guard(ioc_)), + thread_([this] { ioc_.run(); }) {} + + ~RunningIoContext() { + work_guard_.reset(); + ioc_.stop(); + thread_.join(); + } + + boost::asio::any_io_executor GetExecutor() { return ioc_.get_executor(); } + + private: + boost::asio::io_context ioc_; + boost::asio::executor_work_guard + work_guard_; + std::thread thread_; +}; + +} // namespace + +// ============================================================================ +// FallbackCondition +// ============================================================================ + +TEST(FallbackConditionTest, InterruptedArmsTimerWhichFiresAfterTimeout) { + RunningIoContext ioc; + FallbackCondition condition(ioc.GetExecutor(), 100ms); + auto future = condition.Execute(); + + condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", + std::chrono::system_clock::now()}, + false, + }}); + + auto result = future.WaitForResult(1s); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kFallback, *result); +} + +TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) { + RunningIoContext ioc; + FallbackCondition condition(ioc.GetExecutor(), 100ms); + auto future = condition.Execute(); + + // Arm the timer with Interrupted, then cancel via ChangeSet before it + // fires. + condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", + std::chrono::system_clock::now()}, + false, + }}); + condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{ + launchdarkly::data_model::ChangeSet{ + launchdarkly::data_model::ChangeSetType::kFull, + {}, + launchdarkly::data_model::Selector{}, + }, + false, + }}); + + // Wait well past the 100ms threshold; future should remain unresolved. + std::this_thread::sleep_for(300ms); + EXPECT_FALSE(future.IsFinished()); +} + +TEST(FallbackConditionTest, CloseCancelsActiveTimer) { + RunningIoContext ioc; + FallbackCondition condition(ioc.GetExecutor(), 100ms); + auto future = condition.Execute(); + + condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", + std::chrono::system_clock::now()}, + false, + }}); + condition.Close(); + + std::this_thread::sleep_for(200ms); + EXPECT_FALSE(future.IsFinished()); +} + +// ============================================================================ +// RecoveryCondition +// ============================================================================ + +TEST(RecoveryConditionTest, TimerArmedAtConstructionFiresAfterTimeout) { + RunningIoContext ioc; + RecoveryCondition condition(ioc.GetExecutor(), 100ms); + + auto result = condition.Execute().WaitForResult(1s); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result); +} + +TEST(RecoveryConditionTest, InformDoesNotAffectTimer) { + RunningIoContext ioc; + RecoveryCondition condition(ioc.GetExecutor(), 100ms); + auto future = condition.Execute(); + + // Recovery is purely time-based; results from the synchronizer should not + // disturb the timer in either direction. + condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", + std::chrono::system_clock::now()}, + false, + }}); + condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{ + launchdarkly::data_model::ChangeSet{ + launchdarkly::data_model::ChangeSetType::kFull, + {}, + launchdarkly::data_model::Selector{}, + }, + false, + }}); + + auto result = future.WaitForResult(1s); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result); +} + +TEST(RecoveryConditionTest, CloseCancelsActiveTimer) { + RunningIoContext ioc; + RecoveryCondition condition(ioc.GetExecutor(), 100ms); + auto future = condition.Execute(); + + condition.Close(); + + std::this_thread::sleep_for(200ms); + EXPECT_FALSE(future.IsFinished()); +} + +// ============================================================================ +// Conditions +// ============================================================================ + +TEST(ConditionsTest, EmptyAggregateNeverResolves) { + Conditions conditions({}); + + auto result = conditions.GetFuture().WaitForResult(50ms); + + EXPECT_FALSE(result.has_value()); +} + +TEST(ConditionsTest, AggregateResolvesWithTypeOfFirstFiringCondition) { + RunningIoContext ioc; + + // Recovery's timer is much shorter, so it should win the race. + std::vector> conds; + conds.push_back(std::make_unique(ioc.GetExecutor(), 1s)); + conds.push_back( + std::make_unique(ioc.GetExecutor(), 100ms)); + Conditions conditions(std::move(conds)); + + auto result = conditions.GetFuture().WaitForResult(1s); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result); +} + +TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) { + RunningIoContext ioc; + + // Fallback's timer is shorter than recovery's; informing Interrupted arms + // the fallback timer, which will then beat recovery. + std::vector> conds; + conds.push_back( + std::make_unique(ioc.GetExecutor(), 100ms)); + conds.push_back(std::make_unique(ioc.GetExecutor(), 1s)); + Conditions conditions(std::move(conds)); + + conditions.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", + std::chrono::system_clock::now()}, + false, + }}); + + auto result = conditions.GetFuture().WaitForResult(1s); + + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kFallback, *result); +} + +TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) { + RunningIoContext ioc; + + std::vector> conds; + conds.push_back( + std::make_unique(ioc.GetExecutor(), 100ms)); + conds.push_back( + std::make_unique(ioc.GetExecutor(), 100ms)); + Conditions conditions(std::move(conds)); + + conditions.Close(); + + auto result = conditions.GetFuture().WaitForResult(200ms); + EXPECT_FALSE(result.has_value()); +} diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index 20db235f1..c113390c6 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -64,27 +64,31 @@ class MockInitializer : public IFDv2Initializer { }; // Synchronizer that resolves successive Next() calls from a queue of results. -// Once the queue is exhausted, returns Shutdown to terminate orchestration. +// Once the queue is exhausted, returns Shutdown to terminate orchestration — +// unless `stall_after_results` is set, in which case the next Future never +// resolves (useful for tests where a condition's timer must win the race). class MockSynchronizer : public IFDv2Synchronizer { public: - using NextCall = std::pair; - MockSynchronizer(std::vector results, bool* closed_flag = nullptr, - std::vector* next_calls = nullptr) + std::vector* next_calls = nullptr, + bool stall_after_results = false) : results_(std::move(results)), closed_flag_(closed_flag), - next_calls_(next_calls) {} + next_calls_(next_calls), + stall_after_results_(stall_after_results) {} async::Future Next( - std::chrono::milliseconds timeout, data_model::Selector selector) override { if (next_calls_) { - next_calls_->push_back({timeout, selector}); + next_calls_->push_back(selector); } if (call_index_ < results_.size()) { return async::MakeFuture(std::move(results_[call_index_++])); } + if (stall_after_results_) { + return stall_promise_.GetFuture(); + } return async::MakeFuture( FDv2SourceResult{FDv2SourceResult::Shutdown{}}); } @@ -104,7 +108,9 @@ class MockSynchronizer : public IFDv2Synchronizer { std::vector results_; std::size_t call_index_ = 0; bool* closed_flag_; - std::vector* next_calls_; + std::vector* next_calls_; + bool stall_after_results_; + async::Promise stall_promise_; }; // One-shot factory: returns a pre-supplied source on its first Build() call. @@ -142,7 +148,8 @@ class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory { // destruction with orchestration in flight. class StalledInitializer : public IFDv2Initializer { public: - explicit StalledInitializer(bool* closed_flag) : closed_flag_(closed_flag) {} + explicit StalledInitializer(bool* closed_flag) + : closed_flag_(closed_flag) {} async::Future Run() override { return promise_.GetFuture(); @@ -171,9 +178,7 @@ class StalledSynchronizer : public IFDv2Synchronizer { explicit StalledSynchronizer(bool* closed_flag) : closed_flag_(closed_flag) {} - async::Future Next( - std::chrono::milliseconds, - data_model::Selector) override { + async::Future Next(data_model::Selector) override { return promise_.GetFuture(); } @@ -221,7 +226,8 @@ TEST(FDv2DataSystemTest, OfflineMode_NoFactories_StatusValid) { boost::asio::io_context ioc; data_components::DataSourceStatusManager status_manager; - FDv2DataSystem ds({}, {}, ioc.get_executor(), &status_manager, logger); + FDv2DataSystem ds({}, {}, nullptr, nullptr, ioc.get_executor(), + &status_manager, logger); // Initialize with no sources; orchestration should not be posted. ds.Initialize(); @@ -238,7 +244,8 @@ TEST(FDv2DataSystemTest, Destructor_TransitionsStatusToOff) { data_components::DataSourceStatusManager status_manager; { - FDv2DataSystem ds({}, {}, ioc.get_executor(), &status_manager, logger); + FDv2DataSystem ds({}, {}, nullptr, nullptr, ioc.get_executor(), + &status_manager, logger); ds.Initialize(); ASSERT_EQ(status_manager.Status().State(), DataSourceStatus::DataSourceState::kValid); @@ -273,8 +280,8 @@ TEST(FDv2DataSystemTest, InitializerWithBasis_AppliesAndStatusValid) { initializers.push_back( std::make_unique(std::move(initializer))); - FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); // Run the initializer to completion. ds.Initialize(); @@ -322,8 +329,8 @@ TEST(FDv2DataSystemTest, InitializerInterrupted_AdvancesToNextInitializer) { initializers.push_back(std::move(first_factory)); initializers.push_back(std::move(second_factory)); - FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); // Run; first initializer fails, orchestrator should fall through to // the second. @@ -385,8 +392,8 @@ TEST(FDv2DataSystemTest, initializers.push_back(std::move(first_factory)); initializers.push_back(std::move(second_factory)); - FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -423,8 +430,8 @@ TEST(FDv2DataSystemTest, initializers.push_back(std::move(first_factory)); initializers.push_back(std::move(second_factory)); - FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -452,8 +459,8 @@ TEST(FDv2DataSystemTest, InitializerOnly_AllFail_TransitionsToOff) { initializers.push_back( std::make_unique(std::move(init))); - FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); // Run: initializer fails and there are no synchronizers to fall through to. ds.Initialize(); @@ -490,8 +497,8 @@ TEST(FDv2DataSystemTest, SynchronizerChangeSet_AppliesAndStatusValid) { synchronizers.push_back( std::make_unique(std::move(sync))); - FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); // No initializers; orchestrator should hand directly to the synchronizer. ds.Initialize(); @@ -531,8 +538,8 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_StaysOnSameSynchronizer) { synchronizers.push_back(std::move(first_factory)); synchronizers.push_back(std::move(second_factory)); - FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -568,8 +575,8 @@ TEST(FDv2DataSystemTest, SynchronizerInterrupted_RetriesSameSynchronizer) { auto* factory_ptr = factory.get(); synchronizers.push_back(std::move(factory)); - FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -596,7 +603,7 @@ TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) { // Synchronizer first returns a partial changeset with a NEW selector, // then exhausts (Shutdown) on the next call. - std::vector next_calls; + std::vector next_calls; std::vector results; results.push_back(FDv2SourceResult{FDv2SourceResult::ChangeSet{ data_model::ChangeSet{ @@ -614,7 +621,8 @@ TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) { std::make_unique(std::move(sync))); FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), - ioc.get_executor(), &status_manager, logger); + nullptr, nullptr, ioc.get_executor(), &status_manager, + logger); ds.Initialize(); ioc.run(); @@ -622,12 +630,12 @@ TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) { // Two Next calls: first with the initializer's selector, second with the // selector updated by the partial changeset. ASSERT_EQ(2u, next_calls.size()); - ASSERT_TRUE(next_calls[0].second.value.has_value()); - EXPECT_EQ(1, next_calls[0].second.value->version); - EXPECT_EQ("state-1", next_calls[0].second.value->state); - ASSERT_TRUE(next_calls[1].second.value.has_value()); - EXPECT_EQ(2, next_calls[1].second.value->version); - EXPECT_EQ("state-2", next_calls[1].second.value->state); + ASSERT_TRUE(next_calls[0].value.has_value()); + EXPECT_EQ(1, next_calls[0].value->version); + EXPECT_EQ("state-1", next_calls[0].value->state); + ASSERT_TRUE(next_calls[1].value.has_value()); + EXPECT_EQ(2, next_calls[1].value->version); + EXPECT_EQ("state-2", next_calls[1].value->state); } TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) { @@ -651,7 +659,7 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) { // preservation, the SDK would reconnect with stale or empty payload // state on every Goodbye, forcing the server into expensive xfer-full // responses instead of efficient xfer-changes. - std::vector next_calls; + std::vector next_calls; std::vector results; results.push_back(FDv2SourceResult{FDv2SourceResult::ChangeSet{ data_model::ChangeSet{ @@ -671,7 +679,8 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) { std::make_unique(std::move(sync))); FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), - ioc.get_executor(), &status_manager, logger); + nullptr, nullptr, ioc.get_executor(), &status_manager, + logger); ds.Initialize(); ioc.run(); @@ -681,17 +690,17 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) { // regress the selector. ASSERT_EQ(3u, next_calls.size()); - ASSERT_TRUE(next_calls[0].second.value.has_value()); - EXPECT_EQ(1, next_calls[0].second.value->version); - EXPECT_EQ("state-1", next_calls[0].second.value->state); + ASSERT_TRUE(next_calls[0].value.has_value()); + EXPECT_EQ(1, next_calls[0].value->version); + EXPECT_EQ("state-1", next_calls[0].value->state); - ASSERT_TRUE(next_calls[1].second.value.has_value()); - EXPECT_EQ(2, next_calls[1].second.value->version); - EXPECT_EQ("state-2", next_calls[1].second.value->state); + ASSERT_TRUE(next_calls[1].value.has_value()); + EXPECT_EQ(2, next_calls[1].value->version); + EXPECT_EQ("state-2", next_calls[1].value->state); - ASSERT_TRUE(next_calls[2].second.value.has_value()); - EXPECT_EQ(2, next_calls[2].second.value->version); - EXPECT_EQ("state-2", next_calls[2].second.value->state); + ASSERT_TRUE(next_calls[2].value.has_value()); + EXPECT_EQ(2, next_calls[2].value->version); + EXPECT_EQ("state-2", next_calls[2].value->state); } TEST(FDv2DataSystemTest, @@ -725,8 +734,8 @@ TEST(FDv2DataSystemTest, synchronizers.push_back(std::move(first_factory)); synchronizers.push_back(std::move(second_factory)); - FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -762,8 +771,8 @@ TEST(FDv2DataSystemTest, SynchronizerCycledExhaustion_TransitionsToOff) { synchronizers.push_back( std::make_unique(std::move(sync))); - FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); // Synchronizer fails terminally; no more factories to try. ds.Initialize(); @@ -774,6 +783,55 @@ TEST(FDv2DataSystemTest, SynchronizerCycledExhaustion_TransitionsToOff) { DataSourceStatus::DataSourceState::kOff); } +// ============================================================================ +// Fallback and recovery +// ============================================================================ + +TEST(FDv2DataSystemTest, FallbackConditionFires_AdvancesToNextSynchronizer) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Primary returns Interrupted once and then stalls, leaving the fallback + // condition's timer free to win the race. + auto primary_sync = std::make_unique( + std::vector{ + FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, + "boom", std::chrono::system_clock::now()}, + false, + }}, + }, + nullptr, nullptr, /*stall_after_results=*/true); + auto primary_factory = + std::make_unique(std::move(primary_sync)); + + // Secondary returns Shutdown on first call, ending orchestration cleanly. + auto secondary_sync = + std::make_unique(std::vector{}); + auto secondary_factory = + std::make_unique(std::move(secondary_sync)); + auto* secondary_factory_ptr = secondary_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(primary_factory)); + synchronizers.push_back(std::move(secondary_factory)); + + auto fallback_factory = + std::make_unique(ioc.get_executor(), 50ms); + + FDv2DataSystem ds({}, std::move(synchronizers), std::move(fallback_factory), + nullptr, ioc.get_executor(), &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // Fallback condition fired after Interrupted, so the orchestrator + // advanced to the secondary. + EXPECT_EQ(1, secondary_factory_ptr->build_count_); +} + // ============================================================================ // Destruction protocol: in-flight orchestration // ============================================================================ @@ -800,8 +858,8 @@ TEST(FDv2DataSystemTest, std::make_unique(std::move(initializer))); { - FDv2DataSystem ds(std::move(initializers), {}, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); // RunNextInitializer runs, builds the source, calls Run().Then(...). // Run() returns an unresolved Future; the orchestrator's continuation @@ -826,12 +884,12 @@ TEST(FDv2DataSystemTest, auto synchronizer = std::make_unique(&synchronizer_closed); std::vector> synchronizers; - synchronizers.push_back(std::make_unique( - std::move(synchronizer))); + synchronizers.push_back( + std::make_unique(std::move(synchronizer))); { - FDv2DataSystem ds({}, std::move(synchronizers), ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); // No initializers -> RunNextInitializer immediately exhausts -> // StartSynchronizers builds the synchronizer and calls Next(). diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp index 9ae12f0df..5a0f166f5 100644 --- a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -149,7 +149,7 @@ TEST(FDv2StreamingSynchronizerTest, NextBadEndpointUrlReturnsTerminalError) { // Act: trigger setup with a malformed streaming URL. URL parsing happens // inside EnsureStarted on the first Next call. - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: TerminalError tells the orchestrator not to retry, which is the @@ -173,7 +173,7 @@ TEST(FDv2StreamingSynchronizerTest, CloseBeforeNextReturnsShutdown) { synchronizer.Close(); // Act: call Next on an already-closed synchronizer. - auto future = synchronizer.Next(5s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: Shutdown is delivered immediately (the outer Next short-circuits @@ -197,7 +197,7 @@ TEST(FDv2StreamingSynchronizerTest, CloseDuringPendingNextResolvesShutdown) { FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); // Act: start a pending Next, then Close while it is still pending. - auto future = synchronizer.Next(5s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); synchronizer.Close(); auto result = future.WaitForResult(2s); @@ -208,27 +208,6 @@ TEST(FDv2StreamingSynchronizerTest, CloseDuringPendingNextResolvesShutdown) { std::holds_alternative(result->value)); } -TEST(FDv2StreamingSynchronizerTest, NextTimeoutReturnsTimeout) { - auto logger = MakeNullLogger(); - IoContextRunner runner; - - FDv2StreamingSynchronizer synchronizer( - runner.context().get_executor(), logger, - MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, - 1s); - - FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); - - // Act: call Next with a short timeout and never deliver any event. - auto future = synchronizer.Next(100ms, data_model::Selector{}); - auto result = future.WaitForResult(2s); - - // Assert: the timeout future wins the race and Next resolves with Timeout. - ASSERT_TRUE(result.has_value()); - EXPECT_TRUE( - std::holds_alternative(result->value)); -} - // ============================================================================ // on_connect hook — URL/target construction // ============================================================================ @@ -407,7 +386,7 @@ TEST(FDv2StreamingSynchronizerTest, FullChangesetEventsReturnsChangeSet) { FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, put_object); FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, payload_transferred); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: a ChangeSet result is delivered, and the translated payload @@ -432,7 +411,7 @@ TEST(FDv2StreamingSynchronizerTest, GoodbyeEventReturnsGoodbye) { // Act: deliver a goodbye event through OnEvent and read the queued result. FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, goodbye); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: a Goodbye result with the wire reason is delivered, signaling @@ -461,7 +440,7 @@ TEST(FDv2StreamingSynchronizerTest, GoodbyeEventTriggersAsyncRestart) { // Act: deliver a goodbye event and drain the resulting Goodbye result. FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, goodbye); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); ASSERT_TRUE(result.has_value()); @@ -493,8 +472,7 @@ TEST(FDv2StreamingSynchronizerTest, R"({"version":1,"kind":"flag","key":"abandoned","object":)" R"({"key":"abandoned","on":true,"fallthrough":{"variation":0},)" R"("variations":[true,false],"version":1}})"); - FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, - server_intent_one); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_intent_one); FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, abandoned_put); // Goodbye arrives mid-payload; expect a Goodbye result and the partial @@ -502,7 +480,7 @@ TEST(FDv2StreamingSynchronizerTest, sse::Event goodbye("goodbye", R"({"reason":"bye"})"); FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, goodbye); auto goodbye_result = - synchronizer.Next(2s, data_model::Selector{}).WaitForResult(2s); + synchronizer.Next(data_model::Selector{}).WaitForResult(2s); ASSERT_TRUE(goodbye_result.has_value()); ASSERT_NE(std::get_if(&goodbye_result->value), nullptr); @@ -520,13 +498,12 @@ TEST(FDv2StreamingSynchronizerTest, R"("variations":[true,false],"version":2}})"); sse::Event payload_transferred("payload-transferred", R"({"state":"abc","version":2})"); - FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, - server_intent_two); + FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_intent_two); FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, fresh_put); FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, payload_transferred); auto changeset_result = - synchronizer.Next(2s, data_model::Selector{}).WaitForResult(2s); + synchronizer.Next(data_model::Selector{}).WaitForResult(2s); ASSERT_TRUE(changeset_result.has_value()); auto* cs = std::get_if(&changeset_result->value); @@ -551,7 +528,7 @@ TEST(FDv2StreamingSynchronizerTest, ServerErrorEventReturnsInterrupted) { // Act: deliver an FDv2 server-side error event. FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, server_error); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: the error is reported as Interrupted{kErrorResponse}, with the @@ -583,7 +560,7 @@ TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) { // Act: deliver an event whose data field cannot be parsed as JSON. FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, bad_event); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: the synchronizer reports Interrupted{kInvalidData} so the @@ -596,30 +573,6 @@ TEST(FDv2StreamingSynchronizerTest, MalformedJsonEventReturnsInterrupted) { FDv2SourceResult::ErrorInfo::ErrorKind::kInvalidData); } -TEST(FDv2StreamingSynchronizerTest, HeartbeatEventNoResultDelivered) { - auto logger = MakeNullLogger(); - IoContextRunner runner; - - FDv2StreamingSynchronizer synchronizer( - runner.context().get_executor(), logger, - MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, - 1s); - FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); - - sse::Event heartbeat("heartbeat", R"({})"); - - // Act: deliver a heartbeat event, then call Next with a short timeout. - FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, heartbeat); - auto future = synchronizer.Next(100ms, data_model::Selector{}); - auto result = future.WaitForResult(2s); - - // Assert: the heartbeat does not produce any FDv2SourceResult, so Next - // resolves with Timeout instead. - ASSERT_TRUE(result.has_value()); - EXPECT_TRUE( - std::holds_alternative(result->value)); -} - TEST(FDv2StreamingSynchronizerTest, TranslationFailureReturnsInterrupted) { auto logger = MakeNullLogger(); IoContextRunner runner; @@ -649,7 +602,7 @@ TEST(FDv2StreamingSynchronizerTest, TranslationFailureReturnsInterrupted) { FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, put_object); FDv2StreamingSynchronizerTestPeer::OnEvent(synchronizer, payload_transferred); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: the changeset is rejected with Interrupted{kInvalidData}. @@ -681,7 +634,7 @@ TEST(FDv2StreamingSynchronizerTest, // Act: deliver an unrecoverable HTTP 500 error from the SSE client. FDv2StreamingSynchronizerTestPeer::OnError(synchronizer, error); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: the synchronizer reports TerminalError{kErrorResponse} carrying @@ -709,7 +662,7 @@ TEST(FDv2StreamingSynchronizerTest, RecoverableReadTimeoutReturnsInterrupted) { // Act: deliver a recoverable read-timeout error from the SSE client. FDv2StreamingSynchronizerTestPeer::OnError(synchronizer, error); - auto future = synchronizer.Next(2s, data_model::Selector{}); + auto future = synchronizer.Next(data_model::Selector{}); auto result = future.WaitForResult(2s); // Assert: the synchronizer reports Interrupted{kNetworkError} so the From 0208900224a2cd20bda3c9fa6cb925484f4037fa Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Mon, 11 May 2026 14:41:56 -0700 Subject: [PATCH 2/8] fix: resolve StalledSynchronizer promise on Close to break leak cycle --- libs/server-sdk/tests/conditions_test.cpp | 50 +++++----- .../tests/fdv2_data_system_test.cpp | 91 +++++++++++++------ 2 files changed, 88 insertions(+), 53 deletions(-) diff --git a/libs/server-sdk/tests/conditions_test.cpp b/libs/server-sdk/tests/conditions_test.cpp index 722b8e759..031c17ef5 100644 --- a/libs/server-sdk/tests/conditions_test.cpp +++ b/libs/server-sdk/tests/conditions_test.cpp @@ -47,13 +47,13 @@ class RunningIoContext { TEST(FallbackConditionTest, InterruptedArmsTimerWhichFiresAfterTimeout) { RunningIoContext ioc; - FallbackCondition condition(ioc.GetExecutor(), 100ms); + FallbackCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto future = condition.Execute(); condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ FDv2SourceResult::ErrorInfo{ - FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", - std::chrono::system_clock::now()}, + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", std::chrono::system_clock::now()}, false, }}); @@ -65,15 +65,15 @@ TEST(FallbackConditionTest, InterruptedArmsTimerWhichFiresAfterTimeout) { TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) { RunningIoContext ioc; - FallbackCondition condition(ioc.GetExecutor(), 100ms); + FallbackCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto future = condition.Execute(); // Arm the timer with Interrupted, then cancel via ChangeSet before it // fires. condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ FDv2SourceResult::ErrorInfo{ - FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", - std::chrono::system_clock::now()}, + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", std::chrono::system_clock::now()}, false, }}); condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{ @@ -92,13 +92,13 @@ TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) { TEST(FallbackConditionTest, CloseCancelsActiveTimer) { RunningIoContext ioc; - FallbackCondition condition(ioc.GetExecutor(), 100ms); + FallbackCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto future = condition.Execute(); condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ FDv2SourceResult::ErrorInfo{ - FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", - std::chrono::system_clock::now()}, + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", std::chrono::system_clock::now()}, false, }}); condition.Close(); @@ -113,7 +113,7 @@ TEST(FallbackConditionTest, CloseCancelsActiveTimer) { TEST(RecoveryConditionTest, TimerArmedAtConstructionFiresAfterTimeout) { RunningIoContext ioc; - RecoveryCondition condition(ioc.GetExecutor(), 100ms); + RecoveryCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto result = condition.Execute().WaitForResult(1s); @@ -123,15 +123,15 @@ TEST(RecoveryConditionTest, TimerArmedAtConstructionFiresAfterTimeout) { TEST(RecoveryConditionTest, InformDoesNotAffectTimer) { RunningIoContext ioc; - RecoveryCondition condition(ioc.GetExecutor(), 100ms); + RecoveryCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto future = condition.Execute(); // Recovery is purely time-based; results from the synchronizer should not // disturb the timer in either direction. condition.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ FDv2SourceResult::ErrorInfo{ - FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", - std::chrono::system_clock::now()}, + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", std::chrono::system_clock::now()}, false, }}); condition.Inform(FDv2SourceResult{FDv2SourceResult::ChangeSet{ @@ -150,7 +150,7 @@ TEST(RecoveryConditionTest, InformDoesNotAffectTimer) { TEST(RecoveryConditionTest, CloseCancelsActiveTimer) { RunningIoContext ioc; - RecoveryCondition condition(ioc.GetExecutor(), 100ms); + RecoveryCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto future = condition.Execute(); condition.Close(); @@ -176,9 +176,10 @@ TEST(ConditionsTest, AggregateResolvesWithTypeOfFirstFiringCondition) { // Recovery's timer is much shorter, so it should win the race. std::vector> conds; - conds.push_back(std::make_unique(ioc.GetExecutor(), 1s)); conds.push_back( - std::make_unique(ioc.GetExecutor(), 100ms)); + std::make_unique(ioc.GetExecutor(), /*timeout=*/1s)); + conds.push_back(std::make_unique(ioc.GetExecutor(), + /*timeout=*/100ms)); Conditions conditions(std::move(conds)); auto result = conditions.GetFuture().WaitForResult(1s); @@ -193,15 +194,16 @@ TEST(ConditionsTest, InformForwardsToAllUnderlyingConditions) { // Fallback's timer is shorter than recovery's; informing Interrupted arms // the fallback timer, which will then beat recovery. std::vector> conds; + conds.push_back(std::make_unique(ioc.GetExecutor(), + /*timeout=*/100ms)); conds.push_back( - std::make_unique(ioc.GetExecutor(), 100ms)); - conds.push_back(std::make_unique(ioc.GetExecutor(), 1s)); + std::make_unique(ioc.GetExecutor(), /*timeout=*/1s)); Conditions conditions(std::move(conds)); conditions.Inform(FDv2SourceResult{FDv2SourceResult::Interrupted{ FDv2SourceResult::ErrorInfo{ - FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, "boom", - std::chrono::system_clock::now()}, + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", std::chrono::system_clock::now()}, false, }}); @@ -215,10 +217,10 @@ TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) { RunningIoContext ioc; std::vector> conds; - conds.push_back( - std::make_unique(ioc.GetExecutor(), 100ms)); - conds.push_back( - std::make_unique(ioc.GetExecutor(), 100ms)); + conds.push_back(std::make_unique(ioc.GetExecutor(), + /*timeout=*/100ms)); + conds.push_back(std::make_unique(ioc.GetExecutor(), + /*timeout=*/100ms)); Conditions conditions(std::move(conds)); conditions.Close(); diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index c113390c6..2695245ab 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -183,6 +183,7 @@ class StalledSynchronizer : public IFDv2Synchronizer { } void Close() override { + promise_.Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}}); if (closed_flag_) { *closed_flag_ = true; } @@ -226,8 +227,9 @@ TEST(FDv2DataSystemTest, OfflineMode_NoFactories_StatusValid) { boost::asio::io_context ioc; data_components::DataSourceStatusManager status_manager; - FDv2DataSystem ds({}, {}, nullptr, nullptr, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, {}, /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); // Initialize with no sources; orchestration should not be posted. ds.Initialize(); @@ -244,8 +246,9 @@ TEST(FDv2DataSystemTest, Destructor_TransitionsStatusToOff) { data_components::DataSourceStatusManager status_manager; { - FDv2DataSystem ds({}, {}, nullptr, nullptr, ioc.get_executor(), - &status_manager, logger); + FDv2DataSystem ds({}, {}, /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ASSERT_EQ(status_manager.Status().State(), DataSourceStatus::DataSourceState::kValid); @@ -280,7 +283,9 @@ TEST(FDv2DataSystemTest, InitializerWithBasis_AppliesAndStatusValid) { initializers.push_back( std::make_unique(std::move(initializer))); - FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + FDv2DataSystem ds(std::move(initializers), {}, + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); // Run the initializer to completion. @@ -329,7 +334,9 @@ TEST(FDv2DataSystemTest, InitializerInterrupted_AdvancesToNextInitializer) { initializers.push_back(std::move(first_factory)); initializers.push_back(std::move(second_factory)); - FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + FDv2DataSystem ds(std::move(initializers), {}, + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); // Run; first initializer fails, orchestrator should fall through to @@ -392,7 +399,9 @@ TEST(FDv2DataSystemTest, initializers.push_back(std::move(first_factory)); initializers.push_back(std::move(second_factory)); - FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + FDv2DataSystem ds(std::move(initializers), {}, + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); ds.Initialize(); @@ -430,7 +439,9 @@ TEST(FDv2DataSystemTest, initializers.push_back(std::move(first_factory)); initializers.push_back(std::move(second_factory)); - FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + FDv2DataSystem ds(std::move(initializers), {}, + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); ds.Initialize(); @@ -459,7 +470,9 @@ TEST(FDv2DataSystemTest, InitializerOnly_AllFail_TransitionsToOff) { initializers.push_back( std::make_unique(std::move(init))); - FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + FDv2DataSystem ds(std::move(initializers), {}, + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); // Run: initializer fails and there are no synchronizers to fall through to. @@ -497,7 +510,9 @@ TEST(FDv2DataSystemTest, SynchronizerChangeSet_AppliesAndStatusValid) { synchronizers.push_back( std::make_unique(std::move(sync))); - FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); // No initializers; orchestrator should hand directly to the synchronizer. @@ -538,7 +553,9 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_StaysOnSameSynchronizer) { synchronizers.push_back(std::move(first_factory)); synchronizers.push_back(std::move(second_factory)); - FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); ds.Initialize(); @@ -575,7 +592,9 @@ TEST(FDv2DataSystemTest, SynchronizerInterrupted_RetriesSameSynchronizer) { auto* factory_ptr = factory.get(); synchronizers.push_back(std::move(factory)); - FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); ds.Initialize(); @@ -613,16 +632,17 @@ TEST(FDv2DataSystemTest, SynchronizerNext_ReceivesUpdatedSelector) { }, false, }}); - auto sync = std::make_unique(std::move(results), nullptr, - &next_calls); + auto sync = std::make_unique( + std::move(results), /*closed_flag=*/nullptr, &next_calls); std::vector> synchronizers; synchronizers.push_back( std::make_unique(std::move(sync))); FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), - nullptr, nullptr, ioc.get_executor(), &status_manager, - logger); + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -671,16 +691,17 @@ TEST(FDv2DataSystemTest, SynchronizerGoodbye_PreservesSelectorOnNextCall) { }}); results.push_back( FDv2SourceResult{FDv2SourceResult::Goodbye{std::nullopt, false}}); - auto sync = std::make_unique(std::move(results), nullptr, - &next_calls); + auto sync = std::make_unique( + std::move(results), /*closed_flag=*/nullptr, &next_calls); std::vector> synchronizers; synchronizers.push_back( std::make_unique(std::move(sync))); FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), - nullptr, nullptr, ioc.get_executor(), &status_manager, - logger); + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -734,7 +755,9 @@ TEST(FDv2DataSystemTest, synchronizers.push_back(std::move(first_factory)); synchronizers.push_back(std::move(second_factory)); - FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); ds.Initialize(); @@ -771,7 +794,9 @@ TEST(FDv2DataSystemTest, SynchronizerCycledExhaustion_TransitionsToOff) { synchronizers.push_back( std::make_unique(std::move(sync))); - FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); // Synchronizer fails terminally; no more factories to try. @@ -798,12 +823,14 @@ TEST(FDv2DataSystemTest, FallbackConditionFires_AdvancesToNextSynchronizer) { std::vector{ FDv2SourceResult{FDv2SourceResult::Interrupted{ FDv2SourceResult::ErrorInfo{ - FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, 0, - "boom", std::chrono::system_clock::now()}, + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", + std::chrono::system_clock::now()}, false, }}, }, - nullptr, nullptr, /*stall_after_results=*/true); + /*closed_flag=*/nullptr, /*next_calls=*/nullptr, + /*stall_after_results=*/true); auto primary_factory = std::make_unique(std::move(primary_sync)); @@ -819,10 +846,12 @@ TEST(FDv2DataSystemTest, FallbackConditionFires_AdvancesToNextSynchronizer) { synchronizers.push_back(std::move(secondary_factory)); auto fallback_factory = - std::make_unique(ioc.get_executor(), 50ms); + std::make_unique(ioc.get_executor(), + /*timeout=*/50ms); FDv2DataSystem ds({}, std::move(synchronizers), std::move(fallback_factory), - nullptr, ioc.get_executor(), &status_manager, logger); + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); ds.Initialize(); ioc.run(); @@ -858,7 +887,9 @@ TEST(FDv2DataSystemTest, std::make_unique(std::move(initializer))); { - FDv2DataSystem ds(std::move(initializers), {}, nullptr, nullptr, + FDv2DataSystem ds(std::move(initializers), {}, + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); ds.Initialize(); // RunNextInitializer runs, builds the source, calls Run().Then(...). @@ -888,7 +919,9 @@ TEST(FDv2DataSystemTest, std::make_unique(std::move(synchronizer))); { - FDv2DataSystem ds({}, std::move(synchronizers), nullptr, nullptr, + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, ioc.get_executor(), &status_manager, logger); ds.Initialize(); // No initializers -> RunNextInitializer immediately exhausts -> From df918a1c8ca3db8f5443c45d5e42530076d2f84f Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Mon, 11 May 2026 15:33:54 -0700 Subject: [PATCH 3/8] fix: resolve condition promise on Close to break leak cycles --- .../data_interfaces/source/ifdv2_condition.hpp | 4 ++++ .../src/data_systems/fdv2/conditions.cpp | 1 + .../src/data_systems/fdv2/fdv2_data_system.cpp | 3 +++ libs/server-sdk/tests/conditions_test.cpp | 17 ++++++++++------- 4 files changed, 18 insertions(+), 7 deletions(-) diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp index 63a078aa3..867b65cf2 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp @@ -33,8 +33,12 @@ namespace launchdarkly::server_side::data_interfaces { class IFDv2Condition { public: enum class Type { + /** Stop the active synchronizer and start the next-preferred one. */ kFallback, + /** Return to the most-preferred synchronizer. */ kRecovery, + /** The condition was Close()d before firing; orchestrator ignores. */ + kCancelled, }; /** diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp index 71640f378..ad3d158ad 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp @@ -35,6 +35,7 @@ void TimedCondition::Close() { state_->timer_cancel->Cancel(); state_->timer_cancel.reset(); } + state_->promise.Resolve(IFDv2Condition::Type::kCancelled); } void TimedCondition::ArmTimer() { diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index 496934798..5fb925799 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -260,6 +260,9 @@ void FDv2DataSystem::RunSynchronizerNext() { void FDv2DataSystem::OnConditionFired( data_interfaces::IFDv2Condition::Type type) { using Type = data_interfaces::IFDv2Condition::Type; + if (type == Type::kCancelled) { + return; + } { std::lock_guard lock(mutex_); if (closed_) { diff --git a/libs/server-sdk/tests/conditions_test.cpp b/libs/server-sdk/tests/conditions_test.cpp index 031c17ef5..d32ca8522 100644 --- a/libs/server-sdk/tests/conditions_test.cpp +++ b/libs/server-sdk/tests/conditions_test.cpp @@ -90,7 +90,7 @@ TEST(FallbackConditionTest, ChangeSetCancelsActiveTimer) { EXPECT_FALSE(future.IsFinished()); } -TEST(FallbackConditionTest, CloseCancelsActiveTimer) { +TEST(FallbackConditionTest, CloseCancelsActiveTimerAndResolvesWithCancelled) { RunningIoContext ioc; FallbackCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto future = condition.Execute(); @@ -103,8 +103,9 @@ TEST(FallbackConditionTest, CloseCancelsActiveTimer) { }}); condition.Close(); - std::this_thread::sleep_for(200ms); - EXPECT_FALSE(future.IsFinished()); + auto result = future.WaitForResult(200ms); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kCancelled, *result); } // ============================================================================ @@ -148,15 +149,16 @@ TEST(RecoveryConditionTest, InformDoesNotAffectTimer) { EXPECT_EQ(IFDv2Condition::Type::kRecovery, *result); } -TEST(RecoveryConditionTest, CloseCancelsActiveTimer) { +TEST(RecoveryConditionTest, CloseCancelsActiveTimerAndResolvesWithCancelled) { RunningIoContext ioc; RecoveryCondition condition(ioc.GetExecutor(), /*timeout=*/100ms); auto future = condition.Execute(); condition.Close(); - std::this_thread::sleep_for(200ms); - EXPECT_FALSE(future.IsFinished()); + auto result = future.WaitForResult(200ms); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kCancelled, *result); } // ============================================================================ @@ -226,5 +228,6 @@ TEST(ConditionsTest, CloseForwardsToAllUnderlyingConditions) { conditions.Close(); auto result = conditions.GetFuture().WaitForResult(200ms); - EXPECT_FALSE(result.has_value()); + ASSERT_TRUE(result.has_value()); + EXPECT_EQ(IFDv2Condition::Type::kCancelled, *result); } From 810f0d3e1caf9713be03ac9ccea29a6b9b641343 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 12 May 2026 13:25:53 -0700 Subject: [PATCH 4/8] feat: cyclic synchronizer rotation with block/unblock state --- libs/server-sdk/src/CMakeLists.txt | 2 + .../data_systems/fdv2/fdv2_data_system.cpp | 42 +-- .../data_systems/fdv2/fdv2_data_system.hpp | 28 +- .../src/data_systems/fdv2/source_manager.cpp | 75 ++++++ .../src/data_systems/fdv2/source_manager.hpp | 104 ++++++++ .../tests/fdv2_data_system_test.cpp | 245 ++++++++++++++++++ libs/server-sdk/tests/source_manager_test.cpp | 187 +++++++++++++ 7 files changed, 647 insertions(+), 36 deletions(-) create mode 100644 libs/server-sdk/src/data_systems/fdv2/source_manager.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/source_manager.hpp create mode 100644 libs/server-sdk/tests/source_manager_test.cpp diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 0b6c67b01..49630fbb3 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -62,6 +62,8 @@ target_sources(${LIBNAME} data_systems/fdv2/streaming_synchronizer.cpp data_systems/fdv2/conditions.hpp data_systems/fdv2/conditions.cpp + data_systems/fdv2/source_manager.hpp + data_systems/fdv2/source_manager.cpp data_systems/fdv2/fdv2_data_system.hpp data_systems/fdv2/fdv2_data_system.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index 5fb925799..b7b5b2fc6 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -37,7 +37,6 @@ FDv2DataSystem::FDv2DataSystem( : logger_(logger), ioc_(std::move(ioc)), initializer_factories_(std::move(initializer_factories)), - synchronizer_factories_(std::move(synchronizer_factories)), fallback_condition_factory_(std::move(fallback_condition_factory)), recovery_condition_factory_(std::move(recovery_condition_factory)), status_manager_(status_manager), @@ -47,7 +46,7 @@ FDv2DataSystem::FDv2DataSystem( closed_(false), selector_(), initializer_index_(0), - synchronizer_index_(0), + source_manager_(std::move(synchronizer_factories)), active_initializer_(nullptr), active_synchronizer_(nullptr), active_conditions_(nullptr) {} @@ -101,7 +100,8 @@ void FDv2DataSystem::Initialize() { assert(!already_called && "Initialize() must be called at most once"); LD_LOG(logger_, LogLevel::kInfo) << Identity() << ": starting"; - if (initializer_factories_.empty() && synchronizer_factories_.empty()) { + if (initializer_factories_.empty() && + source_manager_.SynchronizerCount() == 0) { // Offline mode: empty store is the canonical state. status_manager_->SetState(DataSourceStatus::DataSourceState::kValid); return; @@ -196,30 +196,27 @@ void FDv2DataSystem::OnInitializerResult( void FDv2DataSystem::StartSynchronizers() { bool exhausted = false; - bool cycled_synchronizers = false; + // True if at least one synchronizer factory was configured at construction. + bool any_synchronizers_configured = false; { std::lock_guard lock(mutex_); if (closed_) { return; } - if (synchronizer_index_ >= synchronizer_factories_.size()) { - exhausted = true; - cycled_synchronizers = synchronizer_index_ > 0; + active_synchronizer_ = source_manager_.NextSynchronizer(); + if (active_synchronizer_) { + active_conditions_ = BuildActiveConditions(); } else { - auto& factory = synchronizer_factories_[synchronizer_index_]; - active_synchronizer_ = factory->Build(); - active_conditions_ = - BuildConditionsForSynchronizer(synchronizer_index_); - ++synchronizer_index_; + exhausted = true; + any_synchronizers_configured = + source_manager_.SynchronizerCount() > 0; } } if (exhausted) { - // kOff when we can't continue updating; init-only with data stays - // kValid. - if (cycled_synchronizers || !store_.Initialized()) { + if (any_synchronizers_configured || !store_.Initialized()) { std::string const message = - cycled_synchronizers + any_synchronizers_configured ? "all data source acquisition methods have been exhausted" : "all initializers exhausted and no synchronizers " "configured"; @@ -274,7 +271,7 @@ void FDv2DataSystem::OnConditionFired( if (type == Type::kRecovery) { LD_LOG(logger_, LogLevel::kInfo) << Identity() << ": recovery condition met"; - synchronizer_index_ = 0; + source_manager_.ResetSourceIndex(); } else { LD_LOG(logger_, LogLevel::kInfo) << Identity() << ": fallback condition met"; @@ -283,16 +280,18 @@ void FDv2DataSystem::OnConditionFired( StartSynchronizers(); } -std::unique_ptr FDv2DataSystem::BuildConditionsForSynchronizer( - std::size_t synchronizer_position) const { +std::unique_ptr FDv2DataSystem::BuildActiveConditions() const { std::vector> conditions; - if (synchronizer_factories_.size() <= 1) { + // With only one synchronizer available there's nothing to fall back to + // or recover from, so leave the conditions empty. + if (source_manager_.AvailableSynchronizerCount() == 1) { return std::make_unique(std::move(conditions)); } if (fallback_condition_factory_) { conditions.push_back(fallback_condition_factory_->Build()); } - if (synchronizer_position > 0 && recovery_condition_factory_) { + // The prime synchronizer has nothing more-preferred to recover to. + if (!source_manager_.IsPrimeSynchronizer() && recovery_condition_factory_) { conditions.push_back(recovery_condition_factory_->Build()); } return std::make_unique(std::move(conditions)); @@ -356,6 +355,7 @@ void FDv2DataSystem::OnSynchronizerResult( return; } if (advance) { + source_manager_.BlockCurrentSynchronizer(); active_synchronizer_.reset(); active_conditions_.reset(); } diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp index fb1ed46e3..02b059077 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp @@ -8,6 +8,7 @@ #include "../../data_interfaces/source/ifdv2_synchronizer_factory.hpp" #include "../../data_interfaces/system/idata_system.hpp" #include "conditions.hpp" +#include "source_manager.hpp" #include #include @@ -104,16 +105,18 @@ namespace launchdarkly::server_side::data_systems { * | * | (N exhausted, or basis received) * v - * +-------------------+ synchronizer #M's Next returns: + * +-------------------+ active synchronizer's Next returns: * | Synchronizer | ChangeSet -> apply, loop * | phase | Interrupted -> loop (source self-retries) - * | M = 0, 1, 2, ... | Timeout -> loop - * | | Goodbye -> loop (source self-restarts) - * | | TerminalError -> M += 1 - * | | Shutdown -> [Closed] + * | (cyclic; | Goodbye -> loop (source self-restarts) + * | blocked sources | TerminalError -> block, advance + * | are skipped) | Shutdown -> [Closed] * +-------------------+ + * ^ | fallback condition -> advance (with wrap) + * | | recovery condition -> reset to first available + * +---+ * | - * | (M exhausted) + * | (all synchronizers blocked) * v * [Done; final status preserved] * @@ -248,11 +251,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { void OnSynchronizerResult(data_interfaces::FDv2SourceResult result); void OnConditionFired(data_interfaces::IFDv2Condition::Type type); - // Builds the conditions to apply to a synchronizer at the given chain - // position. Reads only const-after-construction state, so no - // synchronization is required. - std::unique_ptr BuildConditionsForSynchronizer( - std::size_t synchronizer_position) const; + // Builds the conditions to apply to the currently active synchronizer. + // Must be called with mutex_ held; reads source_manager_ state. + std::unique_ptr BuildActiveConditions() const; // Applies a typed FDv2 changeset to the in-memory store and updates the // tracked selector if the changeset's selector is non-empty. @@ -266,9 +267,6 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { boost::asio::any_io_executor const ioc_; std::vector> const initializer_factories_; - std::vector< - std::unique_ptr> const - synchronizer_factories_; std::unique_ptr const fallback_condition_factory_; std::unique_ptr const @@ -290,7 +288,7 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { bool closed_; data_model::Selector selector_; std::size_t initializer_index_; - std::size_t synchronizer_index_; + SourceManager source_manager_; std::unique_ptr active_initializer_; std::unique_ptr active_synchronizer_; std::unique_ptr active_conditions_; diff --git a/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp b/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp new file mode 100644 index 000000000..362f13efd --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp @@ -0,0 +1,75 @@ +#include "source_manager.hpp" + +#include + +namespace launchdarkly::server_side::data_systems { + +using data_interfaces::IFDv2Synchronizer; +using data_interfaces::IFDv2SynchronizerFactory; + +SourceManager::SourceManager( + std::vector> factories) { + synchronizers_.reserve(factories.size()); + for (auto& factory : factories) { + synchronizers_.push_back( + SynchronizerFactoryWithState{std::move(factory), State::kAvailable, + /*is_fdv1_fallback=*/false}); + } +} + +std::unique_ptr SourceManager::NextSynchronizer() { + if (synchronizers_.empty()) { + current_factory_index_ = -1; + return nullptr; + } + for (std::size_t visited = 0; visited < synchronizers_.size(); ++visited) { + synchronizer_index_ = + (synchronizer_index_ + 1) % static_cast(synchronizers_.size()); + if (synchronizers_[synchronizer_index_].state == State::kAvailable) { + current_factory_index_ = synchronizer_index_; + return synchronizers_[synchronizer_index_].factory->Build(); + } + } + current_factory_index_ = -1; + return nullptr; +} + +void SourceManager::BlockCurrentSynchronizer() { + if (current_factory_index_ >= 0) { + synchronizers_[current_factory_index_].state = State::kBlocked; + } +} + +void SourceManager::ResetSourceIndex() { + synchronizer_index_ = -1; +} + +bool SourceManager::IsPrimeSynchronizer() const { + for (std::size_t i = 0; i < synchronizers_.size(); ++i) { + if (synchronizers_[i].state == State::kAvailable) { + return synchronizer_index_ == static_cast(i); + } + } + return false; +} + +std::size_t SourceManager::AvailableSynchronizerCount() const { + std::size_t count = 0; + for (auto const& s : synchronizers_) { + if (s.state == State::kAvailable) { + ++count; + } + } + return count; +} + +std::size_t SourceManager::SynchronizerCount() const { + return synchronizers_.size(); +} + +bool SourceManager::IsCurrentSynchronizerFDv1Fallback() const { + return current_factory_index_ >= 0 && + synchronizers_[current_factory_index_].is_fdv1_fallback; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp b/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp new file mode 100644 index 000000000..1f310bfed --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp @@ -0,0 +1,104 @@ +#pragma once + +#include "../../data_interfaces/source/ifdv2_synchronizer.hpp" +#include "../../data_interfaces/source/ifdv2_synchronizer_factory.hpp" + +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * Manages a list of synchronizer factories together with per-factory state + * (Available / Blocked) and the index of the currently active factory. + * + * Iteration is cyclic: NextSynchronizer advances past the end and wraps to + * the beginning, skipping any factory in the Blocked state. A factory enters + * the Blocked state when BlockCurrentSynchronizer is called (typically on a + * terminal error). Once blocked, a factory is not eligible to be built again + * until unblocked. + * + * ResetSourceIndex causes the next call to start iteration at index 0 — used + * by recovery, which wants to fall back to the most-preferred Available + * synchronizer. + * + * Each factory also carries an is_fdv1_fallback flag, currently always + * false. TODO: populate when the FDv1 fallback directive is implemented. + * + * Not thread-safe. The caller is responsible for serializing all calls. + */ +class SourceManager { + public: + explicit SourceManager( + std::vector> + factories); + + /** + * Advances to the next Available synchronizer factory (wrapping past the + * end), builds a synchronizer instance from it, and records that factory + * as the current one for subsequent queries. Returns nullptr if no + * Available factory exists. + */ + std::unique_ptr NextSynchronizer(); + + /** + * Marks the currently tracked factory as Blocked. No-op if no factory is + * currently tracked. + */ + void BlockCurrentSynchronizer(); + + /** + * Resets the iteration cursor so that the next call to NextSynchronizer + * begins searching from index 0. + */ + void ResetSourceIndex(); + + /** + * Returns true if the currently tracked factory is the first Available + * factory in the list. Returns false if no factory is currently tracked. + */ + [[nodiscard]] bool IsPrimeSynchronizer() const; + + /** + * Returns the count of factories not in the Blocked state. + */ + [[nodiscard]] std::size_t AvailableSynchronizerCount() const; + + /** + * Returns the total number of factories configured at construction + * (including any currently in the Blocked state). Constant for the + * lifetime of the SourceManager. + */ + [[nodiscard]] std::size_t SynchronizerCount() const; + + /** + * Returns true if the currently tracked factory was configured as the + * FDv1 fallback synchronizer. Always false until the FDv1 fallback + * directive is implemented. + */ + [[nodiscard]] bool IsCurrentSynchronizerFDv1Fallback() const; + + SourceManager(SourceManager const&) = delete; + SourceManager(SourceManager&&) = delete; + SourceManager& operator=(SourceManager const&) = delete; + SourceManager& operator=(SourceManager&&) = delete; + ~SourceManager() = default; + + private: + enum class State { kAvailable, kBlocked }; + + struct SynchronizerFactoryWithState { + std::unique_ptr factory; + State state = State::kAvailable; + bool is_fdv1_fallback = false; + }; + + std::vector synchronizers_; + // Iteration cursor; -1 means "start at 0 on next call". + int synchronizer_index_ = -1; + // Index of the most recently returned factory; -1 if none. + int current_factory_index_ = -1; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index 2695245ab..df885993c 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -144,6 +144,29 @@ class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory { std::unique_ptr source_; }; +// Returns each pre-supplied source in order on successive Build() calls. +// Returns nullptr once the supply is exhausted. Used in tests that exercise +// wrap-around or recovery, where the same factory is built more than once. +class MultiShotSynchronizerFactory : public IFDv2SynchronizerFactory { + public: + explicit MultiShotSynchronizerFactory( + std::vector> sources) + : sources_(std::move(sources)) {} + + std::unique_ptr Build() override { + ++build_count_; + if (build_count_ <= static_cast(sources_.size())) { + return std::move(sources_[build_count_ - 1]); + } + return nullptr; + } + + int build_count_ = 0; + + private: + std::vector> sources_; +}; + // Initializer whose Run() returns an unresolved Future. Used to exercise // destruction with orchestration in flight. class StalledInitializer : public IFDv2Initializer { @@ -861,6 +884,228 @@ TEST(FDv2DataSystemTest, FallbackConditionFires_AdvancesToNextSynchronizer) { EXPECT_EQ(1, secondary_factory_ptr->build_count_); } +TEST(FDv2DataSystemTest, FallbackConditionOnLastSynchronizerWrapsToPrimary) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Two synchronizers, both Interrupted-then-stall on the first build. + // The primary's second build returns Shutdown, which ends orchestration + // cleanly once the wrap brings us back to the primary. + std::vector> primary_sources; + primary_sources.push_back(std::make_unique( + std::vector{ + FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", + std::chrono::system_clock::now()}, + false, + }}, + }, + /*closed_flag=*/nullptr, /*next_calls=*/nullptr, + /*stall_after_results=*/true)); + primary_sources.push_back( + std::make_unique(std::vector{})); + auto primary_factory = std::make_unique( + std::move(primary_sources)); + auto* primary_factory_ptr = primary_factory.get(); + + auto secondary_sync = std::make_unique( + std::vector{ + FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", + std::chrono::system_clock::now()}, + false, + }}, + }, + /*closed_flag=*/nullptr, /*next_calls=*/nullptr, + /*stall_after_results=*/true); + auto secondary_factory = + std::make_unique(std::move(secondary_sync)); + auto* secondary_factory_ptr = secondary_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(primary_factory)); + synchronizers.push_back(std::move(secondary_factory)); + + auto fallback_factory = + std::make_unique(ioc.get_executor(), + /*timeout=*/5ms); + + FDv2DataSystem ds({}, std::move(synchronizers), std::move(fallback_factory), + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // Primary built twice: once at startup, once after the wrap. + // Secondary built once: between the two primary builds. + EXPECT_EQ(2, primary_factory_ptr->build_count_); + EXPECT_EQ(1, secondary_factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, RecoveryConditionResetsToFirstAvailable) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Two synchronizers, both Interrupted-then-stall. Recovery fires before + // fallback on the secondary, so the orchestrator resets to the primary. + // The primary's second build returns Shutdown to end the test cleanly. + std::vector> primary_sources; + primary_sources.push_back(std::make_unique( + std::vector{ + FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", + std::chrono::system_clock::now()}, + false, + }}, + }, + /*closed_flag=*/nullptr, /*next_calls=*/nullptr, + /*stall_after_results=*/true)); + primary_sources.push_back( + std::make_unique(std::vector{})); + auto primary_factory = std::make_unique( + std::move(primary_sources)); + auto* primary_factory_ptr = primary_factory.get(); + + auto secondary_sync = std::make_unique( + std::vector{ + FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", + std::chrono::system_clock::now()}, + false, + }}, + }, + /*closed_flag=*/nullptr, /*next_calls=*/nullptr, + /*stall_after_results=*/true); + auto secondary_factory = + std::make_unique(std::move(secondary_sync)); + auto* secondary_factory_ptr = secondary_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(primary_factory)); + synchronizers.push_back(std::move(secondary_factory)); + + // Recovery is shorter than fallback so it wins the race on the secondary. + auto fallback_factory = + std::make_unique(ioc.get_executor(), + /*timeout=*/20ms); + auto recovery_factory = + std::make_unique(ioc.get_executor(), + /*timeout=*/5ms); + + FDv2DataSystem ds({}, std::move(synchronizers), std::move(fallback_factory), + std::move(recovery_factory), ioc.get_executor(), + &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + // Primary built twice: once at startup, once after recovery reset the + // index. Secondary built once in between. + EXPECT_EQ(2, primary_factory_ptr->build_count_); + EXPECT_EQ(1, secondary_factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, TerminalErrorsOnEverySynchronizerExhaustToOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Two synchronizers, each returning TerminalError on first Next. Block + // marks each as unavailable; once both are blocked, NextSynchronizer + // returns null and the orchestrator exhausts. + auto primary_sync = + std::make_unique(std::vector{ + FDv2SourceResult{FDv2SourceResult::TerminalError{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, + "unauthorized", std::chrono::system_clock::now()}, + false, + }}}); + auto primary_factory = + std::make_unique(std::move(primary_sync)); + + auto secondary_sync = + std::make_unique(std::vector{ + FDv2SourceResult{FDv2SourceResult::TerminalError{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, 401, + "unauthorized", std::chrono::system_clock::now()}, + false, + }}}); + auto secondary_factory = + std::make_unique(std::move(secondary_sync)); + + std::vector> synchronizers; + synchronizers.push_back(std::move(primary_factory)); + synchronizers.push_back(std::move(secondary_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(status_manager.Status().State(), + DataSourceStatus::DataSourceState::kOff); +} + +TEST(FDv2DataSystemTest, SingleSynchronizerHasNoFallbackArmed) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Single synchronizer: applies a ChangeSet (status -> Valid), then + // returns Interrupted (status -> Interrupted), then stalls. A fallback + // factory is configured, but with only one synchronizer Available the + // orchestrator should arm no conditions, so the fallback never fires + // and the orchestrator stays put rather than exhausting to kOff. + std::vector results; + results.push_back( + MakeFullChangeSetResult(ChangeSetData{}, MakeSelector(1, "state-1"))); + results.push_back(FDv2SourceResult{FDv2SourceResult::Interrupted{ + FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kNetworkError, + /*status_code=*/0, "boom", std::chrono::system_clock::now()}, + false, + }}); + auto sync = std::make_unique( + std::move(results), /*closed_flag=*/nullptr, /*next_calls=*/nullptr, + /*stall_after_results=*/true); + auto factory = + std::make_unique(std::move(sync)); + + std::vector> synchronizers; + synchronizers.push_back(std::move(factory)); + + auto fallback_factory = + std::make_unique(ioc.get_executor(), + /*timeout=*/5ms); + + FDv2DataSystem ds({}, std::move(synchronizers), std::move(fallback_factory), + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + + // run_for is an upper bound; ioc returns as soon as work drains. + ds.Initialize(); + ioc.run_for(500ms); + + EXPECT_EQ(DataSourceStatus::DataSourceState::kInterrupted, + status_manager.Status().State()); +} + // ============================================================================ // Destruction protocol: in-flight orchestration // ============================================================================ diff --git a/libs/server-sdk/tests/source_manager_test.cpp b/libs/server-sdk/tests/source_manager_test.cpp new file mode 100644 index 000000000..244ca6400 --- /dev/null +++ b/libs/server-sdk/tests/source_manager_test.cpp @@ -0,0 +1,187 @@ +#include + +#include +#include +#include + +#include +#include +#include +#include + +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; + +namespace { + +// Stub synchronizer; SourceManager only cares that Build() returns one. +class StubSynchronizer : public IFDv2Synchronizer { + public: + launchdarkly::async::Future Next( + launchdarkly::data_model::Selector) override { + return launchdarkly::async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + + void Close() override {} + + std::string const& Identity() const override { + static std::string const id = "stub"; + return id; + } +}; + +// Counts Build() calls for assertion. Tests don't run the returned +// synchronizer, so a fresh stub each time is fine. +class CountingFactory : public IFDv2SynchronizerFactory { + public: + std::unique_ptr Build() override { + ++build_count; + return std::make_unique(); + } + + int build_count = 0; +}; + +} // namespace + +TEST(SourceManagerTest, EmptyManagerReportsZeroAvailable) { + SourceManager mgr({}); + + EXPECT_EQ(0u, mgr.AvailableSynchronizerCount()); + EXPECT_EQ(nullptr, mgr.NextSynchronizer()); + EXPECT_FALSE(mgr.IsPrimeSynchronizer()); + EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback()); +} + +TEST(SourceManagerTest, NextSynchronizerReturnsFirstThenWrapsAround) { + auto f0 = std::make_unique(); + auto f1 = std::make_unique(); + auto* f0_ptr = f0.get(); + auto* f1_ptr = f1.get(); + std::vector> factories; + factories.push_back(std::move(f0)); + factories.push_back(std::move(f1)); + SourceManager mgr(std::move(factories)); + + EXPECT_NE(nullptr, mgr.NextSynchronizer()); + EXPECT_TRUE(mgr.IsPrimeSynchronizer()); + EXPECT_NE(nullptr, mgr.NextSynchronizer()); + EXPECT_FALSE(mgr.IsPrimeSynchronizer()); + // Wraps back to the first factory. + EXPECT_NE(nullptr, mgr.NextSynchronizer()); + EXPECT_TRUE(mgr.IsPrimeSynchronizer()); + + EXPECT_EQ(2, f0_ptr->build_count); + EXPECT_EQ(1, f1_ptr->build_count); +} + +TEST(SourceManagerTest, BlockCurrentSynchronizerRemovesItFromRotation) { + auto f0 = std::make_unique(); + auto f1 = std::make_unique(); + auto f2 = std::make_unique(); + auto* f0_ptr = f0.get(); + auto* f1_ptr = f1.get(); + auto* f2_ptr = f2.get(); + std::vector> factories; + factories.push_back(std::move(f0)); + factories.push_back(std::move(f1)); + factories.push_back(std::move(f2)); + SourceManager mgr(std::move(factories)); + + // Advance to index 1 and block it. + mgr.NextSynchronizer(); // 0 + mgr.NextSynchronizer(); // 1 + mgr.BlockCurrentSynchronizer(); + + EXPECT_EQ(2u, mgr.AvailableSynchronizerCount()); + + // From index 1, next should skip to 2. + mgr.NextSynchronizer(); + EXPECT_FALSE(mgr.IsPrimeSynchronizer()); + + // Then wrap to 0 (skipping blocked 1). + mgr.NextSynchronizer(); + EXPECT_TRUE(mgr.IsPrimeSynchronizer()); + + EXPECT_EQ(2, f0_ptr->build_count); + EXPECT_EQ(1, f1_ptr->build_count); + EXPECT_EQ(1, f2_ptr->build_count); +} + +TEST(SourceManagerTest, AllBlockedReturnsNullAndZeroCount) { + auto f0 = std::make_unique(); + auto f1 = std::make_unique(); + std::vector> factories; + factories.push_back(std::move(f0)); + factories.push_back(std::move(f1)); + SourceManager mgr(std::move(factories)); + + mgr.NextSynchronizer(); + mgr.BlockCurrentSynchronizer(); + mgr.NextSynchronizer(); + mgr.BlockCurrentSynchronizer(); + + EXPECT_EQ(0u, mgr.AvailableSynchronizerCount()); + EXPECT_EQ(nullptr, mgr.NextSynchronizer()); + EXPECT_FALSE(mgr.IsPrimeSynchronizer()); +} + +TEST(SourceManagerTest, ResetSourceIndexSendsNextCallToTheFirstAvailable) { + auto f0 = std::make_unique(); + auto f1 = std::make_unique(); + auto f2 = std::make_unique(); + auto* f0_ptr = f0.get(); + auto* f2_ptr = f2.get(); + std::vector> factories; + factories.push_back(std::move(f0)); + factories.push_back(std::move(f1)); + factories.push_back(std::move(f2)); + SourceManager mgr(std::move(factories)); + + // Walk to index 2. + mgr.NextSynchronizer(); + mgr.NextSynchronizer(); + mgr.NextSynchronizer(); + EXPECT_EQ(1, f2_ptr->build_count); + + // Reset; next call should hit index 0 again. + mgr.ResetSourceIndex(); + mgr.NextSynchronizer(); + EXPECT_TRUE(mgr.IsPrimeSynchronizer()); + EXPECT_EQ(2, f0_ptr->build_count); +} + +TEST(SourceManagerTest, ResetSourceIndexSkipsBlockedFirstFactory) { + auto f0 = std::make_unique(); + auto f1 = std::make_unique(); + auto* f0_ptr = f0.get(); + auto* f1_ptr = f1.get(); + std::vector> factories; + factories.push_back(std::move(f0)); + factories.push_back(std::move(f1)); + SourceManager mgr(std::move(factories)); + + // Pick and block index 0. + mgr.NextSynchronizer(); + mgr.BlockCurrentSynchronizer(); + + // After reset, the next call should land on index 1 (first available), + // and IsPrimeSynchronizer should treat index 1 as the prime. + mgr.ResetSourceIndex(); + mgr.NextSynchronizer(); + EXPECT_TRUE(mgr.IsPrimeSynchronizer()); + + EXPECT_EQ(1, f0_ptr->build_count); + EXPECT_EQ(1, f1_ptr->build_count); +} + +TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) { + auto f0 = std::make_unique(); + std::vector> factories; + factories.push_back(std::move(f0)); + SourceManager mgr(std::move(factories)); + + mgr.NextSynchronizer(); + EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback()); +} From 82115eb9aa7a0c7a6d3de8fabd86205ca28cee81 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 12 May 2026 13:49:51 -0700 Subject: [PATCH 5/8] fix: resolve MockSynchronizer stall promise on Close --- libs/server-sdk/tests/fdv2_data_system_test.cpp | 1 + 1 file changed, 1 insertion(+) diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index df885993c..e11fc7a15 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -94,6 +94,7 @@ class MockSynchronizer : public IFDv2Synchronizer { } void Close() override { + stall_promise_.Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}}); if (closed_flag_) { *closed_flag_ = true; } From ec9d49f8cc5b76a32bd9727d3f49be23be73d68b Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 12 May 2026 14:13:22 -0700 Subject: [PATCH 6/8] docs: state thread-safety contracts on condition classes --- .../src/data_interfaces/source/ifdv2_condition.hpp | 6 ++++++ libs/server-sdk/src/data_systems/fdv2/conditions.hpp | 3 +++ 2 files changed, 9 insertions(+) diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp index 867b65cf2..b693dcd97 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp @@ -29,6 +29,9 @@ namespace launchdarkly::server_side::data_interfaces { * * Close() cancels any pending internal work (e.g., a timer) without resolving * the future. After Close() returns the condition's future will not resolve. + * + * Implementations must be thread-safe: Execute, Inform, Close, and GetType + * may be called from any thread. */ class IFDv2Condition { public: @@ -78,6 +81,9 @@ class IFDv2Condition { /** * Builds new IFDv2Condition instances on demand. Each call to Build() produces * a fresh condition with no prior state. + * + * Implementations must be thread-safe: Build and GetType may be called from + * any thread. */ class IFDv2ConditionFactory { public: diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.hpp b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp index 71e3d0a93..ffe1ff1e0 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp @@ -137,6 +137,9 @@ class RecoveryConditionFactory final * type of the first condition to fire. Inform() and Close() forward to every * underlying condition. If constructed with no conditions, GetFuture() * returns a Future that never resolves. + * + * Not thread-safe: the caller is responsible for serializing all method + * calls. */ class Conditions final { public: From ce63d8dacdcdbe0006d9d6cb0f426ff6e836cba9 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 12 May 2026 15:26:52 -0700 Subject: [PATCH 7/8] docs: fix stale condition and orchestrator doc comments --- .../source/ifdv2_condition.hpp | 8 +-- .../src/data_systems/fdv2/conditions.hpp | 3 +- .../data_systems/fdv2/fdv2_data_system.cpp | 59 +++++++++---------- .../data_systems/fdv2/fdv2_data_system.hpp | 2 +- 4 files changed, 33 insertions(+), 39 deletions(-) diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp index b693dcd97..ea19f0520 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp @@ -27,8 +27,8 @@ namespace launchdarkly::server_side::data_interfaces { * orchestrator builds a fresh condition for each synchronizer activation via * an IFDv2ConditionFactory. * - * Close() cancels any pending internal work (e.g., a timer) without resolving - * the future. After Close() returns the condition's future will not resolve. + * Close() cancels any pending internal work (e.g., a timer) and resolves the + * future with kCancelled. * * Implementations must be thread-safe: Execute, Inform, Close, and GetType * may be called from any thread. @@ -58,8 +58,8 @@ class IFDv2Condition { virtual void Inform(FDv2SourceResult const& result) = 0; /** - * Cancels any pending internal work and ensures the future will not - * resolve. Idempotent. + * Cancels any pending internal work and resolves the future returned by + * Execute() with kCancelled if it has not already resolved. Idempotent. */ virtual void Close() = 0; diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.hpp b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp index ffe1ff1e0..f230e9a07 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.hpp @@ -138,8 +138,7 @@ class RecoveryConditionFactory final * underlying condition. If constructed with no conditions, GetFuture() * returns a Future that never resolves. * - * Not thread-safe: the caller is responsible for serializing all method - * calls. + * Thread-safe: GetFuture, Inform, and Close may be called from any thread. */ class Conditions final { public: diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index b7b5b2fc6..b3caa5aba 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -314,38 +314,33 @@ void FDv2DataSystem::OnSynchronizerResult( bool got_shutdown = false; bool advance = false; - std::visit( - overloaded{ - [&](Result::ChangeSet& cs) { - ApplyChangeSet(std::move(cs.change_set)); - }, - [&](Result::Shutdown&) { got_shutdown = true; }, - [&](Result::Interrupted const& iv) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() - << ": synchronizer interrupted: " << iv.error.Message(); - status_manager_->SetState( - DataSourceStatus::DataSourceState::kInterrupted, - iv.error.Kind(), iv.error.Message()); - }, - [&](Result::TerminalError const& te) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() - << ": synchronizer terminal error: " << te.error.Message(); - status_manager_->SetState( - DataSourceStatus::DataSourceState::kInterrupted, - te.error.Kind(), te.error.Message()); - advance = true; - }, - [&](Result::Goodbye const& gb) { - // The synchronizer handles goodbye internally (reconnects); - // the orchestrator just loops on the same source. - LD_LOG(logger_, LogLevel::kDebug) - << Identity() << ": ignoring goodbye from synchronizer" - << (gb.reason ? ": " + *gb.reason : ""); - }, - }, - result.value); + std::visit(overloaded{ + [&](Result::ChangeSet& cs) { + ApplyChangeSet(std::move(cs.change_set)); + }, + [&](Result::Shutdown&) { got_shutdown = true; }, + [&](Result::Interrupted const& iv) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() << ": synchronizer interrupted: " + << iv.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + iv.error.Kind(), iv.error.Message()); + }, + [&](Result::TerminalError const& te) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() << ": synchronizer terminal error: " + << te.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + te.error.Kind(), te.error.Message()); + advance = true; + }, + [&](Result::Goodbye const&) { + // The synchronizer handles this internally. + }, + }, + result.value); { std::lock_guard lock(mutex_); diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp index 02b059077..7957de429 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp @@ -99,7 +99,7 @@ namespace launchdarkly::server_side::data_systems { * | Initializer phase| ChangeSet(no selector) -> stay, N += 1 * | N = 0, 1, 2, ... | ChangeSet(selector) -> go to Sync * | | Interrupted/Terminal -> stay, N += 1 - * | | Goodbye/Timeout -> stay, N += 1 + * | | Goodbye -> stay, N += 1 * | | Shutdown -> [Closed] * +-------------------+ * | From 0bb7c75540316b95f570492155ab560a7651f4ef Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 12 May 2026 16:55:52 -0700 Subject: [PATCH 8/8] fix: resolve promise outside state mutex in TimedCondition::Close --- .../src/data_systems/fdv2/conditions.cpp | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp index ad3d158ad..2d013c497 100644 --- a/libs/server-sdk/src/data_systems/fdv2/conditions.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/conditions.cpp @@ -26,14 +26,16 @@ async::Future TimedCondition::Execute() { } void TimedCondition::Close() { - std::lock_guard lock(state_->mutex); - if (state_->closed) { - return; - } - state_->closed = true; - if (state_->timer_cancel) { - state_->timer_cancel->Cancel(); - state_->timer_cancel.reset(); + { + std::lock_guard lock(state_->mutex); + if (state_->closed) { + return; + } + state_->closed = true; + if (state_->timer_cancel) { + state_->timer_cancel->Cancel(); + state_->timer_cancel.reset(); + } } state_->promise.Resolve(IFDv2Condition::Type::kCancelled); }