Skip to content
Open
24 changes: 24 additions & 0 deletions libs/internal/include/launchdarkly/async/promise.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,30 @@ Future<std::size_t> WhenAny(Future<Ts>... futures) {
return result;
}

// Vector overload of WhenAny. Returns a Future<std::size_t> that resolves with
// the 0-based index of whichever input future resolves first. If the vector is
// empty, the returned future never resolves.
template <typename T>
Future<std::size_t> WhenAny(std::vector<Future<T>> const& futures) {
Promise<std::size_t> promise;
Future<std::size_t> result = promise.GetFuture();

auto shared_promise =
std::make_shared<Promise<std::size_t>>(std::move(promise));

for (std::size_t i = 0; i < futures.size(); ++i) {
Future<T> future = futures[i];
future.Then(
[shared_promise, i](T const&) -> std::monostate {
shared_promise->Resolve(i);
return std::monostate{};
},
[](Continuation<void()> f) { f(); });
}

return result;
}

// MakeFuture returns an already-resolved Future<T>. Useful in flattening Then
// continuations where some branches produce a result immediately and others
// return a Future, requiring a uniform Future<T> return type across all
Expand Down
28 changes: 28 additions & 0 deletions libs/internal/tests/promise_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -416,6 +416,34 @@ TEST(WhenAny, MixedTypesFirstWins) {
EXPECT_EQ(std::get<std::string>(*result.GetResult()), "hello");
}

// Verifies that the vector overload of WhenAny resolves with the index of
// the first future to resolve.
TEST(WhenAny, VectorFirstResolved) {
Promise<int> p0;
Promise<int> p1;
Promise<int> p2;

std::vector<Future<int>> futures{p0.GetFuture(), p1.GetFuture(),
p2.GetFuture()};
Future<std::size_t> result = WhenAny(futures);

EXPECT_FALSE(result.IsFinished());
p2.Resolve(99);

auto r = result.WaitForResult(std::chrono::seconds(5));
ASSERT_TRUE(r.has_value());
EXPECT_EQ(*r, 2u);
}

// Verifies that an empty vector produces a future that never resolves.
TEST(WhenAny, VectorEmptyNeverResolves) {
std::vector<Future<int>> futures;
Future<std::size_t> result = WhenAny(futures);

auto r = result.WaitForResult(std::chrono::milliseconds(50));
EXPECT_FALSE(r.has_value());
}

// Verifies that WhenAny resolves immediately if a future is already resolved.
TEST(WhenAny, AlreadyResolved) {
Promise<int> p0;
Expand Down
4 changes: 4 additions & 0 deletions libs/server-sdk/src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,10 @@ target_sources(${LIBNAME}
data_systems/fdv2/polling_synchronizer.cpp
data_systems/fdv2/streaming_synchronizer.hpp
data_systems/fdv2/streaming_synchronizer.cpp
data_systems/fdv2/conditions.hpp
data_systems/fdv2/conditions.cpp
data_systems/fdv2/source_manager.hpp
data_systems/fdv2/source_manager.cpp
data_systems/fdv2/fdv2_data_system.hpp
data_systems/fdv2/fdv2_data_system.cpp
data_systems/background_sync/sources/streaming/streaming_data_source.hpp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,17 +56,8 @@ struct FDv2SourceResult {
bool fdv1_fallback;
};

/**
* Next() returned because the timeout expired before a result arrived.
*/
struct Timeout {};

using Value = std::variant<ChangeSet,
Interrupted,
TerminalError,
Shutdown,
Goodbye,
Timeout>;
using Value =
std::variant<ChangeSet, Interrupted, TerminalError, Shutdown, Goodbye>;

Value value;
};
Expand Down
107 changes: 107 additions & 0 deletions libs/server-sdk/src/data_interfaces/source/ifdv2_condition.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
#pragma once

#include "fdv2_source_result.hpp"

#include <launchdarkly/async/promise.hpp>

#include <memory>

namespace launchdarkly::server_side::data_interfaces {

/**
* A condition observes the orchestrator's stream of synchronizer results and
* fires when criteria for a synchronizer transition are met.
*
* Each condition plays one of two roles, identified by Type():
* - kFallback: when fired, the orchestrator stops the active synchronizer
* and starts the next-preferred one.
* - kRecovery: when fired, the orchestrator stops the active fallback
* synchronizer and returns to the most-preferred synchronizer.
*
* Conditions are stateful: the orchestrator pushes results into a condition
* via Inform() so the condition can update its internal state (typically a
* timer). When the condition's criteria are satisfied, the future returned
* by Execute() resolves with the condition's Type.
*
* Conditions are single-use: once fired, they are not re-armed. The
* orchestrator builds a fresh condition for each synchronizer activation via
* an IFDv2ConditionFactory.
*
* Close() cancels any pending internal work (e.g., a timer) and resolves the
* future with kCancelled.
*
* Implementations must be thread-safe: Execute, Inform, Close, and GetType
* may be called from any thread.
*/
class IFDv2Condition {
public:
enum class Type {
/** Stop the active synchronizer and start the next-preferred one. */
kFallback,
/** Return to the most-preferred synchronizer. */
kRecovery,
/** The condition was Close()d before firing; orchestrator ignores. */
kCancelled,
};

/**
* Returns a Future that resolves with the condition's Type once the
* condition's criteria are satisfied. May be called multiple times; each
* call returns a Future referring to the same underlying state.
*/
[[nodiscard]] virtual async::Future<Type> Execute() = 0;

/**
* Pushes a synchronizer result into the condition so it can update any
* internal state (e.g., arm or cancel a timer).
*/
virtual void Inform(FDv2SourceResult const& result) = 0;

/**
* Cancels any pending internal work and resolves the future returned by
* Execute() with kCancelled if it has not already resolved. Idempotent.
*/
virtual void Close() = 0;

/**
* Returns the condition's role in the orchestrator.
*/
[[nodiscard]] virtual Type GetType() const = 0;

virtual ~IFDv2Condition() = default;
IFDv2Condition(IFDv2Condition const&) = delete;
IFDv2Condition(IFDv2Condition&&) = delete;
IFDv2Condition& operator=(IFDv2Condition const&) = delete;
IFDv2Condition& operator=(IFDv2Condition&&) = delete;

protected:
IFDv2Condition() = default;
};

/**
* Builds new IFDv2Condition instances on demand. Each call to Build() produces
* a fresh condition with no prior state.
*
* Implementations must be thread-safe: Build and GetType may be called from
* any thread.
*/
class IFDv2ConditionFactory {
public:
[[nodiscard]] virtual std::unique_ptr<IFDv2Condition> Build() = 0;

/**
* Returns the type of conditions this factory builds.
*/
[[nodiscard]] virtual IFDv2Condition::Type GetType() const = 0;

virtual ~IFDv2ConditionFactory() = default;
IFDv2ConditionFactory(IFDv2ConditionFactory const&) = delete;
IFDv2ConditionFactory(IFDv2ConditionFactory&&) = delete;
IFDv2ConditionFactory& operator=(IFDv2ConditionFactory const&) = delete;
IFDv2ConditionFactory& operator=(IFDv2ConditionFactory&&) = delete;

protected:
IFDv2ConditionFactory() = default;
};

} // namespace launchdarkly::server_side::data_interfaces
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <launchdarkly/async/promise.hpp>
#include <launchdarkly/data_model/selector.hpp>

#include <chrono>
#include <string>

namespace launchdarkly::server_side::data_interfaces {
Expand All @@ -20,25 +19,19 @@ namespace launchdarkly::server_side::data_interfaces {
class IFDv2Synchronizer {
public:
/**
* Returns a Future that resolves with the next result once it is available
* or the timeout expires.
* Returns a Future that resolves with the next result once it is
* available.
*
* On the first call, the synchronizer starts its underlying connection.
* Subsequent calls continue reading from the same connection.
*
* If the timeout expires before a result arrives, the future resolves with
* FDv2SourceResult::Timeout. The orchestrator uses this to evaluate
* fallback conditions.
*
* Close() may be called from another thread to unblock Next(), in which
* case the future resolves with FDv2SourceResult::Shutdown.
*
* @param timeout Maximum time to wait for the next result.
* @param selector The selector to send with the request, reflecting any
* changesets applied since the previous call.
*/
virtual async::Future<FDv2SourceResult> Next(
std::chrono::milliseconds timeout,
data_model::Selector selector) = 0;

/**
Expand Down
Loading
Loading