Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions apps/code/src/main/services/cloud-task/service.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -604,6 +604,69 @@ describe("CloudTaskService", () => {
).toBe(true);
});

it("fails the watcher after exhausting the cumulative reconnect budget on clean-EOF loops", async () => {
vi.useFakeTimers();

const updates: unknown[] = [];
service.on(CloudTaskEvent.Update, (payload) => updates.push(payload));

const makeInProgressRun = () =>
createJsonResponse({
id: "run-1",
status: "in_progress",
stage: null,
output: null,
error_message: null,
branch: "main",
updated_at: "2026-01-01T00:00:00Z",
});

mockNetFetch.mockImplementation((input: string | Request) => {
const url = typeof input === "string" ? input : input.url;
if (url.includes("/session_logs/")) {
return Promise.resolve(
createJsonResponse([], 200, { "X-Has-More": "false" }),
);
}
return Promise.resolve(makeInProgressRun());
});

mockStreamFetch.mockImplementation(() =>
Promise.resolve(createSseResponse("")),
);

service.watch({
taskId: "task-1",
runId: "run-1",
apiHost: "https://app.example.com",
teamId: 2,
});

await waitFor(() => mockStreamFetch.mock.calls.length === 1);
await vi.advanceTimersByTimeAsync(60 * 60_000);

await waitFor(
() =>
updates.some(
(u) =>
typeof u === "object" &&
u !== null &&
(u as { kind?: string }).kind === "error",
),
10_000,
);

expect(updates).toContainEqual({
taskId: "task-1",
runId: "run-1",
kind: "error",
errorTitle: "Cloud run unreachable",
errorMessage:
"Could not maintain a connection to the cloud run after many attempts. Click retry once the issue is resolved.",
retryable: true,
});
});

it("emits a retryable cloud error after repeated stream failures", async () => {
vi.useFakeTimers();

Expand Down
28 changes: 25 additions & 3 deletions apps/code/src/main/services/cloud-task/service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import { type SseEvent, SseEventParser } from "./sse-parser";
const log = logger.scope("cloud-task");

const MAX_SSE_RECONNECT_ATTEMPTS = 5;
const MAX_CUMULATIVE_RECONNECT_ATTEMPTS = 30;
const SSE_RECONNECT_BASE_DELAY_MS = 2_000;
const SSE_RECONNECT_MAX_DELAY_MS = 30_000;
const SSE_HEALTHY_CONNECTION_MS = 60_000;
Expand Down Expand Up @@ -91,6 +92,7 @@ interface WatcherState {
totalEntryCount: number;
reconnectAttempts: number;
streamErrorAttempts: number;
cumulativeReconnectAttempts: number;
lastEventId: string | null;
lastStatus: TaskRunStatus | null;
lastStage: string | null;
Expand Down Expand Up @@ -283,6 +285,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {

watcher.reconnectAttempts = 0;
watcher.streamErrorAttempts = 0;
watcher.cumulativeReconnectAttempts = 0;
watcher.failed = false;
watcher.pendingLogEntries = [];
watcher.bufferedLogBatches = [];
Expand Down Expand Up @@ -406,6 +409,7 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
totalEntryCount: 0,
reconnectAttempts: 0,
streamErrorAttempts: 0,
cumulativeReconnectAttempts: 0,
lastEventId: null,
lastStatus: null,
lastStage: null,
Expand Down Expand Up @@ -760,8 +764,9 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
}

// A real data event proves the stream materialized; clear the backend-error
// budget too.
// and cumulative budgets too.
watcher.streamErrorAttempts = 0;
watcher.cumulativeReconnectAttempts = 0;

if (isTaskRunStateEvent(event.data)) {
if (this.applyTaskRunState(watcher, event.data)) {
Expand Down Expand Up @@ -1016,12 +1021,26 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {
clearTimeout(watcher.reconnectTimeoutId);
}

// Cumulative counter bounds runaway loops that clean-EOF (countAttempt=false)
// and would otherwise dodge `reconnectAttempts`.
watcher.cumulativeReconnectAttempts += 1;
const countAttempt = options.countAttempt ?? true;
if (countAttempt) {
watcher.reconnectAttempts += 1;
} else {
watcher.reconnectAttempts = 0;
}

if (
watcher.cumulativeReconnectAttempts > MAX_CUMULATIVE_RECONNECT_ATTEMPTS
) {
this.failWatcher(key, {
title: "Cloud run unreachable",
message:
"Could not maintain a connection to the cloud run after many attempts. Click retry once the issue is resolved.",
retryable: true,
});
return;
}

// The watcher fails once either budget is exhausted: transport reconnect
// failures or backend stream-error frames.
const attemptCount = Math.max(
Expand Down Expand Up @@ -1112,6 +1131,9 @@ export class CloudTaskService extends TypedEventEmitter<CloudTaskEvents> {

if (!isTerminalStatus(watcher.lastStatus) && reconnectIfNonTerminal) {
if (stateChanged) {
// Polled progress proves the run is alive — reset both budgets.
watcher.reconnectAttempts = 0;
watcher.cumulativeReconnectAttempts = 0;
Comment on lines 1133 to +1136
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 When the poll confirms the run is making progress (stateChanged=true), only cumulativeReconnectAttempts is reset while reconnectAttempts retains its prior error count. A watcher that has accumulated 5 error reconnects, then sees a poll-confirmed state change, then gets just 1 more error will immediately fail with "Cloud stream disconnected" — even though the run was confirmed alive. Reset reconnectAttempts here too so both budgets are aligned when real progress is observed.

Suggested change
if (stateChanged) {
// Polled progress proves the run is alive — don't burn cumulative budget.
watcher.cumulativeReconnectAttempts = 0;
if (stateChanged) {
// Polled progress proves the run is alive — don't burn cumulative budget.
watcher.cumulativeReconnectAttempts = 0;
watcher.reconnectAttempts = 0;
Prompt To Fix With AI
This is a comment left during a code review.
Path: apps/code/src/main/services/cloud-task/service.ts
Line: 1077-1079

Comment:
When the poll confirms the run is making progress (`stateChanged=true`), only `cumulativeReconnectAttempts` is reset while `reconnectAttempts` retains its prior error count. A watcher that has accumulated 5 error reconnects, then sees a poll-confirmed state change, then gets just 1 more error will immediately fail with "Cloud stream disconnected" — even though the run was confirmed alive. Reset `reconnectAttempts` here too so both budgets are aligned when real progress is observed.

```suggestion
      if (stateChanged) {
        // Polled progress proves the run is alive — don't burn cumulative budget.
        watcher.cumulativeReconnectAttempts = 0;
        watcher.reconnectAttempts = 0;
```

How can I resolve this? If you propose a fix, please make it concise.

this.emit(CloudTaskEvent.Update, {
taskId: watcher.taskId,
runId: watcher.runId,
Expand Down
Loading