diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java index fa6df28cc3c4..6e891298dd33 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java @@ -20,6 +20,7 @@ import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING; import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE; +import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_HARD_KILL; import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL; import org.apache.dolphinscheduler.common.thread.ThreadUtils; @@ -190,13 +191,13 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder, result.setExitStatusCode(this.process.exitValue()); } else { - log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...", - taskRequest.getTaskTimeout()); + log.error("process has failure due to timeout kill, timeout value is:{}, timeoutStrategy is:{}", + taskRequest.getTaskTimeout(), taskRequest.getTaskTimeoutStrategy()); result.setExitStatusCode(EXIT_CODE_FAILURE); - cancelApplication(); } int exitCode = this.process.exitValue(); - String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited."; + String exitLogMessage = (EXIT_CODE_KILL == exitCode || EXIT_CODE_HARD_KILL == exitCode) ? "process has killed." + : "process has exited."; log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}", exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode); return result; diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java index 3b47004330a8..2937a4a0d25d 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java @@ -44,6 +44,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -113,15 +114,7 @@ public static boolean kill(@NonNull TaskExecutionContext request) { } // Get all child processes - String pids = getPidsStr(processId); - String[] pidArray = PID_PATTERN.split(pids); - if (pidArray.length == 0) { - log.warn("No valid PIDs found for process: {}", processId); - return true; - } - - // Convert PID string to list of integers - List pidList = Arrays.stream(pidArray).map(Integer::parseInt).collect(Collectors.toList()); + List pidList = getPidList(processId); // 1. Try to terminate gracefully (SIGINT) boolean gracefulKillSuccess = sendKillSignal("SIGINT", pidList, request.getTenantCode()); @@ -251,26 +244,69 @@ private static boolean isProcessAlive(int pid, String tenantCode) { } /** - * get pids str. + * Get all descendant process IDs (including the given process) using pstree. * - * @param processId process id - * @return pids pid String - * @throws Exception exception + * @param processId the root process ID + * @return list of process IDs; returns empty list if no PIDs found (e.g., process not exists) + * @throws IllegalArgumentException if any PID is invalid (blank, non-numeric, or non-positive) + * @throws Exception if command execution fails unexpectedly (e.g., command not found) */ - public static String getPidsStr(int processId) throws Exception { - + public static List getPidList(int processId) throws Exception { String rawPidStr; - // pstree pid get sub pids - if (SystemUtils.IS_OS_MAC) { - rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId)); - } else if (SystemUtils.IS_OS_LINUX) { - rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); - } else { - rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); + try { + if (SystemUtils.IS_OS_MAC) { + rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId)); + } else if (SystemUtils.IS_OS_LINUX) { + rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); + } else { + log.warn("Unsupported OS for pstree: {}. Attempting generic command.", SystemUtils.OS_NAME); + rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId)); + } + } catch (Exception ex) { + log.error("Failed to execute 'pstree' command for process ID: {}", processId, ex); + throw ex; + } + + String pidsStr = parsePidStr(rawPidStr); + if (StringUtils.isBlank(pidsStr)) { + log.warn("No PIDs found for process: {}", processId); + return Collections.emptyList(); + } + + String[] pidArray = PID_PATTERN.split(pidsStr.trim()); + if (pidArray.length == 0) { + log.warn("No PIDs parsed for process: {}", processId); + return Collections.emptyList(); + } + + List pidList = new ArrayList<>(); + for (String pidStr : pidArray) { + pidStr = pidStr.trim(); + + if (StringUtils.isBlank(pidStr)) { + log.error("Empty or blank PID found in output for process: {}, full PIDs string: {}", processId, + pidsStr); + throw new IllegalArgumentException("Empty or blank PID found in output from process: " + processId); + } + + try { + int pid = Integer.parseInt(pidStr); + if (pid <= 0) { + log.error("Invalid PID value (must be positive): {} for process: {}, full PIDs string: {}", + pidStr, processId, pidsStr); + throw new IllegalArgumentException("Invalid PID value (must be positive): " + pid); + } + pidList.add(pid); + } catch (NumberFormatException e) { + log.error("Invalid PID format in output: {} for process: {}, full PIDs string: {}", + pidStr, processId, pidsStr, e); + throw new IllegalArgumentException( + "Invalid PID format in output: '" + pidStr + "' (from process " + processId + ")", e); + } } - return parsePidStr(rawPidStr); + return pidList; } public static String parsePidStr(String rawPidStr) { diff --git a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java index 526dc1a7f37e..0af5738ccb22 100644 --- a/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java +++ b/dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java @@ -23,6 +23,10 @@ import org.apache.commons.lang3.SystemUtils; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; @@ -46,11 +50,11 @@ void tearDown() { } } @Test - public void testGetPidsStr() throws Exception { + public void testGetPidList() throws Exception { // first String pids = "sudo(6279)---558_1497.sh(6282)---sleep(6354)"; int processId = 6279; - String exceptPidsStr = "6279 6282 6354"; + List exceptPidList = new ArrayList<>(Arrays.asList(6279, 6282, 6354)); String command; if (SystemUtils.IS_OS_MAC) { pids = "-+= 6279 sudo -+- 6282 558_1497.sh --- 6354 sleep"; @@ -61,13 +65,13 @@ public void testGetPidsStr() throws Exception { command = String.format("%s -p %d", TaskConstants.PSTREE, processId); } mockedOSUtils.when(() -> OSUtils.exeCmd(command)).thenReturn(pids); - String actualPidsStr = ProcessUtils.getPidsStr(processId); - Assertions.assertEquals(exceptPidsStr, actualPidsStr); + List actualPidList = ProcessUtils.getPidList(processId); + Assertions.assertEquals(exceptPidList, actualPidList); // second String pids2 = "apache2(2000)---222332-apache2-submit_task.py(2100)---apache2(2101)"; int processId2 = 2000; - String exceptPidsStr2 = "2000 2100 2101"; + List exceptPidList2 = new ArrayList<>(Arrays.asList(2000, 2100, 2101)); String command2; if (SystemUtils.IS_OS_MAC) { pids2 = "-+= 2000 apache2 -+- 2100 222332-apache2-submit_task.py --- 2101 apache2"; @@ -78,13 +82,13 @@ public void testGetPidsStr() throws Exception { command2 = String.format("%s -p %d", TaskConstants.PSTREE, processId2); } mockedOSUtils.when(() -> OSUtils.exeCmd(command2)).thenReturn(pids2); - String actualPidsStr2 = ProcessUtils.getPidsStr(processId2); - Assertions.assertEquals(exceptPidsStr2, actualPidsStr2); + List actualPidList2 = ProcessUtils.getPidList(processId2); + Assertions.assertEquals(exceptPidList2, actualPidList2); // Third String pids3 = "sshd(5000)---sshd(6000)---bash(7000)---python(7100)"; int processId3 = 5000; - String exceptPidsStr3 = "5000 6000 7000 7100"; + List exceptPidList3 = new ArrayList<>(Arrays.asList(5000, 6000, 7000, 7100)); String command3; if (SystemUtils.IS_OS_MAC) { pids3 = "-+= 5000 sshd -+- 6000 sshd --= 7000 bash --- 7100 python"; @@ -95,12 +99,12 @@ public void testGetPidsStr() throws Exception { command3 = String.format("%s -p %d", TaskConstants.PSTREE, processId3); } mockedOSUtils.when(() -> OSUtils.exeCmd(command3)).thenReturn(pids3); - String actualPidsStr3 = ProcessUtils.getPidsStr(processId3); - Assertions.assertEquals(exceptPidsStr3, actualPidsStr3); + List actualPidList3 = ProcessUtils.getPidList(processId3); + Assertions.assertEquals(exceptPidList3, actualPidList3); } @Test - public void tetRemoveK8sClientCache() { + public void testRemoveK8sClientCache() { Assertions.assertDoesNotThrow(() -> { ProcessUtils.removeK8sClientCache("a"); });