From 8b9df3d88c42f70a0bbe2c5b89d8799c24293a0d Mon Sep 17 00:00:00 2001 From: Jason Hernandez <7144515+jasonhernandez@users.noreply.github.com> Date: Fri, 12 Jun 2026 16:46:10 -0700 Subject: [PATCH 1/2] adapter: migrate LaunchDarkly SDK off fork to upstream 3.1.1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Move launchdarkly-server-sdk from the MaterializeInc/rust-server-sdk fork back to upstream crates.io 3.1.1, restoring the launchdarkly-sdk- transport + MetricsTransport setup and dropping the [patch.crates-io] override. The fork existed for launchdarkly/rust-server-sdk#116: a StreamingData Source/eventsource StreamClosed bug where a non-Eof stream error left the data source stuck with no reconnect, silently breaking LD sync. A prior upgrade to upstream 3.0.1 had to be reverted (incident-984) because that bug was still unfixed upstream. The fixes have since landed — rust-server-sdk#168 and rust-eventsource-client#134/#135 — and 3.1.1 resolves eventsource-client to 0.17.5, which carries them. Use the rustls + aws-lc-rs features (hyper-rustls-native-roots, crypto-aws-lc-rs), now the upstream defaults, instead of the prior attempt's native-tls/crypto-openssl, avoiding the OpenSSL path. The transport build_https() call is identical either way. deny.toml gains skips for the duplicate versions the transport stack pulls (older tower/rustls-native-certs; newer rand/rand_core/getrandom/ cpufeatures) and re-adds the launchdarkly-sdk-transport wrapper. Adds a test, test_metric_frozen_on_midstream_error, modeling the exact incident-984 failure mode (200 OK then a mid-stream timeout): it asserts the last_sse_time_seconds gauge freezes so the staleness alert can detect a stuck data source. Co-Authored-By: Claude Fable 5 --- Cargo.lock | 388 +++++++++++++++++++--- Cargo.toml | 6 +- deny.toml | 10 + src/adapter/Cargo.toml | 2 +- src/adapter/src/config/frontend.rs | 315 ++++++++++++++++-- src/balancerd/Cargo.toml | 1 + src/balancerd/src/bin/balancerd.rs | 4 + src/clusterd/Cargo.toml | 1 + src/clusterd/src/lib.rs | 4 + src/dyncfg-launchdarkly/Cargo.toml | 2 +- src/dyncfg-launchdarkly/src/lib.rs | 21 +- src/environmentd/Cargo.toml | 1 + src/environmentd/src/environmentd/main.rs | 5 + src/persist-cli/Cargo.toml | 1 + src/persist-cli/src/main.rs | 4 + src/sqllogictest/Cargo.toml | 1 + src/sqllogictest/src/bin/sqllogictest.rs | 4 + src/testdrive/Cargo.toml | 1 + src/testdrive/src/bin/testdrive.rs | 4 + 19 files changed, 676 insertions(+), 99 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d6f7778192be5..5217ad61576bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,7 +31,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", "cipher", - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -1579,7 +1579,7 @@ dependencies = [ "bitflags 2.11.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1939,6 +1939,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.41" @@ -2006,6 +2017,12 @@ dependencies = [ "half 1.6.0", ] +[[package]] +name = "cidr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579504560394e388085d0c080ea587dfa5c15f7e251b4d5247d1e1a61d1d6928" + [[package]] name = "cipher" version = "0.4.4" @@ -2349,6 +2366,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.3.0" @@ -3029,7 +3055,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -3411,7 +3437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3443,17 +3469,18 @@ dependencies = [ [[package]] name = "eventsource-client" -version = "0.16.0" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c26451361cde19fe2322835b6e684f4825902edf3139ce9d6a70e6542566a0b2" +checksum = "96df8cfa11d3c8e4e1a48b81c9c600ecbe8c11d1326f41e1524cc4d42f7bf6d9" dependencies = [ "base64 0.22.1", + "bytes", "futures", - "hyper 0.14.32", - "hyper-timeout 0.4.1", + "http 1.4.2", + "launchdarkly-sdk-transport", "log", "pin-project", - "rand 0.8.5", + "rand 0.10.1", "tokio", ] @@ -3998,11 +4025,25 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.3.0", "wasi 0.14.2+wasi-0.2.4", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "rand_core 0.10.1", + "wasip2", + "wasip3", +] + [[package]] name = "gimli" version = "0.32.3" @@ -4425,6 +4466,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-http-proxy" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ad4b0a1e37510028bc4ba81d0e38d239c39671b0f0ce9e02dfa93a8133f7c08" +dependencies = [ + "bytes", + "futures-util", + "headers", + "http 1.4.2", + "hyper 1.9.0", + "hyper-rustls", + "hyper-util", + "pin-project-lite", + "rustls-native-certs 0.7.3", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-openssl" version = "0.10.2" @@ -4455,25 +4516,13 @@ dependencies = [ "hyper 1.9.0", "hyper-util", "rustls", - "rustls-native-certs", + "rustls-native-certs 0.8.3", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", ] -[[package]] -name = "hyper-timeout" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" -dependencies = [ - "hyper 0.14.32", - "pin-project-lite", - "tokio", - "tokio-io-timeout", -] - [[package]] name = "hyper-timeout" version = "0.5.1" @@ -4533,7 +4582,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.4", "system-configuration", "tokio", "tower-service", @@ -4933,7 +4982,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -5129,7 +5178,7 @@ dependencies = [ "http-body-util", "hyper 1.9.0", "hyper-openssl", - "hyper-timeout 0.5.1", + "hyper-timeout", "hyper-util", "jiff", "jsonpath-rust", @@ -5210,20 +5259,43 @@ dependencies = [ "tracing", ] +[[package]] +name = "launchdarkly-sdk-transport" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a048341aa360a768a7d91ebf6232a574f355ada0724acff2df7c5d8dc1e04c98" +dependencies = [ + "bytes", + "futures", + "http 1.4.2", + "http-body-util", + "hyper 1.9.0", + "hyper-http-proxy", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "log", + "no-proxy", + "tower 0.4.13", +] + [[package]] name = "launchdarkly-server-sdk" -version = "2.6.2" -source = "git+https://github.com/MaterializeInc/rust-server-sdk?rev=3e0a0b98b09a2970f292577a07e1c9382b65b5da#3e0a0b98b09a2970f292577a07e1c9382b65b5da" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0440c21aaa0670948071d9cbd8142e9d057a4b40feb3e6293e0efc6de9879b69" dependencies = [ "aws-lc-rs", + "bitflags 2.11.0", + "bytes", "chrono", "crossbeam-channel", "data-encoding", "eventsource-client", "futures", - "hyper 0.14.32", + "http 1.4.2", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk-evaluation", - "lazy_static", "log", "lru", "moka", @@ -5239,9 +5311,9 @@ dependencies = [ [[package]] name = "launchdarkly-server-sdk-evaluation" -version = "2.0.1" +version = "2.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63706c23ee67f699563e5c52c7542361eccc966cba0430b6a7862c0ecaee9432" +checksum = "4bc97a52681b9860197ad81a2c1a34d96bb647459bcdf067ac1fae42c9fe8c30" dependencies = [ "base16ct", "chrono", @@ -5266,6 +5338,12 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "lexical-core" version = "1.0.5" @@ -5844,10 +5922,10 @@ dependencies = [ "hex", "http 1.4.2", "humantime", - "hyper-tls 0.5.0", "imbl", "ipnet", "itertools 0.14.0", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk", "maplit", "mz-adapter-types", @@ -6136,6 +6214,7 @@ dependencies = [ "prometheus", "proxy-header", "reqwest 0.12.28", + "rustls", "semver", "tempfile", "tokio", @@ -6423,6 +6502,7 @@ dependencies = [ "mz-txn-wal", "nix 0.31.3", "num_cpus", + "rustls", "serde", "tokio", "tower 0.5.3", @@ -6746,7 +6826,7 @@ version = "0.0.0" dependencies = [ "anyhow", "humantime", - "hyper-tls 0.5.0", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk", "mz-build-info", "mz-dyncfg", @@ -6872,6 +6952,7 @@ dependencies = [ "regex", "reqwest 0.12.28", "rlimit", + "rustls", "semver", "sentry-tracing", "serde", @@ -8410,6 +8491,7 @@ dependencies = [ "postgres-protocol", "regex", "reqwest 0.12.28", + "rustls", "serde_json", "shell-words", "tempfile", @@ -8823,6 +8905,7 @@ dependencies = [ "rdkafka", "regex", "reqwest 0.12.28", + "rustls", "semver", "serde", "serde_json", @@ -9032,6 +9115,15 @@ dependencies = [ "libc", ] +[[package]] +name = "no-proxy" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f79c902b31ceac6856e262af5dbaffef75390cf4647c9fef7b55da69a4b912e" +dependencies = [ + "cidr", +] + [[package]] name = "nom" version = "7.1.3" @@ -9792,6 +9884,7 @@ dependencies = [ "num_cpus", "num_enum 0.7.6", "prometheus", + "rustls", "serde", "serde_json", "timely", @@ -10408,7 +10501,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", - "itertools 0.10.5", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -10429,7 +10522,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -10654,6 +10747,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "radium" version = "0.7.0" @@ -10695,6 +10794,17 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -10753,6 +10863,12 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_distr" version = "0.5.1" @@ -11345,7 +11461,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -11358,7 +11474,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -11370,12 +11486,26 @@ dependencies = [ "aws-lc-rs", "log", "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe 0.1.6", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.3" @@ -11388,6 +11518,15 @@ dependencies = [ "security-framework 3.7.0", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -11936,7 +12075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -11953,7 +12092,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", "sha2-asm", ] @@ -12480,7 +12619,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.1.4", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -12499,7 +12638,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -12777,16 +12916,6 @@ dependencies = [ "windows-sys 0.61.1", ] -[[package]] -name = "tokio-io-timeout" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90c49f106be240de154571dd31fbe48acb10ba6c6dd6f6517ad603abffa42de9" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-io-utility" version = "0.7.6" @@ -12994,7 +13123,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.9.0", - "hyper-timeout 0.5.1", + "hyper-timeout", "hyper-util", "percent-encoding", "pin-project", @@ -13059,6 +13188,7 @@ dependencies = [ "pin-project-lite", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -13574,6 +13704,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unit-prefix" version = "0.5.2" @@ -13806,6 +13942,24 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip2" +version = "1.0.4+wasi-0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67efb37e106e55ce722a510d6b5f9c17f083e5fc79afc2badeb12cc313d9487" +dependencies = [ + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", +] + [[package]] name = "wasite" version = "0.1.0" @@ -13867,6 +14021,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap 2.11.4", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.1" @@ -13880,6 +14056,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.11.0", + "hashbrown 0.15.3", + "indexmap 2.11.4", + "semver", +] + [[package]] name = "wasmtimer" version = "0.4.3" @@ -14209,6 +14397,32 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" @@ -14218,6 +14432,74 @@ dependencies = [ "bitflags 2.11.0", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap 2.11.4", + "prettyplease", + "syn 2.0.117", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.117", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.11.0", + "indexmap 2.11.4", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.11.4", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index fcdb0ea4abffe..ab7c1ced6543c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -400,7 +400,8 @@ junit-report = "0.8.3" k8s-controller = "0.11.0" k8s-openapi = { version = "0.27.0", features = ["schemars", "v1_32"] } kube = { version = "3.1.0", default-features = false, features = ["client", "derive", "openssl-tls", "runtime", "ws"] } -launchdarkly-server-sdk = { version = "2.6.2", default-features = false } +launchdarkly-server-sdk = { version = "3.1.1", default-features = false, features = ["hyper-rustls-native-roots", "crypto-aws-lc-rs"] } +launchdarkly-sdk-transport = "0.1" lgalloc = "0.6.0" libc = "0.2.186" lru = "0.16.3" @@ -646,9 +647,6 @@ postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" # Waiting on https://github.com/MaterializeInc/serde-value/pull/35. serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" } -# Waiting for resolution of https://github.com/launchdarkly/rust-server-sdk/issues/116 -launchdarkly-server-sdk = { git = "https://github.com/MaterializeInc/rust-server-sdk", rev = "3e0a0b98b09a2970f292577a07e1c9382b65b5da" } - # Waiting on https://github.com/edenhill/librdkafka/pull/4051. rdkafka = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" } rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" } diff --git a/deny.toml b/deny.toml index ca75af3011106..81034ea2ba2e2 100644 --- a/deny.toml +++ b/deny.toml @@ -150,6 +150,15 @@ skip = [ { name = "hashlink", version = "0.9.1" }, # held back by owo-colors 4.3 (mz-deploy terminal styling) { name = "supports-color", version = "2.1.0" }, + + # Pulled by launchdarkly-server-sdk 3.x via launchdarkly-sdk-transport / + # eventsource-client (proxy/timeout/rustls stack and the SDK's RNG path). + # NB: tower 0.4.13 is already skipped above (mz-deploy). + { name = "rustls-native-certs", version = "0.7.3" }, + { name = "rand", version = "0.10.1" }, + { name = "rand_core", version = "0.10.1" }, + { name = "getrandom", version = "0.4.2" }, + { name = "cpufeatures", version = "0.3.0" }, ] [[bans.deny]] @@ -206,6 +215,7 @@ wrappers = [ "globset", "launchdarkly-server-sdk", "launchdarkly-server-sdk-evaluation", + "launchdarkly-sdk-transport", "native-tls", "opendal", "os_info", diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index 74da334662554..32c6eeed02f92 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -29,10 +29,10 @@ hex.workspace = true humantime.workspace = true imbl.workspace = true http.workspace = true -hyper-tls = "0.5.0" ipnet.workspace = true itertools.workspace = true launchdarkly-server-sdk.workspace = true +launchdarkly-sdk-transport.workspace = true maplit.workspace = true mz-adapter-types = { path = "../adapter-types" } mz-audit-log = { path = "../audit-log" } diff --git a/src/adapter/src/config/frontend.rs b/src/adapter/src/config/frontend.rs index bc65fb7a293bd..3a942d9602870 100644 --- a/src/adapter/src/config/frontend.rs +++ b/src/adapter/src/config/frontend.rs @@ -10,16 +10,18 @@ use std::collections::BTreeMap; use std::fs; use std::path::PathBuf; -use std::sync::Arc; use std::time::Duration; +use bytes::Bytes; use derivative::Derivative; -use hyper_tls::HttpsConnector; +use futures::TryStreamExt; +use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture}; use launchdarkly_server_sdk as ld; use mz_build_info::BuildInfo; use mz_cloud_provider::CloudProvider; use mz_cluster_client::ReplicaId; use mz_controller_types::ClusterId; +use mz_ore::metrics::UIntGauge; use mz_ore::now::NowFn; use mz_sql::catalog::EnvironmentId; use serde_json::Value as JsonValue; @@ -71,7 +73,7 @@ impl SystemParameterFrontend { /// Create a new [SystemParameterFrontend] initialize. /// /// This will create and initialize an [ld::Client] instance. The - /// [ld::Client::initialized_async] call will be attempted in a loop with an + /// [ld::Client::wait_for_initialization] call will be attempted in a loop with an /// exponential backoff with power `2s` and max duration `60s`. pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result { match &sync_config.backend_config { @@ -347,25 +349,68 @@ pub struct ClusterEvalContext { pub cluster: ClusterScopeContext, } -fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config { +/// An [`HttpTransport`] wrapper that records timestamps on successful HTTP +/// responses. Used to populate Prometheus metrics that track LaunchDarkly +/// connectivity health. +/// +/// Two instances are created — one for the event processor (CSE metric, tracks +/// outbound event sends) and one for the streaming data source (SSE metric, +/// tracks inbound SSE events). +#[derive(Clone)] +struct MetricsTransport { + inner: T, + last_success_gauge: UIntGauge, + now_fn: NowFn, +} + +impl HttpTransport for MetricsTransport { + fn request(&self, request: http::Request>) -> ResponseFuture { + let inner_fut = self.inner.request(request); + let gauge = self.last_success_gauge.clone(); + let now_fn = self.now_fn.clone(); + Box::pin(async move { + let resp = inner_fut.await?; + if resp.status().is_success() { + gauge.set(now_fn() / 1000); + let (parts, body) = resp.into_parts(); + let wrapped: ByteStream = Box::pin(body.inspect_ok(move |_| { + gauge.set(now_fn() / 1000); + })); + Ok(http::Response::from_parts(parts, wrapped)) + } else { + Ok(resp) + } + }) + } +} + +fn ld_config(api_key: &str, metrics: &Metrics, now_fn: &NowFn) -> ld::Config { + let transport = launchdarkly_sdk_transport::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(300)) + .build_https() + .expect("failed to create HTTPS transport"); + + let cse_transport = MetricsTransport { + inner: transport.clone(), + last_success_gauge: metrics.last_cse_time_seconds.clone(), + now_fn: now_fn.clone(), + }; + let data_source_transport = MetricsTransport { + inner: transport, + last_success_gauge: metrics.last_sse_time_seconds.clone(), + now_fn: now_fn.clone(), + }; + + let mut event_processor = ld::EventProcessorBuilder::new(); + event_processor.transport(cse_transport); + + let mut data_source = ld::StreamingDataSourceBuilder::new(); + data_source.transport(data_source_transport); + ld::ConfigBuilder::new(api_key) - .event_processor( - ld::EventProcessorBuilder::new() - .https_connector(HttpsConnector::new()) - .on_success({ - let last_cse_time_seconds = metrics.last_cse_time_seconds.clone(); - Arc::new(move |result| { - if let Ok(ts) = u64::try_from(result.time_from_server / 1000) { - last_cse_time_seconds.set(ts); - } else { - tracing::warn!( - "Cannot convert time_from_server / 1000 from u128 to u64" - ); - } - }) - }), - ) - .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new())) + .event_processor(&event_processor) + .data_source(&data_source) .build() .expect("valid config") } @@ -375,19 +420,9 @@ async fn ld_client( metrics: &Metrics, now_fn: &NowFn, ) -> Result { - let ld_client = ld::Client::build(ld_config(api_key, metrics))?; + let ld_client = ld::Client::build(ld_config(api_key, metrics, now_fn))?; tracing::info!("waiting for SystemParameterFrontend to initialize"); - // Start and initialize LD client for the frontend. The callback passed - // will export the last time when an SSE event from the LD server was - // received in a Prometheus metric. - ld_client.start_with_default_executor_and_callback({ - let last_sse_time_seconds = metrics.last_sse_time_seconds.clone(); - let now_fn = now_fn.clone(); - Arc::new(move |_ev| { - let ts = now_fn() / 1000; - last_sse_time_seconds.set(ts); - }) - }); + ld_client.start_with_default_executor(); let max_backoff = Duration::from_secs(60); let mut backoff = Duration::from_secs(5); @@ -579,7 +614,13 @@ fn ld_ctx( #[cfg(test)] mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + + use futures::StreamExt; + use launchdarkly_sdk_transport::{ByteStream, TransportError}; use mz_build_info::DUMMY_BUILD_INFO; + use mz_ore::metrics::MetricsRegistry; use super::*; @@ -626,4 +667,212 @@ mod tests { fn environment_wide_context_is_unscoped() { ld_ctx(&env_id(), &DUMMY_BUILD_INFO, None, None).expect("environment-wide context builds"); } + + /// A fake transport that simulates a long-lived SSE streaming connection: + /// returns 200 OK immediately, then delivers multiple SSE events as body + /// chunks (exactly how LaunchDarkly's streaming data source works). + #[derive(Clone)] + struct FakeSseTransport; + + impl HttpTransport for FakeSseTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + let body: ByteStream = Box::pin(futures::stream::iter(vec![ + Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")), + Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag1\"}\n\n")), + Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag2\"}\n\n")), + ])); + Box::pin(async move { + http::Response::builder() + .status(200) + .body(body) + .map_err(|e| TransportError::new(std::io::Error::other(e))) + }) + } + } + + /// A fake transport that returns an error, simulating a failed connection. + #[derive(Clone)] + struct FailingTransport; + + impl HttpTransport for FailingTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + Box::pin(async move { + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "connection refused", + ))) + }) + } + } + + /// A fake transport that returns 200 OK, delivers one event, then errors + /// mid-stream with a timeout: the non-Eof stream error a dropped long-lived + /// SSE connection surfaces. + #[derive(Clone)] + struct MidStreamFailureTransport; + + impl HttpTransport for MidStreamFailureTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + let body: ByteStream = Box::pin(futures::stream::iter(vec![ + Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")), + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "body timed out", + ))), + ])); + Box::pin(async move { + http::Response::builder() + .status(200) + .body(body) + .map_err(|e| TransportError::new(std::io::Error::other(e))) + }) + } + } + + fn test_gauge(registry: &MetricsRegistry, name: &str) -> UIntGauge { + registry.register(mz_ore::metric!( + name: name, + help: "test gauge", + )) + } + + /// Verifies that MetricsTransport updates the gauge on each body chunk, + /// not just on the initial HTTP 200 response head. This matters for + /// long-lived streaming connections where SSE events arrive as body chunks. + #[mz_ore::test(tokio::test)] + async fn test_metric_updated_on_body_chunks() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_sse_gauge"); + + let transport = MetricsTransport { + inner: FakeSseTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + assert_eq!(gauge.get(), 0); + + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let response = transport.request(request).await?; + + assert_eq!(gauge.get(), 1000); + + time.store(2_800_000, Ordering::SeqCst); + + let mut body = response.into_body(); + let mut event_count = 0; + while let Some(Ok(_chunk)) = body.next().await { + event_count += 1; + } + assert_eq!(event_count, 3); + + assert_eq!(gauge.get(), 2800); + Ok(()) + } + + #[mz_ore::test(tokio::test)] + async fn test_cse_metric_updates_correctly_per_request() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_cse_gauge"); + + let transport = MetricsTransport { + inner: FakeSseTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + let req = || -> Result>, http::Error> { + http::Request::builder() + .uri("https://events.launchdarkly.com/bulk") + .body(None) + }; + + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 1000); + + time.store(2_000_000, Ordering::SeqCst); + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 2000); + + time.store(3_000_000, Ordering::SeqCst); + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 3000); + Ok(()) + } + + #[mz_ore::test(tokio::test)] + async fn test_metric_not_updated_on_failed_request() -> Result<(), anyhow::Error> { + let now_fn = NowFn::from(|| 5_000_000u64); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_fail_gauge"); + + let transport = MetricsTransport { + inner: FailingTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let result = transport.request(request).await; + assert!(result.is_err()); + assert_eq!(gauge.get(), 0, "gauge must not update on transport error"); + Ok(()) + } + + /// Verifies that when an SSE connection returns 200 OK and then dies + /// mid-stream, `last_sse_time_seconds` advances only for the events that + /// arrived and then freezes — the frozen timestamp is what lets the + /// staleness alert detect a stuck data source. + #[mz_ore::test(tokio::test)] + async fn test_metric_frozen_on_midstream_error() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_midstream_gauge"); + + let transport = MetricsTransport { + inner: MidStreamFailureTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + // The 200 OK response head updates the gauge. + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let response = transport.request(request).await?; + assert_eq!(gauge.get(), 1000); + + // The first event arrives and advances the gauge. + time.store(2_000_000, Ordering::SeqCst); + let mut body = response.into_body(); + assert!(matches!(body.next().await, Some(Ok(_)))); + assert_eq!(gauge.get(), 2000); + + // The stream then errors mid-flight. Time has moved forward, but the + // gauge must stay frozen at the last successful event. + time.store(9_000_000, Ordering::SeqCst); + assert!(matches!(body.next().await, Some(Err(_)))); + assert_eq!( + gauge.get(), + 2000, + "gauge must freeze on mid-stream error so the staleness alert can fire" + ); + Ok(()) + } } diff --git a/src/balancerd/Cargo.toml b/src/balancerd/Cargo.toml index 8cccb6d2cdcdd..d4b5cb266058c 100644 --- a/src/balancerd/Cargo.toml +++ b/src/balancerd/Cargo.toml @@ -42,6 +42,7 @@ num_cpus.workspace = true openssl.workspace = true prometheus.workspace = true proxy-header.workspace = true +rustls.workspace = true semver.workspace = true tokio.workspace = true tokio-openssl.workspace = true diff --git a/src/balancerd/src/bin/balancerd.rs b/src/balancerd/src/bin/balancerd.rs index 28104b2e63c42..ae2784865bee0 100644 --- a/src/balancerd/src/bin/balancerd.rs +++ b/src/balancerd/src/bin/balancerd.rs @@ -170,6 +170,10 @@ pub struct ServiceArgs { } fn main() { + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig::default()); // Mirror the tokio Runtime configuration in our production binaries. diff --git a/src/clusterd/Cargo.toml b/src/clusterd/Cargo.toml index 83ad4410a6be6..1e0426711c99f 100644 --- a/src/clusterd/Cargo.toml +++ b/src/clusterd/Cargo.toml @@ -40,6 +40,7 @@ mz-timely-util = { path = "../timely-util" } mz-txn-wal = { path = "../txn-wal" } nix.workspace = true num_cpus.workspace = true +rustls.workspace = true serde.workspace = true tokio.workspace = true tower.workspace = true diff --git a/src/clusterd/src/lib.rs b/src/clusterd/src/lib.rs index a3048a21a1348..0a9765adf4153 100644 --- a/src/clusterd/src/lib.rs +++ b/src/clusterd/src/lib.rs @@ -177,6 +177,10 @@ struct Args { pub fn main() { mz_ore::panic::install_enhanced_handler(); + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args = cli::parse_args(CliConfig { env_prefix: Some("CLUSTERD_"), enable_version_flag: true, diff --git a/src/dyncfg-launchdarkly/Cargo.toml b/src/dyncfg-launchdarkly/Cargo.toml index 42ff83496db0e..b1efcbece2523 100644 --- a/src/dyncfg-launchdarkly/Cargo.toml +++ b/src/dyncfg-launchdarkly/Cargo.toml @@ -13,8 +13,8 @@ workspace = true [dependencies] anyhow.workspace = true humantime.workspace = true -hyper-tls = "0.5.0" launchdarkly-server-sdk.workspace = true +launchdarkly-sdk-transport.workspace = true mz-build-info = { path = "../build-info" } mz-dyncfg = { path = "../dyncfg" } mz-ore = { path = "../ore", default-features = false, features = ["async"] } diff --git a/src/dyncfg-launchdarkly/src/lib.rs b/src/dyncfg-launchdarkly/src/lib.rs index 3e25eb5b41bb4..50f412f777763 100644 --- a/src/dyncfg-launchdarkly/src/lib.rs +++ b/src/dyncfg-launchdarkly/src/lib.rs @@ -11,7 +11,6 @@ use std::time::Duration; -use hyper_tls::HttpsConnector; use launchdarkly_server_sdk as ld; use mz_build_info::BuildInfo; use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal}; @@ -50,13 +49,21 @@ where let _ = dyn_into_flag(entry.val())?; } let ld_client = if let Some(key) = launchdarkly_sdk_key { + let transport = launchdarkly_sdk_transport::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(300)) + .build_https() + .expect("failed to create HTTPS transport"); + + let mut data_source = ld::StreamingDataSourceBuilder::new(); + data_source.transport(transport.clone()); + + let mut event_processor = ld::EventProcessorBuilder::new(); + event_processor.transport(transport); + let config = ld::ConfigBuilder::new(key) - .event_processor( - ld::EventProcessorBuilder::new().https_connector(HttpsConnector::new()), - ) - .data_source( - ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()), - ) + .data_source(&data_source) + .event_processor(&event_processor) .build() .expect("valid config"); let client = ld::Client::build(config)?; diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index 57c30178342ae..4410b700142f5 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -96,6 +96,7 @@ rand.workspace = true regex = { workspace = true, optional = true } reqwest.workspace = true rlimit.workspace = true +rustls.workspace = true semver.workspace = true sentry-tracing.workspace = true serde.workspace = true diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index 489b63464507e..359408c4e81e7 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -657,6 +657,11 @@ pub fn main() { fn run(mut args: Args) -> Result<(), anyhow::Error> { mz_ore::panic::install_enhanced_handler(); + + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let envd_start = Instant::now(); // Configure signal handling as soon as possible. We want signals to be diff --git a/src/persist-cli/Cargo.toml b/src/persist-cli/Cargo.toml index 760004453edb5..e4c847f3acaaf 100644 --- a/src/persist-cli/Cargo.toml +++ b/src/persist-cli/Cargo.toml @@ -39,6 +39,7 @@ mz-txn-wal = { path = "../txn-wal" } num_cpus.workspace = true num_enum.workspace = true prometheus.workspace = true +rustls.workspace = true serde = { workspace = true, features = ["rc"] } serde_json.workspace = true timely.workspace = true diff --git a/src/persist-cli/src/main.rs b/src/persist-cli/src/main.rs index 1df4898124f6e..5f6ba45542cb5 100644 --- a/src/persist-cli/src/main.rs +++ b/src/persist-cli/src/main.rs @@ -47,6 +47,10 @@ enum Command { } fn main() { + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig::default()); // Mirror the tokio Runtime configuration in our production binaries. diff --git a/src/sqllogictest/Cargo.toml b/src/sqllogictest/Cargo.toml index c5276285203cc..da6264f1dca92 100644 --- a/src/sqllogictest/Cargo.toml +++ b/src/sqllogictest/Cargo.toml @@ -46,6 +46,7 @@ mz-tracing = { path = "../tracing" } postgres-protocol.workspace = true regex.workspace = true reqwest.workspace = true +rustls.workspace = true shell-words.workspace = true serde_json.workspace = true tempfile.workspace = true diff --git a/src/sqllogictest/src/bin/sqllogictest.rs b/src/sqllogictest/src/bin/sqllogictest.rs index f610dada2d21e..25b9712dcb656 100644 --- a/src/sqllogictest/src/bin/sqllogictest.rs +++ b/src/sqllogictest/src/bin/sqllogictest.rs @@ -114,6 +114,10 @@ struct Args { async fn main() -> ExitCode { mz_ore::panic::install_enhanced_handler(); + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig { env_prefix: Some("MZ_"), enable_version_flag: false, diff --git a/src/testdrive/Cargo.toml b/src/testdrive/Cargo.toml index 4a168603856f3..813f6ebacf132 100644 --- a/src/testdrive/Cargo.toml +++ b/src/testdrive/Cargo.toml @@ -66,6 +66,7 @@ rand.workspace = true rdkafka.workspace = true regex.workspace = true reqwest.workspace = true +rustls.workspace = true semver.workspace = true serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } diff --git a/src/testdrive/src/bin/testdrive.rs b/src/testdrive/src/bin/testdrive.rs index 4acbb284c2dfa..92b76f9b64784 100644 --- a/src/testdrive/src/bin/testdrive.rs +++ b/src/testdrive/src/bin/testdrive.rs @@ -298,6 +298,10 @@ struct Args { #[tokio::main] async fn main() { + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig::default()); tracing_subscriber::fmt() From 414a4f790baf128799411245b55b1005ddf833b9 Mon Sep 17 00:00:00 2001 From: Jason Hernandez <7144515+jasonhernandez@users.noreply.github.com> Date: Fri, 12 Jun 2026 17:13:22 -0700 Subject: [PATCH 2/2] adapter: add LaunchDarkly reconnect integration test Add an integration test that reproduces incident-984: the LaunchDarkly data source must reconnect after its streaming connection drops with a non-Eof error, so flag updates keep syncing. To make this testable against a controlled server, add a hidden `--launchdarkly-base-uri` flag (env `MZ_LAUNCHDARKLY_BASE_URI`) that overrides the SDK's streaming/polling/events endpoints with a single base URL via the SDK's relay-proxy support. This is also generally useful for pointing at a LaunchDarkly relay proxy. The test (test/launchdarkly-reconnect) runs a mock LaunchDarkly streaming server that serves an initial flag value, resets the first streaming connection mid-stream with a TCP RST (a non-Eof transport error, as in the incident), and serves an updated value to every reconnecting client. environmentd is pointed at the mock, so reaching the updated value proves the data source reconnected; a regressed SDK stays stuck on the initial value. Unlike test/launchdarkly, this needs no real LaunchDarkly credentials, and runs in the nightly pipeline. Co-Authored-By: Claude Fable 5 --- ci/nightly/pipeline.template.yml | 11 ++ src/adapter/src/config.rs | 5 + src/adapter/src/config/frontend.rs | 31 +++-- src/environmentd/src/environmentd/main.rs | 6 + src/environmentd/src/lib.rs | 4 + src/environmentd/src/test_util.rs | 1 + src/sqllogictest/src/runner.rs | 1 + test/launchdarkly-reconnect/mock_ld.py | 149 ++++++++++++++++++++++ test/launchdarkly-reconnect/mzcompose.py | 80 ++++++++++++ 9 files changed, 280 insertions(+), 8 deletions(-) create mode 100644 test/launchdarkly-reconnect/mock_ld.py create mode 100644 test/launchdarkly-reconnect/mzcompose.py diff --git a/ci/nightly/pipeline.template.yml b/ci/nightly/pipeline.template.yml index ad2b69637b4b3..0e3e705e35547 100644 --- a/ci/nightly/pipeline.template.yml +++ b/ci/nightly/pipeline.template.yml @@ -1492,6 +1492,17 @@ steps: composition: launchdarkly-flag-consistency branches: "main" + - id: launchdarkly-reconnect + label: "LaunchDarkly reconnect" + depends_on: build-aarch64 + timeout_in_minutes: 30 + agents: + # Uses a mock LaunchDarkly server; needs no real credentials. + queue: hetzner-aarch64-4cpu-8gb + plugins: + - ./ci/plugins/mzcompose: + composition: launchdarkly-reconnect + - group: E2E key: e2e steps: diff --git a/src/adapter/src/config.rs b/src/adapter/src/config.rs index 46ee79ce1ed69..82fdfe03a5709 100644 --- a/src/adapter/src/config.rs +++ b/src/adapter/src/config.rs @@ -109,6 +109,11 @@ pub enum SystemParameterSyncClientConfig { LaunchDarkly { /// The LaunchDarkly SDK key sdk_key: String, + /// Overrides the LaunchDarkly streaming, polling, and events endpoints + /// with a single base URL (as for a relay proxy). `None` uses + /// LaunchDarkly's default endpoints. Primarily for pointing the SDK at + /// a mock server in tests. + base_uri: Option, /// Function to return the current time. now_fn: NowFn, }, diff --git a/src/adapter/src/config/frontend.rs b/src/adapter/src/config/frontend.rs index 3a942d9602870..62a63c687fab3 100644 --- a/src/adapter/src/config/frontend.rs +++ b/src/adapter/src/config/frontend.rs @@ -84,9 +84,14 @@ impl SystemParameterFrontend { build_info: sync_config.build_info, metrics: sync_config.metrics.clone(), }), - SystemParameterSyncClientConfig::LaunchDarkly { sdk_key, now_fn } => Ok(Self { + SystemParameterSyncClientConfig::LaunchDarkly { + sdk_key, + base_uri, + now_fn, + } => Ok(Self { client: SystemParameterFrontendClient::LaunchDarkly { - client: ld_client(sdk_key, &sync_config.metrics, now_fn).await?, + client: ld_client(sdk_key, base_uri.as_deref(), &sync_config.metrics, now_fn) + .await?, // The environment-wide context carries no cluster/replica // scope. Scoped evaluation passes a `cluster` or `replica` // context per pass via [`ld_ctx`]. @@ -384,7 +389,12 @@ impl HttpTransport for MetricsTransport { } } -fn ld_config(api_key: &str, metrics: &Metrics, now_fn: &NowFn) -> ld::Config { +fn ld_config( + api_key: &str, + base_uri: Option<&str>, + metrics: &Metrics, + now_fn: &NowFn, +) -> ld::Config { let transport = launchdarkly_sdk_transport::HyperTransport::builder() .connect_timeout(Duration::from_secs(10)) .read_timeout(Duration::from_secs(300)) @@ -408,19 +418,24 @@ fn ld_config(api_key: &str, metrics: &Metrics, now_fn: &NowFn) -> ld::Config { let mut data_source = ld::StreamingDataSourceBuilder::new(); data_source.transport(data_source_transport); - ld::ConfigBuilder::new(api_key) + let mut config = ld::ConfigBuilder::new(api_key) .event_processor(&event_processor) - .data_source(&data_source) - .build() - .expect("valid config") + .data_source(&data_source); + if let Some(base_uri) = base_uri { + let mut endpoints = ld::ServiceEndpointsBuilder::new(); + endpoints.relay_proxy(base_uri); + config = config.service_endpoints(&endpoints); + } + config.build().expect("valid config") } async fn ld_client( api_key: &str, + base_uri: Option<&str>, metrics: &Metrics, now_fn: &NowFn, ) -> Result { - let ld_client = ld::Client::build(ld_config(api_key, metrics, now_fn))?; + let ld_client = ld::Client::build(ld_config(api_key, base_uri, metrics, now_fn))?; tracing::info!("waiting for SystemParameterFrontend to initialize"); ld_client.start_with_default_executor(); diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index 359408c4e81e7..4ce31ddedda91 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -434,6 +434,11 @@ pub struct Args { /// configuration parameters. #[clap(long, env = "LAUNCHDARKLY_SDK_KEY")] launchdarkly_sdk_key: Option, + /// Overrides the LaunchDarkly streaming, polling, and events endpoints with + /// a single base URL, as for a relay proxy. Primarily intended for pointing + /// the SDK at a mock LaunchDarkly server in tests. + #[clap(long, env = "LAUNCHDARKLY_BASE_URI", value_name = "URL")] + launchdarkly_base_uri: Option, /// A list of PARAM_NAME=KEY_NAME pairs from system parameter names to /// LaunchDarkly feature keys. /// @@ -1120,6 +1125,7 @@ fn run(mut args: Args) -> Result<(), anyhow::Error> { segment_client_side: args.segment_client_side, test_only_dummy_segment_client: args.test_only_dummy_segment_client, launchdarkly_sdk_key: args.launchdarkly_sdk_key, + launchdarkly_base_uri: args.launchdarkly_base_uri, launchdarkly_key_map: args .launchdarkly_key_map .into_iter() diff --git a/src/environmentd/src/lib.rs b/src/environmentd/src/lib.rs index d04e8487c0202..246ed69497c75 100644 --- a/src/environmentd/src/lib.rs +++ b/src/environmentd/src/lib.rs @@ -166,6 +166,9 @@ pub struct Config { /// An SDK key for LaunchDarkly. Enables system parameter synchronization /// with LaunchDarkly. pub launchdarkly_sdk_key: Option, + /// Overrides the LaunchDarkly service endpoints with a single base URL, as + /// for a relay proxy or a mock server in tests. + pub launchdarkly_base_uri: Option, /// An invertible map from system parameter names to LaunchDarkly feature /// keys to use when propagating values from the latter to the former. pub launchdarkly_key_map: BTreeMap, @@ -480,6 +483,7 @@ impl Listeners { config.launchdarkly_key_map, SystemParameterSyncClientConfig::LaunchDarkly { sdk_key: key, + base_uri: config.launchdarkly_base_uri, now_fn: config.now.clone(), }, )), diff --git a/src/environmentd/src/test_util.rs b/src/environmentd/src/test_util.rs index ec878b33b206e..b29bd530d32c3 100644 --- a/src/environmentd/src/test_util.rs +++ b/src/environmentd/src/test_util.rs @@ -903,6 +903,7 @@ impl Listeners { aws_account_id: None, aws_privatelink_availability_zones: None, launchdarkly_sdk_key: None, + launchdarkly_base_uri: None, launchdarkly_key_map: Default::default(), config_sync_file_path: None, config_sync_timeout: Duration::from_secs(30), diff --git a/src/sqllogictest/src/runner.rs b/src/sqllogictest/src/runner.rs index 9033442ffccdd..a86b2caf09e29 100644 --- a/src/sqllogictest/src/runner.rs +++ b/src/sqllogictest/src/runner.rs @@ -1281,6 +1281,7 @@ impl<'a> RunnerInner<'a> { aws_account_id: None, aws_privatelink_availability_zones: None, launchdarkly_sdk_key: None, + launchdarkly_base_uri: None, launchdarkly_key_map: Default::default(), config_sync_file_path: None, config_sync_timeout: Duration::from_secs(30), diff --git a/test/launchdarkly-reconnect/mock_ld.py b/test/launchdarkly-reconnect/mock_ld.py new file mode 100644 index 0000000000000..f5a726bc0f16d --- /dev/null +++ b/test/launchdarkly-reconnect/mock_ld.py @@ -0,0 +1,149 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +A minimal mock of the LaunchDarkly streaming API, used to exercise the SDK's +reconnect behavior (see mzcompose.py). + +The first streaming client receives an initial flag value and then has its +connection reset mid-stream -- a non-Eof transport error, reproducing the +failure mode of incident-984. Every subsequent (reconnecting) client receives +an updated value. The test therefore asserts that the value changed, which can +only happen if the data source reconnected after the reset. +""" + +import json +import socket +import struct +import sys +import threading +import time +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer + +PORT = 8080 +FLAG_KEY = "reconnect-test" + +# Number variations served before and after the forced reconnect. The SDK maps +# these onto the `max_result_size` system parameter (2 GiB and 3 GiB). +INITIAL_VALUE = 2147483648 +RECONNECT_VALUE = 3221225472 + +# Seconds to hold the first connection open before resetting it, leaving time +# for the SDK to apply the initial value. +HOLD_BEFORE_RESET = 3.0 + +_lock = threading.Lock() +_stream_connections = 0 + + +def put_event(value: int, version: int) -> bytes: + """Serialize a LaunchDarkly streaming `put` event carrying a single flag + that, with targeting off, evaluates to `value`.""" + flag = { + "key": FLAG_KEY, + "version": version, + "on": False, + "targets": [], + "rules": [], + "prerequisites": [], + "fallthrough": {"variation": 0}, + "offVariation": 0, + "variations": [value], + "salt": "reconnect-test-salt", + "clientSideAvailability": { + "usingMobileKey": False, + "usingEnvironmentId": False, + }, + } + payload = {"path": "/", "data": {"flags": {FLAG_KEY: flag}, "segments": {}}} + return f"event: put\ndata: {json.dumps(payload)}\n\n".encode() + + +class Handler(BaseHTTPRequestHandler): + protocol_version = "HTTP/1.1" + + def log_message(self, format: str, *args: object) -> None: + sys.stderr.write("mock-ld: " + (format % args) + "\n") + + def _reset_connection(self) -> None: + # Force a TCP RST instead of a clean FIN, so the SDK sees a non-Eof + # transport error (as in incident-984) rather than the Eof it has + # always recovered from. + self.connection.setsockopt( + socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0) + ) + self.connection.close() + + def _send_stream_headers(self) -> None: + self.send_response(200) + self.send_header("Content-Type", "text/event-stream") + self.send_header("Cache-Control", "no-cache") + self.end_headers() + + def do_GET(self) -> None: + if self.path == "/health": + self.send_response(200) + self.send_header("Content-Length", "0") + self.end_headers() + return + + if self.path != "/all": + self.send_response(404) + self.send_header("Content-Length", "0") + self.end_headers() + return + + global _stream_connections + with _lock: + _stream_connections += 1 + n = _stream_connections + + self._send_stream_headers() + + if n == 1: + # First client: send the initial value, then reset mid-stream. + self.wfile.write(put_event(INITIAL_VALUE, 1)) + self.wfile.flush() + time.sleep(HOLD_BEFORE_RESET) + self.log_message("resetting first streaming connection") + self._reset_connection() + self.close_connection = True + return + + # Reconnecting client: send the updated value, then hold the connection + # open with periodic heartbeats so the SDK stays connected. + self.wfile.write(put_event(RECONNECT_VALUE, 2)) + self.wfile.flush() + try: + while True: + time.sleep(5) + self.wfile.write(b":heartbeat\n\n") + self.wfile.flush() + except (BrokenPipeError, ConnectionResetError, OSError): + return + + def do_POST(self) -> None: + # The event processor POSTs analytics events to `/bulk`; accept and + # discard them so it doesn't log errors. + length = int(self.headers.get("Content-Length", 0)) + if length: + self.rfile.read(length) + self.send_response(202) + self.send_header("Content-Length", "0") + self.end_headers() + + +def main() -> None: + server = ThreadingHTTPServer(("0.0.0.0", PORT), Handler) + sys.stderr.write(f"mock-ld: listening on {PORT}\n") + server.serve_forever() + + +if __name__ == "__main__": + main() diff --git a/test/launchdarkly-reconnect/mzcompose.py b/test/launchdarkly-reconnect/mzcompose.py new file mode 100644 index 0000000000000..e26fb6851d93a --- /dev/null +++ b/test/launchdarkly-reconnect/mzcompose.py @@ -0,0 +1,80 @@ +# Copyright Materialize, Inc. and contributors. All rights reserved. +# +# Use of this software is governed by the Business Source License +# included in the LICENSE file at the root of this repository. +# +# As of the Change Date specified in that file, in accordance with +# the Business Source License, use of this software will be governed +# by the Apache License, Version 2.0. + +""" +Regression test for incident-984: the LaunchDarkly data source must reconnect +after its streaming connection drops with a non-Eof error, so that flag updates +keep syncing. + +A mock LaunchDarkly server (mock_ld.py) serves an initial flag value, resets the +first streaming connection mid-stream (reproducing the incident's non-Eof +transport error), and serves an updated value to every reconnecting client. +environmentd is pointed at the mock via MZ_LAUNCHDARKLY_BASE_URI, so the updated +value can only reach it if the data source reconnected after the reset. A +regressed SDK gets stuck on the initial value and the assertion below times out. + +Unlike test/launchdarkly, this needs no real LaunchDarkly credentials. +""" + +from materialize.mzcompose.composition import Composition, Service +from materialize.mzcompose.service import Service as DockerService +from materialize.mzcompose.services.materialized import Materialized +from materialize.mzcompose.services.testdrive import Testdrive + +FLAG_KEY = "reconnect-test" +MOCK_HOST = "mock-launchdarkly" +MOCK_PORT = 8080 + +SERVICES = [ + DockerService( + name=MOCK_HOST, + config={ + "image": "python:3.12-slim", + "volumes": ["./mock_ld.py:/app/mock_ld.py"], + "command": ["python3", "-u", "/app/mock_ld.py"], + "ports": [MOCK_PORT], + "healthcheck": { + "test": [ + "CMD", + "python3", + "-c", + "import urllib.request; urllib.request.urlopen('http://localhost:8080/health')", + ], + "interval": "1s", + "start_period": "30s", + }, + }, + ), + Materialized( + environment_extra=[ + "MZ_LAUNCHDARKLY_SDK_KEY=sdk-mock-key", + f"MZ_LAUNCHDARKLY_BASE_URI=http://{MOCK_HOST}:{MOCK_PORT}", + f"MZ_LAUNCHDARKLY_KEY_MAP=max_result_size={FLAG_KEY}", + "MZ_CONFIG_SYNC_LOOP_INTERVAL=1s", + ], + additional_system_parameter_defaults={ + "log_filter": "mz_adapter::config=debug,launchdarkly_server_sdk=debug,info", + }, + external_metadata_store=True, + ), + # The reconnect (eventsource backoff) plus the 1s sync loop means the + # updated value can take several seconds to land; give it ample room. + Testdrive(no_reset=True, seed=1, default_timeout="120s"), +] + + +def workflow_default(c: Composition) -> None: + c.up(MOCK_HOST, "materialized", Service("testdrive", idle=True)) + + # The mock serves 2 GiB on the first streaming connection, resets it + # mid-stream, then serves 3 GiB to every reconnecting client. Reaching + # 3 GiB therefore proves the data source reconnected after a non-Eof error; + # a regressed SDK stays stuck at 2 GiB and this assertion times out. We + # don't assert the transient 2 GiB value, as the reset can race startup. + c.testdrive("\n".join(["> SHOW max_result_size", "3GB"]))