diff --git a/Cargo.lock b/Cargo.lock index 1824b4514eb22..95da04963bef8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6022,6 +6022,7 @@ dependencies = [ "futures", "hyper 1.9.0", "hyper-util", + "libc", "mz-alloc", "mz-alloc-default", "mz-build-info", diff --git a/src/clusterd/Cargo.toml b/src/clusterd/Cargo.toml index f964fdc9a0767..7970a7f54fec0 100644 --- a/src/clusterd/Cargo.toml +++ b/src/clusterd/Cargo.toml @@ -17,6 +17,7 @@ fail.workspace = true futures.workspace = true hyper.workspace = true hyper-util.workspace = true +libc.workspace = true mz-alloc = { path = "../alloc" } mz-alloc-default = { path = "../alloc-default", optional = true } mz-build-info = { path = "../build-info" } @@ -38,7 +39,7 @@ mz-storage-client = { path = "../storage-client" } mz-storage-types = { path = "../storage-types" } mz-timely-util = { path = "../timely-util" } mz-txn-wal = { path = "../txn-wal" } -nix.workspace = true +nix = { workspace = true, features = ["hostname", "signal"] } num_cpus.workspace = true serde.workspace = true tokio.workspace = true diff --git a/src/clusterd/src/lib.rs b/src/clusterd/src/lib.rs index 39b6b45b04d7b..5da958c1657f3 100644 --- a/src/clusterd/src/lib.rs +++ b/src/clusterd/src/lib.rs @@ -48,6 +48,51 @@ mod usage_metrics; const BUILD_INFO: BuildInfo = build_info!(); +/// Note: `getaddrinfo` is a blocking call with no timeout. If DNS is +/// unavailable at pod startup (e.g., CoreDNS restart), this will block +/// the main thread indefinitely. In practice this is rare since CoreDNS +/// runs as a DaemonSet and is available before user pods start. +fn resolve_fqdn(short_hostname: &str) -> String { + use std::ffi::{CStr, CString}; + use std::ptr; + + let Ok(c_host) = CString::new(short_hostname) else { + return short_hostname.to_string(); + }; + + let mut hints: libc::addrinfo = unsafe { std::mem::zeroed() }; + hints.ai_flags = libc::AI_CANONNAME; + hints.ai_family = libc::AF_UNSPEC; + + let mut result: *mut libc::addrinfo = ptr::null_mut(); + + let rc = unsafe { libc::getaddrinfo(c_host.as_ptr(), ptr::null(), &hints, &mut result) }; + + if rc != 0 || result.is_null() { + eprintln!( + "warning: getaddrinfo failed for {:?} (rc={}); falling back to short hostname. \ + GRPC host validation may not work correctly.", + short_hostname, rc + ); + return short_hostname.to_string(); + } + + let fqdn = unsafe { + let info = &*result; + if info.ai_canonname.is_null() { + short_hostname.to_string() + } else { + CStr::from_ptr(info.ai_canonname) + .to_string_lossy() + .into_owned() + } + }; + + unsafe { libc::freeaddrinfo(result) }; + + fqdn +} + pub static VERSION: LazyLock = LazyLock::new(|| BUILD_INFO.human_version(None)); /// Independent cluster server for Materialize. @@ -174,9 +219,77 @@ struct Args { enable_storage_introspection_logs: bool, } +/// On Linux, PID 1 has special signal semantics: the kernel will not +/// deliver signals whose disposition is SIG_DFL (the default). Since +/// distroless containers run the binary directly as PID 1 (no tini), +/// signals like SIGTERM from Kubernetes pod termination would be silently +/// ignored without explicit handlers. This function registers a handler +/// that restores the default disposition and re-raises, producing the +/// expected termination behavior. +fn install_termination_signal_handlers() { + use nix::sys::signal; + + extern "C" fn handle_signal(signum: i32) { + let action = signal::SigAction::new( + signal::SigHandler::SigDfl, + signal::SaFlags::SA_NODEFER | signal::SaFlags::SA_ONSTACK, + signal::SigSet::empty(), + ); + unsafe { signal::sigaction(signum.try_into().unwrap(), &action) } + .unwrap_or_else(|_| panic!("failed to uninstall handler for {}", signum)); + let ret = unsafe { libc::raise(signum) }; + if ret == -1 { + panic!("failed to re-raise signal {}", signum); + } + } + + let action = signal::SigAction::new( + signal::SigHandler::Handler(handle_signal), + signal::SaFlags::SA_NODEFER | signal::SaFlags::SA_ONSTACK, + signal::SigSet::empty(), + ); + for signum in &[ + signal::SIGHUP, + signal::SIGINT, + signal::SIGALRM, + signal::SIGTERM, + ] { + unsafe { signal::sigaction(*signum, &action) } + .unwrap_or_else(|e| panic!("failed to install handler for {}: {}", signum, e)); + } +} + pub fn main() { + install_termination_signal_handlers(); + mz_ore::panic::install_enhanced_handler(); + // SAFETY: Called before any threads are spawned. + // `install_enhanced_handler` above only registers a panic hook; it does + // not spawn threads. The hook spawns a thread only if a panic fires, + // which cannot happen between here and the first `unsafe` call below. + if std::env::var("KUBERNETES_SERVICE_HOST").is_ok() { + if std::env::var("CLUSTERD_GRPC_HOST").is_err() { + if let Ok(hostname) = nix::unistd::gethostname() { + if let Some(short) = hostname.to_str() { + let fqdn = resolve_fqdn(short); + unsafe { std::env::set_var("CLUSTERD_GRPC_HOST", &fqdn) }; + } + } + } + if std::env::var("CLUSTERD_PROCESS").is_err() { + // Extract the ordinal index from the last segment of the + // StatefulSet hostname (e.g., "mz5ncn-cluster-s1-replica-s1-gen-1-0" + // → "0"). This matches orchestrator-kubernetes which also uses + // split('-').next_back() to extract the process ID from pod names. + if let Ok(hostname) = std::env::var("HOSTNAME") { + if let Some(ordinal) = hostname.rsplit('-').next() { + unsafe { std::env::set_var("CLUSTERD_PROCESS", ordinal) }; + } + } + } + } + let args = cli::parse_args(CliConfig { env_prefix: Some("CLUSTERD_"), enable_version_flag: true,