Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions ci/nightly/pipeline.template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1492,6 +1492,17 @@ steps:
composition: launchdarkly-flag-consistency
branches: "main"

- id: launchdarkly-reconnect
label: "LaunchDarkly reconnect"
depends_on: build-aarch64
timeout_in_minutes: 30
agents:
# Uses a mock LaunchDarkly server; needs no real credentials.
queue: hetzner-aarch64-4cpu-8gb
plugins:
- ./ci/plugins/mzcompose:
composition: launchdarkly-reconnect

- group: E2E
key: e2e
steps:
Expand Down
5 changes: 5 additions & 0 deletions src/adapter/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,11 @@ pub enum SystemParameterSyncClientConfig {
LaunchDarkly {
/// The LaunchDarkly SDK key
sdk_key: String,
/// Overrides the LaunchDarkly streaming, polling, and events endpoints
/// with a single base URL (as for a relay proxy). `None` uses
/// LaunchDarkly's default endpoints. Primarily for pointing the SDK at
/// a mock server in tests.
base_uri: Option<String>,
/// Function to return the current time.
now_fn: NowFn,
},
Expand Down
26 changes: 18 additions & 8 deletions src/adapter/src/config/frontend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,9 +82,14 @@ impl SystemParameterFrontend {
build_info: sync_config.build_info,
metrics: sync_config.metrics.clone(),
}),
SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self {
SystemParameterSyncClientConfig::LaunchDarkly {
sdk_key,
base_uri,
now_fn,
} => Ok(Self {
client: SystemParameterFrontendClient::LaunchDarkly {
client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?,
client: ld_client(sdk_key, base_uri.as_deref(), &sync_config.metrics, now_fn)
.await?,
// The environment-wide context carries no cluster/replica
// scope. Scoped evaluation passes a `cluster` or `replica`
// context per pass via [`ld_ctx`].
Expand Down Expand Up @@ -347,8 +352,8 @@ pub struct ClusterEvalContext {
pub cluster: ClusterScopeContext,
}

fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config {
ld::ConfigBuilder::new(api_key)
fn ld_config(api_key: &str, base_uri: Option<&str>, metrics: &Metrics) -> ld::Config {
let mut config = ld::ConfigBuilder::new(api_key)
.event_processor(
ld::EventProcessorBuilder::new()
.https_connector(HttpsConnector::new())
Expand All @@ -365,17 +370,22 @@ fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config {
})
}),
)
.data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()))
.build()
.expect("valid config")
.data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()));
if let Some(base_uri) = base_uri {
let mut endpoints = ld::ServiceEndpointsBuilder::new();
endpoints.relay_proxy(base_uri);
config = config.service_endpoints(&endpoints);
}
config.build().expect("valid config")
}

async fn ld_client(
api_key: &str,
base_uri: Option<&str>,
metrics: &Metrics,
now_fn: &NowFn,
) -> Result<ld::Client, anyhow::Error> {
let ld_client = ld::Client::build(ld_config(api_key, metrics))?;
let ld_client = ld::Client::build(ld_config(api_key, base_uri, metrics))?;
tracing::info!("waiting for SystemParameterFrontend to initialize");
// Start and initialize LD client for the frontend. The callback passed
// will export the last time when an SSE event from the LD server was
Expand Down
6 changes: 6 additions & 0 deletions src/environmentd/src/environmentd/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,11 @@ pub struct Args {
/// configuration parameters.
#[clap(long, env = "LAUNCHDARKLY_SDK_KEY")]
launchdarkly_sdk_key: Option<String>,
/// Overrides the LaunchDarkly streaming, polling, and events endpoints with
/// a single base URL, as for a relay proxy. Primarily intended for pointing
/// the SDK at a mock LaunchDarkly server in tests.
#[clap(long, env = "LAUNCHDARKLY_BASE_URI", value_name = "URL")]
launchdarkly_base_uri: Option<String>,
/// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to
/// LaunchDarkly feature keys.
///
Expand Down Expand Up @@ -1115,6 +1120,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> {
segment_client_side: args.segment_client_side,
test_only_dummy_segment_client: args.test_only_dummy_segment_client,
launchdarkly_sdk_key: args.launchdarkly_sdk_key,
launchdarkly_base_uri: args.launchdarkly_base_uri,
launchdarkly_key_map: args
.launchdarkly_key_map
.into_iter()
Expand Down
4 changes: 4 additions & 0 deletions src/environmentd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,9 @@ pub struct Config {
/// An SDK key for LaunchDarkly. Enables system parameter synchronization
/// with LaunchDarkly.
pub launchdarkly_sdk_key: Option<String>,
/// Overrides the LaunchDarkly service endpoints with a single base URL, as
/// for a relay proxy or a mock server in tests.
pub launchdarkly_base_uri: Option<String>,
/// An invertible map from system parameter names to LaunchDarkly feature
/// keys to use when propagating values from the latter to the former.
pub launchdarkly_key_map: BTreeMap<String, String>,
Expand Down Expand Up @@ -480,6 +483,7 @@ impl Listeners {
config.launchdarkly_key_map,
SystemParameterSyncClientConfig::LaunchDarkly {
sdk_key: key,
base_uri: config.launchdarkly_base_uri,
now_fn: config.now.clone(),
},
)),
Expand Down
1 change: 1 addition & 0 deletions src/environmentd/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -903,6 +903,7 @@ impl Listeners {
aws_account_id: None,
aws_privatelink_availability_zones: None,
launchdarkly_sdk_key: None,
launchdarkly_base_uri: None,
launchdarkly_key_map: Default::default(),
config_sync_file_path: None,
config_sync_timeout: Duration::from_secs(30),
Expand Down
1 change: 1 addition & 0 deletions src/sqllogictest/src/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1281,6 +1281,7 @@ impl<'a> RunnerInner<'a> {
aws_account_id: None,
aws_privatelink_availability_zones: None,
launchdarkly_sdk_key: None,
launchdarkly_base_uri: None,
launchdarkly_key_map: Default::default(),
config_sync_file_path: None,
config_sync_timeout: Duration::from_secs(30),
Expand Down
149 changes: 149 additions & 0 deletions test/launchdarkly-reconnect/mock_ld.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,149 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""
A minimal mock of the LaunchDarkly streaming API, used to exercise the SDK's
reconnect behavior (see mzcompose.py).

The first streaming client receives an initial flag value and then has its
connection reset mid-stream -- a non-Eof transport error, reproducing the
failure mode of incident-984. Every subsequent (reconnecting) client receives
an updated value. The test therefore asserts that the value changed, which can
only happen if the data source reconnected after the reset.
"""

import json
import socket
import struct
import sys
import threading
import time
from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer

PORT = 8080
FLAG_KEY = "reconnect-test"

# Number variations served before and after the forced reconnect. The SDK maps
# these onto the `max_result_size` system parameter (2 GiB and 3 GiB).
INITIAL_VALUE = 2147483648
RECONNECT_VALUE = 3221225472

# Seconds to hold the first connection open before resetting it, leaving time
# for the SDK to apply the initial value.
HOLD_BEFORE_RESET = 3.0

_lock = threading.Lock()
_stream_connections = 0


def put_event(value: int, version: int) -> bytes:
"""Serialize a LaunchDarkly streaming `put` event carrying a single flag
that, with targeting off, evaluates to `value`."""
flag = {
"key": FLAG_KEY,
"version": version,
"on": False,
"targets": [],
"rules": [],
"prerequisites": [],
"fallthrough": {"variation": 0},
"offVariation": 0,
"variations": [value],
"salt": "reconnect-test-salt",
"clientSideAvailability": {
"usingMobileKey": False,
"usingEnvironmentId": False,
},
}
payload = {"path": "/", "data": {"flags": {FLAG_KEY: flag}, "segments": {}}}
return f"event: put\ndata: {json.dumps(payload)}\n\n".encode()


class Handler(BaseHTTPRequestHandler):
protocol_version = "HTTP/1.1"

def log_message(self, format: str, *args: object) -> None:
sys.stderr.write("mock-ld: " + (format % args) + "\n")

def _reset_connection(self) -> None:
# Force a TCP RST instead of a clean FIN, so the SDK sees a non-Eof
# transport error (as in incident-984) rather than the Eof it has
# always recovered from.
self.connection.setsockopt(
socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0)
)
self.connection.close()

def _send_stream_headers(self) -> None:
self.send_response(200)
self.send_header("Content-Type", "text/event-stream")
self.send_header("Cache-Control", "no-cache")
self.end_headers()

def do_GET(self) -> None:
if self.path == "/health":
self.send_response(200)
self.send_header("Content-Length", "0")
self.end_headers()
return

if self.path != "/all":
self.send_response(404)
self.send_header("Content-Length", "0")
self.end_headers()
return

global _stream_connections
with _lock:
_stream_connections += 1
n = _stream_connections

self._send_stream_headers()

if n == 1:
# First client: send the initial value, then reset mid-stream.
self.wfile.write(put_event(INITIAL_VALUE, 1))
self.wfile.flush()
time.sleep(HOLD_BEFORE_RESET)
self.log_message("resetting first streaming connection")
self._reset_connection()
self.close_connection = True
return

# Reconnecting client: send the updated value, then hold the connection
# open with periodic heartbeats so the SDK stays connected.
self.wfile.write(put_event(RECONNECT_VALUE, 2))
self.wfile.flush()
try:
while True:
time.sleep(5)
self.wfile.write(b":heartbeat\n\n")
self.wfile.flush()
except (BrokenPipeError, ConnectionResetError, OSError):
return

def do_POST(self) -> None:
# The event processor POSTs analytics events to `/bulk`; accept and
# discard them so it doesn't log errors.
length = int(self.headers.get("Content-Length", 0))
if length:
self.rfile.read(length)
self.send_response(202)
self.send_header("Content-Length", "0")
self.end_headers()


def main() -> None:
server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler)
sys.stderr.write(f"mock-ld: listening on {PORT}\n")
server.serve_forever()


if __name__ == "__main__":
main()
80 changes: 80 additions & 0 deletions test/launchdarkly-reconnect/mzcompose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
# Copyright Materialize, Inc. and contributors. All rights reserved.
#
# Use of this software is governed by the Business Source License
# included in the LICENSE file at the root of this repository.
#
# As of the Change Date specified in that file, in accordance with
# the Business Source License, use of this software will be governed
# by the Apache License, Version 2.0.

"""
Regression test for incident-984: the LaunchDarkly data source must reconnect
after its streaming connection drops with a non-Eof error, so that flag updates
keep syncing.

A mock LaunchDarkly server (mock_ld.py) serves an initial flag value, resets the
first streaming connection mid-stream (reproducing the incident's non-Eof
transport error), and serves an updated value to every reconnecting client.
environmentd is pointed at the mock via MZ_LAUNCHDARKLY_BASE_URI, so the updated
value can only reach it if the data source reconnected after the reset. A
regressed SDK gets stuck on the initial value and the assertion below times out.

Unlike test/launchdarkly, this needs no real LaunchDarkly credentials.
"""

from materialize.mzcompose.composition import Composition, Service
from materialize.mzcompose.service import Service as DockerService
from materialize.mzcompose.services.materialized import Materialized
from materialize.mzcompose.services.testdrive import Testdrive

FLAG_KEY = "reconnect-test"
MOCK_HOST = "mock-launchdarkly"
MOCK_PORT = 8080

SERVICES = [
DockerService(
name=MOCK_HOST,
config={
"image": "python:3.12-slim",
"volumes": ["./mock_ld.py:/app/mock_ld.py"],
"command": ["python3", "-u", "/app/mock_ld.py"],
"ports": [MOCK_PORT],
"healthcheck": {
"test": [
"CMD",
"python3",
"-c",
"import urllib.request; urllib.request.urlopen('http://localhost:8080/health')",
],
"interval": "1s",
"start_period": "30s",
},
},
),
Materialized(
environment_extra=[
"MZ_LAUNCHDARKLY_SDK_KEY=sdk-mock-key",
f"MZ_LAUNCHDARKLY_BASE_URI=http://{MOCK_HOST}:{MOCK_PORT}",
f"MZ_LAUNCHDARKLY_KEY_MAP=max_result_size={FLAG_KEY}",
"MZ_CONFIG_SYNC_LOOP_INTERVAL=1s",
],
additional_system_parameter_defaults={
"log_filter": "mz_adapter::config=debug,launchdarkly_server_sdk=debug,info",
},
external_metadata_store=True,
),
# The reconnect (eventsource backoff) plus the 1s sync loop means the
# updated value can take several seconds to land; give it ample room.
Testdrive(no_reset=True, seed=1, default_timeout="120s"),
]


def workflow_default(c: Composition) -> None:
c.up(MOCK_HOST, "materialized", Service("testdrive", idle=True))

# The mock serves 2 GiB on the first streaming connection, resets it
# mid-stream, then serves 3 GiB to every reconnecting client. Reaching
# 3 GiB therefore proves the data source reconnected after a non-Eof error;
# a regressed SDK stays stuck at 2 GiB and this assertion times out. We
# don't assert the transient 2 GiB value, as the reset can race startup.
c.testdrive("\n".join(["> SHOW max_result_size", "3GB"]))
Loading