From 753bbb4c15154034e98fb36dc7028faaa89c8a4a Mon Sep 17 00:00:00 2001 From: Jason Hernandez <7144515+jasonhernandez@users.noreply.github.com> Date: Tue, 30 Jun 2026 13:03:19 -0700 Subject: [PATCH] adapter: add LaunchDarkly reconnect integration test Add an integration test that reproduces incident-984: the LaunchDarkly data source must reconnect after its streaming connection drops with a non-Eof error, so flag updates keep syncing. This lands ahead of the upstream SDK bump so the behavior is characterized against the current (fork) SDK first, and the bump must keep the test green. To make this testable against a controlled server, add a hidden `--launchdarkly-base-uri` flag (env `LAUNCHDARKLY_BASE_URI`) that overrides the SDK's streaming, polling, and events endpoints with a single base URL via the SDK's relay-proxy support. This is also generally useful for pointing at a LaunchDarkly relay proxy. It is threaded through `SystemParameterSyncClientConfig::LaunchDarkly` into `ld_config`. The test (test/launchdarkly-reconnect) runs a mock LaunchDarkly streaming server that serves an initial flag value, resets the first streaming connection mid-stream with a TCP RST (a non-Eof transport error, as in the incident), and serves an updated value to every reconnecting client. environmentd is pointed at the mock, so reaching the updated value proves the data source reconnected; a regressed SDK stays stuck on the initial value. Unlike test/launchdarkly, this needs no real LaunchDarkly credentials, and runs in the nightly pipeline. Co-Authored-By: Claude Opus 4.8 (1M context) --- ci/nightly/pipeline.template.yml | 11 ++ src/adapter/src/config.rs | 5 + src/adapter/src/config/frontend.rs | 26 ++-- src/environmentd/src/environmentd/main.rs | 6 + src/environmentd/src/lib.rs | 4 + src/environmentd/src/test_util.rs | 1 + src/sqllogictest/src/runner.rs | 1 + test/launchdarkly-reconnect/mock_ld.py | 149 ++++++++++++++++++++++ test/launchdarkly-reconnect/mzcompose.py | 80 ++++++++++++ 9 files changed, 275 insertions(+), 8 deletions(-) create mode 100644 test/launchdarkly-reconnect/mock_ld.py create mode 100644 test/launchdarkly-reconnect/mzcompose.py diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index ad2b69637b4b3..0e3e705e35547 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1492,6 +1492,17 @@ steps: composition: launchdarkly-flag-consistency branches: "main" + - id: launchdarkly-reconnect + label: "LaunchDarkly reconnect" + depends_on: build-aarch64 + timeout_in_minutes: 30 + agents: + # Uses a mock LaunchDarkly server; needs no real credentials. + queue: hetzner-aarch64-4cpu-8gb + plugins: + - ./ci/plugins/mzcompose: + composition: launchdarkly-reconnect + - group: E2E key: e2e steps: diff --git a/src/adapter/src/config.rs b/src/adapter/src/config.rs index 46ee79ce1ed69..82fdfe03a5709 100644 --- a/src/adapter/src/config.rs +++ b/src/adapter/src/config.rs @@ -109,6 +109,11 @@ pub enum SystemParameterSyncClientConfig { LaunchDarkly { /// The LaunchDarkly SDK key sdk_key: String, + /// Overrides the LaunchDarkly streaming, polling, and events endpoints + /// with a single base URL (as for a relay proxy). `None` uses + /// LaunchDarkly's default endpoints. Primarily for pointing the SDK at + /// a mock server in tests. + base_uri: Option, /// Function to return the current time. now_fn: NowFn, }, diff --git a/src/adapter/src/config/frontend.rs b/src/adapter/src/config/frontend.rs index bc65fb7a293bd..c0db692e6ac2e 100644 --- a/src/adapter/src/config/frontend.rs +++ b/src/adapter/src/config/frontend.rs @@ -82,9 +82,14 @@ impl SystemParameterFrontend { build_info: sync_config.build_info, metrics: sync_config.metrics.clone(), }), - SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self { + SystemParameterSyncClientConfig::LaunchDarkly { + sdk_key, + base_uri, + now_fn, + } => Ok(Self { client: SystemParameterFrontendClient::LaunchDarkly { - client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?, + client: ld_client(sdk_key, base_uri.as_deref(), &sync_config.metrics, now_fn) + .await?, // The environment-wide context carries no cluster/replica // scope. Scoped evaluation passes a `cluster` or `replica` // context per pass via [`ld_ctx`]. @@ -347,8 +352,8 @@ pub struct ClusterEvalContext { pub cluster: ClusterScopeContext, } -fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config { - ld::ConfigBuilder::new(api_key) +fn ld_config(api_key: &str, base_uri: Option<&str>, metrics: &Metrics) -> ld::Config { + let mut config = ld::ConfigBuilder::new(api_key) .event_processor( ld::EventProcessorBuilder::new() .https_connector(HttpsConnector::new()) @@ -365,17 +370,22 @@ fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config { }) }), ) - .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new())) - .build() - .expect("valid config") + .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new())); + if let Some(base_uri) = base_uri { + let mut endpoints = ld::ServiceEndpointsBuilder::new(); + endpoints.relay_proxy(base_uri); + config = config.service_endpoints(&endpoints); + } + config.build().expect("valid config") } async fn ld_client( api_key: &str, + base_uri: Option<&str>, metrics: &Metrics, now_fn: &NowFn, ) -> Result { - let ld_client = ld::Client::build(ld_config(api_key, metrics))?; + let ld_client = ld::Client::build(ld_config(api_key, base_uri, metrics))?; tracing::info!("waiting for SystemParameterFrontend to initialize"); // Start and initialize LD client for the frontend. The callback passed // will export the last time when an SSE event from the LD server was diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index 489b63464507e..da01d01aff3ca 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -434,6 +434,11 @@ pub struct Args { /// configuration parameters. #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")] launchdarkly_sdk_key: Option, + /// Overrides the LaunchDarkly streaming, polling, and events endpoints with + /// a single base URL, as for a relay proxy. Primarily intended for pointing + /// the SDK at a mock LaunchDarkly server in tests. + #[clap(long, env = "LAUNCHDARKLY_BASE_URI", value_name = "URL")] + launchdarkly_base_uri: Option, /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to /// LaunchDarkly feature keys. /// @@ -1115,6 +1120,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { segment_client_side: args.segment_client_side, test_only_dummy_segment_client: args.test_only_dummy_segment_client, launchdarkly_sdk_key: args.launchdarkly_sdk_key, + launchdarkly_base_uri: args.launchdarkly_base_uri, launchdarkly_key_map: args .launchdarkly_key_map .into_iter() diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index d04e8487c0202..246ed69497c75 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -166,6 +166,9 @@ pub struct Config { /// An SDK key for LaunchDarkly. Enables system parameter synchronization /// with LaunchDarkly. pub launchdarkly_sdk_key: Option, + /// Overrides the LaunchDarkly service endpoints with a single base URL, as + /// for a relay proxy or a mock server in tests. + pub launchdarkly_base_uri: Option, /// An invertible map from system parameter names to LaunchDarkly feature /// keys to use when propagating values from the latter to the former. pub launchdarkly_key_map: BTreeMap, @@ -480,6 +483,7 @@ impl Listeners { config.launchdarkly_key_map, SystemParameterSyncClientConfig::LaunchDarkly { sdk_key: key, + base_uri: config.launchdarkly_base_uri, now_fn: config.now.clone(), }, )), diff --git a/src/environmentd/src/test_util.rs b/src/environmentd/src/test_util.rs index ec878b33b206e..b29bd530d32c3 100644 --- a/src/environmentd/src/test_util.rs +++ b/src/environmentd/src/test_util.rs @@ -903,6 +903,7 @@ impl Listeners { aws_account_id: None, aws_privatelink_availability_zones: None, launchdarkly_sdk_key: None, + launchdarkly_base_uri: None, launchdarkly_key_map: Default::default(), config_sync_file_path: None, config_sync_timeout: Duration::from_secs(30), diff --git a/src/sqllogictest/src/runner.rs b/src/sqllogictest/src/runner.rs index 9033442ffccdd..a86b2caf09e29 100644 --- a/src/sqllogictest/src/runner.rs +++ b/src/sqllogictest/src/runner.rs @@ -1281,6 +1281,7 @@ impl<'a> RunnerInner<'a> { aws_account_id: None, aws_privatelink_availability_zones: None, launchdarkly_sdk_key: None, + launchdarkly_base_uri: None, launchdarkly_key_map: Default::default(), config_sync_file_path: None, config_sync_timeout: Duration::from_secs(30), diff --git a/test/launchdarkly-reconnect/mock_ld.py b/test/launchdarkly-reconnect/mock_ld.py new file mode 100644 index 0000000000000..f5a726bc0f16d --- /dev/null +++ b/test/launchdarkly-reconnect/mock_ld.py @@ -0,0 +1,149 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +A minimal mock of the LaunchDarkly streaming API, used to exercise the SDK's +reconnect behavior (see mzcompose.py). + +The first streaming client receives an initial flag value and then has its +connection reset mid-stream -- a non-Eof transport error, reproducing the +failure mode of incident-984. Every subsequent (reconnecting) client receives +an updated value. The test therefore asserts that the value changed, which can +only happen if the data source reconnected after the reset. +""" + +import json +import socket +import struct +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +PORT = 8080 +FLAG_KEY = "reconnect-test" + +# Number variations served before and after the forced reconnect. The SDK maps +# these onto the `max_result_size` system parameter (2 GiB and 3 GiB). +INITIAL_VALUE = 2147483648 +RECONNECT_VALUE = 3221225472 + +# Seconds to hold the first connection open before resetting it, leaving time +# for the SDK to apply the initial value. +HOLD_BEFORE_RESET = 3.0 + +_lock = threading.Lock() +_stream_connections = 0 + + +def put_event(value: int, version: int) -> bytes: + """Serialize a LaunchDarkly streaming `put` event carrying a single flag + that, with targeting off, evaluates to `value`.""" + flag = { + "key": FLAG_KEY, + "version": version, + "on": False, + "targets": [], + "rules": [], + "prerequisites": [], + "fallthrough": {"variation": 0}, + "offVariation": 0, + "variations": [value], + "salt": "reconnect-test-salt", + "clientSideAvailability": { + "usingMobileKey": False, + "usingEnvironmentId": False, + }, + } + payload = {"path": "/", "data": {"flags": {FLAG_KEY: flag}, "segments": {}}} + return f"event: put\ndata: {json.dumps(payload)}\n\n".encode() + + +class Handler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def log_message(self, format: str, *args: object) -> None: + sys.stderr.write("mock-ld: " + (format % args) + "\n") + + def _reset_connection(self) -> None: + # Force a TCP RST instead of a clean FIN, so the SDK sees a non-Eof + # transport error (as in incident-984) rather than the Eof it has + # always recovered from. + self.connection.setsockopt( + socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0) + ) + self.connection.close() + + def _send_stream_headers(self) -> None: + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.end_headers() + + def do_GET(self) -> None: + if self.path == "/health": + self.send_response(200) + self.send_header("Content-Length", "0") + self.end_headers() + return + + if self.path != "/all": + self.send_response(404) + self.send_header("Content-Length", "0") + self.end_headers() + return + + global _stream_connections + with _lock: + _stream_connections += 1 + n = _stream_connections + + self._send_stream_headers() + + if n == 1: + # First client: send the initial value, then reset mid-stream. + self.wfile.write(put_event(INITIAL_VALUE, 1)) + self.wfile.flush() + time.sleep(HOLD_BEFORE_RESET) + self.log_message("resetting first streaming connection") + self._reset_connection() + self.close_connection = True + return + + # Reconnecting client: send the updated value, then hold the connection + # open with periodic heartbeats so the SDK stays connected. + self.wfile.write(put_event(RECONNECT_VALUE, 2)) + self.wfile.flush() + try: + while True: + time.sleep(5) + self.wfile.write(b":heartbeat\n\n") + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, OSError): + return + + def do_POST(self) -> None: + # The event processor POSTs analytics events to `/bulk`; accept and + # discard them so it doesn't log errors. + length = int(self.headers.get("Content-Length", 0)) + if length: + self.rfile.read(length) + self.send_response(202) + self.send_header("Content-Length", "0") + self.end_headers() + + +def main() -> None: + server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler) + sys.stderr.write(f"mock-ld: listening on {PORT}\n") + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/test/launchdarkly-reconnect/mzcompose.py b/test/launchdarkly-reconnect/mzcompose.py new file mode 100644 index 0000000000000..e26fb6851d93a --- /dev/null +++ b/test/launchdarkly-reconnect/mzcompose.py @@ -0,0 +1,80 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +Regression test for incident-984: the LaunchDarkly data source must reconnect +after its streaming connection drops with a non-Eof error, so that flag updates +keep syncing. + +A mock LaunchDarkly server (mock_ld.py) serves an initial flag value, resets the +first streaming connection mid-stream (reproducing the incident's non-Eof +transport error), and serves an updated value to every reconnecting client. +environmentd is pointed at the mock via MZ_LAUNCHDARKLY_BASE_URI, so the updated +value can only reach it if the data source reconnected after the reset. A +regressed SDK gets stuck on the initial value and the assertion below times out. + +Unlike test/launchdarkly, this needs no real LaunchDarkly credentials. +""" + +from materialize.mzcompose.composition import Composition, Service +from materialize.mzcompose.service import Service as DockerService +from materialize.mzcompose.services.materialized import Materialized +from materialize.mzcompose.services.testdrive import Testdrive + +FLAG_KEY = "reconnect-test" +MOCK_HOST = "mock-launchdarkly" +MOCK_PORT = 8080 + +SERVICES = [ + DockerService( + name=MOCK_HOST, + config={ + "image": "python:3.12-slim", + "volumes": ["./mock_ld.py:/app/mock_ld.py"], + "command": ["python3", "-u", "/app/mock_ld.py"], + "ports": [MOCK_PORT], + "healthcheck": { + "test": [ + "CMD", + "python3", + "-c", + "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')", + ], + "interval": "1s", + "start_period": "30s", + }, + }, + ), + Materialized( + environment_extra=[ + "MZ_LAUNCHDARKLY_SDK_KEY=sdk-mock-key", + f"MZ_LAUNCHDARKLY_BASE_URI=http://{MOCK_HOST}:{MOCK_PORT}", + f"MZ_LAUNCHDARKLY_KEY_MAP=max_result_size={FLAG_KEY}", + "MZ_CONFIG_SYNC_LOOP_INTERVAL=1s", + ], + additional_system_parameter_defaults={ + "log_filter": "mz_adapter::config=debug,launchdarkly_server_sdk=debug,info", + }, + external_metadata_store=True, + ), + # The reconnect (eventsource backoff) plus the 1s sync loop means the + # updated value can take several seconds to land; give it ample room. + Testdrive(no_reset=True, seed=1, default_timeout="120s"), +] + + +def workflow_default(c: Composition) -> None: + c.up(MOCK_HOST, "materialized", Service("testdrive", idle=True)) + + # The mock serves 2 GiB on the first streaming connection, resets it + # mid-stream, then serves 3 GiB to every reconnecting client. Reaching + # 3 GiB therefore proves the data source reconnected after a non-Eof error; + # a regressed SDK stays stuck at 2 GiB and this assertion times out. We + # don't assert the transient 2 GiB value, as the reset can race startup. + c.testdrive("\n".join(["> SHOW max_result_size", "3GB"]))