Skip to content

Commit 4ec7c4b

Browse files
abzymeatsjtusunyifan.syfEricGao888
authored
[Improvement-16994][TaskPlugin] support retry for every api call for serverless spark (#17476)
* [Improvement-16994][TaskPlugin] support retry for every api call for serverless spark --------- Co-authored-by: sunyifan.syf <sunyifan.syf@alibaba-inc.com> Co-authored-by: Eric Gao <ericgao.apache@gmail.com>
1 parent 90a8cdc commit 4ec7c4b

2 files changed

Lines changed: 74 additions & 58 deletions

File tree

dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/main/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTask.java

Lines changed: 73 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,8 @@
3737

3838
import java.util.Arrays;
3939
import java.util.Collections;
40-
import java.util.HashMap;
4140
import java.util.List;
42-
import java.util.Map;
41+
import java.util.UUID;
4342
import java.util.stream.Collectors;
4443

4544
import lombok.extern.slf4j.Slf4j;
@@ -55,7 +54,6 @@
5554
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
5655
import com.aliyun.emr_serverless_spark20230808.models.Tag;
5756
import com.aliyun.teaopenapi.models.Config;
58-
import com.aliyun.teautil.models.RuntimeOptions;
5957

6058
@Slf4j
6159
public class AliyunServerlessSparkTask extends AbstractRemoteTask {
@@ -86,6 +84,8 @@ public class AliyunServerlessSparkTask extends AbstractRemoteTask {
8684

8785
private String endpoint;
8886

87+
private RetryUtils.RetryPolicy retryPolicy = new RetryUtils.RetryPolicy(10, 1000L);
88+
8989
protected AliyunServerlessSparkTask(TaskExecutionContext taskExecutionContext) {
9090
super(taskExecutionContext);
9191
this.taskExecutionContext = taskExecutionContext;
@@ -126,60 +126,66 @@ public void init() {
126126

127127
@Override
128128
public void handle(TaskCallBack taskCallBack) throws TaskException {
129-
try {
130-
GetTemplateResponse getTemplateResponse = aliyunServerlessSparkClient.getTemplate(
131-
aliyunServerlessSparkParameters.getWorkspaceId(),
132-
buildGetTemplateRequest());
133-
134-
if (getTemplateResponse != null) {
135-
templateConf = getTemplateResponse.getBody()
136-
.getData()
137-
.getSparkConf()
138-
.stream()
139-
.map(item -> "--conf " + item.getKey() + "=" + item.getValue())
140-
.collect(Collectors.joining(" "));
141-
142-
templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion();
143-
templateFusion = getTemplateResponse.getBody().getData().getFusion();
129+
GetTemplateResponse getTemplateResponse = RetryUtils.retryFunction(() -> {
130+
try {
131+
return aliyunServerlessSparkClient.getTemplate(
132+
aliyunServerlessSparkParameters.getWorkspaceId(),
133+
buildGetTemplateRequest());
134+
} catch (Exception e) {
135+
throw new TaskException("Failed to get template info", e);
144136
}
145-
} catch (Exception e) {
146-
throw new AliyunServerlessSparkTaskException("Failed to get serverless spark template!");
137+
}, retryPolicy);
138+
139+
if (getTemplateResponse != null) {
140+
templateConf = getTemplateResponse.getBody()
141+
.getData()
142+
.getSparkConf()
143+
.stream()
144+
.map(item -> "--conf " + item.getKey() + "=" + item.getValue())
145+
.collect(Collectors.joining(" "));
146+
147+
templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion();
148+
templateFusion = getTemplateResponse.getBody().getData().getFusion();
147149
}
148150

149-
try {
150-
StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters);
151-
RuntimeOptions runtime = new RuntimeOptions();
152-
Map<String, String> headers = new HashMap<>();
153-
StartJobRunResponse startJobRunResponse = aliyunServerlessSparkClient.startJobRunWithOptions(
154-
aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest, headers, runtime);
155-
jobRunId = startJobRunResponse.getBody().getJobRunId();
156-
setAppIds(jobRunId);
157-
log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId);
158-
159-
while (!RunState.isFinal(currentState)) {
160-
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();
161-
162-
GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> {
163-
try {
164-
return aliyunServerlessSparkClient
165-
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
166-
getJobRunRequest);
167-
} catch (Exception e) {
168-
throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
169-
}
170-
}, new RetryUtils.RetryPolicy(10, 1000L));
171-
172-
currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
173-
log.info("job - {} state - {}", jobRunId, currentState);
174-
Thread.sleep(10 * 1000L);
151+
StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters);
152+
StartJobRunResponse startJobRunResponse = RetryUtils.retryFunction(() -> {
153+
try {
154+
return aliyunServerlessSparkClient.startJobRun(
155+
aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest);
156+
} catch (Exception e) {
157+
throw new AliyunServerlessSparkTaskException("Failed to start job run!");
175158
}
159+
}, retryPolicy);
176160

177-
setExitStatusCode(mapFinalStateToExitCode(currentState));
161+
jobRunId = startJobRunResponse.getBody().getJobRunId();
162+
setAppIds(jobRunId);
163+
log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId);
178164

179-
} catch (Exception e) {
180-
log.error("Serverless spark job failed!", e);
181-
throw new AliyunServerlessSparkTaskException("Serverless spark job failed!");
165+
while (!RunState.isFinal(currentState)) {
166+
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();
167+
168+
GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> {
169+
try {
170+
return aliyunServerlessSparkClient
171+
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
172+
getJobRunRequest);
173+
} catch (Exception e) {
174+
throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
175+
}
176+
}, retryPolicy);
177+
178+
currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
179+
log.info("job - {} state - {}", jobRunId, currentState);
180+
181+
try {
182+
Thread.sleep(10 * 1000L);
183+
} catch (InterruptedException e) {
184+
break;
185+
}
182186
}
187+
188+
setExitStatusCode(mapFinalStateToExitCode(currentState));
183189
}
184190

185191
@Override
@@ -211,12 +217,16 @@ public AbstractParameters getParameters() {
211217
@Override
212218
public void cancelApplication() throws TaskException {
213219
CancelJobRunRequest cancelJobRunRequest = buildCancelJobRunRequest();
214-
try {
215-
aliyunServerlessSparkClient.cancelJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
216-
cancelJobRunRequest);
217-
} catch (Exception e) {
218-
log.error("Failed to cancel serverless spark job run", e);
219-
}
220+
RetryUtils.retryFunction(
221+
() -> {
222+
try {
223+
return aliyunServerlessSparkClient.cancelJobRun(
224+
aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
225+
cancelJobRunRequest);
226+
} catch (Exception e) {
227+
throw new AliyunServerlessSparkTaskException("Failed to cancel job run!");
228+
}
229+
}, retryPolicy);
220230
}
221231

222232
@Override
@@ -244,6 +254,7 @@ protected StartJobRunRequest buildStartJobRunRequest(AliyunServerlessSparkParame
244254
}
245255

246256
StartJobRunRequest startJobRunRequest = new StartJobRunRequest();
257+
startJobRunRequest.setClientToken(genereteClientToken());
247258
startJobRunRequest.setRegionId(regionId);
248259
startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId());
249260
startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType());
@@ -296,10 +307,15 @@ protected CancelJobRunRequest buildCancelJobRunRequest() {
296307
protected GetTemplateRequest buildGetTemplateRequest() {
297308
GetTemplateRequest getTemplateRequest = new GetTemplateRequest();
298309

299-
if (aliyunServerlessSparkParameters.getTemplateId() != null) {
310+
if (aliyunServerlessSparkParameters.getTemplateId() != null
311+
&& !aliyunServerlessSparkParameters.getTemplateId().isEmpty()) {
300312
getTemplateRequest.setTemplateBizId(aliyunServerlessSparkParameters.getTemplateId());
301313
}
302314

303315
return getTemplateRequest;
304316
}
317+
318+
protected String genereteClientToken() {
319+
return taskExecutionContext.getTaskInstanceId() + "-" + UUID.randomUUID();
320+
}
305321
}

dolphinscheduler-task-plugin/dolphinscheduler-task-aliyunserverlessspark/src/test/java/org/apache/dolphinscheduler/plugin/task/aliyunserverlessspark/AliyunServerlessSparkTaskTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -164,7 +164,7 @@ public void testHandle() {
164164
doReturn(startJobRunResponseBody).when(mockStartJobRunResponse).getBody();
165165
Assertions.assertDoesNotThrow(
166166
() -> doReturn(mockStartJobRunResponse).when(mockAliyunServerlessSparkClient)
167-
.startJobRunWithOptions(any(), any(), any(), any()));
167+
.startJobRun(any(), any()));
168168

169169
doReturn(mockGetJobRunRequest).when(aliyunServerlessSparkTask).buildGetJobRunRequest();
170170
GetJobRunResponseBody getJobRunResponseBody = new GetJobRunResponseBody();

0 commit comments

Comments
 (0)