Skip to content

Commit 4e338c3

Browse files
committed
fix(agent/multimodal-dialog): fix safety issue and add close function
1 parent 30142dc commit 4e338c3

3 files changed

Lines changed: 242 additions & 191 deletions

File tree

src/main/java/com/alibaba/dashscope/multimodal/MultiModalDialog.java

Lines changed: 68 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import java.util.Queue;
2222
import java.util.UUID;
2323
import java.util.concurrent.CountDownLatch;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicBoolean;
2426
import java.util.concurrent.atomic.AtomicReference;
2527
import lombok.Builder;
2628
import lombok.Getter;
@@ -64,7 +66,9 @@ private static class AsyncCmdBuffer { // Asynchronous command buffer class
6466

6567
private final Queue<AsyncCmdBuffer> DialogBuffer = new LinkedList<>(); // Dialogue buffer queue
6668

67-
private AtomicReference<CountDownLatch> stopLatch =
69+
private final AtomicBoolean closed = new AtomicBoolean(false); // Idempotent close guard
70+
71+
private final AtomicReference<CountDownLatch> stopLatch =
6872
new AtomicReference<>(null); // Stop signal latch
6973

7074
@SuperBuilder
@@ -209,7 +213,8 @@ public void start() {
209213
},
210214
BackpressureStrategy.BUFFER);
211215

212-
stopLatch = new AtomicReference<>(new CountDownLatch(1)); // Initializes stop signal latch
216+
closed.set(false); // Reset idempotent close guard for reuse across sessions
217+
stopLatch.set(new CountDownLatch(1)); // Initializes stop signal latch
213218

214219
String preTaskId =
215220
requestParam.getTaskId() != null ? requestParam.getTaskId() : UUID.randomUUID().toString();
@@ -440,17 +445,69 @@ public void updateInfo(MultiModalRequestParam.UpdateParams updateParams) {
440445
sendTextFrame("UpdateInfo");
441446
}
442447

443-
/** Stops the MultiModalDialog. */
448+
/**
449+
* Stops the MultiModalDialog gracefully. Sends a finish message to the server and waits for the
450+
* server to acknowledge. If the server does not respond within the timeout (30s), falls back to
451+
* force close.
452+
*
453+
* <p>This method is safe to call after onError — it will not deadlock.
454+
*/
444455
public void stop() {
445-
sendFinishTaskMessage();
446-
if (stopLatch.get() != null) {
456+
boolean sent = sendFinishTaskMessage();
457+
458+
if (!sent) {
459+
// emitter was null — finish message not sent, no point waiting; force close directly
460+
close();
461+
return;
462+
}
463+
464+
CountDownLatch latch = stopLatch.get();
465+
if (latch != null) {
447466
try {
448-
stopLatch.get().await();
449-
} catch (InterruptedException ignored) {
467+
// Use timeout to prevent deadlock: if server doesn't respond in 30s, force close
468+
boolean completed = latch.await(30, TimeUnit.SECONDS);
469+
if (!completed) {
470+
log.warn("stop() timed out waiting for server acknowledgement, forcing close");
471+
close();
472+
}
473+
} catch (InterruptedException e) {
474+
close();
475+
Thread.currentThread().interrupt();
450476
}
451477
}
452478
}
453479

480+
/**
481+
* Force closes the dialog immediately without sending any message to the server. Safe to call
482+
* from any callback thread (including onError/onStopped). Does not block.
483+
*
484+
* <p>This method: - Nullifies emitter to prevent further data sending - Disposes upstream
485+
* subscription to stop sendBinaryWithRetry - Forces WebSocket cancel (non-blocking) - Sets
486+
* isClosed to short-circuit any reconnection loops in progress - Releases stopLatch to unblock
487+
* any thread waiting in stop()
488+
*/
489+
public void close() {
490+
if (!closed.compareAndSet(false, true)) {
491+
return; // already closed, skip duplicate work
492+
}
493+
// Nullify emitter to prevent further data sending
494+
synchronized (MultiModalDialog.this) {
495+
conversationEmitter = null;
496+
DialogBuffer.clear();
497+
}
498+
// Force cancel: disposes upstream subscription + cancels WebSocket (non-blocking)
499+
// Also sets isClosed=true which breaks reconnection loops in
500+
// establishWebSocketClient/sendTextWithRetry/sendBinaryWithRetry
501+
duplexApi.cancel();
502+
// Release stopLatch to prevent stop() from blocking forever.
503+
// This is critical: if close() is called from onError callback, any thread waiting
504+
// in stop() will be unblocked immediately instead of deadlocking.
505+
CountDownLatch latch = stopLatch.get();
506+
if (latch != null) {
507+
latch.countDown();
508+
}
509+
}
510+
454511
/**
455512
* Gets current dialogue state.
456513
*
@@ -532,12 +589,14 @@ private void sendTextFrame(
532589
}
533590
}
534591

535-
/** Sends stop message. */
536-
private void sendFinishTaskMessage() { // Instruction type
592+
/** Sends stop message. Returns true if the message was actually sent. */
593+
private boolean sendFinishTaskMessage() { // Instruction type
537594
synchronized (MultiModalDialog.this) { // Synchronized block ensures thread safety
538595
if (conversationEmitter != null) {
539596
conversationEmitter.onComplete(); // Ends data flow
597+
return true;
540598
}
599+
return false;
541600
}
542601
}
543602
}

0 commit comments

Comments
 (0)