|
37 | 37 |
|
38 | 38 | import java.util.Arrays; |
39 | 39 | import java.util.Collections; |
40 | | -import java.util.HashMap; |
41 | 40 | import java.util.List; |
42 | | -import java.util.Map; |
| 41 | +import java.util.UUID; |
43 | 42 | import java.util.stream.Collectors; |
44 | 43 |
|
45 | 44 | import lombok.extern.slf4j.Slf4j; |
|
55 | 54 | import com.aliyun.emr_serverless_spark20230808.models.StartJobRunResponse; |
56 | 55 | import com.aliyun.emr_serverless_spark20230808.models.Tag; |
57 | 56 | import com.aliyun.teaopenapi.models.Config; |
58 | | -import com.aliyun.teautil.models.RuntimeOptions; |
59 | 57 |
|
60 | 58 | @Slf4j |
61 | 59 | public class AliyunServerlessSparkTask extends AbstractRemoteTask { |
@@ -86,6 +84,8 @@ public class AliyunServerlessSparkTask extends AbstractRemoteTask { |
86 | 84 |
|
87 | 85 | private String endpoint; |
88 | 86 |
|
| 87 | + private RetryUtils.RetryPolicy retryPolicy = new RetryUtils.RetryPolicy(10, 1000L); |
| 88 | + |
89 | 89 | protected AliyunServerlessSparkTask(TaskExecutionContext taskExecutionContext) { |
90 | 90 | super(taskExecutionContext); |
91 | 91 | this.taskExecutionContext = taskExecutionContext; |
@@ -126,60 +126,66 @@ public void init() { |
126 | 126 |
|
127 | 127 | @Override |
128 | 128 | public void handle(TaskCallBack taskCallBack) throws TaskException { |
129 | | - try { |
130 | | - GetTemplateResponse getTemplateResponse = aliyunServerlessSparkClient.getTemplate( |
131 | | - aliyunServerlessSparkParameters.getWorkspaceId(), |
132 | | - buildGetTemplateRequest()); |
133 | | - |
134 | | - if (getTemplateResponse != null) { |
135 | | - templateConf = getTemplateResponse.getBody() |
136 | | - .getData() |
137 | | - .getSparkConf() |
138 | | - .stream() |
139 | | - .map(item -> "--conf " + item.getKey() + "=" + item.getValue()) |
140 | | - .collect(Collectors.joining(" ")); |
141 | | - |
142 | | - templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion(); |
143 | | - templateFusion = getTemplateResponse.getBody().getData().getFusion(); |
| 129 | + GetTemplateResponse getTemplateResponse = RetryUtils.retryFunction(() -> { |
| 130 | + try { |
| 131 | + return aliyunServerlessSparkClient.getTemplate( |
| 132 | + aliyunServerlessSparkParameters.getWorkspaceId(), |
| 133 | + buildGetTemplateRequest()); |
| 134 | + } catch (Exception e) { |
| 135 | + throw new TaskException("Failed to get template info", e); |
144 | 136 | } |
145 | | - } catch (Exception e) { |
146 | | - throw new AliyunServerlessSparkTaskException("Failed to get serverless spark template!"); |
| 137 | + }, retryPolicy); |
| 138 | + |
| 139 | + if (getTemplateResponse != null) { |
| 140 | + templateConf = getTemplateResponse.getBody() |
| 141 | + .getData() |
| 142 | + .getSparkConf() |
| 143 | + .stream() |
| 144 | + .map(item -> "--conf " + item.getKey() + "=" + item.getValue()) |
| 145 | + .collect(Collectors.joining(" ")); |
| 146 | + |
| 147 | + templateDisplayReleaseVersion = getTemplateResponse.getBody().getData().getDisplaySparkVersion(); |
| 148 | + templateFusion = getTemplateResponse.getBody().getData().getFusion(); |
147 | 149 | } |
148 | 150 |
|
149 | | - try { |
150 | | - StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters); |
151 | | - RuntimeOptions runtime = new RuntimeOptions(); |
152 | | - Map<String, String> headers = new HashMap<>(); |
153 | | - StartJobRunResponse startJobRunResponse = aliyunServerlessSparkClient.startJobRunWithOptions( |
154 | | - aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest, headers, runtime); |
155 | | - jobRunId = startJobRunResponse.getBody().getJobRunId(); |
156 | | - setAppIds(jobRunId); |
157 | | - log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId); |
158 | | - |
159 | | - while (!RunState.isFinal(currentState)) { |
160 | | - GetJobRunRequest getJobRunRequest = buildGetJobRunRequest(); |
161 | | - |
162 | | - GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> { |
163 | | - try { |
164 | | - return aliyunServerlessSparkClient |
165 | | - .getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, |
166 | | - getJobRunRequest); |
167 | | - } catch (Exception e) { |
168 | | - throw new AliyunServerlessSparkTaskException("Failed to get job run!", e); |
169 | | - } |
170 | | - }, new RetryUtils.RetryPolicy(10, 1000L)); |
171 | | - |
172 | | - currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState()); |
173 | | - log.info("job - {} state - {}", jobRunId, currentState); |
174 | | - Thread.sleep(10 * 1000L); |
| 151 | + StartJobRunRequest startJobRunRequest = buildStartJobRunRequest(aliyunServerlessSparkParameters); |
| 152 | + StartJobRunResponse startJobRunResponse = RetryUtils.retryFunction(() -> { |
| 153 | + try { |
| 154 | + return aliyunServerlessSparkClient.startJobRun( |
| 155 | + aliyunServerlessSparkParameters.getWorkspaceId(), startJobRunRequest); |
| 156 | + } catch (Exception e) { |
| 157 | + throw new AliyunServerlessSparkTaskException("Failed to start job run!"); |
175 | 158 | } |
| 159 | + }, retryPolicy); |
176 | 160 |
|
177 | | - setExitStatusCode(mapFinalStateToExitCode(currentState)); |
| 161 | + jobRunId = startJobRunResponse.getBody().getJobRunId(); |
| 162 | + setAppIds(jobRunId); |
| 163 | + log.info("Successfully submitted serverless spark job, jobRunId - {}", jobRunId); |
178 | 164 |
|
179 | | - } catch (Exception e) { |
180 | | - log.error("Serverless spark job failed!", e); |
181 | | - throw new AliyunServerlessSparkTaskException("Serverless spark job failed!"); |
| 165 | + while (!RunState.isFinal(currentState)) { |
| 166 | + GetJobRunRequest getJobRunRequest = buildGetJobRunRequest(); |
| 167 | + |
| 168 | + GetJobRunResponse getJobRunResponse = RetryUtils.retryFunction(() -> { |
| 169 | + try { |
| 170 | + return aliyunServerlessSparkClient |
| 171 | + .getJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, |
| 172 | + getJobRunRequest); |
| 173 | + } catch (Exception e) { |
| 174 | + throw new AliyunServerlessSparkTaskException("Failed to get job run!", e); |
| 175 | + } |
| 176 | + }, retryPolicy); |
| 177 | + |
| 178 | + currentState = RunState.valueOf(getJobRunResponse.getBody().getJobRun().getState()); |
| 179 | + log.info("job - {} state - {}", jobRunId, currentState); |
| 180 | + |
| 181 | + try { |
| 182 | + Thread.sleep(10 * 1000L); |
| 183 | + } catch (InterruptedException e) { |
| 184 | + break; |
| 185 | + } |
182 | 186 | } |
| 187 | + |
| 188 | + setExitStatusCode(mapFinalStateToExitCode(currentState)); |
183 | 189 | } |
184 | 190 |
|
185 | 191 | @Override |
@@ -211,12 +217,16 @@ public AbstractParameters getParameters() { |
211 | 217 | @Override |
212 | 218 | public void cancelApplication() throws TaskException { |
213 | 219 | CancelJobRunRequest cancelJobRunRequest = buildCancelJobRunRequest(); |
214 | | - try { |
215 | | - aliyunServerlessSparkClient.cancelJobRun(aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, |
216 | | - cancelJobRunRequest); |
217 | | - } catch (Exception e) { |
218 | | - log.error("Failed to cancel serverless spark job run", e); |
219 | | - } |
| 220 | + RetryUtils.retryFunction( |
| 221 | + () -> { |
| 222 | + try { |
| 223 | + return aliyunServerlessSparkClient.cancelJobRun( |
| 224 | + aliyunServerlessSparkParameters.getWorkspaceId(), jobRunId, |
| 225 | + cancelJobRunRequest); |
| 226 | + } catch (Exception e) { |
| 227 | + throw new AliyunServerlessSparkTaskException("Failed to cancel job run!"); |
| 228 | + } |
| 229 | + }, retryPolicy); |
220 | 230 | } |
221 | 231 |
|
222 | 232 | @Override |
@@ -244,6 +254,7 @@ protected StartJobRunRequest buildStartJobRunRequest(AliyunServerlessSparkParame |
244 | 254 | } |
245 | 255 |
|
246 | 256 | StartJobRunRequest startJobRunRequest = new StartJobRunRequest(); |
| 257 | + startJobRunRequest.setClientToken(genereteClientToken()); |
247 | 258 | startJobRunRequest.setRegionId(regionId); |
248 | 259 | startJobRunRequest.setResourceQueueId(aliyunServerlessSparkParameters.getResourceQueueId()); |
249 | 260 | startJobRunRequest.setCodeType(aliyunServerlessSparkParameters.getCodeType()); |
@@ -296,10 +307,15 @@ protected CancelJobRunRequest buildCancelJobRunRequest() { |
296 | 307 | protected GetTemplateRequest buildGetTemplateRequest() { |
297 | 308 | GetTemplateRequest getTemplateRequest = new GetTemplateRequest(); |
298 | 309 |
|
299 | | - if (aliyunServerlessSparkParameters.getTemplateId() != null) { |
| 310 | + if (aliyunServerlessSparkParameters.getTemplateId() != null |
| 311 | + && !aliyunServerlessSparkParameters.getTemplateId().isEmpty()) { |
300 | 312 | getTemplateRequest.setTemplateBizId(aliyunServerlessSparkParameters.getTemplateId()); |
301 | 313 | } |
302 | 314 |
|
303 | 315 | return getTemplateRequest; |
304 | 316 | } |
| 317 | + |
| 318 | + protected String genereteClientToken() { |
| 319 | + return taskExecutionContext.getTaskInstanceId() + "-" + UUID.randomUUID(); |
| 320 | + } |
305 | 321 | } |
0 commit comments