diff --git a/conf/experimental/test_scenario/nixl_ep.toml b/conf/experimental/test_scenario/nixl_ep.toml new file mode 100644 index 000000000..6f5add01e --- /dev/null +++ b/conf/experimental/test_scenario/nixl_ep.toml @@ -0,0 +1,97 @@ +# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES +# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved. +# SPDX-License-Identifier: Apache-2.0 +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +name = "NIXL-EP-Example.Inter-node" + +[[Tests]] +id = "NIXL.EP.No-expansion" +num_nodes = 2 +time_limit = "00:10:00" + +name = "NIXL.EP.No-expansion" +description = "NIXL EP Example: No-expansion plan" +test_template_name = "NixlEP" + + [Tests.cmd_args] + docker_image_url = "" + plan = "[[0, 1]]" + num_processes_per_node = 1 + num_tokens = 256 + num_experts_per_rank = 4 + hidden_dim = 8192 + num_topk = 6 + disable_ll_nvlink = true + kineto = false + +[[Tests]] +id = "NIXL.EP.Single-expansion" +num_nodes = 2 +time_limit = "00:10:00" + +name = "NIXL.EP.Single-expansion" +description = "NIXL EP Example: Single-expansion plan" +test_template_name = "NixlEP" + + [Tests.cmd_args] + docker_image_url = "" + plan = "[[0, 1], [0, 1, 2, 3]]" + num_processes_per_node = 2 + num_tokens = 256 + num_experts_per_rank = 4 + hidden_dim = 8192 + num_topk = 6 + disable_ll_nvlink = true + kineto = false + +[[Tests]] +id = "NIXL.EP.Double-expansion" +num_nodes = 2 +time_limit = "00:10:00" + +name = "NIXL.EP.Double-expansion" +description = "NIXL EP Example: Double-expansion plan" +test_template_name = "NixlEP" + + [Tests.cmd_args] + docker_image_url = "" + plan = "[[0], [0, 1], [0, 1, 2, 3]]" + num_processes_per_node = 2 + num_tokens = 256 + num_experts_per_rank = 4 + hidden_dim = 8192 + num_topk = 6 + disable_ll_nvlink = true + kineto = false + +[[Tests]] +id = "NIXL.EP.Expansion-contraction" +num_nodes = 2 +time_limit = "00:10:00" + +name = "NIXL.EP.Expansion-contraction" +description = "NIXL EP Example: Expansion-contraction plan (elasticity testing)" +test_template_name = "NixlEP" + + [Tests.cmd_args] + docker_image_url = "" + plan = "[[0, 1], [0, 1, 2, 3], [0, -2, 3], [0, 1, 2, 3]]" + num_processes_per_node = 3 + num_tokens = 256 + num_experts_per_rank = 4 + hidden_dim = 8192 + num_topk = 6 + disable_ll_nvlink = true + kineto = false diff --git a/src/cloudai/workloads/nixl_ep/nixl_ep.py b/src/cloudai/workloads/nixl_ep/nixl_ep.py index a99020a9e..7afacda5a 100644 --- a/src/cloudai/workloads/nixl_ep/nixl_ep.py +++ b/src/cloudai/workloads/nixl_ep/nixl_ep.py @@ -27,6 +27,11 @@ from .log_parsing import parse_nixl_ep_bandwidth_samples GENERATED_PLAN_FILE_NAME = "nixl-ep-plan.json" +LAUNCHER_EXIT_MARKER = "NIXL EP launcher exiting with rc=" +LAUNCHER_START_MARKERS = ( + "Starting initial NIXL EP stage", + "Starting NIXL EP on the master node", +) class NixlEPCmdArgs(CmdArgs): @@ -149,10 +154,28 @@ def _primary_launch_exit_error_message(content: str) -> str | None: return f"The primary NIXL EP launch exited before phase {phase} completed." + @staticmethod + def _looks_like_planned_srun_termination(content: str) -> bool: + allowed_patterns = ( + re.compile(r"^srun: error: .+: task \d+: Terminated$"), + re.compile(r"^srun: Terminating StepId=\S+$"), + re.compile(r"^srun: Force Terminated StepId=\S+$"), + ) + lines = [line.strip() for line in content.splitlines() if line.strip()] + return bool(lines) and all(any(pattern.match(line) for pattern in allowed_patterns) for line in lines) + + def _has_planned_rank_removal(self) -> bool: + plans = self.cmd_args.plan if isinstance(self.cmd_args.plan, list) else [self.cmd_args.plan] + return any(rank < 0 for plan in plans for phase in NixlEPCmdArgs._parse_plan(plan) for rank in phase) + def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None: if not path.is_file(): return None + content = path.read_text(encoding="utf-8", errors="ignore") + if self._has_planned_rank_removal() and self._looks_like_planned_srun_termination(content): + return None + launcher_failure_patterns = ( ("python3: can't open file", "The benchmark entrypoint could not be opened."), ("Traceback (most recent call last):", "The benchmark launcher raised a Python traceback."), @@ -164,7 +187,6 @@ def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None: ("srun: error:", "Slurm reported an srun failure."), ("Exited with exit code", "A Slurm step exited with a non-zero status."), ) - content = path.read_text(encoding="utf-8", errors="ignore") primary_launch_error = self._primary_launch_exit_error_message(content) if primary_launch_error is not None: tail = self._tail(path) @@ -200,6 +222,25 @@ def _check_benchmark_output(self, expected_node_logs: list[Path]) -> JobStatusRe return JobStatusResult(is_successful=False, error_message=error_message) + def _check_launcher_terminal_verdict(self, stdout_path: Path) -> JobStatusResult | None: + if not stdout_path.is_file(): + return None + + content = stdout_path.read_text(encoding="utf-8", errors="ignore") + if LAUNCHER_EXIT_MARKER in content: + return None + if not any(marker in content for marker in LAUNCHER_START_MARKERS): + return None + + tail = self._tail(stdout_path) + error_message = ( + "The NIXL EP launcher started but exited before printing its terminal verdict. " + f"Expected '{LAUNCHER_EXIT_MARKER}' in {stdout_path}." + ) + if tail: + error_message += f"\n{tail}" + return JobStatusResult(is_successful=False, error_message=error_message) + def was_run_successful(self, tr: TestRun) -> JobStatusResult: output_path = tr.output_path expected_node_logs = [tr.output_path / f"nixl-ep-node-{node_idx}.log" for node_idx in range(tr.nnodes)] @@ -225,4 +266,8 @@ def was_run_successful(self, tr: TestRun) -> JobStatusResult: if benchmark_output_result is not None: return benchmark_output_result + launcher_verdict_result = self._check_launcher_terminal_verdict(output_path / "stdout.txt") + if launcher_verdict_result is not None: + return launcher_verdict_result + return JobStatusResult(is_successful=True) diff --git a/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py b/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py index e60c8dfab..0905032ac 100644 --- a/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/nixl_ep/slurm_command_gen_strategy.py @@ -35,7 +35,10 @@ class NixlEPLaunch: node_idx: int num_processes: int include_tcp_server: bool + step_name: str = "" append_output: bool = False + rank_ids: tuple[int, ...] = () + expects_planned_removal: bool = False @dataclass(frozen=True) @@ -101,15 +104,19 @@ def phase_transition_timeout_seconds(self) -> int: num_plan_phases = max(len(self.tdef.cmd_args.parse_plan()), 1) return max(phase_timeout // num_plan_phases, 1) - def _new_process_counts_by_phase(self) -> list[int]: - counts: list[int] = [] + def _new_rank_ids_by_phase(self) -> list[tuple[int, ...]]: + rank_ids_by_phase: list[tuple[int, ...]] = [] previous_positive_ranks: set[int] = set() for positive_ranks in [{rank for rank in phase if rank >= 0} for phase in self.tdef.cmd_args.parse_plan()]: - counts.append(len(positive_ranks - previous_positive_ranks)) + rank_ids_by_phase.append(tuple(sorted(positive_ranks - previous_positive_ranks))) previous_positive_ranks = positive_ranks + counts = [len(rank_ids) for rank_ids in rank_ids_by_phase] self._validate_requested_processes(counts) - return counts + return rank_ids_by_phase + + def _planned_removal_ranks_after_phase(self, phase_idx: int) -> set[int]: + return {abs(rank) for phase in self.tdef.cmd_args.parse_plan()[phase_idx + 1 :] for rank in phase if rank < 0} def _validate_requested_processes(self, new_process_counts: list[int]) -> None: total_requested_processes = sum(new_process_counts) @@ -132,52 +139,57 @@ def _validate_requested_processes(self, new_process_counts: list[int]) -> None: ) def _allocate_stage_launches( - self, phase_idx: int, new_process_count: int, remaining_capacity: list[int] + self, phase_idx: int, new_rank_ids: tuple[int, ...], remaining_capacity: list[int] ) -> tuple[NixlEPLaunch, ...]: - if new_process_count == 0: + if not new_rank_ids: return () launches: list[NixlEPLaunch] = [] - remaining_phase_processes = new_process_count + remaining_phase_rank_ids = list(new_rank_ids) is_initial_phase = phase_idx == 0 + planned_removal_ranks = self._planned_removal_ranks_after_phase(phase_idx) for node_idx, node_capacity in enumerate(remaining_capacity): - if remaining_phase_processes == 0: + if not remaining_phase_rank_ids: break - assignable = min(node_capacity, remaining_phase_processes) + assignable = min(node_capacity, len(remaining_phase_rank_ids)) if assignable == 0: continue + rank_ids = tuple(remaining_phase_rank_ids[:assignable]) + remaining_phase_rank_ids = remaining_phase_rank_ids[assignable:] remaining_capacity[node_idx] -= assignable - remaining_phase_processes -= assignable launches.append( NixlEPLaunch( node_idx=node_idx, num_processes=assignable, include_tcp_server=(not is_initial_phase) or node_idx != 0, + step_name=f"nixl-ep-p{phase_idx}-l{len(launches)}", append_output=not is_initial_phase, + rank_ids=rank_ids, + expects_planned_removal=any(rank in planned_removal_ranks for rank in rank_ids), ) ) - if remaining_phase_processes != 0: + if remaining_phase_rank_ids: num_nodes, _ = self.get_cached_nodes_spec() raise ValueError( "For multi-node NIXL EP runs, the plan-derived launches cannot be packed onto " f"{num_nodes} nodes with per-node capacity {self.num_processes_per_node}. " - f"Remaining phase size: {remaining_phase_processes}." + f"Remaining phase size: {len(remaining_phase_rank_ids)}." ) return tuple(launches) @property def plan_stages(self) -> tuple[NixlEPStage, ...]: - new_process_counts = self._new_process_counts_by_phase() + new_rank_ids_by_phase = self._new_rank_ids_by_phase() num_nodes, _ = self.get_cached_nodes_spec() remaining_capacity = [self.num_processes_per_node] * num_nodes stages: list[NixlEPStage] = [] - for phase_idx, new_process_count in enumerate(new_process_counts): - launches = self._allocate_stage_launches(phase_idx, new_process_count, remaining_capacity) + for phase_idx, new_rank_ids in enumerate(new_rank_ids_by_phase): + launches = self._allocate_stage_launches(phase_idx, new_rank_ids, remaining_capacity) stages.append(NixlEPStage(idx=phase_idx, launches=launches)) return tuple(stages) @@ -226,11 +238,13 @@ def generate_wait_for_master_services_function(self) -> str: return 1 }}""" - def _launch_srun_prefix(self, node_idx: int) -> str: + def _launch_srun_prefix(self, launch: NixlEPLaunch) -> str: + node_idx = launch.node_idx target_arg = f'--nodelist="${{nodes_array[{node_idx}]}}"' parts = [ *self.gen_srun_prefix(with_num_nodes=False), "--overlap", + f"--job-name={launch.step_name}", target_arg, "--ntasks-per-node=1", "--ntasks=1", @@ -245,7 +259,7 @@ def _render_launch(self, launch: NixlEPLaunch) -> str: open_mode_arg = " --open-mode=append" if launch.append_output else "" script = f"source {shlex.quote(str(env_file))}; {command}".replace('"', '\\"') return ( - f"{self._launch_srun_prefix(launch.node_idx)}{open_mode_arg} " + f"{self._launch_srun_prefix(launch)}{open_mode_arg} " f'--output={log_file} --error={log_file} bash -c "{script}"' ) @@ -289,7 +303,18 @@ def _write_env_vars_file(self) -> None: def _background_launches_lines(self, launches: tuple[NixlEPLaunch, ...]) -> list[str]: lines: list[str] = [] for launch in launches: - lines.extend([self._render_launch(launch) + " &", "active_srun_count=$((active_srun_count + 1))"]) + expected_removal = "1" if launch.expects_planned_removal else "0" + lines.extend( + [ + f'launch_step_names+=( "{launch.step_name}" )', + self._render_launch(launch) + " &", + 'launch_pids+=( "$!" )', + f'launch_step_id="$(wait_for_launch_step_id "{launch.step_name}" || true)"', + f'echo "NIXL EP launch {launch.step_name} step ID: ${{launch_step_id:-unknown}}"', + 'launch_step_ids+=( "$launch_step_id" )', + f'launch_expected_removal+=( "{expected_removal}" )', + ] + ) return lines @staticmethod @@ -299,23 +324,43 @@ def _finish_with_rc_lines() -> list[str]: ' echo "All NIXL EP launches completed successfully"', "fi", "", - "exit $rc", + 'echo "NIXL EP launcher exiting with rc=$rc"', + 'exit "$rc"', ] @classmethod def _wait_for_workers_lines(cls) -> list[str]: return [ + "", + "pending_term=0", + "ignored_expected_removal=0", + "on_nixl_ep_deferred_term() {", + " pending_term=1", + ' echo "Deferring TERM while collecting NIXL EP launch statuses"', + "}", + "trap on_nixl_ep_deferred_term TERM", "", "rc=0", - 'while [ "$active_srun_count" -gt 0 ]; do', - " wait -n", + 'for idx in "${!launch_pids[@]}"; do', + ' wait "${launch_pids[$idx]}"', " wait_rc=$?", - " active_srun_count=$((active_srun_count - 1))", - ' if [ "$wait_rc" -ne 0 ] && [ "$rc" -eq 0 ]; then', + ' if [ "$wait_rc" -eq 0 ]; then', + " continue", + " fi", + ' if [ "${launch_expected_removal[$idx]}" = "1" ] && [ "$wait_rc" -eq 143 ]; then', + (' echo "Ignoring expected NIXL EP planned-rank-removal exit from launch PID ${launch_pids[$idx]}"'), + " ignored_expected_removal=1", + ' elif [ "$pending_term" -ne 0 ] && [ "$wait_rc" -eq 143 ]; then', + ' echo "Ignoring deferred TERM wait interruption for launch PID ${launch_pids[$idx]}"', + ' elif [ "$rc" -eq 0 ]; then', " rc=$wait_rc", " fi", "done", "", + 'if [ "$pending_term" -ne 0 ] && [ "$ignored_expected_removal" -eq 0 ] && [ "$rc" -eq 0 ]; then', + " rc=143", + "fi", + "", *cls._finish_with_rc_lines(), ] @@ -357,14 +402,23 @@ def _initial_follower_launch_lines(self, stage: NixlEPStage) -> list[str]: def _initial_stage_lines(self, stage: NixlEPStage, has_followers: bool) -> list[str]: primary_launch = stage.launches[0] + expected_removal = "1" if primary_launch.expects_planned_removal else "0" master_service_lines = self._wait_for_master_services_lines() if has_followers else [] header_lines = [ - "active_srun_count=0", + "launch_pids=()", + "launch_step_names=()", + "launch_step_ids=()", + "launch_expected_removal=()", "", 'echo "Starting initial NIXL EP stage on the master node..."', + f'launch_step_names+=( "{primary_launch.step_name}" )', self._render_launch(primary_launch) + " &", "primary_pid=$!", - "active_srun_count=$((active_srun_count + 1))", + 'launch_pids+=( "$primary_pid" )', + f'launch_step_id="$(wait_for_launch_step_id "{primary_launch.step_name}" || true)"', + f'echo "NIXL EP launch {primary_launch.step_name} step ID: ${{launch_step_id:-unknown}}"', + 'launch_step_ids+=( "$launch_step_id" )', + f'launch_expected_removal+=( "{expected_removal}" )', ] return header_lines + master_service_lines + self._initial_follower_launch_lines(stage) @@ -413,6 +467,37 @@ def _launcher_prologue_lines(self) -> list[str]: @staticmethod def _cleanup_function_lines() -> list[str]: return [ + "wait_for_launch_step_id() {", + ' local step_name="$1"', + " local step_id", + " for _ in {1..10}; do", + ' step_id=$(squeue --noheader --steps --job "$SLURM_JOB_ID" --format="%i %j" ' + + "| awk -v step_name=\"$step_name\" '$2 == step_name { print $1; exit }')", + ' if [ -n "$step_id" ]; then', + ' echo "$step_id"', + " return 0", + " fi", + " sleep 1", + " done", + " return 1", + "}", + "", + "signal_nixl_ep_steps() {", + ' local signal="$1"', + " local idx", + " local step_id", + ' for idx in "${!launch_step_names[@]}"; do', + ' step_id="${launch_step_ids[$idx]:-}"', + ' if [ -z "$step_id" ]; then', + ' step_id="$(wait_for_launch_step_id "${launch_step_names[$idx]}" || true)"', + ' launch_step_ids[$idx]="$step_id"', + " fi", + ' if [ -n "$step_id" ]; then', + ' scancel --signal="$signal" "$step_id" >/dev/null 2>&1 || true', + " fi", + " done", + "}", + "", "cleanup_nixl_ep() {", " local pids", ' pids="$(jobs -pr)"', @@ -420,18 +505,20 @@ def _cleanup_function_lines() -> list[str]: " return 0", " fi", ' echo "Cleaning up NIXL EP background launches..."', - " kill -TERM $pids >/dev/null 2>&1 || true", + ' signal_nixl_ep_steps "TERM"', " sleep 2", ' pids="$(jobs -pr)"', ' if [ -n "$pids" ]; then', - " kill -KILL $pids >/dev/null 2>&1 || true", + ' signal_nixl_ep_steps "KILL"', " fi", " wait >/dev/null 2>&1 || true", "}", "", "on_nixl_ep_signal() {", ' local rc="$1"', + ' echo "NIXL EP launcher received signal, rc=$rc"', " cleanup_nixl_ep", + ' echo "NIXL EP launcher exiting with rc=$rc"', ' exit "$rc"', "}", "", diff --git a/tests/ref_data/nixl-ep-launch.sh b/tests/ref_data/nixl-ep-launch.sh index 2dec420a8..3cc0e7786 100644 --- a/tests/ref_data/nixl-ep-launch.sh +++ b/tests/ref_data/nixl-ep-launch.sh @@ -12,6 +12,36 @@ echo "Num Nodes: ${#nodes_array[@]}" echo "Master Node: $master_node" echo "Master IP: $master_ip" +wait_for_launch_step_id() { + local step_name="$1" + local step_id + for _ in {1..10}; do + step_id=$(squeue --noheader --steps --job "$SLURM_JOB_ID" --format="%i %j" | awk -v step_name="$step_name" '$2 == step_name { print $1; exit }') + if [ -n "$step_id" ]; then + echo "$step_id" + return 0 + fi + sleep 1 + done + return 1 +} + +signal_nixl_ep_steps() { + local signal="$1" + local idx + local step_id + for idx in "${!launch_step_names[@]}"; do + step_id="${launch_step_ids[$idx]:-}" + if [ -z "$step_id" ]; then + step_id="$(wait_for_launch_step_id "${launch_step_names[$idx]}" || true)" + launch_step_ids[$idx]="$step_id" + fi + if [ -n "$step_id" ]; then + scancel --signal="$signal" "$step_id" >/dev/null 2>&1 || true + fi + done +} + cleanup_nixl_ep() { local pids pids="$(jobs -pr)" @@ -19,18 +49,20 @@ cleanup_nixl_ep() { return 0 fi echo "Cleaning up NIXL EP background launches..." - kill -TERM $pids >/dev/null 2>&1 || true + signal_nixl_ep_steps "TERM" sleep 2 pids="$(jobs -pr)" if [ -n "$pids" ]; then - kill -KILL $pids >/dev/null 2>&1 || true + signal_nixl_ep_steps "KILL" fi wait >/dev/null 2>&1 || true } on_nixl_ep_signal() { local rc="$1" + echo "NIXL EP launcher received signal, rc=$rc" cleanup_nixl_ep + echo "NIXL EP launcher exiting with rc=$rc" exit "$rc" } @@ -83,12 +115,20 @@ wait_for_phase_completion() { return 1 } -active_srun_count=0 +launch_pids=() +launch_step_names=() +launch_step_ids=() +launch_expected_removal=() echo "Starting initial NIXL EP stage on the master node..." -srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --nodelist="${nodes_array[0]}" --ntasks-per-node=1 --ntasks=1 -N1 --output=__OUTPUT_DIR__/output/nixl-ep-node-0.log --error=__OUTPUT_DIR__/output/nixl-ep-node-0.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 4 --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & +launch_step_names+=( "nixl-ep-p0-l0" ) +srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --job-name=nixl-ep-p0-l0 --nodelist="${nodes_array[0]}" --ntasks-per-node=1 --ntasks=1 -N1 --output=__OUTPUT_DIR__/output/nixl-ep-node-0.log --error=__OUTPUT_DIR__/output/nixl-ep-node-0.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 4 --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & primary_pid=$! -active_srun_count=$((active_srun_count + 1)) +launch_pids+=( "$primary_pid" ) +launch_step_id="$(wait_for_launch_step_id "nixl-ep-p0-l0" || true)" +echo "NIXL EP launch nixl-ep-p0-l0 step ID: ${launch_step_id:-unknown}" +launch_step_ids+=( "$launch_step_id" ) +launch_expected_removal+=( "0" ) echo "Waiting for NIXL EP master services..." wait_for_master_services || exit 1 @@ -97,28 +137,58 @@ echo "Waiting for phase 0 before starting phase 1..." wait_for_phase_completion "0" "__OUTPUT_DIR__/output/nixl-ep-node-0.log" "$primary_pid" || exit 1 echo "Starting launches for phase 1..." -srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --nodelist="${nodes_array[1]}" --ntasks-per-node=1 --ntasks=1 -N1 --open-mode=append --output=__OUTPUT_DIR__/output/nixl-ep-node-1.log --error=__OUTPUT_DIR__/output/nixl-ep-node-1.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 4 --tcp-server $master_ip --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & -active_srun_count=$((active_srun_count + 1)) +launch_step_names+=( "nixl-ep-p1-l0" ) +srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --job-name=nixl-ep-p1-l0 --nodelist="${nodes_array[1]}" --ntasks-per-node=1 --ntasks=1 -N1 --open-mode=append --output=__OUTPUT_DIR__/output/nixl-ep-node-1.log --error=__OUTPUT_DIR__/output/nixl-ep-node-1.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 4 --tcp-server $master_ip --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & +launch_pids+=( "$!" ) +launch_step_id="$(wait_for_launch_step_id "nixl-ep-p1-l0" || true)" +echo "NIXL EP launch nixl-ep-p1-l0 step ID: ${launch_step_id:-unknown}" +launch_step_ids+=( "$launch_step_id" ) +launch_expected_removal+=( "1" ) echo "Waiting for phase 2 before starting phase 3..." wait_for_phase_completion "2" "__OUTPUT_DIR__/output/nixl-ep-node-0.log" "$primary_pid" || exit 1 echo "Starting launches for phase 3..." -srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --nodelist="${nodes_array[2]}" --ntasks-per-node=1 --ntasks=1 -N1 --open-mode=append --output=__OUTPUT_DIR__/output/nixl-ep-node-2.log --error=__OUTPUT_DIR__/output/nixl-ep-node-2.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 2 --tcp-server $master_ip --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & -active_srun_count=$((active_srun_count + 1)) +launch_step_names+=( "nixl-ep-p3-l0" ) +srun --export=ALL --mpi=pmix --container-image=docker.io/nvidia/nixl-ep:latest --container-mounts=__OUTPUT_DIR__/output:/cloudai_run_results,__INSTALL_DIR__:/cloudai_install,__OUTPUT_DIR__/output --overlap --job-name=nixl-ep-p3-l0 --nodelist="${nodes_array[2]}" --ntasks-per-node=1 --ntasks=1 -N1 --open-mode=append --output=__OUTPUT_DIR__/output/nixl-ep-node-2.log --error=__OUTPUT_DIR__/output/nixl-ep-node-2.log bash -c "source __OUTPUT_DIR__/output/env_vars.sh; python3 /workspace/nixl/examples/device/ep/tests/elastic/elastic.py --plan __OUTPUT_DIR__/output/nixl-ep-plan.json --num-processes 2 --tcp-server $master_ip --disable-ll-nvlink --hidden-dim 8192 --kineto --num-experts-per-rank 4 --num-tokens 256 --num-topk 6" & +launch_pids+=( "$!" ) +launch_step_id="$(wait_for_launch_step_id "nixl-ep-p3-l0" || true)" +echo "NIXL EP launch nixl-ep-p3-l0 step ID: ${launch_step_id:-unknown}" +launch_step_ids+=( "$launch_step_id" ) +launch_expected_removal+=( "0" ) + +pending_term=0 +ignored_expected_removal=0 +on_nixl_ep_deferred_term() { + pending_term=1 + echo "Deferring TERM while collecting NIXL EP launch statuses" +} +trap on_nixl_ep_deferred_term TERM rc=0 -while [ "$active_srun_count" -gt 0 ]; do - wait -n +for idx in "${!launch_pids[@]}"; do + wait "${launch_pids[$idx]}" wait_rc=$? - active_srun_count=$((active_srun_count - 1)) - if [ "$wait_rc" -ne 0 ] && [ "$rc" -eq 0 ]; then + if [ "$wait_rc" -eq 0 ]; then + continue + fi + if [ "${launch_expected_removal[$idx]}" = "1" ] && [ "$wait_rc" -eq 143 ]; then + echo "Ignoring expected NIXL EP planned-rank-removal exit from launch PID ${launch_pids[$idx]}" + ignored_expected_removal=1 + elif [ "$pending_term" -ne 0 ] && [ "$wait_rc" -eq 143 ]; then + echo "Ignoring deferred TERM wait interruption for launch PID ${launch_pids[$idx]}" + elif [ "$rc" -eq 0 ]; then rc=$wait_rc fi done +if [ "$pending_term" -ne 0 ] && [ "$ignored_expected_removal" -eq 0 ] && [ "$rc" -eq 0 ]; then + rc=143 +fi + if [ "$rc" -eq 0 ]; then echo "All NIXL EP launches completed successfully" fi -exit $rc +echo "NIXL EP launcher exiting with rc=$rc" +exit "$rc" diff --git a/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py index 663aa88e4..ce2bf0cc1 100644 --- a/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py +++ b/tests/workloads/nixl_ep/test_command_gen_strategy_slurm.py @@ -130,6 +130,10 @@ def normalize_stages(strategy: NixlEPSlurmCommandGenStrategy) -> list[tuple[int, return normalized_stages +def normalize_stage_rank_ids(strategy: NixlEPSlurmCommandGenStrategy) -> list[tuple[int, tuple[tuple[int, ...], ...]]]: + return [(stage.idx, tuple(launch.rank_ids for launch in stage.launches)) for stage in strategy.plan_stages] + + def read_launcher_script(strategy: NixlEPSlurmCommandGenStrategy) -> str: srun_command = strategy.gen_srun_command() assert srun_command == f"bash {strategy.launcher_script_path.absolute()}" @@ -507,6 +511,19 @@ def test_gen_srun_command_single_node(nixl_ep: NixlEPTestDefinition, slurm_syste assert_launcher_sruns_are_explicitly_routed(launcher_script) assert "trap cleanup_nixl_ep EXIT" in launcher_script assert "jobs -pr" in launcher_script + assert 'squeue --noheader --steps --job "$SLURM_JOB_ID"' in launcher_script + assert "--job-name=nixl-ep-p0-l0" in launcher_script + assert 'launch_step_names+=( "nixl-ep-p0-l0" )' in launcher_script + assert 'launch_step_ids+=( "$launch_step_id" )' in launcher_script + assert 'signal_nixl_ep_steps "TERM"' in launcher_script + assert 'signal_nixl_ep_steps "KILL"' in launcher_script + assert 'scancel --signal="$signal" "$step_id"' in launcher_script + assert 'scancel --signal=TERM "$SLURM_JOB_ID"' not in launcher_script + assert 'scancel --signal=KILL "$SLURM_JOB_ID"' not in launcher_script + assert "NIXL EP launcher exiting with rc=$rc" in launcher_script + assert "NIXL EP launcher received signal, rc=$rc" in launcher_script + assert "kill -TERM" not in launcher_script + assert "kill -KILL" not in launcher_script def test_gen_srun_command_single_node_static_plan(nixl_ep: NixlEPTestDefinition, slurm_system: SlurmSystem) -> None: @@ -664,6 +681,48 @@ def test_multi_node_stages_match_public_two_node_single_expansion( assert normalize_stages(strategy) == [(0, (4, 0)), (1, (0, 4))] +def test_multi_node_stages_mark_launches_with_planned_rank_removal( + slurm_system: SlurmSystem, +) -> None: + tdef = NixlEPTestDefinition( + name="nixl_ep", + description="NIXL Elastic EP benchmark", + test_template_name="NixlEP", + cmd_args=NixlEPCmdArgs( + docker_image_url="docker.io/nvidia/nixl-ep:latest", + plan=json.dumps([[0, 1], [0, 1, 2, 3], [0, -2, 3], [0, 1, 2, 3]]), + num_processes_per_node=3, + ), + ) + test_run = TestRun( + name="nixl-ep", + num_nodes=2, + nodes=[], + test=tdef, + output_path=slurm_system.output_path, + ) + strategy = NixlEPSlurmCommandGenStrategy(slurm_system, test_run) + + assert normalize_stages(strategy) == [(0, (2, 0)), (1, (1, 1)), (2, (0, 0)), (3, (0, 2))] + assert normalize_stage_rank_ids(strategy) == [(0, ((0, 1),)), (1, ((2,), (3,))), (2, ()), (3, ((1, 2),))] + assert [[launch.expects_planned_removal for launch in stage.launches] for stage in strategy.plan_stages] == [ + [False], + [True, False], + [], + [False], + ] + + launcher_script = read_launcher_script(strategy) + + assert launcher_script.count('launch_expected_removal+=( "1" )') == 1 + assert "--job-name=nixl-ep-p1-l0" in launcher_script + assert "--job-name=nixl-ep-p1-l1" in launcher_script + assert "--job-name=nixl-ep-p3-l0" in launcher_script + assert "Deferring TERM while collecting NIXL EP launch statuses" in launcher_script + assert "Ignoring expected NIXL EP planned-rank-removal exit" in launcher_script + assert "wait -n" not in launcher_script + + def test_multi_node_single_stage_plan_splits_initial_launches_across_nodes( slurm_system: SlurmSystem, ) -> None: @@ -825,7 +884,8 @@ def test_gen_srun_command_single_launch_reports_success( assert "master_ip=$(" in launcher_script assert 'echo "All NIXL EP launches completed successfully"' in launcher_script assert 'if [ "$rc" -eq 0 ]; then' in launcher_script - assert "exit $rc" in launcher_script + assert 'echo "NIXL EP launcher exiting with rc=$rc"' in launcher_script + assert 'exit "$rc"' in launcher_script def test_gen_exec_command_matches_reference(nixl_ep_tr: TestRun, slurm_system: SlurmSystem) -> None: diff --git a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py index de0dcab6c..8ed463576 100644 --- a/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py +++ b/tests/workloads/nixl_ep/test_job_status_retrieval_strategy.py @@ -69,6 +69,80 @@ def test_successful_job(self, nixl_ep_tr: TestRun) -> None: assert result.is_successful assert result.error_message == "" + def test_successful_job_with_launcher_terminal_verdict(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "stdout.txt").write_text( + "Starting initial NIXL EP stage on the master node...\n" + "All NIXL EP launches completed successfully\n" + "NIXL EP launcher exiting with rc=0\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert result.is_successful + + def test_started_launcher_without_terminal_verdict_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, encoding="utf-8" + ) + (nixl_ep_tr.output_path / "stdout.txt").write_text( + "Starting initial NIXL EP stage on the master node...\nStarting launches for phase 3...\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "exited before printing its terminal verdict" in result.error_message + + def test_planned_srun_termination_is_ignored_when_benchmark_output_exists(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, + encoding="utf-8", + ) + (nixl_ep_tr.output_path / "stderr.txt").write_text( + "\n".join( + [ + "srun: error: node001: task 0: Terminated", + "srun: Terminating StepId=123.4", + "srun: Force Terminated StepId=123.4", + ] + ) + + "\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert result.is_successful + + def test_unplanned_srun_termination_is_reported(self, nixl_ep_tr: TestRun) -> None: + nixl_ep_tr.test.cmd_args.plan = "[[0, 1, 2]]" + nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) + for node_idx in range(num_nodes(nixl_ep_tr)): + (nixl_ep_tr.output_path / f"nixl-ep-node-{node_idx}.log").write_text( + SUCCESSFUL_BANDWIDTH_LINE, + encoding="utf-8", + ) + (nixl_ep_tr.output_path / "stderr.txt").write_text( + "srun: error: node001: task 0: Terminated\n", + encoding="utf-8", + ) + + result = nixl_ep_tr.test.was_run_successful(nixl_ep_tr) + + assert not result.is_successful + assert "srun failure" in result.error_message + def test_launcher_path_error_is_reported(self, nixl_ep_tr: TestRun) -> None: nixl_ep_tr.output_path.mkdir(parents=True, exist_ok=True) (nixl_ep_tr.output_path / "nixl-ep-node-0.log").write_text(