diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index ad2b69637b4b3..0e3e705e35547 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1492,6 +1492,17 @@ steps: composition: launchdarkly-flag-consistency branches: "main" + - id: launchdarkly-reconnect + label: "LaunchDarkly reconnect" + depends_on: build-aarch64 + timeout_in_minutes: 30 + agents: + # Uses a mock LaunchDarkly server; needs no real credentials. + queue: hetzner-aarch64-4cpu-8gb + plugins: + - ./ci/plugins/mzcompose: + composition: launchdarkly-reconnect + - group: E2E key: e2e steps: diff --git a/src/adapter/src/config.rs b/src/adapter/src/config.rs index 46ee79ce1ed69..82fdfe03a5709 100644 --- a/src/adapter/src/config.rs +++ b/src/adapter/src/config.rs @@ -109,6 +109,11 @@ pub enum SystemParameterSyncClientConfig { LaunchDarkly { /// The LaunchDarkly SDK key sdk_key: String, + /// Overrides the LaunchDarkly streaming, polling, and events endpoints + /// with a single base URL (as for a relay proxy). `None` uses + /// LaunchDarkly's default endpoints. Primarily for pointing the SDK at + /// a mock server in tests. + base_uri: Option, /// Function to return the current time. now_fn: NowFn, }, diff --git a/src/adapter/src/config/frontend.rs b/src/adapter/src/config/frontend.rs index bc65fb7a293bd..c0db692e6ac2e 100644 --- a/src/adapter/src/config/frontend.rs +++ b/src/adapter/src/config/frontend.rs @@ -82,9 +82,14 @@ impl SystemParameterFrontend { build_info: sync_config.build_info, metrics: sync_config.metrics.clone(), }), - SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self { + SystemParameterSyncClientConfig::LaunchDarkly { + sdk_key, + base_uri, + now_fn, + } => Ok(Self { client: SystemParameterFrontendClient::LaunchDarkly { - client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?, + client: ld_client(sdk_key, base_uri.as_deref(), &sync_config.metrics, now_fn) + .await?, // The environment-wide context carries no cluster/replica // scope. Scoped evaluation passes a `cluster` or `replica` // context per pass via [`ld_ctx`]. @@ -347,8 +352,8 @@ pub struct ClusterEvalContext { pub cluster: ClusterScopeContext, } -fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config { - ld::ConfigBuilder::new(api_key) +fn ld_config(api_key: &str, base_uri: Option<&str>, metrics: &Metrics) -> ld::Config { + let mut config = ld::ConfigBuilder::new(api_key) .event_processor( ld::EventProcessorBuilder::new() .https_connector(HttpsConnector::new()) @@ -365,17 +370,22 @@ fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config { }) }), ) - .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new())) - .build() - .expect("valid config") + .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new())); + if let Some(base_uri) = base_uri { + let mut endpoints = ld::ServiceEndpointsBuilder::new(); + endpoints.relay_proxy(base_uri); + config = config.service_endpoints(&endpoints); + } + config.build().expect("valid config") } async fn ld_client( api_key: &str, + base_uri: Option<&str>, metrics: &Metrics, now_fn: &NowFn, ) -> Result { - let ld_client = ld::Client::build(ld_config(api_key, metrics))?; + let ld_client = ld::Client::build(ld_config(api_key, base_uri, metrics))?; tracing::info!("waiting for SystemParameterFrontend to initialize"); // Start and initialize LD client for the frontend. The callback passed // will export the last time when an SSE event from the LD server was diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index 489b63464507e..da01d01aff3ca 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -434,6 +434,11 @@ pub struct Args { /// configuration parameters. #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")] launchdarkly_sdk_key: Option, + /// Overrides the LaunchDarkly streaming, polling, and events endpoints with + /// a single base URL, as for a relay proxy. Primarily intended for pointing + /// the SDK at a mock LaunchDarkly server in tests. + #[clap(long, env = "LAUNCHDARKLY_BASE_URI", value_name = "URL")] + launchdarkly_base_uri: Option, /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to /// LaunchDarkly feature keys. /// @@ -1115,6 +1120,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { segment_client_side: args.segment_client_side, test_only_dummy_segment_client: args.test_only_dummy_segment_client, launchdarkly_sdk_key: args.launchdarkly_sdk_key, + launchdarkly_base_uri: args.launchdarkly_base_uri, launchdarkly_key_map: args .launchdarkly_key_map .into_iter() diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index d04e8487c0202..246ed69497c75 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -166,6 +166,9 @@ pub struct Config { /// An SDK key for LaunchDarkly. Enables system parameter synchronization /// with LaunchDarkly. pub launchdarkly_sdk_key: Option, + /// Overrides the LaunchDarkly service endpoints with a single base URL, as + /// for a relay proxy or a mock server in tests. + pub launchdarkly_base_uri: Option, /// An invertible map from system parameter names to LaunchDarkly feature /// keys to use when propagating values from the latter to the former. pub launchdarkly_key_map: BTreeMap, @@ -480,6 +483,7 @@ impl Listeners { config.launchdarkly_key_map, SystemParameterSyncClientConfig::LaunchDarkly { sdk_key: key, + base_uri: config.launchdarkly_base_uri, now_fn: config.now.clone(), }, )), diff --git a/src/environmentd/src/test_util.rs b/src/environmentd/src/test_util.rs index ec878b33b206e..b29bd530d32c3 100644 --- a/src/environmentd/src/test_util.rs +++ b/src/environmentd/src/test_util.rs @@ -903,6 +903,7 @@ impl Listeners { aws_account_id: None, aws_privatelink_availability_zones: None, launchdarkly_sdk_key: None, + launchdarkly_base_uri: None, launchdarkly_key_map: Default::default(), config_sync_file_path: None, config_sync_timeout: Duration::from_secs(30), diff --git a/src/sqllogictest/src/runner.rs b/src/sqllogictest/src/runner.rs index 9033442ffccdd..a86b2caf09e29 100644 --- a/src/sqllogictest/src/runner.rs +++ b/src/sqllogictest/src/runner.rs @@ -1281,6 +1281,7 @@ impl<'a> RunnerInner<'a> { aws_account_id: None, aws_privatelink_availability_zones: None, launchdarkly_sdk_key: None, + launchdarkly_base_uri: None, launchdarkly_key_map: Default::default(), config_sync_file_path: None, config_sync_timeout: Duration::from_secs(30), diff --git a/test/launchdarkly-reconnect/mock_ld.py b/test/launchdarkly-reconnect/mock_ld.py new file mode 100644 index 0000000000000..f5a726bc0f16d --- /dev/null +++ b/test/launchdarkly-reconnect/mock_ld.py @@ -0,0 +1,149 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +A minimal mock of the LaunchDarkly streaming API, used to exercise the SDK's +reconnect behavior (see mzcompose.py). + +The first streaming client receives an initial flag value and then has its +connection reset mid-stream -- a non-Eof transport error, reproducing the +failure mode of incident-984. Every subsequent (reconnecting) client receives +an updated value. The test therefore asserts that the value changed, which can +only happen if the data source reconnected after the reset. +""" + +import json +import socket +import struct +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +PORT = 8080 +FLAG_KEY = "reconnect-test" + +# Number variations served before and after the forced reconnect. The SDK maps +# these onto the `max_result_size` system parameter (2 GiB and 3 GiB). +INITIAL_VALUE = 2147483648 +RECONNECT_VALUE = 3221225472 + +# Seconds to hold the first connection open before resetting it, leaving time +# for the SDK to apply the initial value. +HOLD_BEFORE_RESET = 3.0 + +_lock = threading.Lock() +_stream_connections = 0 + + +def put_event(value: int, version: int) -> bytes: + """Serialize a LaunchDarkly streaming `put` event carrying a single flag + that, with targeting off, evaluates to `value`.""" + flag = { + "key": FLAG_KEY, + "version": version, + "on": False, + "targets": [], + "rules": [], + "prerequisites": [], + "fallthrough": {"variation": 0}, + "offVariation": 0, + "variations": [value], + "salt": "reconnect-test-salt", + "clientSideAvailability": { + "usingMobileKey": False, + "usingEnvironmentId": False, + }, + } + payload = {"path": "/", "data": {"flags": {FLAG_KEY: flag}, "segments": {}}} + return f"event: put\ndata: {json.dumps(payload)}\n\n".encode() + + +class Handler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def log_message(self, format: str, *args: object) -> None: + sys.stderr.write("mock-ld: " + (format % args) + "\n") + + def _reset_connection(self) -> None: + # Force a TCP RST instead of a clean FIN, so the SDK sees a non-Eof + # transport error (as in incident-984) rather than the Eof it has + # always recovered from. + self.connection.setsockopt( + socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0) + ) + self.connection.close() + + def _send_stream_headers(self) -> None: + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.end_headers() + + def do_GET(self) -> None: + if self.path == "/health": + self.send_response(200) + self.send_header("Content-Length", "0") + self.end_headers() + return + + if self.path != "/all": + self.send_response(404) + self.send_header("Content-Length", "0") + self.end_headers() + return + + global _stream_connections + with _lock: + _stream_connections += 1 + n = _stream_connections + + self._send_stream_headers() + + if n == 1: + # First client: send the initial value, then reset mid-stream. + self.wfile.write(put_event(INITIAL_VALUE, 1)) + self.wfile.flush() + time.sleep(HOLD_BEFORE_RESET) + self.log_message("resetting first streaming connection") + self._reset_connection() + self.close_connection = True + return + + # Reconnecting client: send the updated value, then hold the connection + # open with periodic heartbeats so the SDK stays connected. + self.wfile.write(put_event(RECONNECT_VALUE, 2)) + self.wfile.flush() + try: + while True: + time.sleep(5) + self.wfile.write(b":heartbeat\n\n") + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, OSError): + return + + def do_POST(self) -> None: + # The event processor POSTs analytics events to `/bulk`; accept and + # discard them so it doesn't log errors. + length = int(self.headers.get("Content-Length", 0)) + if length: + self.rfile.read(length) + self.send_response(202) + self.send_header("Content-Length", "0") + self.end_headers() + + +def main() -> None: + server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler) + sys.stderr.write(f"mock-ld: listening on {PORT}\n") + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/test/launchdarkly-reconnect/mzcompose.py b/test/launchdarkly-reconnect/mzcompose.py new file mode 100644 index 0000000000000..e26fb6851d93a --- /dev/null +++ b/test/launchdarkly-reconnect/mzcompose.py @@ -0,0 +1,80 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +Regression test for incident-984: the LaunchDarkly data source must reconnect +after its streaming connection drops with a non-Eof error, so that flag updates +keep syncing. + +A mock LaunchDarkly server (mock_ld.py) serves an initial flag value, resets the +first streaming connection mid-stream (reproducing the incident's non-Eof +transport error), and serves an updated value to every reconnecting client. +environmentd is pointed at the mock via MZ_LAUNCHDARKLY_BASE_URI, so the updated +value can only reach it if the data source reconnected after the reset. A +regressed SDK gets stuck on the initial value and the assertion below times out. + +Unlike test/launchdarkly, this needs no real LaunchDarkly credentials. +""" + +from materialize.mzcompose.composition import Composition, Service +from materialize.mzcompose.service import Service as DockerService +from materialize.mzcompose.services.materialized import Materialized +from materialize.mzcompose.services.testdrive import Testdrive + +FLAG_KEY = "reconnect-test" +MOCK_HOST = "mock-launchdarkly" +MOCK_PORT = 8080 + +SERVICES = [ + DockerService( + name=MOCK_HOST, + config={ + "image": "python:3.12-slim", + "volumes": ["./mock_ld.py:/app/mock_ld.py"], + "command": ["python3", "-u", "/app/mock_ld.py"], + "ports": [MOCK_PORT], + "healthcheck": { + "test": [ + "CMD", + "python3", + "-c", + "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')", + ], + "interval": "1s", + "start_period": "30s", + }, + }, + ), + Materialized( + environment_extra=[ + "MZ_LAUNCHDARKLY_SDK_KEY=sdk-mock-key", + f"MZ_LAUNCHDARKLY_BASE_URI=http://{MOCK_HOST}:{MOCK_PORT}", + f"MZ_LAUNCHDARKLY_KEY_MAP=max_result_size={FLAG_KEY}", + "MZ_CONFIG_SYNC_LOOP_INTERVAL=1s", + ], + additional_system_parameter_defaults={ + "log_filter": "mz_adapter::config=debug,launchdarkly_server_sdk=debug,info", + }, + external_metadata_store=True, + ), + # The reconnect (eventsource backoff) plus the 1s sync loop means the + # updated value can take several seconds to land; give it ample room. + Testdrive(no_reset=True, seed=1, default_timeout="120s"), +] + + +def workflow_default(c: Composition) -> None: + c.up(MOCK_HOST, "materialized", Service("testdrive", idle=True)) + + # The mock serves 2 GiB on the first streaming connection, resets it + # mid-stream, then serves 3 GiB to every reconnecting client. Reaching + # 3 GiB therefore proves the data source reconnected after a non-Eof error; + # a regressed SDK stays stuck at 2 GiB and this assertion times out. We + # don't assert the transient 2 GiB value, as the reset can race startup. + c.testdrive("\n".join(["> SHOW max_result_size", "3GB"]))