Skip to content

Commit 383ba93

Browse files
njnu-seafishdavidzollo
authored andcommitted
[Fix-17436][Workflow]Task timeout kill throw exception (apache#17437)
1 parent 4cf8001 commit 383ba93

3 files changed

Lines changed: 79 additions & 38 deletions

File tree

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/AbstractCommandExecutor.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.apache.dolphinscheduler.common.constants.Constants.EMPTY_STRING;
2121
import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS;
2222
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_FAILURE;
23+
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_HARD_KILL;
2324
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.EXIT_CODE_KILL;
2425

2526
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
@@ -190,13 +191,13 @@ public TaskResponse run(IShellInterceptorBuilder iShellInterceptorBuilder,
190191
result.setExitStatusCode(this.process.exitValue());
191192

192193
} else {
193-
log.error("process has failure, the task timeout configuration value is:{}, ready to kill ...",
194-
taskRequest.getTaskTimeout());
194+
log.error("process has failure due to timeout kill, timeout value is:{}, timeoutStrategy is:{}",
195+
taskRequest.getTaskTimeout(), taskRequest.getTaskTimeoutStrategy());
195196
result.setExitStatusCode(EXIT_CODE_FAILURE);
196-
cancelApplication();
197197
}
198198
int exitCode = this.process.exitValue();
199-
String exitLogMessage = EXIT_CODE_KILL == exitCode ? "process has killed." : "process has exited.";
199+
String exitLogMessage = (EXIT_CODE_KILL == exitCode || EXIT_CODE_HARD_KILL == exitCode) ? "process has killed."
200+
: "process has exited.";
200201
log.info("{} execute path:{}, processId:{} ,exitStatusCode:{} ,processWaitForStatus:{} ,processExitValue:{}",
201202
exitLogMessage, taskRequest.getExecutePath(), processId, result.getExitStatusCode(), status, exitCode);
202203
return result;

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

Lines changed: 59 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444

4545
import java.util.ArrayList;
4646
import java.util.Arrays;
47+
import java.util.Collections;
4748
import java.util.HashMap;
4849
import java.util.List;
4950
import java.util.Map;
@@ -113,15 +114,7 @@ public static boolean kill(@NonNull TaskExecutionContext request) {
113114
}
114115

115116
// Get all child processes
116-
String pids = getPidsStr(processId);
117-
String[] pidArray = PID_PATTERN.split(pids);
118-
if (pidArray.length == 0) {
119-
log.warn("No valid PIDs found for process: {}", processId);
120-
return true;
121-
}
122-
123-
// Convert PID string to list of integers
124-
List<Integer> pidList = Arrays.stream(pidArray).map(Integer::parseInt).collect(Collectors.toList());
117+
List<Integer> pidList = getPidList(processId);
125118

126119
// 1. Try to terminate gracefully (SIGINT)
127120
boolean gracefulKillSuccess = sendKillSignal("SIGINT", pidList, request.getTenantCode());
@@ -251,26 +244,69 @@ private static boolean isProcessAlive(int pid, String tenantCode) {
251244
}
252245

253246
/**
254-
* get pids str.
247+
* Get all descendant process IDs (including the given process) using pstree.
255248
*
256-
* @param processId process id
257-
* @return pids pid String
258-
* @throws Exception exception
249+
* @param processId the root process ID
250+
* @return list of process IDs; returns empty list if no PIDs found (e.g., process not exists)
251+
* @throws IllegalArgumentException if any PID is invalid (blank, non-numeric, or non-positive)
252+
* @throws Exception if command execution fails unexpectedly (e.g., command not found)
259253
*/
260-
public static String getPidsStr(int processId) throws Exception {
261-
254+
public static List<Integer> getPidList(int processId) throws Exception {
262255
String rawPidStr;
263256

264-
// pstree pid get sub pids
265-
if (SystemUtils.IS_OS_MAC) {
266-
rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId));
267-
} else if (SystemUtils.IS_OS_LINUX) {
268-
rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
269-
} else {
270-
rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
257+
try {
258+
if (SystemUtils.IS_OS_MAC) {
259+
rawPidStr = OSUtils.exeCmd(String.format("%s -sp %d", TaskConstants.PSTREE, processId));
260+
} else if (SystemUtils.IS_OS_LINUX) {
261+
rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
262+
} else {
263+
log.warn("Unsupported OS for pstree: {}. Attempting generic command.", SystemUtils.OS_NAME);
264+
rawPidStr = OSUtils.exeCmd(String.format("%s -p %d", TaskConstants.PSTREE, processId));
265+
}
266+
} catch (Exception ex) {
267+
log.error("Failed to execute 'pstree' command for process ID: {}", processId, ex);
268+
throw ex;
269+
}
270+
271+
String pidsStr = parsePidStr(rawPidStr);
272+
if (StringUtils.isBlank(pidsStr)) {
273+
log.warn("No PIDs found for process: {}", processId);
274+
return Collections.emptyList();
275+
}
276+
277+
String[] pidArray = PID_PATTERN.split(pidsStr.trim());
278+
if (pidArray.length == 0) {
279+
log.warn("No PIDs parsed for process: {}", processId);
280+
return Collections.emptyList();
281+
}
282+
283+
List<Integer> pidList = new ArrayList<>();
284+
for (String pidStr : pidArray) {
285+
pidStr = pidStr.trim();
286+
287+
if (StringUtils.isBlank(pidStr)) {
288+
log.error("Empty or blank PID found in output for process: {}, full PIDs string: {}", processId,
289+
pidsStr);
290+
throw new IllegalArgumentException("Empty or blank PID found in output from process: " + processId);
291+
}
292+
293+
try {
294+
int pid = Integer.parseInt(pidStr);
295+
if (pid <= 0) {
296+
log.error("Invalid PID value (must be positive): {} for process: {}, full PIDs string: {}",
297+
pidStr, processId, pidsStr);
298+
throw new IllegalArgumentException("Invalid PID value (must be positive): " + pid);
299+
}
300+
pidList.add(pid);
301+
} catch (NumberFormatException e) {
302+
log.error("Invalid PID format in output: {} for process: {}, full PIDs string: {}",
303+
pidStr, processId, pidsStr, e);
304+
throw new IllegalArgumentException(
305+
"Invalid PID format in output: '" + pidStr + "' (from process " + processId + ")", e);
306+
}
271307
}
272308

273-
return parsePidStr(rawPidStr);
309+
return pidList;
274310
}
275311

276312
public static String parsePidStr(String rawPidStr) {

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/test/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtilsTest.java

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,10 @@
2323

2424
import org.apache.commons.lang3.SystemUtils;
2525

26+
import java.util.ArrayList;
27+
import java.util.Arrays;
28+
import java.util.List;
29+
2630
import org.junit.jupiter.api.AfterEach;
2731
import org.junit.jupiter.api.Assertions;
2832
import org.junit.jupiter.api.BeforeEach;
@@ -46,11 +50,11 @@ void tearDown() {
4650
}
4751
}
4852
@Test
49-
public void testGetPidsStr() throws Exception {
53+
public void testGetPidList() throws Exception {
5054
// first
5155
String pids = "sudo(6279)---558_1497.sh(6282)---sleep(6354)";
5256
int processId = 6279;
53-
String exceptPidsStr = "6279 6282 6354";
57+
List<Integer> exceptPidList = new ArrayList<>(Arrays.asList(6279, 6282, 6354));
5458
String command;
5559
if (SystemUtils.IS_OS_MAC) {
5660
pids = "-+= 6279 sudo -+- 6282 558_1497.sh --- 6354 sleep";
@@ -61,13 +65,13 @@ public void testGetPidsStr() throws Exception {
6165
command = String.format("%s -p %d", TaskConstants.PSTREE, processId);
6266
}
6367
mockedOSUtils.when(() -> OSUtils.exeCmd(command)).thenReturn(pids);
64-
String actualPidsStr = ProcessUtils.getPidsStr(processId);
65-
Assertions.assertEquals(exceptPidsStr, actualPidsStr);
68+
List<Integer> actualPidList = ProcessUtils.getPidList(processId);
69+
Assertions.assertEquals(exceptPidList, actualPidList);
6670

6771
// second
6872
String pids2 = "apache2(2000)---222332-apache2-submit_task.py(2100)---apache2(2101)";
6973
int processId2 = 2000;
70-
String exceptPidsStr2 = "2000 2100 2101";
74+
List<Integer> exceptPidList2 = new ArrayList<>(Arrays.asList(2000, 2100, 2101));
7175
String command2;
7276
if (SystemUtils.IS_OS_MAC) {
7377
pids2 = "-+= 2000 apache2 -+- 2100 222332-apache2-submit_task.py --- 2101 apache2";
@@ -78,13 +82,13 @@ public void testGetPidsStr() throws Exception {
7882
command2 = String.format("%s -p %d", TaskConstants.PSTREE, processId2);
7983
}
8084
mockedOSUtils.when(() -> OSUtils.exeCmd(command2)).thenReturn(pids2);
81-
String actualPidsStr2 = ProcessUtils.getPidsStr(processId2);
82-
Assertions.assertEquals(exceptPidsStr2, actualPidsStr2);
85+
List<Integer> actualPidList2 = ProcessUtils.getPidList(processId2);
86+
Assertions.assertEquals(exceptPidList2, actualPidList2);
8387

8488
// Third
8589
String pids3 = "sshd(5000)---sshd(6000)---bash(7000)---python(7100)";
8690
int processId3 = 5000;
87-
String exceptPidsStr3 = "5000 6000 7000 7100";
91+
List<Integer> exceptPidList3 = new ArrayList<>(Arrays.asList(5000, 6000, 7000, 7100));
8892
String command3;
8993
if (SystemUtils.IS_OS_MAC) {
9094
pids3 = "-+= 5000 sshd -+- 6000 sshd --= 7000 bash --- 7100 python";
@@ -95,12 +99,12 @@ public void testGetPidsStr() throws Exception {
9599
command3 = String.format("%s -p %d", TaskConstants.PSTREE, processId3);
96100
}
97101
mockedOSUtils.when(() -> OSUtils.exeCmd(command3)).thenReturn(pids3);
98-
String actualPidsStr3 = ProcessUtils.getPidsStr(processId3);
99-
Assertions.assertEquals(exceptPidsStr3, actualPidsStr3);
102+
List<Integer> actualPidList3 = ProcessUtils.getPidList(processId3);
103+
Assertions.assertEquals(exceptPidList3, actualPidList3);
100104
}
101105

102106
@Test
103-
public void tetRemoveK8sClientCache() {
107+
public void testRemoveK8sClientCache() {
104108
Assertions.assertDoesNotThrow(() -> {
105109
ProcessUtils.removeK8sClientCache("a");
106110
});

0 commit comments

Comments
 (0)