-
Notifications
You must be signed in to change notification settings - Fork 24
bug fix for multimodal-dialog agent and add parameter speaker_id in asr result #238
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -17,6 +17,7 @@ | |
| import io.reactivex.Flowable; | ||
| import io.reactivex.FlowableEmitter; | ||
| import io.reactivex.Observable; | ||
| import io.reactivex.disposables.Disposable; | ||
| import io.reactivex.functions.Action; | ||
| import java.io.IOException; | ||
| import java.nio.ByteBuffer; | ||
|
|
@@ -40,7 +41,7 @@ public class OkHttpWebSocketClient extends WebSocketListener | |
| private WebSocket webSocketClient; | ||
| // indicate the websocket connection is established. | ||
| private AtomicBoolean isOpen = new AtomicBoolean(false); | ||
| private AtomicBoolean isClosed = new AtomicBoolean(false); | ||
| protected AtomicBoolean isClosed = new AtomicBoolean(false); | ||
| // indicate the first response is received. | ||
| protected AtomicBoolean isFirstMessage = new AtomicBoolean(false); | ||
| // used for get request response | ||
|
|
@@ -51,6 +52,9 @@ public class OkHttpWebSocketClient extends WebSocketListener | |
|
|
||
| private AtomicBoolean passTaskStarted = new AtomicBoolean(false); | ||
|
|
||
| // Disposable for the streaming data subscription, used to cancel upstream when stopping | ||
| protected volatile Disposable streamingDataDisposable; | ||
|
|
||
| public OkHttpWebSocketClient(OkHttpClient client, boolean passTaskStarted) { | ||
| this.client = client; | ||
| this.passTaskStarted.set(passTaskStarted); | ||
|
|
@@ -97,6 +101,13 @@ public boolean close(int code, String reason) { | |
| } | ||
|
|
||
| public void cancel() { | ||
| // Set isClosed BEFORE cancel to suppress onFailure error propagation | ||
| isClosed.set(true); | ||
| // Dispose upstream subscription to stop sending data | ||
| Disposable d = streamingDataDisposable; | ||
| if (d != null && !d.isDisposed()) { | ||
| d.dispose(); | ||
| } | ||
| if (webSocketClient != null) { | ||
| webSocketClient.cancel(); | ||
| } | ||
|
|
@@ -111,6 +122,11 @@ private void establishWebSocketClient( | |
| int reconnectionTimes = 0; | ||
| String errorMessage = ""; | ||
| while (reconnectionTimes < MAX_CONNECTION_TIMES) { | ||
| // Bail out immediately if cancel() has been called | ||
| if (isClosed.get()) { | ||
| log.debug("Connection cancelled, stop reconnecting."); | ||
| return; | ||
| } | ||
| try { | ||
| Flowable<DashScopeResult> flowable = | ||
| Flowable.<DashScopeResult>create( | ||
|
|
@@ -144,9 +160,17 @@ private void establishWebSocketClient( | |
| } else if (errorMessage.contains(Constants.NO_API_KEY_ERROR)) { | ||
| throw ex; | ||
| } | ||
| // Check again before sleeping | ||
| if (isClosed.get()) { | ||
| log.debug("Connection cancelled during retry, stop reconnecting."); | ||
| return; | ||
| } | ||
| try { | ||
| Thread.sleep(10000); | ||
| } catch (InterruptedException e) {; | ||
| } catch (InterruptedException e) { | ||
| // Respect interruption - exit the loop | ||
| Thread.currentThread().interrupt(); | ||
| return; | ||
| } | ||
| } | ||
| } | ||
|
|
@@ -379,10 +403,18 @@ protected void sendTextWithRetry( | |
| String workspace, | ||
| Map<String, String> customHeaders, | ||
| String baseWebSocketUrl) { | ||
| // Guard: skip if already cancelled | ||
| if (isClosed.get()) { | ||
| log.debug("sendTextWithRetry skipped: connection already closed."); | ||
| return; | ||
| } | ||
| // simple retry with fixed delay, no strategy | ||
| if (!isOpen.get()) { | ||
| establishWebSocketClient(apiKey, isSecurityCheck, workspace, customHeaders, baseWebSocketUrl); | ||
| } | ||
| if (isClosed.get()) { | ||
| return; | ||
| } | ||
| int maxRetries = 3; | ||
| if (passTaskStarted.get()) { | ||
| // when pass througn task started, no need to retry. | ||
|
|
@@ -395,6 +427,9 @@ protected void sendTextWithRetry( | |
| } | ||
| int retryCount = 0; | ||
| while (retryCount < maxRetries) { | ||
| if (isClosed.get()) { | ||
| return; | ||
| } | ||
| log.debug("Sending message: " + message); | ||
|
Comment on lines
+430
to
433
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If If Recommendation: if (isClosed.get()) {
return;
}
if (webSocketClient == null) {
log.warn("webSocketClient is null, cannot send message.");
return;
}
log.debug("Sending message: " + message); |
||
| Boolean isOk = webSocketClient.send(message); | ||
| if (isOk) { | ||
|
|
@@ -418,12 +453,22 @@ protected void sendBinaryWithRetry( | |
| String workspace, | ||
| Map<String, String> customHeaders, | ||
| String baseWebSocketUrl) { | ||
| // Guard: skip if already cancelled | ||
| if (isClosed.get()) { | ||
| return; | ||
| } | ||
| if (!isOpen.get()) { | ||
| establishWebSocketClient(apiKey, isSecurityCheck, workspace, customHeaders, baseWebSocketUrl); | ||
| } | ||
| if (isClosed.get()) { | ||
| return; | ||
| } | ||
| int maxRetries = 3; | ||
| int retryCount = 0; | ||
| while (retryCount < maxRetries) { | ||
| if (isClosed.get()) { | ||
| return; | ||
| } | ||
| Boolean isOk = webSocketClient.send(message); | ||
| if (isOk) { | ||
| break; | ||
|
|
@@ -584,67 +629,77 @@ protected CompletableFuture<Void> sendStreamRequest(FullDuplexRequest req) { | |
| req.getBaseWebSocketUrl()); | ||
|
|
||
| Flowable<Object> streamingData = req.getStreamingData(); | ||
| streamingData.subscribe( | ||
| data -> { | ||
| try { | ||
| if (data instanceof String) { | ||
| JsonObject continueData = req.getContinueMessage((String) data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof byte[]) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((byte[]) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof ByteBuffer) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((ByteBuffer) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else { | ||
| JsonObject continueData = req.getContinueMessage(data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| } catch (Throwable ex) { | ||
| log.error( | ||
| StringUtils.format("sendStreamData exception: %s", ex.getMessage())); | ||
| responseEmitter.onError(ex); | ||
| } | ||
| }, | ||
| err -> { | ||
| log.error(StringUtils.format("Get stream data error!")); | ||
| responseEmitter.onError(err); | ||
| }, | ||
| new Action() { | ||
| @Override | ||
| public void run() throws Exception { | ||
| log.debug(StringUtils.format("Stream data send completed!")); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| }); | ||
| Disposable d = | ||
| streamingData.subscribe( | ||
| data -> { | ||
| try { | ||
| if (data instanceof String) { | ||
| JsonObject continueData = | ||
| req.getContinueMessage((String) data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof byte[]) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((byte[]) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else if (data instanceof ByteBuffer) { | ||
| sendBinaryWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| ByteString.of((ByteBuffer) data), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } else { | ||
| JsonObject continueData = req.getContinueMessage(data, taskId); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(continueData), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| } catch (Throwable ex) { | ||
| log.error( | ||
| StringUtils.format( | ||
| "sendStreamData exception: %s", ex.getMessage())); | ||
| responseEmitter.onError(ex); | ||
| } | ||
| }, | ||
| err -> { | ||
| log.error(StringUtils.format("Get stream data error!")); | ||
| responseEmitter.onError(err); | ||
| }, | ||
| new Action() { | ||
| @Override | ||
| public void run() throws Exception { | ||
| log.debug(StringUtils.format("Stream data send completed!")); | ||
| sendTextWithRetry( | ||
| req.getApiKey(), | ||
| req.isSecurityCheck(), | ||
| JsonUtils.toJson(req.getFinishedTaskMessage(taskId)), | ||
| req.getWorkspace(), | ||
| req.getHeaders(), | ||
| req.getBaseWebSocketUrl()); | ||
| } | ||
| }); | ||
| // Publish the disposable, then check if cancel() raced ahead. | ||
| // If isClosed is already true, cancel() has already run and missed | ||
| // this disposable, so we must dispose it ourselves. | ||
| streamingDataDisposable = d; | ||
| if (isClosed.get()) { | ||
| d.dispose(); | ||
| } | ||
| } catch (Throwable ex) { | ||
| log.error(StringUtils.format("sendStreamData exception: %s", ex.getMessage())); | ||
| responseEmitter.onError(ex); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
isClosedflag is used to prevent reconnection loops and suppress error propagation when the client is cancelled or closed. However, inonClosed(WebSocket webSocket, int code, String reason),isClosedis reset tofalseviaisClosed.set(false);.If
cancel()is called, it setsisClosedtotrueand cancels the websocket, which eventually triggersonClosed(either directly or viaonFailurecallingclose()). WhenonClosedruns, it resetsisClosedback tofalse.If the reconnection loop in
establishWebSocketClientis currently sleeping or retrying, it checksisClosed.get(). Since it was reset tofalse, the loop will continue to retry and reconnect, defeating the purpose ofcancel().Recommendation:
Modify
onClosedso that it does not unconditionally resetisClosedtofalseif it was explicitly set totrueby a user-initiated close or cancel.