Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions src/clusterd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,9 +174,40 @@ struct Args {
enable_storage_introspection_logs: bool,
}

/// The process ordinal for a StatefulSet pod, taken from the trailing
/// `-`-delimited segment of its hostname (e.g.
/// "mz5ncn-cluster-s1-replica-s1-gen-1-0" → "0"). This mirrors how
/// orchestrator-kubernetes recovers the process id from pod names.
///
/// Returns `None` when the trailing segment is not a non-negative integer, so
/// an unexpected hostname leaves `CLUSTERD_PROCESS` unset rather than set to a
/// value that fails to parse as the process index.
fn process_ordinal_from_hostname(hostname: &str) -> Option<&str> {
let ordinal = hostname.rsplit('-').next()?;
ordinal.parse::<usize>().ok().map(|_| ordinal)
}

pub fn main() {
mz_ore::panic::install_enhanced_handler();

// Derive `CLUSTERD_PROCESS` (the process ordinal) from the pod hostname
// when running under Kubernetes and it was not set explicitly. The
// distroless image has no shell entrypoint to do this, so clusterd does it
// itself.
if std::env::var("KUBERNETES_SERVICE_HOST").is_ok()
&& std::env::var("CLUSTERD_PROCESS").is_err()
{
if let Ok(hostname) = std::env::var("HOSTNAME") {
if let Some(ordinal) = process_ordinal_from_hostname(&hostname) {
// SAFETY: `set_var` is called before any threads are spawned.
// `install_enhanced_handler` above only registers a panic hook.
// That hook spawns a thread only on panic, which cannot happen
// before this call.
unsafe { std::env::set_var("CLUSTERD_PROCESS", ordinal) };
}
}
}

let args = cli::parse_args(CliConfig {
env_prefix: Some("CLUSTERD_"),
enable_version_flag: true,
Expand Down Expand Up @@ -477,3 +508,31 @@ fn is_connection_error(e: &std::io::Error) -> bool {
| std::io::ErrorKind::ConnectionReset
)
}

#[cfg(test)]
mod tests {
use super::*;

#[mz_ore::test]
fn test_process_ordinal_from_hostname() {
// A StatefulSet pod name ends in the process ordinal.
assert_eq!(
process_ordinal_from_hostname("mz5ncn-cluster-s1-replica-s1-gen-1-0"),
Some("0")
);
assert_eq!(
process_ordinal_from_hostname("mz5ncn-cluster-s1-replica-s1-gen-1-11"),
Some("11")
);
// A bare numeric hostname is its own ordinal.
assert_eq!(process_ordinal_from_hostname("7"), Some("7"));

// A trailing segment that is not a non-negative integer yields `None`,
// so `CLUSTERD_PROCESS` stays unset rather than being set to a value
// that fails to parse as the process index.
assert_eq!(process_ordinal_from_hostname("clusterd"), None);
assert_eq!(process_ordinal_from_hostname("replica-abc"), None);
assert_eq!(process_ordinal_from_hostname("replica-"), None);
assert_eq!(process_ordinal_from_hostname(""), None);
}
}
Loading