Skip to content

fix safety issue and add asr speaker_id parameter#237

Closed
songguocola wants to merge 2 commits into
dashscope:mainfrom
songguocola:dev/audio_0701
Closed

fix safety issue and add asr speaker_id parameter#237
songguocola wants to merge 2 commits into
dashscope:mainfrom
songguocola:dev/audio_0701

Conversation

@songguocola

Copy link
Copy Markdown
Contributor

No description provided.

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces speaker identification to the audio recognition Sentence model and significantly improves the lifecycle management of MultiModalDialog and OkHttpWebSocketClient by adding graceful shutdown timeouts, immediate force-close capabilities, and proper disposal of upstream streaming subscriptions. The review feedback highlights critical concurrency and race condition issues, specifically regarding the non-thread-safe reassignment of stopLatch in MultiModalDialog, potential subscription leaks during asynchronous stream requests in both WebSocket clients, and redundant synchronization when sending the finish task message.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

sendFinishTaskMessage();
}

CountDownLatch latch = stopLatch.get();

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a concurrency issue with how stopLatch is managed. stopLatch is declared as a non-volatile, non-final AtomicReference<CountDownLatch>:

private AtomicReference<CountDownLatch> stopLatch = new AtomicReference<>(null);

However, in the start() method (line 213), it is reassigned to a completely new AtomicReference instance:

stopLatch = new AtomicReference<>(new CountDownLatch(1));

Because stopLatch is neither final nor volatile, this reassignment is not thread-safe and can lead to other threads (calling stop() or close()) reading a stale or partially published reference.

Recommendation

To fix this, make stopLatch final and use stopLatch.set(new CountDownLatch(1)) in the start() method instead of reassigning the reference.

Comment on lines +632 to +695
streamingDataDisposable =
streamingData.subscribe(
data -> {
try {
if (data instanceof String) {
JsonObject continueData =
req.getContinueMessage((String) data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof byte[]) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((byte[]) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof ByteBuffer) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((ByteBuffer) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else {
JsonObject continueData = req.getContinueMessage(data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
} catch (Throwable ex) {
log.error(
StringUtils.format(
"sendStreamData exception: %s", ex.getMessage()));
responseEmitter.onError(ex);
}
},
err -> {
log.error(StringUtils.format("Get stream data error!"));
responseEmitter.onError(err);
},
new Action() {
@Override
public void run() throws Exception {
log.debug(StringUtils.format("Stream data send completed!"));
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(req.getFinishedTaskMessage(taskId)),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a race condition between cancel() and the asynchronous subscription in sendStreamRequest(). Because sendStreamRequest() runs asynchronously via CompletableFuture.runAsync(), cancel() might be called before the subscription is established and assigned to streamingDataDisposable. If that happens, cancel() sees streamingDataDisposable as null and does not dispose it, while the background thread subsequently subscribes and leaks the active subscription.

Recommendation

To prevent this leak, assign the subscription to a local variable first, then assign it to streamingDataDisposable, and finally check isClosed.get(). If isClosed.get() is true, dispose the subscription immediately.

                Disposable d =
                    streamingData.subscribe(
                        data -> {
                          try {
                            if (data instanceof String) {
                              JsonObject continueData =
                                  req.getContinueMessage((String) data, taskId);
                              sendTextWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  JsonUtils.toJson(continueData),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            } else if (data instanceof byte[]) {
                              sendBinaryWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  ByteString.of((byte[]) data),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            } else if (data instanceof ByteBuffer) {
                              sendBinaryWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  ByteString.of((ByteBuffer) data),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            } else {
                              JsonObject continueData = req.getContinueMessage(data, taskId);
                              sendTextWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  JsonUtils.toJson(continueData),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            }
                          } catch (Throwable ex) {
                            log.error(
                                StringUtils.format(
                                    "sendStreamData exception: %s", ex.getMessage()));
                            responseEmitter.onError(ex);
                          }
                        },
                        err -> {
                          log.error(StringUtils.format("Get stream data error!"));
                          responseEmitter.onError(err);
                        },
                        new Action() {
                          @Override
                          public void run() throws Exception {
                            log.debug(StringUtils.format("Stream data send completed!"));
                            sendTextWithRetry(
                                req.getApiKey(),
                                req.isSecurityCheck(),
                                JsonUtils.toJson(req.getFinishedTaskMessage(taskId)),
                                req.getWorkspace(),
                                req.getHeaders(),
                                req.getBaseWebSocketUrl());
                          }
                        });
                streamingDataDisposable = d;
                if (isClosed.get()) {
                  d.dispose();
                }

Comment on lines +69 to +132
streamingDataDisposable =
streamingData.subscribe(
data -> {
try {
if (data instanceof String) {
JsonObject continueData =
req.getContinueMessage((String) data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof byte[]) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((byte[]) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else if (data instanceof ByteBuffer) {
sendBinaryWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
ByteString.of((ByteBuffer) data),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
} else {
JsonObject continueData = req.getContinueMessage(data, taskId);
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(continueData),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
} catch (Throwable ex) {
log.error(
StringUtils.format(
"sendStreamData exception: %s", ex.getMessage()));
responseEmitter.onError(ex);
}
},
err -> {
log.error(StringUtils.format("Get stream data error!"));
responseEmitter.onError(err);
},
new Action() {
@Override
public void run() throws Exception {
log.debug(StringUtils.format("Stream data send completed!"));
sendTextWithRetry(
req.getApiKey(),
req.isSecurityCheck(),
JsonUtils.toJson(req.getFinishedTaskMessage(taskId)),
req.getWorkspace(),
req.getHeaders(),
req.getBaseWebSocketUrl());
}
});

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

There is a race condition between cancel() and the asynchronous subscription in sendStreamRequest(). Because sendStreamRequest() runs asynchronously via CompletableFuture.runAsync(), cancel() might be called before the subscription is established and assigned to streamingDataDisposable. If that happens, cancel() sees streamingDataDisposable as null and does not dispose it, while the background thread subsequently subscribes and leaks the active subscription.

Recommendation

To prevent this leak, assign the subscription to a local variable first, then assign it to streamingDataDisposable, and finally check isClosed.get(). If isClosed.get() is true, dispose the subscription immediately.

                Disposable d =
                    streamingData.subscribe(
                        data -> {
                          try {
                            if (data instanceof String) {
                              JsonObject continueData =
                                  req.getContinueMessage((String) data, taskId);
                              sendTextWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  JsonUtils.toJson(continueData),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            } else if (data instanceof byte[]) {
                              sendBinaryWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  ByteString.of((byte[]) data),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            } else if (data instanceof ByteBuffer) {
                              sendBinaryWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  ByteString.of((ByteBuffer) data),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            } else {
                              JsonObject continueData = req.getContinueMessage(data, taskId);
                              sendTextWithRetry(
                                  req.getApiKey(),
                                  req.isSecurityCheck(),
                                  JsonUtils.toJson(continueData),
                                  req.getWorkspace(),
                                  req.getHeaders(),
                                  req.getBaseWebSocketUrl());
                            }
                          } catch (Throwable ex) { 
                            log.error(
                                StringUtils.format(
                                    "sendStreamData exception: %s", ex.getMessage()));
                            responseEmitter.onError(ex);
                          }
                        },
                        err -> {
                          log.error(StringUtils.format("Get stream data error!"));
                          responseEmitter.onError(err);
                        },
                        new Action() {
                          @Override
                          public void run() throws Exception {
                            log.debug(StringUtils.format("Stream data send completed!"));
                            sendTextWithRetry(
                                req.getApiKey(),
                                req.isSecurityCheck(),
                                JsonUtils.toJson(req.getFinishedTaskMessage(taskId)),
                                req.getWorkspace(),
                                req.getHeaders(),
                                req.getBaseWebSocketUrl());
                          }
                        });
                streamingDataDisposable = d;
                if (isClosed.get()) {
                  d.dispose();
                }

Comment on lines +454 to +460
boolean sent;
synchronized (MultiModalDialog.this) {
sent = (conversationEmitter != null);
}
if (sent) {
sendFinishTaskMessage();
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The synchronization and null-check on conversationEmitter here is redundant. The sendFinishTaskMessage() method already synchronizes on MultiModalDialog.this and safely checks if conversationEmitter is non-null before calling onComplete(). We can simplify this block to a single call to sendFinishTaskMessage().

Suggested change
boolean sent;
synchronized (MultiModalDialog.this) {
sent = (conversationEmitter != null);
}
if (sent) {
sendFinishTaskMessage();
}
sendFinishTaskMessage();

@songguocola songguocola closed this Jul 1, 2026
@songguocola songguocola deleted the dev/audio_0701 branch July 1, 2026 11:49
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant