Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 97 additions & 0 deletions conf/experimental/test_scenario/nixl_ep.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
# SPDX-FileCopyrightText: NVIDIA CORPORATION & AFFILIATES
# Copyright (c) 2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

name = "NIXL-EP-Example.Inter-node"

[[Tests]]
id = "NIXL.EP.No-expansion"
num_nodes = 2
time_limit = "00:10:00"

name = "NIXL.EP.No-expansion"
description = "NIXL EP Example: No-expansion plan"
test_template_name = "NixlEP"

[Tests.cmd_args]
docker_image_url = ""
plan = "[[0, 1]]"
num_processes_per_node = 1
num_tokens = 256
num_experts_per_rank = 4
hidden_dim = 8192
num_topk = 6
disable_ll_nvlink = true
kineto = false

[[Tests]]
id = "NIXL.EP.Single-expansion"
num_nodes = 2
time_limit = "00:10:00"

name = "NIXL.EP.Single-expansion"
description = "NIXL EP Example: Single-expansion plan"
test_template_name = "NixlEP"

[Tests.cmd_args]
docker_image_url = ""
plan = "[[0, 1], [0, 1, 2, 3]]"
num_processes_per_node = 2
num_tokens = 256
num_experts_per_rank = 4
hidden_dim = 8192
num_topk = 6
disable_ll_nvlink = true
kineto = false

[[Tests]]
id = "NIXL.EP.Double-expansion"
num_nodes = 2
time_limit = "00:10:00"

name = "NIXL.EP.Double-expansion"
description = "NIXL EP Example: Double-expansion plan"
test_template_name = "NixlEP"

[Tests.cmd_args]
docker_image_url = ""
plan = "[[0], [0, 1], [0, 1, 2, 3]]"
num_processes_per_node = 2
num_tokens = 256
num_experts_per_rank = 4
hidden_dim = 8192
num_topk = 6
disable_ll_nvlink = true
kineto = false

[[Tests]]
id = "NIXL.EP.Expansion-contraction"
num_nodes = 2
time_limit = "00:10:00"

name = "NIXL.EP.Expansion-contraction"
description = "NIXL EP Example: Expansion-contraction plan (elasticity testing)"
test_template_name = "NixlEP"

[Tests.cmd_args]
docker_image_url = ""
plan = "[[0, 1], [0, 1, 2, 3], [0, -2, 3], [0, 1, 2, 3]]"
num_processes_per_node = 3
num_tokens = 256
num_experts_per_rank = 4
hidden_dim = 8192
num_topk = 6
disable_ll_nvlink = true
kineto = false
47 changes: 46 additions & 1 deletion src/cloudai/workloads/nixl_ep/nixl_ep.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@
from .log_parsing import parse_nixl_ep_bandwidth_samples

GENERATED_PLAN_FILE_NAME = "nixl-ep-plan.json"
LAUNCHER_EXIT_MARKER = "NIXL EP launcher exiting with rc="
LAUNCHER_START_MARKERS = (
"Starting initial NIXL EP stage",
"Starting NIXL EP on the master node",
)


class NixlEPCmdArgs(CmdArgs):
Expand Down Expand Up @@ -149,10 +154,28 @@ def _primary_launch_exit_error_message(content: str) -> str | None:

return f"The primary NIXL EP launch exited before phase {phase} completed."

@staticmethod
def _looks_like_planned_srun_termination(content: str) -> bool:
allowed_patterns = (
re.compile(r"^srun: error: .+: task \d+: Terminated$"),
re.compile(r"^srun: Terminating StepId=\S+$"),
re.compile(r"^srun: Force Terminated StepId=\S+$"),
)
lines = [line.strip() for line in content.splitlines() if line.strip()]
return bool(lines) and all(any(pattern.match(line) for pattern in allowed_patterns) for line in lines)

def _has_planned_rank_removal(self) -> bool:
plans = self.cmd_args.plan if isinstance(self.cmd_args.plan, list) else [self.cmd_args.plan]
return any(rank < 0 for plan in plans for phase in NixlEPCmdArgs._parse_plan(plan) for rank in phase)

def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None:
if not path.is_file():
return None

content = path.read_text(encoding="utf-8", errors="ignore")
if self._has_planned_rank_removal() and self._looks_like_planned_srun_termination(content):
return None

launcher_failure_patterns = (
("python3: can't open file", "The benchmark entrypoint could not be opened."),
("Traceback (most recent call last):", "The benchmark launcher raised a Python traceback."),
Expand All @@ -164,7 +187,6 @@ def _scan_log_for_failures(self, path: Path) -> JobStatusResult | None:
("srun: error:", "Slurm reported an srun failure."),
("Exited with exit code", "A Slurm step exited with a non-zero status."),
)
content = path.read_text(encoding="utf-8", errors="ignore")
primary_launch_error = self._primary_launch_exit_error_message(content)
if primary_launch_error is not None:
tail = self._tail(path)
Expand Down Expand Up @@ -200,6 +222,25 @@ def _check_benchmark_output(self, expected_node_logs: list[Path]) -> JobStatusRe

return JobStatusResult(is_successful=False, error_message=error_message)

def _check_launcher_terminal_verdict(self, stdout_path: Path) -> JobStatusResult | None:
if not stdout_path.is_file():
return None

content = stdout_path.read_text(encoding="utf-8", errors="ignore")
if LAUNCHER_EXIT_MARKER in content:
return None
if not any(marker in content for marker in LAUNCHER_START_MARKERS):
return None

tail = self._tail(stdout_path)
error_message = (
"The NIXL EP launcher started but exited before printing its terminal verdict. "
f"Expected '{LAUNCHER_EXIT_MARKER}<rc>' in {stdout_path}."
)
if tail:
error_message += f"\n{tail}"
return JobStatusResult(is_successful=False, error_message=error_message)

def was_run_successful(self, tr: TestRun) -> JobStatusResult:
output_path = tr.output_path
expected_node_logs = [tr.output_path / f"nixl-ep-node-{node_idx}.log" for node_idx in range(tr.nnodes)]
Expand All @@ -225,4 +266,8 @@ def was_run_successful(self, tr: TestRun) -> JobStatusResult:
if benchmark_output_result is not None:
return benchmark_output_result

launcher_verdict_result = self._check_launcher_terminal_verdict(output_path / "stdout.txt")
if launcher_verdict_result is not None:
return launcher_verdict_result

return JobStatusResult(is_successful=True)
Loading
Loading