Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@ public class Sentence {
@SerializedName("sentence_end")
boolean sentenceEnd;

@SerializedName("speaker_id")
String speakerId;

public static Sentence from(String message) {
return JsonUtils.fromJson(message, Sentence.class);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.util.Queue;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Builder;
import lombok.Getter;
Expand Down Expand Up @@ -64,7 +66,9 @@ private static class AsyncCmdBuffer { // Asynchronous command buffer class

private final Queue<AsyncCmdBuffer> DialogBuffer = new LinkedList<>(); // Dialogue buffer queue

private AtomicReference<CountDownLatch> stopLatch =
private final AtomicBoolean closed = new AtomicBoolean(false); // Idempotent close guard

private final AtomicReference<CountDownLatch> stopLatch =
new AtomicReference<>(null); // Stop signal latch

@SuperBuilder
Expand Down Expand Up @@ -209,7 +213,7 @@ public void start() {
},
BackpressureStrategy.BUFFER);

stopLatch = new AtomicReference<>(new CountDownLatch(1)); // Initializes stop signal latch
stopLatch.set(new CountDownLatch(1)); // Initializes stop signal latch

String preTaskId =
requestParam.getTaskId() != null ? requestParam.getTaskId() : UUID.randomUUID().toString();
Expand Down Expand Up @@ -440,17 +444,68 @@ public void updateInfo(MultiModalRequestParam.UpdateParams updateParams) {
sendTextFrame("UpdateInfo");
}

/** Stops the MultiModalDialog. */
/**
* Stops the MultiModalDialog gracefully. Sends a finish message to the server and waits for the
* server to acknowledge. If the server does not respond within the timeout (30s), falls back to
* force close.
*
* <p>This method is safe to call after onError — it will not deadlock.
*/
public void stop() {
sendFinishTaskMessage();
if (stopLatch.get() != null) {
boolean sent = sendFinishTaskMessage();

if (!sent) {
// emitter was null — finish message not sent, no point waiting; force close directly
close();
return;
}

CountDownLatch latch = stopLatch.get();
if (latch != null) {
try {
stopLatch.get().await();
} catch (InterruptedException ignored) {
// Use timeout to prevent deadlock: if server doesn't respond in 30s, force close
boolean completed = latch.await(30, TimeUnit.SECONDS);
if (!completed) {
log.warn("stop() timed out waiting for server acknowledgement, forcing close");
close();
}
} catch (InterruptedException e) {
close();
Thread.currentThread().interrupt();
}
}
}

/**
* Force closes the dialog immediately without sending any message to the server. Safe to call
* from any callback thread (including onError/onStopped). Does not block.
*
* <p>This method: - Nullifies emitter to prevent further data sending - Disposes upstream
* subscription to stop sendBinaryWithRetry - Forces WebSocket cancel (non-blocking) - Sets
* isClosed to short-circuit any reconnection loops in progress - Releases stopLatch to unblock
* any thread waiting in stop()
*/
public void close() {
if (!closed.compareAndSet(false, true)) {
return; // already closed, skip duplicate work
}
// Nullify emitter to prevent further data sending
synchronized (MultiModalDialog.this) {
conversationEmitter = null;
}
Comment on lines +492 to +495

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

When close() is called, the dialog session is being terminated. Any pending audio or text frames currently buffered in DialogBuffer will never be sent. To prevent memory leaks and release references to potentially large ByteBuffer objects immediately, we should clear DialogBuffer inside the synchronized block of close().

Suggested change
// Nullify emitter to prevent further data sending
synchronized (MultiModalDialog.this) {
conversationEmitter = null;
}
// Nullify emitter and clear buffer to prevent further data sending and release memory
synchronized (MultiModalDialog.this) {
conversationEmitter = null;
DialogBuffer.clear();
}

// Force cancel: disposes upstream subscription + cancels WebSocket (non-blocking)
// Also sets isClosed=true which breaks reconnection loops in
// establishWebSocketClient/sendTextWithRetry/sendBinaryWithRetry
duplexApi.cancel();
// Release stopLatch to prevent stop() from blocking forever.
// This is critical: if close() is called from onError callback, any thread waiting
// in stop() will be unblocked immediately instead of deadlocking.
CountDownLatch latch = stopLatch.get();
if (latch != null) {
latch.countDown();
}
}

/**
* Gets current dialogue state.
*
Expand Down Expand Up @@ -532,12 +587,14 @@ private void sendTextFrame(
}
}

/** Sends stop message. */
private void sendFinishTaskMessage() { // Instruction type
/** Sends stop message. Returns true if the message was actually sent. */
private boolean sendFinishTaskMessage() { // Instruction type
synchronized (MultiModalDialog.this) { // Synchronized block ensures thread safety
if (conversationEmitter != null) {
conversationEmitter.onComplete(); // Ends data flow
return true;
}
return false;
}
}
}
Loading
Loading