diff --git a/Cargo.lock b/Cargo.lock index d6f7778192be5..5217ad61576bc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -31,7 +31,7 @@ checksum = "b169f7a6d4742236a0a00c541b845991d0ac43e546831af1249753ab4c3aa3a0" dependencies = [ "cfg-if", "cipher", - "cpufeatures", + "cpufeatures 0.2.17", ] [[package]] @@ -1579,7 +1579,7 @@ dependencies = [ "bitflags 2.11.0", "cexpr", "clang-sys", - "itertools 0.10.5", + "itertools 0.13.0", "log", "prettyplease", "proc-macro2", @@ -1939,6 +1939,17 @@ version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" +[[package]] +name = "chacha20" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6f8d983286843e49675a4b7a2d174efe136dc93a18d69130dd18198a6c167601" +dependencies = [ + "cfg-if", + "cpufeatures 0.3.0", + "rand_core 0.10.1", +] + [[package]] name = "chrono" version = "0.4.41" @@ -2006,6 +2017,12 @@ dependencies = [ "half 1.6.0", ] +[[package]] +name = "cidr" +version = "0.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579504560394e388085d0c080ea587dfa5c15f7e251b4d5247d1e1a61d1d6928" + [[package]] name = "cipher" version = "0.4.4" @@ -2349,6 +2366,15 @@ dependencies = [ "libc", ] +[[package]] +name = "cpufeatures" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b2a41393f66f16b0823bb79094d54ac5fbd34ab292ddafb9a0456ac9f87d201" +dependencies = [ + "libc", +] + [[package]] name = "crc" version = "3.3.0" @@ -3029,7 +3055,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -3411,7 +3437,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "778e2ac28f6c47af28e4907f13ffd1e1ddbd400980a9abd7c8df189bf578a5ad" dependencies = [ "libc", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -3443,17 +3469,18 @@ dependencies = [ [[package]] name = "eventsource-client" -version = "0.16.0" +version = "0.17.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c26451361cde19fe2322835b6e684f4825902edf3139ce9d6a70e6542566a0b2" +checksum = "96df8cfa11d3c8e4e1a48b81c9c600ecbe8c11d1326f41e1524cc4d42f7bf6d9" dependencies = [ "base64 0.22.1", + "bytes", "futures", - "hyper 0.14.32", - "hyper-timeout 0.4.1", + "http 1.4.2", + "launchdarkly-sdk-transport", "log", "pin-project", - "rand 0.8.5", + "rand 0.10.1", "tokio", ] @@ -3998,11 +4025,25 @@ dependencies = [ "cfg-if", "js-sys", "libc", - "r-efi", + "r-efi 5.3.0", "wasi 0.14.2+wasi-0.2.4", "wasm-bindgen", ] +[[package]] +name = "getrandom" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0de51e6874e94e7bf76d726fc5d13ba782deca734ff60d5bb2fb2607c7406555" +dependencies = [ + "cfg-if", + "libc", + "r-efi 6.0.0", + "rand_core 0.10.1", + "wasip2", + "wasip3", +] + [[package]] name = "gimli" version = "0.32.3" @@ -4425,6 +4466,26 @@ dependencies = [ "want", ] +[[package]] +name = "hyper-http-proxy" +version = "1.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7ad4b0a1e37510028bc4ba81d0e38d239c39671b0f0ce9e02dfa93a8133f7c08" +dependencies = [ + "bytes", + "futures-util", + "headers", + "http 1.4.2", + "hyper 1.9.0", + "hyper-rustls", + "hyper-util", + "pin-project-lite", + "rustls-native-certs 0.7.3", + "tokio", + "tokio-rustls", + "tower-service", +] + [[package]] name = "hyper-openssl" version = "0.10.2" @@ -4455,25 +4516,13 @@ dependencies = [ "hyper 1.9.0", "hyper-util", "rustls", - "rustls-native-certs", + "rustls-native-certs 0.8.3", "rustls-pki-types", "tokio", "tokio-rustls", "tower-service", ] -[[package]] -name = "hyper-timeout" -version = "0.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bbb958482e8c7be4bc3cf272a766a2b0bf1a6755e7a6ae777f017a31d11b13b1" -dependencies = [ - "hyper 0.14.32", - "pin-project-lite", - "tokio", - "tokio-io-timeout", -] - [[package]] name = "hyper-timeout" version = "0.5.1" @@ -4533,7 +4582,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.4", "system-configuration", "tokio", "tower-service", @@ -4933,7 +4982,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -5129,7 +5178,7 @@ dependencies = [ "http-body-util", "hyper 1.9.0", "hyper-openssl", - "hyper-timeout 0.5.1", + "hyper-timeout", "hyper-util", "jiff", "jsonpath-rust", @@ -5210,20 +5259,43 @@ dependencies = [ "tracing", ] +[[package]] +name = "launchdarkly-sdk-transport" +version = "0.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a048341aa360a768a7d91ebf6232a574f355ada0724acff2df7c5d8dc1e04c98" +dependencies = [ + "bytes", + "futures", + "http 1.4.2", + "http-body-util", + "hyper 1.9.0", + "hyper-http-proxy", + "hyper-rustls", + "hyper-timeout", + "hyper-util", + "log", + "no-proxy", + "tower 0.4.13", +] + [[package]] name = "launchdarkly-server-sdk" -version = "2.6.2" -source = "git+https://github.com/MaterializeInc/rust-server-sdk?rev=3e0a0b98b09a2970f292577a07e1c9382b65b5da#3e0a0b98b09a2970f292577a07e1c9382b65b5da" +version = "3.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0440c21aaa0670948071d9cbd8142e9d057a4b40feb3e6293e0efc6de9879b69" dependencies = [ "aws-lc-rs", + "bitflags 2.11.0", + "bytes", "chrono", "crossbeam-channel", "data-encoding", "eventsource-client", "futures", - "hyper 0.14.32", + "http 1.4.2", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk-evaluation", - "lazy_static", "log", "lru", "moka", @@ -5239,9 +5311,9 @@ dependencies = [ [[package]] name = "launchdarkly-server-sdk-evaluation" -version = "2.0.1" +version = "2.1.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "63706c23ee67f699563e5c52c7542361eccc966cba0430b6a7862c0ecaee9432" +checksum = "4bc97a52681b9860197ad81a2c1a34d96bb647459bcdf067ac1fae42c9fe8c30" dependencies = [ "base16ct", "chrono", @@ -5266,6 +5338,12 @@ dependencies = [ "spin 0.5.2", ] +[[package]] +name = "leb128fmt" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09edd9e8b54e49e587e4f6295a7d29c3ea94d469cb40ab8ca70b288248a81db2" + [[package]] name = "lexical-core" version = "1.0.5" @@ -5844,10 +5922,10 @@ dependencies = [ "hex", "http 1.4.2", "humantime", - "hyper-tls 0.5.0", "imbl", "ipnet", "itertools 0.14.0", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk", "maplit", "mz-adapter-types", @@ -6136,6 +6214,7 @@ dependencies = [ "prometheus", "proxy-header", "reqwest 0.12.28", + "rustls", "semver", "tempfile", "tokio", @@ -6423,6 +6502,7 @@ dependencies = [ "mz-txn-wal", "nix 0.31.3", "num_cpus", + "rustls", "serde", "tokio", "tower 0.5.3", @@ -6746,7 +6826,7 @@ version = "0.0.0" dependencies = [ "anyhow", "humantime", - "hyper-tls 0.5.0", + "launchdarkly-sdk-transport", "launchdarkly-server-sdk", "mz-build-info", "mz-dyncfg", @@ -6872,6 +6952,7 @@ dependencies = [ "regex", "reqwest 0.12.28", "rlimit", + "rustls", "semver", "sentry-tracing", "serde", @@ -8410,6 +8491,7 @@ dependencies = [ "postgres-protocol", "regex", "reqwest 0.12.28", + "rustls", "serde_json", "shell-words", "tempfile", @@ -8823,6 +8905,7 @@ dependencies = [ "rdkafka", "regex", "reqwest 0.12.28", + "rustls", "semver", "serde", "serde_json", @@ -9032,6 +9115,15 @@ dependencies = [ "libc", ] +[[package]] +name = "no-proxy" +version = "0.3.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9f79c902b31ceac6856e262af5dbaffef75390cf4647c9fef7b55da69a4b912e" +dependencies = [ + "cidr", +] + [[package]] name = "nom" version = "7.1.3" @@ -9792,6 +9884,7 @@ dependencies = [ "num_cpus", "num_enum 0.7.6", "prometheus", + "rustls", "serde", "serde_json", "timely", @@ -10408,7 +10501,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "03da047801ff44bb6a4d407d4860c05fd70bb81714e6b2f3812603d5b145b042" dependencies = [ "heck", - "itertools 0.10.5", + "itertools 0.14.0", "log", "multimap", "petgraph", @@ -10429,7 +10522,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b570b25f7617e43d59005d0990ccb79e950a423952cea19671b7a876da390adf" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.14.0", "proc-macro2", "quote", "syn 2.0.117", @@ -10654,6 +10747,12 @@ version = "5.3.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" +[[package]] +name = "r-efi" +version = "6.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8dcc9c7d52a811697d2151c701e0d08956f92b0e24136cf4cf27b57a6a0d9bf" + [[package]] name = "radium" version = "0.7.0" @@ -10695,6 +10794,17 @@ dependencies = [ "rand_core 0.9.3", ] +[[package]] +name = "rand" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d2e8e8bcc7961af1fdac401278c6a831614941f6164ee3bf4ce61b7edb162207" +dependencies = [ + "chacha20", + "getrandom 0.4.2", + "rand_core 0.10.1", +] + [[package]] name = "rand_chacha" version = "0.2.2" @@ -10753,6 +10863,12 @@ dependencies = [ "getrandom 0.3.3", ] +[[package]] +name = "rand_core" +version = "0.10.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "63b8176103e19a2643978565ca18b50549f6101881c443590420e4dc998a3c69" + [[package]] name = "rand_distr" version = "0.5.1" @@ -11345,7 +11461,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.4.15", - "windows-sys 0.52.0", + "windows-sys 0.59.0", ] [[package]] @@ -11358,7 +11474,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.12.1", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -11370,12 +11486,26 @@ dependencies = [ "aws-lc-rs", "log", "once_cell", + "ring", "rustls-pki-types", "rustls-webpki", "subtle", "zeroize", ] +[[package]] +name = "rustls-native-certs" +version = "0.7.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e5bfb394eeed242e909609f56089eecfe5fda225042e8b171791b9c95f5931e5" +dependencies = [ + "openssl-probe 0.1.6", + "rustls-pemfile", + "rustls-pki-types", + "schannel", + "security-framework 2.11.1", +] + [[package]] name = "rustls-native-certs" version = "0.8.3" @@ -11388,6 +11518,15 @@ dependencies = [ "security-framework 3.7.0", ] +[[package]] +name = "rustls-pemfile" +version = "2.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dce314e5fee3f39953d46bb63bb8a46d40c2f8fb7cc5a3b6cab2bde9721d6e50" +dependencies = [ + "rustls-pki-types", +] + [[package]] name = "rustls-pki-types" version = "1.14.0" @@ -11936,7 +12075,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e3bf829a2d51ab4a5ddf1352d8470c140cadc8301b2ae1789db023f01cedd6ba" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", ] @@ -11953,7 +12092,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" dependencies = [ "cfg-if", - "cpufeatures", + "cpufeatures 0.2.17", "digest", "sha2-asm", ] @@ -12480,7 +12619,7 @@ dependencies = [ "getrandom 0.3.3", "once_cell", "rustix 1.1.4", - "windows-sys 0.52.0", + "windows-sys 0.61.1", ] [[package]] @@ -12499,7 +12638,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "230a1b821ccbd75b185820a1f1ff7b14d21da1e442e22c0863ea5f08771a8874" dependencies = [ "rustix 1.1.4", - "windows-sys 0.59.0", + "windows-sys 0.61.1", ] [[package]] @@ -12777,16 +12916,6 @@ dependencies = [ "windows-sys 0.61.1", ] -[[package]] -name = "tokio-io-timeout" -version = "1.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "90c49f106be240de154571dd31fbe48acb10ba6c6dd6f6517ad603abffa42de9" -dependencies = [ - "pin-project-lite", - "tokio", -] - [[package]] name = "tokio-io-utility" version = "0.7.6" @@ -12994,7 +13123,7 @@ dependencies = [ "http-body 1.0.1", "http-body-util", "hyper 1.9.0", - "hyper-timeout 0.5.1", + "hyper-timeout", "hyper-util", "percent-encoding", "pin-project", @@ -13059,6 +13188,7 @@ dependencies = [ "pin-project-lite", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -13574,6 +13704,12 @@ version = "0.2.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4ac048d71ede7ee76d585517add45da530660ef4390e49b098733c6e897f254" +[[package]] +name = "unicode-xid" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebc1c04c71510c7f702b52b7c350734c9ff1295c464a03335b00bb84fc54f853" + [[package]] name = "unit-prefix" version = "0.5.2" @@ -13806,6 +13942,24 @@ dependencies = [ "wit-bindgen-rt", ] +[[package]] +name = "wasip2" +version = "1.0.4+wasi-0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b67efb37e106e55ce722a510d6b5f9c17f083e5fc79afc2badeb12cc313d9487" +dependencies = [ + "wit-bindgen 0.57.1", +] + +[[package]] +name = "wasip3" +version = "0.4.0+wasi-0.3.0-rc-2026-01-06" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5428f8bf88ea5ddc08faddef2ac4a67e390b88186c703ce6dbd955e1c145aca5" +dependencies = [ + "wit-bindgen 0.51.0", +] + [[package]] name = "wasite" version = "0.1.0" @@ -13867,6 +14021,28 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "wasm-encoder" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "990065f2fe63003fe337b932cfb5e3b80e0b4d0f5ff650e6985b1048f62c8319" +dependencies = [ + "leb128fmt", + "wasmparser", +] + +[[package]] +name = "wasm-metadata" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bb0e353e6a2fbdc176932bbaab493762eb1255a7900fe0fea1a2f96c296cc909" +dependencies = [ + "anyhow", + "indexmap 2.11.4", + "wasm-encoder", + "wasmparser", +] + [[package]] name = "wasm-streams" version = "0.4.1" @@ -13880,6 +14056,18 @@ dependencies = [ "web-sys", ] +[[package]] +name = "wasmparser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47b807c72e1bac69382b3a6fb3dbe8ea4c0ed87ff5629b8685ae6b9a611028fe" +dependencies = [ + "bitflags 2.11.0", + "hashbrown 0.15.3", + "indexmap 2.11.4", + "semver", +] + [[package]] name = "wasmtimer" version = "0.4.3" @@ -14209,6 +14397,32 @@ dependencies = [ "memchr", ] +[[package]] +name = "wit-bindgen" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d7249219f66ced02969388cf2bb044a09756a083d0fab1e566056b04d9fbcaa5" +dependencies = [ + "wit-bindgen-rust-macro", +] + +[[package]] +name = "wit-bindgen" +version = "0.57.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1ebf944e87a7c253233ad6766e082e3cd714b5d03812acc24c318f549614536e" + +[[package]] +name = "wit-bindgen-core" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea61de684c3ea68cb082b7a88508a8b27fcc8b797d738bfc99a82facf1d752dc" +dependencies = [ + "anyhow", + "heck", + "wit-parser", +] + [[package]] name = "wit-bindgen-rt" version = "0.39.0" @@ -14218,6 +14432,74 @@ dependencies = [ "bitflags 2.11.0", ] +[[package]] +name = "wit-bindgen-rust" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b7c566e0f4b284dd6561c786d9cb0142da491f46a9fbed79ea69cdad5db17f21" +dependencies = [ + "anyhow", + "heck", + "indexmap 2.11.4", + "prettyplease", + "syn 2.0.117", + "wasm-metadata", + "wit-bindgen-core", + "wit-component", +] + +[[package]] +name = "wit-bindgen-rust-macro" +version = "0.51.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c0f9bfd77e6a48eccf51359e3ae77140a7f50b1e2ebfe62422d8afdaffab17a" +dependencies = [ + "anyhow", + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.117", + "wit-bindgen-core", + "wit-bindgen-rust", +] + +[[package]] +name = "wit-component" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9d66ea20e9553b30172b5e831994e35fbde2d165325bec84fc43dbf6f4eb9cb2" +dependencies = [ + "anyhow", + "bitflags 2.11.0", + "indexmap 2.11.4", + "log", + "serde", + "serde_derive", + "serde_json", + "wasm-encoder", + "wasm-metadata", + "wasmparser", + "wit-parser", +] + +[[package]] +name = "wit-parser" +version = "0.244.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ecc8ac4bc1dc3381b7f59c34f00b67e18f910c2c0f50015669dde7def656a736" +dependencies = [ + "anyhow", + "id-arena", + "indexmap 2.11.4", + "log", + "semver", + "serde", + "serde_derive", + "serde_json", + "unicode-xid", + "wasmparser", +] + [[package]] name = "wyz" version = "0.5.1" diff --git a/Cargo.toml b/Cargo.toml index fcdb0ea4abffe..ab7c1ced6543c 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -400,7 +400,8 @@ junit-report = "0.8.3" k8s-controller = "0.11.0" k8s-openapi = { version = "0.27.0", features = ["schemars", "v1_32"] } kube = { version = "3.1.0", default-features = false, features = ["client", "derive", "openssl-tls", "runtime", "ws"] } -launchdarkly-server-sdk = { version = "2.6.2", default-features = false } +launchdarkly-server-sdk = { version = "3.1.1", default-features = false, features = ["hyper-rustls-native-roots", "crypto-aws-lc-rs"] } +launchdarkly-sdk-transport = "0.1" lgalloc = "0.6.0" libc = "0.2.186" lru = "0.16.3" @@ -646,9 +647,6 @@ postgres_array = { git = "https://github.com/MaterializeInc/rust-postgres-array" # Waiting on https://github.com/MaterializeInc/serde-value/pull/35. serde-value = { git = "https://github.com/MaterializeInc/serde-value.git" } -# Waiting for resolution of https://github.com/launchdarkly/rust-server-sdk/issues/116 -launchdarkly-server-sdk = { git = "https://github.com/MaterializeInc/rust-server-sdk", rev = "3e0a0b98b09a2970f292577a07e1c9382b65b5da" } - # Waiting on https://github.com/edenhill/librdkafka/pull/4051. rdkafka = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" } rdkafka-sys = { git = "https://github.com/MaterializeInc/rust-rdkafka.git" } diff --git a/deny.toml b/deny.toml index ca75af3011106..81034ea2ba2e2 100644 --- a/deny.toml +++ b/deny.toml @@ -150,6 +150,15 @@ skip = [ { name = "hashlink", version = "0.9.1" }, # held back by owo-colors 4.3 (mz-deploy terminal styling) { name = "supports-color", version = "2.1.0" }, + + # Pulled by launchdarkly-server-sdk 3.x via launchdarkly-sdk-transport / + # eventsource-client (proxy/timeout/rustls stack and the SDK's RNG path). + # NB: tower 0.4.13 is already skipped above (mz-deploy). + { name = "rustls-native-certs", version = "0.7.3" }, + { name = "rand", version = "0.10.1" }, + { name = "rand_core", version = "0.10.1" }, + { name = "getrandom", version = "0.4.2" }, + { name = "cpufeatures", version = "0.3.0" }, ] [[bans.deny]] @@ -206,6 +215,7 @@ wrappers = [ "globset", "launchdarkly-server-sdk", "launchdarkly-server-sdk-evaluation", + "launchdarkly-sdk-transport", "native-tls", "opendal", "os_info", diff --git a/src/adapter/Cargo.toml b/src/adapter/Cargo.toml index 74da334662554..32c6eeed02f92 100644 --- a/src/adapter/Cargo.toml +++ b/src/adapter/Cargo.toml @@ -29,10 +29,10 @@ hex.workspace = true humantime.workspace = true imbl.workspace = true http.workspace = true -hyper-tls = "0.5.0" ipnet.workspace = true itertools.workspace = true launchdarkly-server-sdk.workspace = true +launchdarkly-sdk-transport.workspace = true maplit.workspace = true mz-adapter-types = { path = "../adapter-types" } mz-audit-log = { path = "../audit-log" } diff --git a/src/adapter/src/config/frontend.rs b/src/adapter/src/config/frontend.rs index bc65fb7a293bd..3a942d9602870 100644 --- a/src/adapter/src/config/frontend.rs +++ b/src/adapter/src/config/frontend.rs @@ -10,16 +10,18 @@ use std::collections::BTreeMap; use std::fs; use std::path::PathBuf; -use std::sync::Arc; use std::time::Duration; +use bytes::Bytes; use derivative::Derivative; -use hyper_tls::HttpsConnector; +use futures::TryStreamExt; +use launchdarkly_sdk_transport::{ByteStream, HttpTransport, ResponseFuture}; use launchdarkly_server_sdk as ld; use mz_build_info::BuildInfo; use mz_cloud_provider::CloudProvider; use mz_cluster_client::ReplicaId; use mz_controller_types::ClusterId; +use mz_ore::metrics::UIntGauge; use mz_ore::now::NowFn; use mz_sql::catalog::EnvironmentId; use serde_json::Value as JsonValue; @@ -71,7 +73,7 @@ impl SystemParameterFrontend { /// Create a new [SystemParameterFrontend] initialize. /// /// This will create and initialize an [ld::Client] instance. The - /// [ld::Client::initialized_async] call will be attempted in a loop with an + /// [ld::Client::wait_for_initialization] call will be attempted in a loop with an /// exponential backoff with power `2s` and max duration `60s`. pub async fn from(sync_config: &SystemParameterSyncConfig) -> Result { match &sync_config.backend_config { @@ -347,25 +349,68 @@ pub struct ClusterEvalContext { pub cluster: ClusterScopeContext, } -fn ld_config(api_key: &str, metrics: &Metrics) -> ld::Config { +/// An [`HttpTransport`] wrapper that records timestamps on successful HTTP +/// responses. Used to populate Prometheus metrics that track LaunchDarkly +/// connectivity health. +/// +/// Two instances are created — one for the event processor (CSE metric, tracks +/// outbound event sends) and one for the streaming data source (SSE metric, +/// tracks inbound SSE events). +#[derive(Clone)] +struct MetricsTransport { + inner: T, + last_success_gauge: UIntGauge, + now_fn: NowFn, +} + +impl HttpTransport for MetricsTransport { + fn request(&self, request: http::Request>) -> ResponseFuture { + let inner_fut = self.inner.request(request); + let gauge = self.last_success_gauge.clone(); + let now_fn = self.now_fn.clone(); + Box::pin(async move { + let resp = inner_fut.await?; + if resp.status().is_success() { + gauge.set(now_fn() / 1000); + let (parts, body) = resp.into_parts(); + let wrapped: ByteStream = Box::pin(body.inspect_ok(move |_| { + gauge.set(now_fn() / 1000); + })); + Ok(http::Response::from_parts(parts, wrapped)) + } else { + Ok(resp) + } + }) + } +} + +fn ld_config(api_key: &str, metrics: &Metrics, now_fn: &NowFn) -> ld::Config { + let transport = launchdarkly_sdk_transport::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(300)) + .build_https() + .expect("failed to create HTTPS transport"); + + let cse_transport = MetricsTransport { + inner: transport.clone(), + last_success_gauge: metrics.last_cse_time_seconds.clone(), + now_fn: now_fn.clone(), + }; + let data_source_transport = MetricsTransport { + inner: transport, + last_success_gauge: metrics.last_sse_time_seconds.clone(), + now_fn: now_fn.clone(), + }; + + let mut event_processor = ld::EventProcessorBuilder::new(); + event_processor.transport(cse_transport); + + let mut data_source = ld::StreamingDataSourceBuilder::new(); + data_source.transport(data_source_transport); + ld::ConfigBuilder::new(api_key) - .event_processor( - ld::EventProcessorBuilder::new() - .https_connector(HttpsConnector::new()) - .on_success({ - let last_cse_time_seconds = metrics.last_cse_time_seconds.clone(); - Arc::new(move |result| { - if let Ok(ts) = u64::try_from(result.time_from_server / 1000) { - last_cse_time_seconds.set(ts); - } else { - tracing::warn!( - "Cannot convert time_from_server / 1000 from u128 to u64" - ); - } - }) - }), - ) - .data_source(ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new())) + .event_processor(&event_processor) + .data_source(&data_source) .build() .expect("valid config") } @@ -375,19 +420,9 @@ async fn ld_client( metrics: &Metrics, now_fn: &NowFn, ) -> Result { - let ld_client = ld::Client::build(ld_config(api_key, metrics))?; + let ld_client = ld::Client::build(ld_config(api_key, metrics, now_fn))?; tracing::info!("waiting for SystemParameterFrontend to initialize"); - // Start and initialize LD client for the frontend. The callback passed - // will export the last time when an SSE event from the LD server was - // received in a Prometheus metric. - ld_client.start_with_default_executor_and_callback({ - let last_sse_time_seconds = metrics.last_sse_time_seconds.clone(); - let now_fn = now_fn.clone(); - Arc::new(move |_ev| { - let ts = now_fn() / 1000; - last_sse_time_seconds.set(ts); - }) - }); + ld_client.start_with_default_executor(); let max_backoff = Duration::from_secs(60); let mut backoff = Duration::from_secs(5); @@ -579,7 +614,13 @@ fn ld_ctx( #[cfg(test)] mod tests { + use std::sync::Arc; + use std::sync::atomic::{AtomicU64, Ordering}; + + use futures::StreamExt; + use launchdarkly_sdk_transport::{ByteStream, TransportError}; use mz_build_info::DUMMY_BUILD_INFO; + use mz_ore::metrics::MetricsRegistry; use super::*; @@ -626,4 +667,212 @@ mod tests { fn environment_wide_context_is_unscoped() { ld_ctx(&env_id(), &DUMMY_BUILD_INFO, None, None).expect("environment-wide context builds"); } + + /// A fake transport that simulates a long-lived SSE streaming connection: + /// returns 200 OK immediately, then delivers multiple SSE events as body + /// chunks (exactly how LaunchDarkly's streaming data source works). + #[derive(Clone)] + struct FakeSseTransport; + + impl HttpTransport for FakeSseTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + let body: ByteStream = Box::pin(futures::stream::iter(vec![ + Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")), + Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag1\"}\n\n")), + Ok(Bytes::from("event: patch\ndata: {\"key\":\"flag2\"}\n\n")), + ])); + Box::pin(async move { + http::Response::builder() + .status(200) + .body(body) + .map_err(|e| TransportError::new(std::io::Error::other(e))) + }) + } + } + + /// A fake transport that returns an error, simulating a failed connection. + #[derive(Clone)] + struct FailingTransport; + + impl HttpTransport for FailingTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + Box::pin(async move { + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::ConnectionRefused, + "connection refused", + ))) + }) + } + } + + /// A fake transport that returns 200 OK, delivers one event, then errors + /// mid-stream with a timeout: the non-Eof stream error a dropped long-lived + /// SSE connection surfaces. + #[derive(Clone)] + struct MidStreamFailureTransport; + + impl HttpTransport for MidStreamFailureTransport { + fn request(&self, _request: http::Request>) -> ResponseFuture { + let body: ByteStream = Box::pin(futures::stream::iter(vec![ + Ok(Bytes::from("event: put\ndata: {\"flags\":{}}\n\n")), + Err(TransportError::new(std::io::Error::new( + std::io::ErrorKind::TimedOut, + "body timed out", + ))), + ])); + Box::pin(async move { + http::Response::builder() + .status(200) + .body(body) + .map_err(|e| TransportError::new(std::io::Error::other(e))) + }) + } + } + + fn test_gauge(registry: &MetricsRegistry, name: &str) -> UIntGauge { + registry.register(mz_ore::metric!( + name: name, + help: "test gauge", + )) + } + + /// Verifies that MetricsTransport updates the gauge on each body chunk, + /// not just on the initial HTTP 200 response head. This matters for + /// long-lived streaming connections where SSE events arrive as body chunks. + #[mz_ore::test(tokio::test)] + async fn test_metric_updated_on_body_chunks() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_sse_gauge"); + + let transport = MetricsTransport { + inner: FakeSseTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + assert_eq!(gauge.get(), 0); + + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let response = transport.request(request).await?; + + assert_eq!(gauge.get(), 1000); + + time.store(2_800_000, Ordering::SeqCst); + + let mut body = response.into_body(); + let mut event_count = 0; + while let Some(Ok(_chunk)) = body.next().await { + event_count += 1; + } + assert_eq!(event_count, 3); + + assert_eq!(gauge.get(), 2800); + Ok(()) + } + + #[mz_ore::test(tokio::test)] + async fn test_cse_metric_updates_correctly_per_request() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_cse_gauge"); + + let transport = MetricsTransport { + inner: FakeSseTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + let req = || -> Result>, http::Error> { + http::Request::builder() + .uri("https://events.launchdarkly.com/bulk") + .body(None) + }; + + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 1000); + + time.store(2_000_000, Ordering::SeqCst); + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 2000); + + time.store(3_000_000, Ordering::SeqCst); + let _ = transport.request(req()?).await?; + assert_eq!(gauge.get(), 3000); + Ok(()) + } + + #[mz_ore::test(tokio::test)] + async fn test_metric_not_updated_on_failed_request() -> Result<(), anyhow::Error> { + let now_fn = NowFn::from(|| 5_000_000u64); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_fail_gauge"); + + let transport = MetricsTransport { + inner: FailingTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let result = transport.request(request).await; + assert!(result.is_err()); + assert_eq!(gauge.get(), 0, "gauge must not update on transport error"); + Ok(()) + } + + /// Verifies that when an SSE connection returns 200 OK and then dies + /// mid-stream, `last_sse_time_seconds` advances only for the events that + /// arrived and then freezes — the frozen timestamp is what lets the + /// staleness alert detect a stuck data source. + #[mz_ore::test(tokio::test)] + async fn test_metric_frozen_on_midstream_error() -> Result<(), anyhow::Error> { + let time = Arc::new(AtomicU64::new(1_000_000)); + let time_clone = Arc::clone(&time); + let now_fn = NowFn::from(move || time_clone.load(Ordering::SeqCst)); + + let registry = MetricsRegistry::new(); + let gauge = test_gauge(®istry, "test_midstream_gauge"); + + let transport = MetricsTransport { + inner: MidStreamFailureTransport, + last_success_gauge: gauge.clone(), + now_fn, + }; + + // The 200 OK response head updates the gauge. + let request = http::Request::builder() + .uri("https://stream.launchdarkly.com/all") + .body(None)?; + let response = transport.request(request).await?; + assert_eq!(gauge.get(), 1000); + + // The first event arrives and advances the gauge. + time.store(2_000_000, Ordering::SeqCst); + let mut body = response.into_body(); + assert!(matches!(body.next().await, Some(Ok(_)))); + assert_eq!(gauge.get(), 2000); + + // The stream then errors mid-flight. Time has moved forward, but the + // gauge must stay frozen at the last successful event. + time.store(9_000_000, Ordering::SeqCst); + assert!(matches!(body.next().await, Some(Err(_)))); + assert_eq!( + gauge.get(), + 2000, + "gauge must freeze on mid-stream error so the staleness alert can fire" + ); + Ok(()) + } } diff --git a/src/balancerd/Cargo.toml b/src/balancerd/Cargo.toml index 8cccb6d2cdcdd..d4b5cb266058c 100644 --- a/src/balancerd/Cargo.toml +++ b/src/balancerd/Cargo.toml @@ -42,6 +42,7 @@ num_cpus.workspace = true openssl.workspace = true prometheus.workspace = true proxy-header.workspace = true +rustls.workspace = true semver.workspace = true tokio.workspace = true tokio-openssl.workspace = true diff --git a/src/balancerd/src/bin/balancerd.rs b/src/balancerd/src/bin/balancerd.rs index 28104b2e63c42..ae2784865bee0 100644 --- a/src/balancerd/src/bin/balancerd.rs +++ b/src/balancerd/src/bin/balancerd.rs @@ -170,6 +170,10 @@ pub struct ServiceArgs { } fn main() { + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig::default()); // Mirror the tokio Runtime configuration in our production binaries. diff --git a/src/clusterd/Cargo.toml b/src/clusterd/Cargo.toml index 83ad4410a6be6..1e0426711c99f 100644 --- a/src/clusterd/Cargo.toml +++ b/src/clusterd/Cargo.toml @@ -40,6 +40,7 @@ mz-timely-util = { path = "../timely-util" } mz-txn-wal = { path = "../txn-wal" } nix.workspace = true num_cpus.workspace = true +rustls.workspace = true serde.workspace = true tokio.workspace = true tower.workspace = true diff --git a/src/clusterd/src/lib.rs b/src/clusterd/src/lib.rs index a3048a21a1348..0a9765adf4153 100644 --- a/src/clusterd/src/lib.rs +++ b/src/clusterd/src/lib.rs @@ -177,6 +177,10 @@ struct Args { pub fn main() { mz_ore::panic::install_enhanced_handler(); + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args = cli::parse_args(CliConfig { env_prefix: Some("CLUSTERD_"), enable_version_flag: true, diff --git a/src/dyncfg-launchdarkly/Cargo.toml b/src/dyncfg-launchdarkly/Cargo.toml index 42ff83496db0e..b1efcbece2523 100644 --- a/src/dyncfg-launchdarkly/Cargo.toml +++ b/src/dyncfg-launchdarkly/Cargo.toml @@ -13,8 +13,8 @@ workspace = true [dependencies] anyhow.workspace = true humantime.workspace = true -hyper-tls = "0.5.0" launchdarkly-server-sdk.workspace = true +launchdarkly-sdk-transport.workspace = true mz-build-info = { path = "../build-info" } mz-dyncfg = { path = "../dyncfg" } mz-ore = { path = "../ore", default-features = false, features = ["async"] } diff --git a/src/dyncfg-launchdarkly/src/lib.rs b/src/dyncfg-launchdarkly/src/lib.rs index 3e25eb5b41bb4..50f412f777763 100644 --- a/src/dyncfg-launchdarkly/src/lib.rs +++ b/src/dyncfg-launchdarkly/src/lib.rs @@ -11,7 +11,6 @@ use std::time::Duration; -use hyper_tls::HttpsConnector; use launchdarkly_server_sdk as ld; use mz_build_info::BuildInfo; use mz_dyncfg::{ConfigSet, ConfigUpdates, ConfigVal}; @@ -50,13 +49,21 @@ where let _ = dyn_into_flag(entry.val())?; } let ld_client = if let Some(key) = launchdarkly_sdk_key { + let transport = launchdarkly_sdk_transport::HyperTransport::builder() + .connect_timeout(Duration::from_secs(10)) + .read_timeout(Duration::from_secs(300)) + .build_https() + .expect("failed to create HTTPS transport"); + + let mut data_source = ld::StreamingDataSourceBuilder::new(); + data_source.transport(transport.clone()); + + let mut event_processor = ld::EventProcessorBuilder::new(); + event_processor.transport(transport); + let config = ld::ConfigBuilder::new(key) - .event_processor( - ld::EventProcessorBuilder::new().https_connector(HttpsConnector::new()), - ) - .data_source( - ld::StreamingDataSourceBuilder::new().https_connector(HttpsConnector::new()), - ) + .data_source(&data_source) + .event_processor(&event_processor) .build() .expect("valid config"); let client = ld::Client::build(config)?; diff --git a/src/environmentd/Cargo.toml b/src/environmentd/Cargo.toml index 57c30178342ae..4410b700142f5 100644 --- a/src/environmentd/Cargo.toml +++ b/src/environmentd/Cargo.toml @@ -96,6 +96,7 @@ rand.workspace = true regex = { workspace = true, optional = true } reqwest.workspace = true rlimit.workspace = true +rustls.workspace = true semver.workspace = true sentry-tracing.workspace = true serde.workspace = true diff --git a/src/environmentd/src/environmentd/main.rs b/src/environmentd/src/environmentd/main.rs index 489b63464507e..359408c4e81e7 100644 --- a/src/environmentd/src/environmentd/main.rs +++ b/src/environmentd/src/environmentd/main.rs @@ -657,6 +657,11 @@ pub fn main() { fn run(mut args: Args) -> Result<(), anyhow::Error> { mz_ore::panic::install_enhanced_handler(); + + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let envd_start = Instant::now(); // Configure signal handling as soon as possible. We want signals to be diff --git a/src/persist-cli/Cargo.toml b/src/persist-cli/Cargo.toml index 760004453edb5..e4c847f3acaaf 100644 --- a/src/persist-cli/Cargo.toml +++ b/src/persist-cli/Cargo.toml @@ -39,6 +39,7 @@ mz-txn-wal = { path = "../txn-wal" } num_cpus.workspace = true num_enum.workspace = true prometheus.workspace = true +rustls.workspace = true serde = { workspace = true, features = ["rc"] } serde_json.workspace = true timely.workspace = true diff --git a/src/persist-cli/src/main.rs b/src/persist-cli/src/main.rs index 1df4898124f6e..5f6ba45542cb5 100644 --- a/src/persist-cli/src/main.rs +++ b/src/persist-cli/src/main.rs @@ -47,6 +47,10 @@ enum Command { } fn main() { + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig::default()); // Mirror the tokio Runtime configuration in our production binaries. diff --git a/src/sqllogictest/Cargo.toml b/src/sqllogictest/Cargo.toml index c5276285203cc..da6264f1dca92 100644 --- a/src/sqllogictest/Cargo.toml +++ b/src/sqllogictest/Cargo.toml @@ -46,6 +46,7 @@ mz-tracing = { path = "../tracing" } postgres-protocol.workspace = true regex.workspace = true reqwest.workspace = true +rustls.workspace = true shell-words.workspace = true serde_json.workspace = true tempfile.workspace = true diff --git a/src/sqllogictest/src/bin/sqllogictest.rs b/src/sqllogictest/src/bin/sqllogictest.rs index f610dada2d21e..25b9712dcb656 100644 --- a/src/sqllogictest/src/bin/sqllogictest.rs +++ b/src/sqllogictest/src/bin/sqllogictest.rs @@ -114,6 +114,10 @@ struct Args { async fn main() -> ExitCode { mz_ore::panic::install_enhanced_handler(); + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig { env_prefix: Some("MZ_"), enable_version_flag: false, diff --git a/src/testdrive/Cargo.toml b/src/testdrive/Cargo.toml index 4a168603856f3..813f6ebacf132 100644 --- a/src/testdrive/Cargo.toml +++ b/src/testdrive/Cargo.toml @@ -66,6 +66,7 @@ rand.workspace = true rdkafka.workspace = true regex.workspace = true reqwest.workspace = true +rustls.workspace = true semver.workspace = true serde.workspace = true serde_json = { workspace = true, features = ["raw_value"] } diff --git a/src/testdrive/src/bin/testdrive.rs b/src/testdrive/src/bin/testdrive.rs index 4acbb284c2dfa..92b76f9b64784 100644 --- a/src/testdrive/src/bin/testdrive.rs +++ b/src/testdrive/src/bin/testdrive.rs @@ -298,6 +298,10 @@ struct Args { #[tokio::main] async fn main() { + // Both the aws-lc-rs and ring rustls backends are linked, so rustls can't + // auto-select a provider and panics on first TLS use unless one is installed. + let _ = rustls::crypto::aws_lc_rs::default_provider().install_default(); + let args: Args = cli::parse_args(CliConfig::default()); tracing_subscriber::fmt()