bug fix for multimodal-dialog agent and add parameter speaker_id in asr result #238
bug fix for multimodal-dialog agent and add parameter speaker_id in asr result #238songguocola wants to merge 2 commits into
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces graceful shutdown and cancellation mechanisms to prevent deadlocks and connection leaks, including adding a timeout to stop() and a new close() method in MultiModalDialog. It also manages streaming subscriptions using Disposable and checks the isClosed state to prevent reconnection loops in OkHttpWebSocketClient and OkHttpWebSocketClientForAudio. The review feedback highlights critical issues to address: a potential reconnection loop bug due to isClosed being reset in onClosed, a potential NullPointerException in sendTextWithRetry if the client is null, and significant code duplication in sendStreamRequest across the websocket client classes that should be refactored into a shared helper method.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| // indicate the websocket connection is established. | ||
| private AtomicBoolean isOpen = new AtomicBoolean(false); | ||
| private AtomicBoolean isClosed = new AtomicBoolean(false); | ||
| protected AtomicBoolean isClosed = new AtomicBoolean(false); |
There was a problem hiding this comment.
The isClosed flag is used to prevent reconnection loops and suppress error propagation when the client is cancelled or closed. However, in onClosed(WebSocket webSocket, int code, String reason), isClosed is reset to false via isClosed.set(false);.
If cancel() is called, it sets isClosed to true and cancels the websocket, which eventually triggers onClosed (either directly or via onFailure calling close()). When onClosed runs, it resets isClosed back to false.
If the reconnection loop in establishWebSocketClient is currently sleeping or retrying, it checks isClosed.get(). Since it was reset to false, the loop will continue to retry and reconnect, defeating the purpose of cancel().
Recommendation:
Modify onClosed so that it does not unconditionally reset isClosed to false if it was explicitly set to true by a user-initiated close or cancel.
| if (isClosed.get()) { | ||
| return; | ||
| } | ||
| log.debug("Sending message: " + message); |
There was a problem hiding this comment.
If establishWebSocketClient is interrupted during retry sleep, it catches InterruptedException, restores the interrupt status, and returns early. In this case, webSocketClient remains null.
If isClosed.get() is still false, the execution in sendTextWithRetry continues to the retry loop and attempts to call webSocketClient.send(message), which will throw a NullPointerException.
Recommendation:
Add a null check for webSocketClient before calling send to prevent potential NullPointerException when the connection establishment is interrupted or fails silently.
if (isClosed.get()) {
return;
}
if (webSocketClient == null) {
log.warn("webSocketClient is null, cannot send message.");
return;
}
log.debug("Sending message: " + message);| streamingDataDisposable = d; | ||
| if (isClosed.get()) { | ||
| d.dispose(); | ||
| } |
There was a problem hiding this comment.
The sendStreamRequest method in OkHttpWebSocketClientForAudio is almost 100% identical to the one in OkHttpWebSocketClient, duplicating a complex ~100 line block of RxJava subscription, error handling, and thread-safety logic. The only differences are the logging of the start message and the use of STREAMING_REQUEST_EXECUTOR instead of the default executor.
Recommendation:
Refactor this by extracting the core logic of sendStreamRequest into a protected void executeStreamRequest(FullDuplexRequest req) helper method in OkHttpWebSocketClient. This eliminates the massive code duplication and simplifies both classes significantly.
Example refactoring:
// In OkHttpWebSocketClient.java
protected void executeStreamRequest(FullDuplexRequest req) {
// ... core subscription and sending logic ...
}
protected CompletableFuture<Void> sendStreamRequest(FullDuplexRequest req) {
return CompletableFuture.runAsync(() -> executeStreamRequest(req));
}
// In OkHttpWebSocketClientForAudio.java
@Override
protected CompletableFuture<Void> sendStreamRequest(FullDuplexRequest req) {
return CompletableFuture.runAsync(() -> executeStreamRequest(req), STREAMING_REQUEST_EXECUTOR);
}
No description provided.