From 30142dc796a8e6703445afb38d11588d9bbb7b4c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=A5=E4=BB=99?= Date: Tue, 30 Jun 2026 17:20:44 +0800 Subject: [PATCH 1/2] feat(model/asr): support speak_id param --- .../dashscope/audio/asr/recognition/timestamp/Sentence.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java b/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java index 3a721a0..d49e795 100644 --- a/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java +++ b/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java @@ -56,6 +56,9 @@ public class Sentence { @SerializedName("sentence_end") boolean sentenceEnd; + @SerializedName("speaker_id") + String speakerId; + public static Sentence from(String message) { return JsonUtils.fromJson(message, Sentence.class); } From ac7c54286c9e1715617170d6c09813097dbf9839 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=81=A5=E4=BB=99?= Date: Tue, 30 Jun 2026 20:36:15 +0800 Subject: [PATCH 2/2] fix(agent/multimodal-dialog): fix safety issue and add close function --- .../multimodal/MultiModalDialog.java | 61 ++++++- .../okhttp/OkHttpWebSocketClient.java | 172 +++++++++++------- .../okhttp/OkHttpWebSocketClientForAudio.java | 125 ++++++------- 3 files changed, 230 insertions(+), 128 deletions(-) diff --git a/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java b/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java index 64c463b..8bdf0dc 100644 --- a/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java +++ b/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java @@ -21,6 +21,7 @@ import java.util.Queue; import java.util.UUID; import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import lombok.Builder; import lombok.Getter; @@ -440,17 +441,67 @@ public void updateInfo(MultiModalRequestParam.UpdateParams updateParams) { sendTextFrame("UpdateInfo"); } - /** Stops the MultiModalDialog. */ + /** + * Stops the MultiModalDialog gracefully. Sends a finish message to the server and waits for the + * server to acknowledge. If the server does not respond within the timeout (30s), falls back to + * force close. + * + *

This method is safe to call after onError — it will not deadlock. + */ public void stop() { - sendFinishTaskMessage(); - if (stopLatch.get() != null) { + // If emitter is already null (e.g., close() was called from onError), skip sending and just + // ensure cleanup + boolean sent; + synchronized (MultiModalDialog.this) { + sent = (conversationEmitter != null); + } + if (sent) { + sendFinishTaskMessage(); + } + + CountDownLatch latch = stopLatch.get(); + if (latch != null) { try { - stopLatch.get().await(); - } catch (InterruptedException ignored) { + // Use timeout to prevent deadlock: if server doesn't respond in 30s, force close + boolean completed = latch.await(30, TimeUnit.SECONDS); + if (!completed) { + log.warn("stop() timed out waiting for server acknowledgement, forcing close"); + close(); + } + } catch (InterruptedException e) { + close(); + Thread.currentThread().interrupt(); } } } + /** + * Force closes the dialog immediately without sending any message to the server. Safe to call + * from any callback thread (including onError/onStopped). Does not block. + * + *

This method: - Nullifies emitter to prevent further data sending - Disposes upstream + * subscription to stop sendBinaryWithRetry - Forces WebSocket cancel (non-blocking) - Sets + * isClosed to short-circuit any reconnection loops in progress - Releases stopLatch to unblock + * any thread waiting in stop() + */ + public void close() { + // Nullify emitter to prevent further data sending + synchronized (MultiModalDialog.this) { + conversationEmitter = null; + } + // Force cancel: disposes upstream subscription + cancels WebSocket (non-blocking) + // Also sets isClosed=true which breaks reconnection loops in + // establishWebSocketClient/sendTextWithRetry/sendBinaryWithRetry + duplexApi.cancel(); + // Release stopLatch to prevent stop() from blocking forever. + // This is critical: if close() is called from onError callback, any thread waiting + // in stop() will be unblocked immediately instead of deadlocking. + CountDownLatch latch = stopLatch.get(); + if (latch != null) { + latch.countDown(); + } + } + /** * Gets current dialogue state. * diff --git a/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java b/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java index 157990b..87234c6 100644 --- a/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java +++ b/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java @@ -17,6 +17,7 @@ import io.reactivex.Flowable; import io.reactivex.FlowableEmitter; import io.reactivex.Observable; +import io.reactivex.disposables.Disposable; import io.reactivex.functions.Action; import java.io.IOException; import java.nio.ByteBuffer; @@ -51,6 +52,9 @@ public class OkHttpWebSocketClient extends WebSocketListener private AtomicBoolean passTaskStarted = new AtomicBoolean(false); + // Disposable for the streaming data subscription, used to cancel upstream when stopping + protected volatile Disposable streamingDataDisposable; + public OkHttpWebSocketClient(OkHttpClient client, boolean passTaskStarted) { this.client = client; this.passTaskStarted.set(passTaskStarted); @@ -97,6 +101,13 @@ public boolean close(int code, String reason) { } public void cancel() { + // Set isClosed BEFORE cancel to suppress onFailure error propagation + isClosed.set(true); + // Dispose upstream subscription to stop sending data + Disposable d = streamingDataDisposable; + if (d != null && !d.isDisposed()) { + d.dispose(); + } if (webSocketClient != null) { webSocketClient.cancel(); } @@ -111,6 +122,11 @@ private void establishWebSocketClient( int reconnectionTimes = 0; String errorMessage = ""; while (reconnectionTimes < MAX_CONNECTION_TIMES) { + // Bail out immediately if cancel() has been called + if (isClosed.get()) { + log.debug("Connection cancelled, stop reconnecting."); + return; + } try { Flowable flowable = Flowable.create( @@ -144,9 +160,17 @@ private void establishWebSocketClient( } else if (errorMessage.contains(Constants.NO_API_KEY_ERROR)) { throw ex; } + // Check again before sleeping + if (isClosed.get()) { + log.debug("Connection cancelled during retry, stop reconnecting."); + return; + } try { Thread.sleep(10000); - } catch (InterruptedException e) {; + } catch (InterruptedException e) { + // Respect interruption - exit the loop + Thread.currentThread().interrupt(); + return; } } } @@ -379,10 +403,18 @@ protected void sendTextWithRetry( String workspace, Map customHeaders, String baseWebSocketUrl) { + // Guard: skip if already cancelled + if (isClosed.get()) { + log.debug("sendTextWithRetry skipped: connection already closed."); + return; + } // simple retry with fixed delay, no strategy if (!isOpen.get()) { establishWebSocketClient(apiKey, isSecurityCheck, workspace, customHeaders, baseWebSocketUrl); } + if (isClosed.get()) { + return; + } int maxRetries = 3; if (passTaskStarted.get()) { // when pass througn task started, no need to retry. @@ -395,6 +427,9 @@ protected void sendTextWithRetry( } int retryCount = 0; while (retryCount < maxRetries) { + if (isClosed.get()) { + return; + } log.debug("Sending message: " + message); Boolean isOk = webSocketClient.send(message); if (isOk) { @@ -418,12 +453,22 @@ protected void sendBinaryWithRetry( String workspace, Map customHeaders, String baseWebSocketUrl) { + // Guard: skip if already cancelled + if (isClosed.get()) { + return; + } if (!isOpen.get()) { establishWebSocketClient(apiKey, isSecurityCheck, workspace, customHeaders, baseWebSocketUrl); } + if (isClosed.get()) { + return; + } int maxRetries = 3; int retryCount = 0; while (retryCount < maxRetries) { + if (isClosed.get()) { + return; + } Boolean isOk = webSocketClient.send(message); if (isOk) { break; @@ -584,67 +629,70 @@ protected CompletableFuture sendStreamRequest(FullDuplexRequest req) { req.getBaseWebSocketUrl()); Flowable streamingData = req.getStreamingData(); - streamingData.subscribe( - data -> { - try { - if (data instanceof String) { - JsonObject continueData = req.getContinueMessage((String) data, taskId); - sendTextWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - JsonUtils.toJson(continueData), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } else if (data instanceof byte[]) { - sendBinaryWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - ByteString.of((byte[]) data), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } else if (data instanceof ByteBuffer) { - sendBinaryWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - ByteString.of((ByteBuffer) data), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } else { - JsonObject continueData = req.getContinueMessage(data, taskId); - sendTextWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - JsonUtils.toJson(continueData), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } - } catch (Throwable ex) { - log.error( - StringUtils.format("sendStreamData exception: %s", ex.getMessage())); - responseEmitter.onError(ex); - } - }, - err -> { - log.error(StringUtils.format("Get stream data error!")); - responseEmitter.onError(err); - }, - new Action() { - @Override - public void run() throws Exception { - log.debug(StringUtils.format("Stream data send completed!")); - sendTextWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } - }); + streamingDataDisposable = + streamingData.subscribe( + data -> { + try { + if (data instanceof String) { + JsonObject continueData = + req.getContinueMessage((String) data, taskId); + sendTextWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + JsonUtils.toJson(continueData), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } else if (data instanceof byte[]) { + sendBinaryWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + ByteString.of((byte[]) data), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } else if (data instanceof ByteBuffer) { + sendBinaryWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + ByteString.of((ByteBuffer) data), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } else { + JsonObject continueData = req.getContinueMessage(data, taskId); + sendTextWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + JsonUtils.toJson(continueData), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } + } catch (Throwable ex) { + log.error( + StringUtils.format( + "sendStreamData exception: %s", ex.getMessage())); + responseEmitter.onError(ex); + } + }, + err -> { + log.error(StringUtils.format("Get stream data error!")); + responseEmitter.onError(err); + }, + new Action() { + @Override + public void run() throws Exception { + log.debug(StringUtils.format("Stream data send completed!")); + sendTextWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } + }); } catch (Throwable ex) { log.error(StringUtils.format("sendStreamData exception: %s", ex.getMessage())); responseEmitter.onError(ex); diff --git a/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClientForAudio.java b/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClientForAudio.java index 13019f8..7b7f412 100644 --- a/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClientForAudio.java +++ b/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClientForAudio.java @@ -66,67 +66,70 @@ protected CompletableFuture sendStreamRequest(FullDuplexRequest req) { req.getBaseWebSocketUrl()); Flowable streamingData = req.getStreamingData(); - streamingData.subscribe( - data -> { - try { - if (data instanceof String) { - JsonObject continueData = req.getContinueMessage((String) data, taskId); - sendTextWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - JsonUtils.toJson(continueData), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } else if (data instanceof byte[]) { - sendBinaryWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - ByteString.of((byte[]) data), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } else if (data instanceof ByteBuffer) { - sendBinaryWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - ByteString.of((ByteBuffer) data), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } else { - JsonObject continueData = req.getContinueMessage(data, taskId); - sendTextWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - JsonUtils.toJson(continueData), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } - } catch (Throwable ex) { - log.error( - StringUtils.format("sendStreamData exception: %s", ex.getMessage())); - responseEmitter.onError(ex); - } - }, - err -> { - log.error(StringUtils.format("Get stream data error!")); - responseEmitter.onError(err); - }, - new Action() { - @Override - public void run() throws Exception { - log.debug(StringUtils.format("Stream data send completed!")); - sendTextWithRetry( - req.getApiKey(), - req.isSecurityCheck(), - JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), - req.getWorkspace(), - req.getHeaders(), - req.getBaseWebSocketUrl()); - } - }); + streamingDataDisposable = + streamingData.subscribe( + data -> { + try { + if (data instanceof String) { + JsonObject continueData = + req.getContinueMessage((String) data, taskId); + sendTextWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + JsonUtils.toJson(continueData), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } else if (data instanceof byte[]) { + sendBinaryWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + ByteString.of((byte[]) data), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } else if (data instanceof ByteBuffer) { + sendBinaryWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + ByteString.of((ByteBuffer) data), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } else { + JsonObject continueData = req.getContinueMessage(data, taskId); + sendTextWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + JsonUtils.toJson(continueData), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } + } catch (Throwable ex) { + log.error( + StringUtils.format( + "sendStreamData exception: %s", ex.getMessage())); + responseEmitter.onError(ex); + } + }, + err -> { + log.error(StringUtils.format("Get stream data error!")); + responseEmitter.onError(err); + }, + new Action() { + @Override + public void run() throws Exception { + log.debug(StringUtils.format("Stream data send completed!")); + sendTextWithRetry( + req.getApiKey(), + req.isSecurityCheck(), + JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), + req.getWorkspace(), + req.getHeaders(), + req.getBaseWebSocketUrl()); + } + }); } catch (Throwable ex) { log.error(StringUtils.format("sendStreamData exception: %s", ex.getMessage())); responseEmitter.onError(ex);