diff --git a/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java b/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java
index 3a721a0..d49e795 100644
--- a/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java
+++ b/src/main/java/com/alibaba/dashscope/audio/asr/recognition/timestamp/Sentence.java
@@ -56,6 +56,9 @@ public class Sentence {
@SerializedName("sentence_end")
boolean sentenceEnd;
+ @SerializedName("speaker_id")
+ String speakerId;
+
public static Sentence from(String message) {
return JsonUtils.fromJson(message, Sentence.class);
}
diff --git a/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java b/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java
index 64c463b..8bdf0dc 100644
--- a/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java
+++ b/src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java
@@ -21,6 +21,7 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Builder;
import lombok.Getter;
@@ -440,17 +441,67 @@ public void updateInfo(MultiModalRequestParam.UpdateParams updateParams) {
sendTextFrame("UpdateInfo");
}
- /** Stops the MultiModalDialog. */
+ /**
+ * Stops the MultiModalDialog gracefully. Sends a finish message to the server and waits for the
+ * server to acknowledge. If the server does not respond within the timeout (30s), falls back to
+ * force close.
+ *
+ *
This method is safe to call after onError — it will not deadlock.
+ */
public void stop() {
- sendFinishTaskMessage();
- if (stopLatch.get() != null) {
+ // If emitter is already null (e.g., close() was called from onError), skip sending and just
+ // ensure cleanup
+ boolean sent;
+ synchronized (MultiModalDialog.this) {
+ sent = (conversationEmitter != null);
+ }
+ if (sent) {
+ sendFinishTaskMessage();
+ }
+
+ CountDownLatch latch = stopLatch.get();
+ if (latch != null) {
try {
- stopLatch.get().await();
- } catch (InterruptedException ignored) {
+ // Use timeout to prevent deadlock: if server doesn't respond in 30s, force close
+ boolean completed = latch.await(30, TimeUnit.SECONDS);
+ if (!completed) {
+ log.warn("stop() timed out waiting for server acknowledgement, forcing close");
+ close();
+ }
+ } catch (InterruptedException e) {
+ close();
+ Thread.currentThread().interrupt();
}
}
}
+ /**
+ * Force closes the dialog immediately without sending any message to the server. Safe to call
+ * from any callback thread (including onError/onStopped). Does not block.
+ *
+ *
This method: - Nullifies emitter to prevent further data sending - Disposes upstream
+ * subscription to stop sendBinaryWithRetry - Forces WebSocket cancel (non-blocking) - Sets
+ * isClosed to short-circuit any reconnection loops in progress - Releases stopLatch to unblock
+ * any thread waiting in stop()
+ */
+ public void close() {
+ // Nullify emitter to prevent further data sending
+ synchronized (MultiModalDialog.this) {
+ conversationEmitter = null;
+ }
+ // Force cancel: disposes upstream subscription + cancels WebSocket (non-blocking)
+ // Also sets isClosed=true which breaks reconnection loops in
+ // establishWebSocketClient/sendTextWithRetry/sendBinaryWithRetry
+ duplexApi.cancel();
+ // Release stopLatch to prevent stop() from blocking forever.
+ // This is critical: if close() is called from onError callback, any thread waiting
+ // in stop() will be unblocked immediately instead of deadlocking.
+ CountDownLatch latch = stopLatch.get();
+ if (latch != null) {
+ latch.countDown();
+ }
+ }
+
/**
* Gets current dialogue state.
*
diff --git a/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java b/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java
index 157990b..87234c6 100644
--- a/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java
+++ b/src/main/java/com/alibaba/dashscope/protocol/okhttp/OkHttpWebSocketClient.java
@@ -17,6 +17,7 @@
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.Observable;
+import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Action;
import java.io.IOException;
import java.nio.ByteBuffer;
@@ -51,6 +52,9 @@ public class OkHttpWebSocketClient extends WebSocketListener
private AtomicBoolean passTaskStarted = new AtomicBoolean(false);
+ // Disposable for the streaming data subscription, used to cancel upstream when stopping
+ protected volatile Disposable streamingDataDisposable;
+
public OkHttpWebSocketClient(OkHttpClient client, boolean passTaskStarted) {
this.client = client;
this.passTaskStarted.set(passTaskStarted);
@@ -97,6 +101,13 @@ public boolean close(int code, String reason) {
}
public void cancel() {
+ // Set isClosed BEFORE cancel to suppress onFailure error propagation
+ isClosed.set(true);
+ // Dispose upstream subscription to stop sending data
+ Disposable d = streamingDataDisposable;
+ if (d != null && !d.isDisposed()) {
+ d.dispose();
+ }
if (webSocketClient != null) {
webSocketClient.cancel();
}
@@ -111,6 +122,11 @@ private void establishWebSocketClient(
int reconnectionTimes = 0;
String errorMessage = "";
while (reconnectionTimes < MAX_CONNECTION_TIMES) {
+ // Bail out immediately if cancel() has been called
+ if (isClosed.get()) {
+ log.debug("Connection cancelled, stop reconnecting.");
+ return;
+ }
try {
Flowable flowable =
Flowable.create(
@@ -144,9 +160,17 @@ private void establishWebSocketClient(
} else if (errorMessage.contains(Constants.NO_API_KEY_ERROR)) {
throw ex;
}
+ // Check again before sleeping
+ if (isClosed.get()) {
+ log.debug("Connection cancelled during retry, stop reconnecting.");
+ return;
+ }
try {
Thread.sleep(10000);
- } catch (InterruptedException e) {;
+ } catch (InterruptedException e) {
+ // Respect interruption - exit the loop
+ Thread.currentThread().interrupt();
+ return;
}
}
}
@@ -379,10 +403,18 @@ protected void sendTextWithRetry(
String workspace,
Map customHeaders,
String baseWebSocketUrl) {
+ // Guard: skip if already cancelled
+ if (isClosed.get()) {
+ log.debug("sendTextWithRetry skipped: connection already closed.");
+ return;
+ }
// simple retry with fixed delay, no strategy
if (!isOpen.get()) {
establishWebSocketClient(apiKey, isSecurityCheck, workspace, customHeaders, baseWebSocketUrl);
}
+ if (isClosed.get()) {
+ return;
+ }
int maxRetries = 3;
if (passTaskStarted.get()) {
// when pass througn task started, no need to retry.
@@ -395,6 +427,9 @@ protected void sendTextWithRetry(
}
int retryCount = 0;
while (retryCount < maxRetries) {
+ if (isClosed.get()) {
+ return;
+ }
log.debug("Sending message: " + message);
Boolean isOk = webSocketClient.send(message);
if (isOk) {
@@ -418,12 +453,22 @@ protected void sendBinaryWithRetry(
String workspace,
Map customHeaders,
String baseWebSocketUrl) {
+ // Guard: skip if already cancelled
+ if (isClosed.get()) {
+ return;
+ }
if (!isOpen.get()) {
establishWebSocketClient(apiKey, isSecurityCheck, workspace, customHeaders, baseWebSocketUrl);
}
+ if (isClosed.get()) {
+ return;
+ }
int maxRetries = 3;
int retryCount = 0;
while (retryCount < maxRetries) {
+ if (isClosed.get()) {
+ return;
+ }
Boolean isOk = webSocketClient.send(message);
if (isOk) {
break;
@@ -584,67 +629,70 @@ protected CompletableFuture sendStreamRequest(FullDuplexRequest req) {
req.getBaseWebSocketUrl());
Flowable