diff --git a/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml b/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml index decfead3d..fd23c9555 100644 --- a/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml +++ b/conf/experimental/ai_dynamo/test_scenario/vllm_slurm.toml @@ -49,6 +49,33 @@ time_limit = "00:10:00" concurrency = 4 request-count = 50 +[[Tests]] +id = "test.disagg.shared-node" +test_name = "vLLM" +num_nodes = 2 +time_limit = "00:10:00" + + [Tests.cmd_args] + + [Tests.cmd_args.dynamo.prefill_worker] + num-nodes = 2 + [Tests.cmd_args.dynamo.prefill_worker.args] + tensor-parallel-size = 4 + pipeline-parallel-size = 1 + + [Tests.cmd_args.dynamo.decode_worker] + num-nodes = 2 + [Tests.cmd_args.dynamo.decode_worker.args] + tensor-parallel-size = 4 + pipeline-parallel-size = 1 + + [[Tests.cmd_args.aiperf_phases]] + name = "shared_node_smoke" + [Tests.cmd_args.aiperf_phases.args] + concurrency = 2 + request-count = 50 + server-metrics = "auto" + [[Tests]] id = "test.disagg.multinode" test_name = "vLLM" diff --git a/doc/workloads/ai_dynamo.rst b/doc/workloads/ai_dynamo.rst index 560107090..ae963855c 100644 --- a/doc/workloads/ai_dynamo.rst +++ b/doc/workloads/ai_dynamo.rst @@ -51,13 +51,15 @@ AI Dynamo jobs use three distinct types of nodes: - **Prefill node(s)**: Handle the prefill stage of inference - **Decode node(s)**: Handle the decode stage of inference (optional, depending on model and setup) -The total number of required nodes must be: +By default, when ``num_nodes`` is omitted, CloudAI allocates separate nodes for prefill and decode workers: :: num_prefill_nodes + num_decode_nodes -If there is a mismatch in the number of nodes between the schema and the test scenario, CloudAI will use the number of nodes specified in the test schema, ignoring the value in the test scenario. +Set top-level ``num_nodes`` explicitly to control the Slurm allocation. A value lower than +``num_prefill_nodes + num_decode_nodes`` enables shared-node disaggregated inference, where prefill and decode roles +run on the same allocated node(s) with separate GPU slices. All node role assignments and orchestration are automatically managed by CloudAI. @@ -303,6 +305,15 @@ If AIPerf accuracy mode is enabled, CloudAI copies ``aiperf_accuracy_artifacts/a Navigate to ``./results///0/`` and open the CSV to examine performance metrics. +Shared-Node Disaggregated Runs +~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + +For Slurm, set top-level ``num_nodes`` lower than the sum of ``prefill_worker.num-nodes`` and +``decode_worker.num-nodes`` to run both roles on the same allocated node(s). For example, ``num_nodes = 1`` with +``prefill_worker.num-nodes = 1`` and ``decode_worker.num-nodes = 1`` runs one prefill worker and one decode worker on +the same node. CloudAI assigns decode GPUs first and prefill GPUs after that based on each role's +``tensor-parallel-size * pipeline-parallel-size``. The combined role GPU count must fit on one node. + Example ``aiperf_report.csv``: :: diff --git a/src/cloudai/_core/test_scenario.py b/src/cloudai/_core/test_scenario.py index 6c1d0ee3c..d37b4df89 100644 --- a/src/cloudai/_core/test_scenario.py +++ b/src/cloudai/_core/test_scenario.py @@ -96,6 +96,7 @@ class TestRun: post_test: Optional[TestScenario] = None reports: Set[Type[ReportGenerationStrategy]] = field(default_factory=set) extra_srun_args: str | None = None + num_nodes_explicit: bool = False def __hash__(self) -> int: return hash(self.name + self.test.name + str(self.iterations) + str(self.current_iteration)) diff --git a/src/cloudai/test_scenario_parser.py b/src/cloudai/test_scenario_parser.py index 9de6a744d..48e3acad5 100644 --- a/src/cloudai/test_scenario_parser.py +++ b/src/cloudai/test_scenario_parser.py @@ -210,6 +210,7 @@ def _create_test_run( reports=get_reporters(test_info, tdef), extra_srun_args=test_info.extra_srun_args, exclude_nodes=test_info.exclude_nodes, + num_nodes_explicit="num_nodes" in test_info.model_fields_set, ) return tr diff --git a/src/cloudai/workloads/ai_dynamo/ai_dynamo.py b/src/cloudai/workloads/ai_dynamo/ai_dynamo.py index 298d9a79d..1c92ff437 100644 --- a/src/cloudai/workloads/ai_dynamo/ai_dynamo.py +++ b/src/cloudai/workloads/ai_dynamo/ai_dynamo.py @@ -604,6 +604,23 @@ def constraint_check(self, tr: TestRun, system: Optional[System]) -> bool: return False logging.info("constraint_check passed for: tp_times_pp_le_gpus_per_node") + role_total_nodes = int(prefill_worker.num_nodes) + int(decode_worker.num_nodes) + prefill_nodes = set(prefill_worker.nodes.split(",")) if prefill_worker.nodes else set() + decode_nodes = set(decode_worker.nodes.split(",")) if decode_worker.nodes else set() + has_explicit_allocation = getattr(tr, "num_nodes_explicit", False) or bool(tr.nodes) + shared_node_disagg = bool(prefill_nodes & decode_nodes) or ( + has_explicit_allocation and tr.nnodes < role_total_nodes + ) + if ( + shared_node_disagg + and gpus_per_node > 0 + and self.constraints.tp_times_pp_le_gpus_per_node + and (prefill_tp * prefill_pp + decode_tp * decode_pp > gpus_per_node) + ): + logging.info("constraint_check failed for: shared_node_tp_pp_sum_le_gpus_per_node") + return False + logging.info("constraint_check passed for: shared_node_tp_pp_sum_le_gpus_per_node") + return True diff --git a/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh b/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh index 8b1f3475a..18ce737ee 100644 --- a/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh +++ b/src/cloudai/workloads/ai_dynamo/ai_dynamo.sh @@ -39,6 +39,7 @@ declare -A aiperf_accuracy_args declare -A aiperf_accuracy_config lmcache_controller_cmd="" +SHARED_NODE_DISAGG="false" declare -A dynamo_args dynamo_args["backend"]="vllm" @@ -88,6 +89,18 @@ _csv_index_of() { echo "-1" } +_csv_lists_overlap() { + local left="$1" + local right="$2" + local item + for item in $(echo "$left" | tr ',' ' '); do + if [[ ",${right}," == *",${item},"* ]]; then + return 0 + fi + done + return 1 +} + _gpus_per_node() { local n=$(echo "${CUDA_VISIBLE_DEVICES:-}" | tr ',' '\n' | grep -c . || true) [[ "$n" -gt 0 ]] && echo "$n" || echo "1" @@ -238,7 +251,15 @@ _set_nodelists() fi if [[ -z "${prefill_config["node-list"]}" ]]; then - prefill_config["node-list"]=$(_populate_nodelist "${prefill_config["num-nodes"]}" "${decode_config["node-list"]}") + local allocated_nodes + local requested_role_nodes + allocated_nodes=$(_csv_len "$DYNAMO_NODELIST") + requested_role_nodes=$(( ${decode_config["num-nodes"]:-0} + ${prefill_config["num-nodes"]:-0} )) + if [[ "$allocated_nodes" -lt "$requested_role_nodes" ]]; then + prefill_config["node-list"]=$(_populate_nodelist "${prefill_config["num-nodes"]}" "") + else + prefill_config["node-list"]=$(_populate_nodelist "${prefill_config["num-nodes"]}" "${decode_config["node-list"]}") + fi fi # Prefill nodelist should match prefill node count (skip validation if num-nodes is 0) @@ -256,6 +277,12 @@ _set_nodelists() log "ERROR: number of nodes in decode nodelist (${decode_nodelist_count}) does not match decode node count (${decode_config["num-nodes"]})" exit 1 fi + + SHARED_NODE_DISAGG="false" + if _csv_lists_overlap "${prefill_config["node-list"]}" "${decode_config["node-list"]}"; then + SHARED_NODE_DISAGG="true" + log "Shared-node disaggregated mode: prefill and decode workers share node(s)" + fi } _has_connector() { @@ -322,18 +349,33 @@ _compute_worker_allocation_vllm() { exit 1 fi - if [[ "${prefill_config["multiple-workers-per-node"]}" != "true" ]]; then - prefill_config["gpus-per-worker"]=$num_gpus - fi + decode_config["gpu-offset"]=0 + prefill_config["gpu-offset"]=0 - if [[ "${decode_config["multiple-workers-per-node"]}" != "true" ]]; then - decode_config["gpus-per-worker"]=$num_gpus + if [[ "${SHARED_NODE_DISAGG}" == "true" ]]; then + local shared_gpus_needed=$(( prefill_config["gpus-per-worker"] + decode_config["gpus-per-worker"] )) + if [[ "$shared_gpus_needed" -gt "$num_gpus" ]]; then + log "ERROR: Not enough GPUs for shared-node disaggregated mode: need ${decode_config["gpus-per-worker"]} decode + ${prefill_config["gpus-per-worker"]} prefill, but only have ${num_gpus}" + exit 1 + fi + decode_config["workers-per-node"]=1 + prefill_config["workers-per-node"]=1 + prefill_config["gpu-offset"]=${decode_config["gpus-per-worker"]} + else + if [[ "${prefill_config["multiple-workers-per-node"]}" != "true" ]]; then + prefill_config["gpus-per-worker"]=$num_gpus + fi + + if [[ "${decode_config["multiple-workers-per-node"]}" != "true" ]]; then + decode_config["gpus-per-worker"]=$num_gpus + fi + + prefill_config["workers-per-node"]=$(( num_gpus / prefill_config["gpus-per-worker"] )) + decode_config["workers-per-node"]=$(( num_gpus / decode_config["gpus-per-worker"] )) fi log "DECODE: num GPUs: $num_gpus, GPUs per worker: ${decode_config["gpus-per-worker"]}" - log "PREFILL: num GPUs: $num_gpus, GPUs per worker: ${prefill_config["gpus-per-worker"]}" - prefill_config["workers-per-node"]=$(( num_gpus / prefill_config["gpus-per-worker"] )) - decode_config["workers-per-node"]=$(( num_gpus / decode_config["gpus-per-worker"] )) + log "PREFILL: num GPUs: $num_gpus, GPUs per worker: ${prefill_config["gpus-per-worker"]}, GPU offset: ${prefill_config["gpu-offset"]}" log "DECODE: workers per node: ${decode_config["workers-per-node"]}" log "PREFILL: workers per node: ${prefill_config["workers-per-node"]}" @@ -495,6 +537,15 @@ _gpu_list_for_worker() { echo "$(echo $CUDA_VISIBLE_DEVICES | cut -d',' -f${start}-${end})" } +_gpu_list_for_worker_offset() { + local per_worker=$1 + local idx=$2 + local offset=${3:-0} + local start=$(( 1 + offset + (idx * per_worker) )) + local end=$(( start + per_worker - 1 )) + echo "$CUDA_VISIBLE_DEVICES" | cut -d',' -f"${start}-${end}" +} + _log_file_for_worker() { local role="$1" local idx="$2" @@ -891,13 +942,15 @@ function launch_decode() local base_kv_event_port=${DYN_VLLM_KV_EVENT_PORT:-20080} local base_kvbm_pub_port=${DYN_KVBM_LEADER_ZMQ_PUB_PORT:-56001} local base_kvbm_ack_port=${DYN_KVBM_LEADER_ZMQ_ACK_PORT:-56002} + local base_system_port=${DYN_SYSTEM_PORT:-9090} local kvbm_port_stride=2 local side_channel_host side_channel_host="$(_current_node_ip)" log "Launching $workers_per_node decode worker(s) with unique port ranges" for i in $(seq 0 $(( $workers_per_node - 1 ))); do - local gpu_list=$(_gpu_list_for_worker "${decode_config["gpus-per-worker"]}" "$i") + local gpu_list + gpu_list=$(_gpu_list_for_worker "${decode_config["gpus-per-worker"]}" "$i") local log_file=$(_log_file_for_worker "decode" "$i") # Each worker needs unique port ranges to avoid ZMQ conflicts: # - NIXL side channel: base_port + (worker_index * tp_size) for TP ranks @@ -907,6 +960,7 @@ function launch_decode() local kv_event_port=$((base_kv_event_port + i)) local kvbm_pub_port=$((base_kvbm_pub_port + (i * kvbm_port_stride))) local kvbm_ack_port=$((base_kvbm_ack_port + (i * kvbm_port_stride))) + local system_port=$((base_system_port + i)) # Build decode args as proper bash arrays to preserve # multi-word values (e.g. --cmd "genai-perf profile") through word splitting. @@ -918,6 +972,7 @@ function launch_decode() log "Launching decode worker $i on GPUs $gpu_list (NIXL host: $side_channel_host, NIXL port: $nixl_port, KV event port: $kv_event_port, KVBM pub/ack: $kvbm_pub_port/$kvbm_ack_port)" log "Decode cmd: ${decode_config["cmd"]} ${args_arr[*]} ${decode_config["extra-args"]}" CUDA_VISIBLE_DEVICES=$gpu_list \ + DYN_SYSTEM_PORT=$system_port \ VLLM_NIXL_SIDE_CHANNEL_HOST="$side_channel_host" \ VLLM_NIXL_SIDE_CHANNEL_PORT=$nixl_port \ DYN_VLLM_KV_EVENT_PORT=$kv_event_port \ @@ -948,13 +1003,28 @@ function launch_prefill() local base_kv_event_port=${DYN_VLLM_KV_EVENT_PORT:-20080} local base_kvbm_pub_port=${DYN_KVBM_LEADER_ZMQ_PUB_PORT:-56001} local base_kvbm_ack_port=${DYN_KVBM_LEADER_ZMQ_ACK_PORT:-56002} + local base_system_port=${DYN_SYSTEM_PORT:-9090} local kvbm_port_stride=2 + local gpu_offset=${prefill_config["gpu-offset"]:-0} local side_channel_host side_channel_host="$(_current_node_ip)" + + if [[ "${SHARED_NODE_DISAGG}" == "true" ]]; then + local decode_workers=${decode_config["workers-per-node"]} + local decode_tp=${decode_args["--tensor-parallel-size"]} + base_nixl_port=$((base_nixl_port + (decode_workers * decode_tp))) + base_kv_event_port=$((base_kv_event_port + decode_workers)) + base_kvbm_pub_port=$((base_kvbm_pub_port + (decode_workers * kvbm_port_stride))) + base_kvbm_ack_port=$((base_kvbm_ack_port + (decode_workers * kvbm_port_stride))) + base_system_port=$((base_system_port + decode_workers)) + log "Shared-node prefill offsets: GPU offset=$gpu_offset, NIXL base=$base_nixl_port, KV event base=$base_kv_event_port, KVBM pub/ack base=$base_kvbm_pub_port/$base_kvbm_ack_port, system base=$base_system_port" + fi + log "Launching $workers_per_node prefill worker(s) with unique port ranges" for i in $(seq 0 $(( $workers_per_node - 1 ))); do - local gpu_list=$(_gpu_list_for_worker "${prefill_config["gpus-per-worker"]}" "$i") + local gpu_list + gpu_list=$(_gpu_list_for_worker_offset "${prefill_config["gpus-per-worker"]}" "$i" "$gpu_offset") local log_file=$(_log_file_for_worker "prefill" "$i") # Each worker needs unique port ranges to avoid ZMQ conflicts: # - NIXL side channel: base_port + (worker_index * tp_size) for TP ranks @@ -964,6 +1034,7 @@ function launch_prefill() local kv_event_port=$((base_kv_event_port + i)) local kvbm_pub_port=$((base_kvbm_pub_port + (i * kvbm_port_stride))) local kvbm_ack_port=$((base_kvbm_ack_port + (i * kvbm_port_stride))) + local system_port=$((base_system_port + i)) # Build prefill args as proper bash arrays to preserve # multi-word values (e.g. --cmd "genai-perf profile") through word splitting. @@ -975,6 +1046,7 @@ function launch_prefill() log "Launching prefill worker $i on GPUs $gpu_list (NIXL host: $side_channel_host, NIXL port: $nixl_port, KV event port: $kv_event_port, KVBM pub/ack: $kvbm_pub_port/$kvbm_ack_port)" log "Prefill cmd: ${prefill_config["cmd"]} ${args_arr[*]} ${prefill_config["extra-args"]}" CUDA_VISIBLE_DEVICES=$gpu_list \ + DYN_SYSTEM_PORT=$system_port \ VLLM_NIXL_SIDE_CHANNEL_HOST="$side_channel_host" \ VLLM_NIXL_SIDE_CHANNEL_PORT=$nixl_port \ DYN_VLLM_KV_EVENT_PORT=$kv_event_port \ @@ -1031,19 +1103,24 @@ _resolve_aiperf_server_metrics_urls() { local base_system_port=${DYN_SYSTEM_PORT:-9090} local decode_workers_per_node=${decode_config["workers-per-node"]:-1} local prefill_workers_per_node=${prefill_config["workers-per-node"]:-1} + local prefill_system_port_offset=0 local IFS_SAVE="$IFS" local node i + if [[ "${SHARED_NODE_DISAGG}" == "true" ]]; then + prefill_system_port_offset=$decode_workers_per_node + fi + IFS=',' - for node in ${prefill_config["node-list"]:-}; do - for i in $(seq 0 $(( prefill_workers_per_node - 1 ))); do + for node in ${decode_config["node-list"]:-}; do + for i in $(seq 0 $(( decode_workers_per_node - 1 ))); do urls="${urls},http://${node}:$((base_system_port + i))/metrics" done done - for node in ${decode_config["node-list"]:-}; do - for i in $(seq 0 $(( decode_workers_per_node - 1 ))); do - urls="${urls},http://${node}:$((base_system_port + i))/metrics" + for node in ${prefill_config["node-list"]:-}; do + for i in $(seq 0 $(( prefill_workers_per_node - 1 ))); do + urls="${urls},http://${node}:$((base_system_port + prefill_system_port_offset + i))/metrics" done done diff --git a/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py b/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py index 439c0eda8..c7370b949 100644 --- a/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py +++ b/src/cloudai/workloads/ai_dynamo/slurm_command_gen_strategy.py @@ -115,8 +115,8 @@ def _get_toml_args(self, base_model: BaseModel, prefix: str, exclude: List[str] return args - def _get_nested_toml_args(self, base_model: BaseModel, prefix: str) -> List[str]: - result = self._get_toml_args(base_model, prefix, exclude=["args"]) + def _get_nested_toml_args(self, base_model: BaseModel, prefix: str, exclude: List[str] | None = None) -> List[str]: + result = self._get_toml_args(base_model, prefix, exclude=["args", *(exclude or [])]) if (nested_args := getattr(base_model, "args", None)) is not None: result.extend(self._get_toml_args(nested_args, prefix + "args-")) @@ -401,8 +401,12 @@ def _gen_script_args(self, td: AIDynamoTestDefinition) -> List[str]: args.append(f'--dynamo-dcgm-exporter-port "{td.cmd_args.dynamo.dcgm_exporter.port}"') if td.cmd_args.dynamo.prefill_worker: - args.extend(self._get_nested_toml_args(td.cmd_args.dynamo.prefill_worker, "--prefill-")) - args.extend(self._get_nested_toml_args(td.cmd_args.dynamo.decode_worker, "--decode-")) + args.extend(self._get_nested_toml_args(td.cmd_args.dynamo.prefill_worker, "--prefill-", exclude=["nodes"])) + if td.cmd_args.dynamo.prefill_worker.nodes: + args.append(f"--prefill-node-list {shlex.quote(td.cmd_args.dynamo.prefill_worker.nodes)}") + args.extend(self._get_nested_toml_args(td.cmd_args.dynamo.decode_worker, "--decode-", exclude=["nodes"])) + if td.cmd_args.dynamo.decode_worker.nodes: + args.append(f"--decode-node-list {shlex.quote(td.cmd_args.dynamo.decode_worker.nodes)}") args.extend(self._get_nested_toml_args(td.cmd_args.genai_perf, "--genai_perf-")) if aiperf_script: @@ -577,12 +581,18 @@ def _validate_worker_nodes( if not all(node in node_list for node in worker_node_list): raise ValueError(f"Some {worker_type} nodes are not in the allocated node list") - def _validate_node_overlap(self, prefill_nodes: str, decode_nodes: str) -> None: - """Validate that there is no overlap between prefill and decode nodes.""" - prefill_set = set(prefill_nodes.split(",")) - decode_set = set(decode_nodes.split(",")) - if prefill_set & decode_set: - raise ValueError("Overlap found between prefill and decode node lists") + @staticmethod + def _split_node_list(nodes: str | None) -> list[str]: + if not nodes: + return [] + return [node for node in nodes.split(",") if node] + + @staticmethod + def _unique_nodes(nodes: list[str]) -> list[str]: + return list(dict.fromkeys(nodes)) + + def _worker_nodes_overlap(self, prefill_nodes: str | None, decode_nodes: str | None) -> bool: + return bool(set(self._split_node_list(prefill_nodes)) & set(self._split_node_list(decode_nodes))) def get_cached_nodes_spec(self) -> tuple[int, list[str]]: cache_key = ":".join( @@ -608,36 +618,56 @@ def get_cached_nodes_spec(self) -> tuple[int, list[str]]: assert isinstance(prefill_n, int), "prefill_worker.num_nodes must be an integer" assert isinstance(decode_n, int), "decode_worker.num_nodes must be an integer" - if prefill_nodes and decode_nodes: - self.test_run.nodes = prefill_nodes.split(",") + decode_nodes.split(",") + self.test_run.nodes - self.test_run.num_nodes = len(self.test_run.nodes) - prefill_n = len(prefill_nodes.split(",")) - decode_n = len(decode_nodes.split(",")) - else: - self.test_run.num_nodes = prefill_n + decode_n - - total_nodes = prefill_n + decode_n + role_total_nodes = prefill_n + decode_n + role_nodes = self._unique_nodes( + self._split_node_list(prefill_nodes) + self._split_node_list(decode_nodes) + self.test_run.nodes + ) + has_explicit_allocation = getattr(self.test_run, "num_nodes_explicit", False) or bool(self.test_run.nodes) - logging.info("Setting num_nodes from %d to %d", self.test_run.num_nodes, total_nodes) + if prefill_nodes or decode_nodes: + self.test_run.nodes = role_nodes + self.test_run.num_nodes = len(role_nodes) + prefill_n = len(self._split_node_list(prefill_nodes)) if prefill_nodes else prefill_n + decode_n = len(self._split_node_list(decode_nodes)) if decode_nodes else decode_n + role_total_nodes = prefill_n + decode_n + elif not has_explicit_allocation: + self.test_run.num_nodes = role_total_nodes - self.test_run.num_nodes = total_nodes + logging.info("Using %d allocated node(s) for %d role node(s)", self.test_run.nnodes, role_total_nodes) requested_nodes, node_list = self.system.get_nodes_by_spec(self.test_run.nnodes, self.test_run.nodes) + shared_node_disagg = self._worker_nodes_overlap(prefill_nodes, decode_nodes) or ( + prefill_n > 0 and requested_nodes < role_total_nodes + ) + + if shared_node_disagg and requested_nodes < max(prefill_n, decode_n): + raise ValueError( + f"Not enough nodes requested for shared-node disaggregated run: need at least " + f"{max(prefill_n, decode_n)} node(s) to satisfy role node counts " + f"({prefill_n} prefill, {decode_n} decode), but only got {requested_nodes}" + ) if prefill_nodes or decode_nodes: self._validate_worker_nodes(node_list, prefill_nodes, prefill_n, "prefill") self._validate_worker_nodes(node_list, decode_nodes, decode_n, "decode") - if prefill_nodes and decode_nodes: - self._validate_node_overlap(prefill_nodes, decode_nodes) + if self._worker_nodes_overlap(prefill_nodes, decode_nodes): + unique_worker_nodes = self._unique_nodes( + self._split_node_list(prefill_nodes) + self._split_node_list(decode_nodes) + ) + if requested_nodes != len(unique_worker_nodes): + raise ValueError( + "Overlapping prefill/decode node lists require the allocated node count to match " + f"the unique worker node count ({len(unique_worker_nodes)}), but got {requested_nodes}" + ) - if total_nodes > requested_nodes: + if not shared_node_disagg and role_total_nodes > requested_nodes: raise ValueError( - f"Not enough nodes requested: need {total_nodes} total nodes " + f"Not enough nodes requested: need {role_total_nodes} total nodes " f"({prefill_n} prefill + {decode_n} decode), " f"but only got {requested_nodes}" ) - result = (total_nodes, node_list) + result = (requested_nodes, node_list) self._node_spec_cache[cache_key] = result return result diff --git a/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py b/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py index d5c832156..d3f7bb1a1 100644 --- a/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py +++ b/tests/workloads/ai_dynamo/test_command_gen_strategy_slurm.py @@ -351,6 +351,130 @@ def test_dcgm_exporter_adds_configured_docker_image_installable(cmd_args: AIDyna assert tdef.dcgm_exporter_image in tdef.installables +def test_shared_node_disagg_preserves_explicit_smaller_node_count( + slurm_system: SlurmSystem, tmp_path: Path, cmd_args: AIDynamoCmdArgs +) -> None: + tdef = AIDynamoTestDefinition( + name="test", + description="desc", + test_template_name="template", + cmd_args=cmd_args, + repo=GitRepo(url="https://github.com/ai-dynamo/dynamo.git", commit="main", installed_path=tmp_path), + ) + tr = TestRun( + name="run", + test=tdef, + nodes=["n0"], + num_nodes=1, + output_path=tmp_path, + num_nodes_explicit=True, + ) + strategy = AIDynamoSlurmCommandGenStrategy(slurm_system, tr) + + assert strategy.get_cached_nodes_spec() == (1, ["n0"]) + + srun = strategy._gen_srun_command() + assert "--nodes=1" in srun + assert "--nodelist=n0" in srun + + +def test_separate_node_disagg_keeps_role_sum_when_num_nodes_is_omitted( + strategy: AIDynamoSlurmCommandGenStrategy, +) -> None: + strategy.test_run.nodes = [] + strategy.test_run.num_nodes = 1 + strategy.test_run.num_nodes_explicit = False + + assert strategy.get_cached_nodes_spec()[0] == 2 + + +def test_explicit_overlapping_worker_nodes_are_allowed_for_shared_node( + slurm_system: SlurmSystem, tmp_path: Path, cmd_args: AIDynamoCmdArgs +) -> None: + cmd_args.dynamo.prefill_worker.nodes = "n0" + cmd_args.dynamo.decode_worker.nodes = "n0" + tdef = AIDynamoTestDefinition( + name="test", + description="desc", + test_template_name="template", + cmd_args=cmd_args, + repo=GitRepo(url="https://github.com/ai-dynamo/dynamo.git", commit="main", installed_path=tmp_path), + ) + tr = TestRun(name="run", test=tdef, nodes=[], num_nodes=1, output_path=tmp_path) + strategy = AIDynamoSlurmCommandGenStrategy(slurm_system, tr) + + assert strategy.get_cached_nodes_spec() == (1, ["n0"]) + args = strategy._gen_script_args(tdef) + assert "--prefill-node-list n0" in args + assert "--decode-node-list n0" in args + + +def test_explicit_overlapping_worker_nodes_reject_extra_allocated_nodes( + slurm_system: SlurmSystem, tmp_path: Path, cmd_args: AIDynamoCmdArgs +) -> None: + cmd_args.dynamo.prefill_worker.nodes = "n0" + cmd_args.dynamo.decode_worker.nodes = "n0" + tdef = AIDynamoTestDefinition( + name="test", + description="desc", + test_template_name="template", + cmd_args=cmd_args, + repo=GitRepo(url="https://github.com/ai-dynamo/dynamo.git", commit="main", installed_path=tmp_path), + ) + tr = TestRun( + name="run", + test=tdef, + nodes=["n0", "n1"], + num_nodes=2, + output_path=tmp_path, + num_nodes_explicit=True, + ) + strategy = AIDynamoSlurmCommandGenStrategy(slurm_system, tr) + + with pytest.raises(ValueError, match="Overlapping prefill/decode node lists"): + strategy.get_cached_nodes_spec() + + +def test_constraint_allows_shared_node_split_that_fits(slurm_system: SlurmSystem, test_run: TestRun) -> None: + slurm_system.gpus_per_node = 8 + td = cast(AIDynamoTestDefinition, test_run.test) + td.cmd_args.dynamo.prefill_worker.args.tensor_parallel_size = 4 + td.cmd_args.dynamo.decode_worker.args.tensor_parallel_size = 4 + test_run.num_nodes = 1 + test_run.nodes = ["n0"] + test_run.num_nodes_explicit = True + + assert td.constraint_check(test_run, slurm_system) + + +def test_constraint_rejects_shared_node_split_that_exceeds_node_gpus( + slurm_system: SlurmSystem, test_run: TestRun +) -> None: + slurm_system.gpus_per_node = 8 + td = cast(AIDynamoTestDefinition, test_run.test) + td.cmd_args.dynamo.prefill_worker.args.tensor_parallel_size = 4 + td.cmd_args.dynamo.decode_worker.args.tensor_parallel_size = 8 + test_run.num_nodes = 1 + test_run.nodes = ["n0"] + test_run.num_nodes_explicit = True + + assert not td.constraint_check(test_run, slurm_system) + + +def test_constraint_allows_separate_node_roles_using_all_node_gpus( + slurm_system: SlurmSystem, test_run: TestRun +) -> None: + slurm_system.gpus_per_node = 8 + td = cast(AIDynamoTestDefinition, test_run.test) + td.cmd_args.dynamo.prefill_worker.args.tensor_parallel_size = 8 + td.cmd_args.dynamo.decode_worker.args.tensor_parallel_size = 8 + test_run.num_nodes = 2 + test_run.nodes = ["n0", "n1"] + test_run.num_nodes_explicit = True + + assert td.constraint_check(test_run, slurm_system) + + def test_aiperf_phase_roundtrip_does_not_emit_default_report_name(strategy: AIDynamoSlurmCommandGenStrategy) -> None: td = cast(AIDynamoTestDefinition, strategy.test_run.test) td.cmd_args.workloads = "aiperf.sh"