Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 54 additions & 4 deletions src/main/java/com/alibaba/dashscope/api/AsynchronousApi.java
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

import com.alibaba.dashscope.base.HalfDuplexParamBase;
import com.alibaba.dashscope.common.DashScopeResult;
import com.alibaba.dashscope.common.Status;
import com.alibaba.dashscope.common.TaskStatus;
import com.alibaba.dashscope.exception.ApiException;
import com.alibaba.dashscope.exception.NoApiKeyException;
Expand Down Expand Up @@ -76,12 +77,17 @@ public DashScopeResult asyncCall(ParamT param, ServiceOption serviceOption)
* @param apiKey The api-key.
* @param baseUrl The base http url.
* @param customHeaders The custom headers.
* @param timeoutSeconds The maximum time to wait in seconds.
* @return The task result.
* @throws NoApiKeyException Can not find api key
* @throws ApiException The request failed, possibly due to a network or data error.
* @throws ApiException The request failed, possibly due to a network or data error, or timeout.
*/
public DashScopeResult wait(
String taskId, String apiKey, String baseUrl, Map<String, String> customHeaders)
String taskId,
String apiKey,
String baseUrl,
Map<String, String> customHeaders,
long timeoutSeconds)
throws ApiException, NoApiKeyException {
AsyncTaskOption serviceOption =
AsyncTaskOption.builder()
Expand All @@ -98,7 +104,23 @@ public DashScopeResult wait(
int maxWaitMilliseconds = 5 * 1000;
int incrementSteps = 3;
int step = 0;
long startTime = System.currentTimeMillis();
long timeoutMillis = timeoutSeconds > 0 ? timeoutSeconds * 1000L : -1L;
while (true) {
if (timeoutMillis > 0) {
long elapsed = System.currentTimeMillis() - startTime;
if (elapsed >= timeoutMillis) {
throw new ApiException(
Status.builder()
.statusCode(HttpURLConnection.HTTP_CLIENT_TIMEOUT)
.code("TaskWaitTimeout")
.message(
StringUtils.format(
"Waiting for task [%s] timed out after %d ms (timeoutSeconds=%d).",
taskId, elapsed, timeoutSeconds))
.build());
}
}
try {
DashScopeResult taskResult = client.send(req);
JsonObject output = (JsonObject) taskResult.getOutput();
Expand All @@ -121,9 +143,20 @@ public DashScopeResult wait(
? maxWaitMilliseconds
: waitMilliseconds * 2;
}
long sleepMs = waitMilliseconds;
if (timeoutMillis > 0) {
long remaining = timeoutMillis - (System.currentTimeMillis() - startTime);
if (remaining <= 0) {
continue;
}
if (remaining < sleepMs) {
sleepMs = remaining;
}
}
try {
Thread.sleep(waitMilliseconds);
} catch (InterruptedException ignored) {
Thread.sleep(sleepMs);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
Comment on lines 156 to 160

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When Thread.sleep(sleepMs) is interrupted, it throws an InterruptedException and clears the thread's interrupted status. In the catch block, calling Thread.currentThread().interrupt() sets the interrupted status back to true.

However, because this is inside a while (true) loop and there is no break, return, or exception thrown to exit the loop, the loop will continue to the next iteration. In the next iteration, when Thread.sleep is called again, it will immediately throw InterruptedException because the interrupted status is set. This creates an infinite loop that spins rapidly, consuming 100% CPU and repeatedly calling client.send(req) without any delay, effectively spamming the server.

To fix this, you should propagate the interruption by throwing an ApiException (which is already declared in the method signature) or breaking out of the loop.

          try {
            Thread.sleep(sleepMs);
          } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
            throw new ApiException(e);
          }

}
} catch (ApiException e) {
Expand All @@ -135,6 +168,23 @@ public DashScopeResult wait(
}
}

/**
* Wait for async task completed and return task result.
*
* @param taskId The async task id.
* @param apiKey The api-key.
* @param baseUrl The base http url.
* @param customHeaders The custom headers.
* @return The task result.
* @throws NoApiKeyException Can not find api key
* @throws ApiException The request failed, possibly due to a network or data error.
*/
public DashScopeResult wait(
String taskId, String apiKey, String baseUrl, Map<String, String> customHeaders)
throws ApiException, NoApiKeyException {
return wait(taskId, apiKey, baseUrl, customHeaders, -1);
}

/**
* Wait for async task completed and return task result.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,9 @@ public static RecognitionParamWithStream FromRecognitionParam(
if (param.getPhraseId() != null && !param.getPhraseId().isEmpty()) {
recognitionParamWithStream.setPhraseId(param.getPhraseId());
}
if (param.getInputs() != null && !param.getInputs().isEmpty()) {
recognitionParamWithStream.setInput(param.getInputs());
}

return recognitionParamWithStream;
}
Expand All @@ -103,6 +106,10 @@ public Recognition() {
duplexApi = new SynchronizeFullDuplexApi<>(serviceOption);
}

public void setWebsocketUrl(String websocketUrl) {
serviceOption.setBaseWebSocketUrl(websocketUrl);
}

public Flowable<RecognitionResult> streamCall(
RecognitionParam param, Flowable<ByteBuffer> audioFrame)
throws ApiException, NoApiKeyException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class RecognitionParam extends FullDuplexServiceParam {

private String vocabularyId;

private Map<String, Object> input;

@Override
public Map<String, Object> getParameters() {
Map<String, Object> params = new HashMap<>();
Expand All @@ -45,6 +47,11 @@ public Map<String, Object> getParameters() {
return params;
}

@Override
public Map<String, Object> getInputs() {
return input;
}

@Override
public Object getResources() {
if (phraseId == null || phraseId.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ public TranscriptionResult asyncCall(TranscriptionParam param) {
}

public TranscriptionResult wait(TranscriptionQueryParam queryParam) {
return wait(queryParam, -1);
}

public TranscriptionResult wait(TranscriptionQueryParam queryParam, long timeoutSeconds) {
try {
return TranscriptionResult.fromDashScopeResult(
asyncApi.wait(
queryParam.getTaskId(),
queryParam.getApiKey(),
baseUrl,
queryParam.getCustomHeaders()));
queryParam.getCustomHeaders(),
timeoutSeconds));
} catch (NoApiKeyException e) {
throw new ApiException(e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,13 +45,18 @@ public QwenTranscriptionResult asyncCall(QwenTranscriptionParam param) {
}

public QwenTranscriptionResult wait(QwenTranscriptionQueryParam queryParam) {
return wait(queryParam, -1);
}

public QwenTranscriptionResult wait(QwenTranscriptionQueryParam queryParam, long timeoutSeconds) {
try {
return QwenTranscriptionResult.fromDashScopeResult(
asyncApi.wait(
queryParam.getTaskId(),
queryParam.getApiKey(),
baseUrl,
queryParam.getCustomHeaders()));
queryParam.getCustomHeaders(),
timeoutSeconds));
} catch (NoApiKeyException e) {
throw new ApiException(e);
}
Expand Down
Loading