fix safety issue and add asr speaker_id parameter#237
Conversation
There was a problem hiding this comment.
Code Review
This pull request introduces speaker identification to the audio recognition Sentence model and significantly improves the lifecycle management of MultiModalDialog and OkHttpWebSocketClient by adding graceful shutdown timeouts, immediate force-close capabilities, and proper disposal of upstream streaming subscriptions. The review feedback highlights critical concurrency and race condition issues, specifically regarding the non-thread-safe reassignment of stopLatch in MultiModalDialog, potential subscription leaks during asynchronous stream requests in both WebSocket clients, and redundant synchronization when sending the finish task message.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| sendFinishTaskMessage(); | ||
| } | ||
|
|
||
| CountDownLatch latch = stopLatch.get(); |
There was a problem hiding this comment.
There is a concurrency issue with how stopLatch is managed. stopLatch is declared as a non-volatile, non-final AtomicReference<CountDownLatch>:
private AtomicReference<CountDownLatch> stopLatch = new AtomicReference<>(null);However, in the start() method (line 213), it is reassigned to a completely new AtomicReference instance:
stopLatch = new AtomicReference<>(new CountDownLatch(1));Because stopLatch is neither final nor volatile, this reassignment is not thread-safe and can lead to other threads (calling stop() or close()) reading a stale or partially published reference.
Recommendation
To fix this, make stopLatch final and use stopLatch.set(new CountDownLatch(1)) in the start() method instead of reassigning the reference.
| streamingDataDisposable = | ||
| streamingData.subscribe( | ||
| data -> { | ||
| try { | ||
| if (data instanceof String) { | ||
| JsonObject continueData = | ||
| req.getContinueMessage((String) data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof byte[]) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((byte[]) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof ByteBuffer) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((ByteBuffer) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else { | ||
| JsonObject continueData = req.getContinueMessage(data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| } catch (Throwable ex) { | ||
| log.error( | ||
| StringUtils.format( | ||
| "sendStreamData exception: %s", ex.getMessage())); | ||
| responseEmitter.onError(ex); | ||
| } | ||
| }, | ||
| err -> { | ||
| log.error(StringUtils.format("Get stream data error!")); | ||
| responseEmitter.onError(err); | ||
| }, | ||
| new Action() { | ||
| @Override | ||
| public void run() throws Exception { | ||
| log.debug(StringUtils.format("Stream data send completed!")); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| }); |
There was a problem hiding this comment.
There is a race condition between cancel() and the asynchronous subscription in sendStreamRequest(). Because sendStreamRequest() runs asynchronously via CompletableFuture.runAsync(), cancel() might be called before the subscription is established and assigned to streamingDataDisposable. If that happens, cancel() sees streamingDataDisposable as null and does not dispose it, while the background thread subsequently subscribes and leaks the active subscription.
Recommendation
To prevent this leak, assign the subscription to a local variable first, then assign it to streamingDataDisposable, and finally check isClosed.get(). If isClosed.get() is true, dispose the subscription immediately.
Disposable d =
streamingData.subscribe(
data -> {
try {
if (data instanceof String) {
JsonObject continueData =
req.getContinueMessage((String) data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof byte[]) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((byte[]) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof ByteBuffer) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((ByteBuffer) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else {
JsonObject continueData = req.getContinueMessage(data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
} catch (Throwable ex) {
log.error(
StringUtils.format(
"sendStreamData exception: %s", ex.getMessage()));
responseEmitter.onError(ex);
}
},
err -> {
log.error(StringUtils.format("Get stream data error!"));
responseEmitter.onError(err);
},
new Action() {
@Override
public void run() throws Exception {
log.debug(StringUtils.format("Stream data send completed!"));
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(req.getFinishedTaskMessage(taskId)),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
});
streamingDataDisposable = d;
if (isClosed.get()) {
d.dispose();
}| streamingDataDisposable = | ||
| streamingData.subscribe( | ||
| data -> { | ||
| try { | ||
| if (data instanceof String) { | ||
| JsonObject continueData = | ||
| req.getContinueMessage((String) data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof byte[]) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((byte[]) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof ByteBuffer) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((ByteBuffer) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else { | ||
| JsonObject continueData = req.getContinueMessage(data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| } catch (Throwable ex) { | ||
| log.error( | ||
| StringUtils.format( | ||
| "sendStreamData exception: %s", ex.getMessage())); | ||
| responseEmitter.onError(ex); | ||
| } | ||
| }, | ||
| err -> { | ||
| log.error(StringUtils.format("Get stream data error!")); | ||
| responseEmitter.onError(err); | ||
| }, | ||
| new Action() { | ||
| @Override | ||
| public void run() throws Exception { | ||
| log.debug(StringUtils.format("Stream data send completed!")); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| }); |
There was a problem hiding this comment.
There is a race condition between cancel() and the asynchronous subscription in sendStreamRequest(). Because sendStreamRequest() runs asynchronously via CompletableFuture.runAsync(), cancel() might be called before the subscription is established and assigned to streamingDataDisposable. If that happens, cancel() sees streamingDataDisposable as null and does not dispose it, while the background thread subsequently subscribes and leaks the active subscription.
Recommendation
To prevent this leak, assign the subscription to a local variable first, then assign it to streamingDataDisposable, and finally check isClosed.get(). If isClosed.get() is true, dispose the subscription immediately.
Disposable d =
streamingData.subscribe(
data -> {
try {
if (data instanceof String) {
JsonObject continueData =
req.getContinueMessage((String) data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof byte[]) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((byte[]) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof ByteBuffer) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((ByteBuffer) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else {
JsonObject continueData = req.getContinueMessage(data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
} catch (Throwable ex) {
log.error(
StringUtils.format(
"sendStreamData exception: %s", ex.getMessage()));
responseEmitter.onError(ex);
}
},
err -> {
log.error(StringUtils.format("Get stream data error!"));
responseEmitter.onError(err);
},
new Action() {
@Override
public void run() throws Exception {
log.debug(StringUtils.format("Stream data send completed!"));
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(req.getFinishedTaskMessage(taskId)),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
});
streamingDataDisposable = d;
if (isClosed.get()) {
d.dispose();
}| boolean sent; | ||
| synchronized (MultiModalDialog.this) { | ||
| sent = (conversationEmitter != null); | ||
| } | ||
| if (sent) { | ||
| sendFinishTaskMessage(); | ||
| } |
There was a problem hiding this comment.
The synchronization and null-check on conversationEmitter here is redundant. The sendFinishTaskMessage() method already synchronizes on MultiModalDialog.this and safely checks if conversationEmitter is non-null before calling onComplete(). We can simplify this block to a single call to sendFinishTaskMessage().
| boolean sent; | |
| synchronized (MultiModalDialog.this) { | |
| sent = (conversationEmitter != null); | |
| } | |
| if (sent) { | |
| sendFinishTaskMessage(); | |
| } | |
| sendFinishTaskMessage(); |
No description provided.