Skip to content

Commit e8f8a8c

Browse files
authored
[Fix-17316][Task-API] Add check process status after killing task (#17320)
1 parent ff3a9c0 commit e8f8a8c

10 files changed

Lines changed: 220 additions & 16 deletions

File tree

deploy/kubernetes/dolphinscheduler/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -155,6 +155,7 @@ Please refer to the [Quick Start in Kubernetes](../../../docs/docs/en/guide/inst
155155
| conf.common."resource.manager.httpaddress.port" | int | `8088` | resourcemanager port, the default value is 8088 if not specified |
156156
| conf.common."resource.storage.type" | string | `"S3"` | resource storage type: HDFS, S3, OSS, GCS, ABS, NONE |
157157
| conf.common."resource.storage.upload.base.path" | string | `"/dolphinscheduler"` | resource store on HDFS/S3 path, resource file will store to this base path, self configuration, please make sure the directory exists on hdfs and have read write permissions. "/dolphinscheduler" is recommended |
158+
| conf.common."shell.kill.wait.timeout" | int | `10` | If the shell process is still active after this timeout value (in seconds), then will use kill -9 to kill it |
158159
| conf.common."sudo.enable" | bool | `true` | use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions |
159160
| conf.common."support.hive.oneSession" | bool | `false` | Whether hive SQL is executed in the same session |
160161
| conf.common."task.resource.limit.state" | bool | `false` | Task resource limit state |

deploy/kubernetes/dolphinscheduler/values.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -345,6 +345,9 @@ conf:
345345
# -- development state
346346
development.state: false
347347

348+
# -- If the shell process is still active after this timeout value (in seconds), then will use kill -9 to kill it
349+
shell.kill.wait.timeout: 10
350+
348351
# -- set path of conda.sh
349352
conda.path: /opt/anaconda3/etc/profile.d/conda.sh
350353

dolphinscheduler-api-test/dolphinscheduler-api-test-case/src/test/resources/docker/file-manage/common.properties

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ sudo.enable=true
9999
# development state
100100
development.state=false
101101

102+
# If the shell process is still active after this timeout value (in seconds), then will use kill -9 to kill it
103+
shell.kill.wait.timeout=10
104+
102105
# set path of conda.sh
103106
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
104107

@@ -111,4 +114,5 @@ ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
111114
ml.mlflow.preset_repository_version="main"
112115

113116
# way to collect applicationId: log(original regex match), aop
114-
appId.collect: log
117+
appId.collect=log
118+

dolphinscheduler-common/src/main/java/org/apache/dolphinscheduler/common/constants/Constants.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,11 @@ public final class Constants {
6969
*/
7070
public static final String DEVELOPMENT_STATE = "development.state";
7171

72+
/**
73+
* shell.kill.wait.timeout: this property defines the wait timeout in seconds before using SIGKILL.
74+
*/
75+
public static final String SHELL_KILL_WAIT_TIMEOUT = "shell.kill.wait.timeout";
76+
7277
/**
7378
* sudo enable
7479
*/

dolphinscheduler-common/src/main/resources/common.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,9 @@ dolphin.scheduler.network.interface.restrict=docker0
8484
# development state
8585
development.state=false
8686

87+
# If the shell process is still active after this timeout value (in seconds), then will use kill -9 to kill it
88+
shell.kill.wait.timeout=10
89+
8790
# set path of conda.sh
8891
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
8992

dolphinscheduler-common/src/test/resources/common.properties

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,9 @@ dolphin.scheduler.network.interface.restrict=docker0
148148
# development state
149149
development.state=false
150150

151+
# If the shell process is still active after this timeout value (in seconds), then will use kill -9 to kill it
152+
shell.kill.wait.timeout=10
153+
151154
# set path of conda.sh
152155
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
153156

dolphinscheduler-e2e/dolphinscheduler-e2e-case/src/test/resources/docker/file-manage/common.properties

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,9 @@ sudo.enable=true
9999
# development state
100100
development.state=false
101101

102+
# If the shell process is still active after this timeout value (in seconds), then will use kill -9 to kill it
103+
shell.kill.wait.timeout=10
104+
102105
# set path of conda.sh
103106
conda.path=/opt/anaconda3/etc/profile.d/conda.sh
104107

@@ -111,4 +114,4 @@ ml.mlflow.preset_repository=https://github.com/apache/dolphinscheduler-mlflow
111114
ml.mlflow.preset_repository_version="main"
112115

113116
# way to collect applicationId: log(original regex match), aop
114-
appId.collect: log
117+
appId.collect=log

dolphinscheduler-task-plugin/dolphinscheduler-task-api/src/main/java/org/apache/dolphinscheduler/plugin/task/api/utils/ProcessUtils.java

Lines changed: 103 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,14 @@
1717

1818
package org.apache.dolphinscheduler.plugin.task.api.utils;
1919

20+
import static org.apache.dolphinscheduler.common.constants.Constants.SLEEP_TIME_MILLIS;
2021
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.APPID_COLLECT;
2122
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.COMMA;
2223
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.DEFAULT_COLLECT_WAY;
2324
import static org.apache.dolphinscheduler.plugin.task.api.TaskConstants.TASK_TYPE_SET_K8S;
2425

26+
import org.apache.dolphinscheduler.common.constants.Constants;
27+
import org.apache.dolphinscheduler.common.thread.ThreadUtils;
2528
import org.apache.dolphinscheduler.common.utils.OSUtils;
2629
import org.apache.dolphinscheduler.common.utils.PropertyUtils;
2730
import org.apache.dolphinscheduler.plugin.task.api.K8sTaskExecutionContext;
@@ -46,8 +49,10 @@
4649
import java.util.Map;
4750
import java.util.Objects;
4851
import java.util.ServiceLoader;
52+
import java.util.concurrent.TimeUnit;
4953
import java.util.regex.Matcher;
5054
import java.util.regex.Pattern;
55+
import java.util.stream.Collectors;
5156

5257
import lombok.NonNull;
5358
import lombok.extern.slf4j.Slf4j;
@@ -56,6 +61,10 @@
5661
@Slf4j
5762
public final class ProcessUtils {
5863

64+
// If the shell process is still active after this timeout value (in seconds), then will use kill -9 to kill it
65+
private static final Integer SHELL_KILL_WAIT_TIMEOUT =
66+
PropertyUtils.getInt(Constants.SHELL_KILL_WAIT_TIMEOUT, 10);
67+
5968
private ProcessUtils() {
6069
throw new IllegalStateException("Utility class");
6170
}
@@ -77,13 +86,18 @@ private ProcessUtils() {
7786
/**
7887
* Expression of PID recognition in Windows scene
7988
*/
80-
private static final Pattern WINDOWSPATTERN = Pattern.compile("(\\d+)");
89+
private static final Pattern WINDOWSPATTERN = Pattern.compile("\\((\\d+)\\)");
8190

8291
/**
8392
* Expression of PID recognition in Linux scene
8493
*/
8594
private static final Pattern LINUXPATTERN = Pattern.compile("\\((\\d+)\\)");
8695

96+
/**
97+
* PID recognition pattern
98+
*/
99+
private static final Pattern PID_PATTERN = Pattern.compile("\\s+");
100+
87101
/**
88102
* Terminate the task process, support multi-level signal processing and fallback strategy
89103
* @param request Task execution context
@@ -100,29 +114,32 @@ public static boolean kill(@NonNull TaskExecutionContext request) {
100114

101115
// Get all child processes
102116
String pids = getPidsStr(processId);
103-
String[] pidArray = pids.split("\\s+");
117+
String[] pidArray = PID_PATTERN.split(pids);
104118
if (pidArray.length == 0) {
105119
log.warn("No valid PIDs found for process: {}", processId);
106120
return true;
107121
}
108122

123+
// Convert PID string to list of integers
124+
List<Integer> pidList = Arrays.stream(pidArray).map(Integer::parseInt).collect(Collectors.toList());
125+
109126
// 1. Try to terminate gracefully (SIGINT)
110-
boolean gracefulKillSuccess = sendKillSignal("SIGINT", pids, request.getTenantCode());
127+
boolean gracefulKillSuccess = sendKillSignal("SIGINT", pidList, request.getTenantCode());
111128
if (gracefulKillSuccess) {
112129
log.info("Successfully killed process tree using SIGINT, processId: {}", processId);
113130
return true;
114131
}
115132

116133
// 2. Try to terminate forcefully (SIGTERM)
117-
boolean termKillSuccess = sendKillSignal("SIGTERM", pids, request.getTenantCode());
134+
boolean termKillSuccess = sendKillSignal("SIGTERM", pidList, request.getTenantCode());
118135
if (termKillSuccess) {
119136
log.info("Successfully killed process tree using SIGTERM, processId: {}", processId);
120137
return true;
121138
}
122139

123140
// 3. As a last resort, use `kill -9`
124141
log.warn("SIGINT & SIGTERM failed, using SIGKILL as a last resort for processId: {}", processId);
125-
boolean forceKillSuccess = sendKillSignal("SIGKILL", pids, request.getTenantCode());
142+
boolean forceKillSuccess = sendKillSignal("SIGKILL", pidList, request.getTenantCode());
126143
if (forceKillSuccess) {
127144
log.info("Successfully sent SIGKILL signal to process tree, processId: {}", processId);
128145
} else {
@@ -139,23 +156,100 @@ public static boolean kill(@NonNull TaskExecutionContext request) {
139156
/**
140157
* Send a kill signal to a process group
141158
* @param signal Signal type (SIGINT, SIGTERM, SIGKILL)
142-
* @param pids Process ID list
159+
* @param pidList Process ID list
143160
* @param tenantCode Tenant code
144161
*/
145-
private static boolean sendKillSignal(String signal, String pids, String tenantCode) {
162+
private static boolean sendKillSignal(String signal, List<Integer> pidList, String tenantCode) {
163+
if (pidList == null || pidList.isEmpty()) {
164+
log.info("No process needs to be killed.");
165+
return true;
166+
}
167+
168+
List<Integer> alivePidList = getAlivePidList(pidList, tenantCode);
169+
if (alivePidList.isEmpty()) {
170+
log.info("All processes already terminated.");
171+
return true;
172+
}
173+
174+
String pids = alivePidList.stream()
175+
.map(String::valueOf)
176+
.collect(Collectors.joining(" "));
177+
146178
try {
179+
// 1. Send the kill signal
147180
String killCmd = String.format("kill -s %s %s", signal, pids);
148181
killCmd = OSUtils.getSudoCmd(tenantCode, killCmd);
149182
log.info("Sending {} to process group: {}, command: {}", signal, pids, killCmd);
150183
OSUtils.exeCmd(killCmd);
151184

152-
return true;
185+
// 2. Wait for the processes to terminate with a timeout-based polling mechanism
186+
// Max wait time
187+
long timeoutMillis = TimeUnit.SECONDS.toMillis(SHELL_KILL_WAIT_TIMEOUT);
188+
189+
long startTime = System.currentTimeMillis();
190+
while (!alivePidList.isEmpty() && (System.currentTimeMillis() - startTime < timeoutMillis)) {
191+
// Remove if process is no longer alive
192+
alivePidList.removeIf(pid -> !isProcessAlive(pid, tenantCode));
193+
if (!alivePidList.isEmpty()) {
194+
// Wait for a short interval before checking process statuses again, to avoid excessive CPU usage
195+
// from tight-loop polling.
196+
ThreadUtils.sleep(SLEEP_TIME_MILLIS);
197+
}
198+
}
199+
200+
// 3. Return final result based on whether all processes were terminated
201+
if (alivePidList.isEmpty()) {
202+
// All processes have been successfully terminated
203+
log.debug("Kill command: {}, kill succeeded", killCmd);
204+
return true;
205+
} else {
206+
String remainingPids = alivePidList.stream()
207+
.map(String::valueOf)
208+
.collect(Collectors.joining(" "));
209+
log.info("Kill command: {}, timed out, still running PIDs: {}", killCmd, remainingPids);
210+
return false;
211+
}
153212
} catch (Exception e) {
154213
log.error("Error sending {} to process: {}", signal, pids, e);
155214
return false;
156215
}
157216
}
158217

218+
/**
219+
* Returns a list of process IDs that are still running.
220+
* This method filters the provided list of PIDs by checking whether each process is still active
221+
*
222+
* @param pidList the list of process IDs to check
223+
* @param tenantCode the tenant identifier used for permission control or logging context
224+
* @return a new list containing only the PIDs of processes that are still running;
225+
* returns an empty list if none are alive
226+
*/
227+
private static List<Integer> getAlivePidList(List<Integer> pidList, String tenantCode) {
228+
return pidList.stream()
229+
.filter(pid -> isProcessAlive(pid, tenantCode))
230+
.collect(Collectors.toList());
231+
}
232+
233+
/**
234+
* Check if a process with the specified PID is alive.
235+
*
236+
* @param pid the process ID to check
237+
* @return true if the process exists and is running, false otherwise
238+
*/
239+
private static boolean isProcessAlive(int pid, String tenantCode) {
240+
try {
241+
// Use kill -0 to check if the process exists; it does not actually send a signal
242+
String checkCmd = String.format("kill -0 %d", pid);
243+
checkCmd = OSUtils.getSudoCmd(tenantCode, checkCmd);
244+
OSUtils.exeCmd(checkCmd);
245+
// If the command executes successfully, the process exists
246+
return true;
247+
} catch (Exception e) {
248+
// If the command fails, the process does not exist
249+
return false;
250+
}
251+
}
252+
159253
/**
160254
* get pids str.
161255
*
@@ -249,6 +343,7 @@ public static void cancelApplication(TaskExecutionContext taskExecutionContext)
249343
}
250344
ApplicationManager applicationManager = applicationManagerMap.get(ResourceManagerType.YARN);
251345
applicationManager.killApplication(new YarnApplicationManagerContext(executePath, tenantCode, appIds));
346+
log.info("yarn application [{}] is killed or already finished", appIds);
252347
}
253348
} catch (Exception e) {
254349
log.error("Cancel application failed: {}", e.getMessage());

0 commit comments

Comments
 (0)