Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,8 @@

import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;

import lombok.extern.slf4j.Slf4j;
Expand All @@ -55,7 +54,6 @@
import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse;
import com.aliyun.emr_serverless_spark20230808.models.Tag;
import com.aliyun.teaopenapi.models.Config;
import com.aliyun.teautil.models.RuntimeOptions;

@Slf4j
public class AliyunServerlessSparkTask extends AbstractRemoteTask {
Expand Down Expand Up @@ -86,6 +84,8 @@ public class AliyunServerlessSparkTask extends AbstractRemoteTask {

private String endpoint;

private RetryUtils.RetryPolicy retryPolicy = new RetryUtils.RetryPolicy(10, 1000L);

protected AliyunServerlessSparkTask(TaskExecutionContext taskExecutionContext) {
super(taskExecutionContext);
this.taskExecutionContext = taskExecutionContext;
Expand Down Expand Up @@ -126,60 +126,66 @@ public void init() {

@Override
public void handle(TaskCallBack taskCallBack) throws TaskException {
try {
GetTemplateResponse getTemplateResponse = aliyunServerlessSparkClient.getTemplate(
aliyunServerlessSparkParameters.getWorkspaceId(),
buildGetTemplateRequest());

if (getTemplateResponse != null) {
templateConf = getTemplateResponse.getBody()
.getData()
.getSparkConf()
.stream()
.map(item -> "--conf " + item.getKey() + "=" + item.getValue())
.collect(Collectors.joining(" "));

templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion();
templateFusion = getTemplateResponse.getBody().getData().getFusion();
GetTemplateResponse getTemplateResponse = RetryUtils.retryFunction(() -> {
try {
return aliyunServerlessSparkClient.getTemplate(
aliyunServerlessSparkParameters.getWorkspaceId(),
buildGetTemplateRequest());
} catch (Exception e) {
throw new TaskException("Failed to get template info", e);
}
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to get serverless spark template!");
}, retryPolicy);

if (getTemplateResponse != null) {
templateConf = getTemplateResponse.getBody()
.getData()
.getSparkConf()
.stream()
.map(item -> "--conf " + item.getKey() + "=" + item.getValue())
.collect(Collectors.joining(" "));

templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion();
templateFusion = getTemplateResponse.getBody().getData().getFusion();
}

try {
StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters);
RuntimeOptions runtime = new RuntimeOptions();
Map<String, String> headers = new HashMap<>();
StartJobRunResponse startJobRunResponse = aliyunServerlessSparkClient.startJobRunWithOptions(
aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest, headers, runtime);
jobRunId = startJobRunResponse.getBody().getJobRunId();
setAppIds(jobRunId);
log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId);

while (!RunState.isFinal(currentState)) {
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();

GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> {
try {
return aliyunServerlessSparkClient
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
getJobRunRequest);
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
}
}, new RetryUtils.RetryPolicy(10, 1000L));

currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
log.info("job - {} state - {}", jobRunId, currentState);
Thread.sleep(10 * 1000L);
StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters);
StartJobRunResponse startJobRunResponse = RetryUtils.retryFunction(() -> {
try {
return aliyunServerlessSparkClient.startJobRun(
aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest);
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to start job run!");
Comment on lines +151 to +157
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to exist timeout issue here, if the http is timeout, then client will retry, but server side might already handle the previous, then will cause the request be handled twice. I'm unsure whether the service side has implemented idempotency handling. Because a new token is passed here each time, so the server side cannot know the second request are retry.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There seems to exist timeout issue here, if the http is timeout, then client will retry, but server side might already handle the previous, then will cause the request be handled twice. I'm unsure whether the service side has implemented idempotency handling. Because a new token is passed here each time, so the server side cannot know the second request are retry.

@ruanwenjun @abzymeatsjtu Looks like the token is generated and set when initializing the request line#257, therefore, I assume the idempotency is alright here?

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, I missed that. If the server-side is capable of implementing idempotency handling via tokens, that would be a great solution.

}
}, retryPolicy);

setExitStatusCode(mapFinalStateToExitCode(currentState));
jobRunId = startJobRunResponse.getBody().getJobRunId();
setAppIds(jobRunId);
log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId);

} catch (Exception e) {
log.error("Serverless spark job failed!", e);
throw new AliyunServerlessSparkTaskException("Serverless spark job failed!");
while (!RunState.isFinal(currentState)) {
GetJobRunRequest getJobRunRequest = buildGetJobRunRequest();

GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> {
try {
return aliyunServerlessSparkClient
.getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
getJobRunRequest);
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to get job run!", e);
}
}, retryPolicy);

currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState());
log.info("job - {} state - {}", jobRunId, currentState);

try {
Thread.sleep(10 * 1000L);
} catch (InterruptedException e) {
break;
}
}

setExitStatusCode(mapFinalStateToExitCode(currentState));
}

@Override
Expand Down Expand Up @@ -211,12 +217,16 @@ public AbstractParameters getParameters() {
@Override
public void cancelApplication() throws TaskException {
CancelJobRunRequest cancelJobRunRequest = buildCancelJobRunRequest();
try {
aliyunServerlessSparkClient.cancelJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
cancelJobRunRequest);
} catch (Exception e) {
log.error("Failed to cancel serverless spark job run", e);
}
RetryUtils.retryFunction(
() -> {
try {
return aliyunServerlessSparkClient.cancelJobRun(
aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId,
cancelJobRunRequest);
} catch (Exception e) {
throw new AliyunServerlessSparkTaskException("Failed to cancel job run!");
}
}, retryPolicy);
}

@Override
Expand Down Expand Up @@ -244,6 +254,7 @@ protected StartJobRunRequest buildStartJobRunRequest(AliyunServerlessSparkParame
}

StartJobRunRequest startJobRunRequest = new StartJobRunRequest();
startJobRunRequest.setClientToken(genereteClientToken());
startJobRunRequest.setRegionId(regionId);
startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId());
startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType());
Expand Down Expand Up @@ -296,10 +307,15 @@ protected CancelJobRunRequest buildCancelJobRunRequest() {
protected GetTemplateRequest buildGetTemplateRequest() {
GetTemplateRequest getTemplateRequest = new GetTemplateRequest();

if (aliyunServerlessSparkParameters.getTemplateId() != null) {
if (aliyunServerlessSparkParameters.getTemplateId() != null
&& !aliyunServerlessSparkParameters.getTemplateId().isEmpty()) {
getTemplateRequest.setTemplateBizId(aliyunServerlessSparkParameters.getTemplateId());
}

return getTemplateRequest;
}

protected String genereteClientToken() {
return taskExecutionContext.getTaskInstanceId() + "-" + UUID.randomUUID();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ public void testHandle() {
doReturn(startJobRunResponseBody).when(mockStartJobRunResponse).getBody();
Assertions.assertDoesNotThrow(
() -> doReturn(mockStartJobRunResponse).when(mockAliyunServerlessSparkClient)
.startJobRunWithOptions(any(), any(), any(), any()));
.startJobRun(any(), any()));

doReturn(mockGetJobRunRequest).when(aliyunServerlessSparkTask).buildGetJobRunRequest();
GetJobRunResponseBody getJobRunResponseBody = new GetJobRunResponseBody();
Expand Down
Loading