diff --git a/apps/code/src/renderer/features/sessions/service/service.test.ts b/apps/code/src/renderer/features/sessions/service/service.test.ts index 95a3ebfd2..1b7f411b4 100644 --- a/apps/code/src/renderer/features/sessions/service/service.test.ts +++ b/apps/code/src/renderer/features/sessions/service/service.test.ts @@ -2512,6 +2512,112 @@ describe("SessionService", () => { // unbounded growth on long-running cloud runs. expect(mockSessionStoreSetters.appendEvents).not.toHaveBeenCalled(); }); + + const setupReconcileLoopTest = (logContent: string) => { + const service = getSessionService(); + const existingSession = createMockSession({ + taskRunId: "run-123", + taskId: "task-123", + status: "connected", + isCloud: true, + logUrl: "https://logs.example.com/run-123", + processedLineCount: 5, + events: [], + }); + mockSessionStoreSetters.getSessionByTaskId.mockReturnValue( + existingSession, + ); + mockSessionStoreSetters.getSessions.mockReturnValue({ + "run-123": existingSession, + }); + mockTrpcLogs.readLocalLogs.query.mockResolvedValue(logContent); + mockTrpcLogs.fetchS3Logs.query.mockResolvedValue(logContent); + service.watchCloudTask( + "task-123", + "run-123", + "https://api.anthropic.com", + 123, + ); + const subscribeOptions = mockTrpcCloudTask.onUpdate.subscribe.mock + .calls[0][1] as { + onData: (update: unknown) => void; + }; + return { subscribeOptions }; + }; + + const newEntry = { + type: "notification", + timestamp: "2024-01-01T00:00:01Z", + notification: { method: "session/update" }, + }; + const validLine = JSON.stringify({ + type: "notification", + timestamp: "2024-01-01T00:00:00Z", + notification: { method: "session/update" }, + }); + + it("breaks the reconcile loop on first observation when parse failures are present", async () => { + const { subscribeOptions } = setupReconcileLoopTest( + [ + ...Array.from({ length: 8 }, () => validLine), + "}}not-json{{", + "{broken", + ].join("\n"), + ); + + subscribeOptions.onData({ + kind: "logs", + taskId: "task-123", + runId: "run-123", + totalEntryCount: 20, + newEntries: [newEntry], + }); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ processedLineCount: 20 }), + ); + }); + }); + + it("breaks the reconcile loop after a repeated stable deficiency", async () => { + const { subscribeOptions } = setupReconcileLoopTest( + Array.from({ length: 8 }, () => validLine).join("\n"), + ); + + subscribeOptions.onData({ + kind: "logs", + taskId: "task-123", + runId: "run-123", + totalEntryCount: 14, + newEntries: [newEntry], + }); + await vi.waitFor(() => { + expect(mockTrpcLogs.fetchS3Logs.query).toHaveBeenCalledTimes(1); + }); + + expect(mockSessionStoreSetters.updateSession).not.toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ processedLineCount: 14 }), + ); + + subscribeOptions.onData({ + kind: "logs", + taskId: "task-123", + runId: "run-123", + totalEntryCount: 14, + newEntries: [newEntry], + }); + + await vi.waitFor(() => { + expect(mockSessionStoreSetters.updateSession).toHaveBeenCalledWith( + "run-123", + expect.objectContaining({ processedLineCount: 14 }), + ); + }); + }); + it("flips status to connected on _posthog/run_started", async () => { const service = getSessionService(); const hydratedSession = createMockSession({ diff --git a/apps/code/src/renderer/features/sessions/service/service.ts b/apps/code/src/renderer/features/sessions/service/service.ts index 930e5f0dc..b0b38d4b5 100644 --- a/apps/code/src/renderer/features/sessions/service/service.ts +++ b/apps/code/src/renderer/features/sessions/service/service.ts @@ -208,6 +208,7 @@ interface CloudLogGapReconcileRequest { interface ParsedSessionLogs { rawEntries: StoredLogEntry[]; totalLineCount: number; + parseFailureCount: number; sessionId?: string; adapter?: Adapter; } @@ -216,6 +217,11 @@ interface CloudLogGapReconcileState { pendingRequest?: CloudLogGapReconcileRequest; } +interface CloudLogReconcileDeficiency { + expectedCount: number; + observedLineCount: number; +} + export interface ConnectParams { task: Task; repoPath: string; @@ -303,6 +309,11 @@ export class SessionService { } >(); private cloudLogGapReconciles = new Map(); + /** Last observed reconcile deficit per taskRunId — see reconcileCloudLogGapOnce. */ + private cloudLogReconcileDeficiency = new Map< + string, + CloudLogReconcileDeficiency + >(); /** Maps toolCallId → cloud requestId for routing permission responses */ private cloudPermissionRequestIds = new Map(); private idleKilledSubscription: { unsubscribe: () => void } | null = null; @@ -739,6 +750,7 @@ export class SessionService { this.unsubscribeFromChannel(taskRunId); sessionStoreSetters.removeSession(taskRunId); this.cloudRunIdleTracker.delete(taskRunId); + this.cloudLogReconcileDeficiency.delete(taskRunId); if (session) { this.localRepoPaths.delete(session.taskId); this.localRecoveryAttempts.delete(session.taskId); @@ -1118,6 +1130,7 @@ export class SessionService { this.localRecoveryAttempts.clear(); this.cloudPermissionRequestIds.clear(); this.cloudLogGapReconciles.clear(); + this.cloudLogReconcileDeficiency.clear(); this.dispatchingCloudQueues.clear(); this.scheduledCloudQueueFlushes.clear(); this.cloudRunIdleTracker.clear(); @@ -2998,6 +3011,7 @@ export class SessionService { watcher.subscription.unsubscribe(); this.cloudTaskWatchers.delete(taskId); + this.cloudLogReconcileDeficiency.delete(watcher.runId); } async preflightToLocal(taskId: string, repoPath: string) { @@ -3658,6 +3672,7 @@ export class SessionService { const rawEntries: StoredLogEntry[] = []; let sessionId: string | undefined; let adapter: Adapter | undefined; + let parseFailureCount = 0; const lines = content.trim().split("\n"); for (const line of lines) { @@ -3679,11 +3694,18 @@ export class SessionService { if (params?.adapter) adapter = params.adapter; } } catch { + parseFailureCount += 1; log.warn("Failed to parse log entry", { line }); } } - return { rawEntries, totalLineCount: lines.length, sessionId, adapter }; + return { + rawEntries, + totalLineCount: lines.length, + parseFailureCount, + sessionId, + adapter, + }; } private async fetchSessionLogs( @@ -3691,7 +3713,11 @@ export class SessionService { taskRunId?: string, options: { minEntryCount?: number } = {}, ): Promise { - const empty: ParsedSessionLogs = { rawEntries: [], totalLineCount: 0 }; + const empty: ParsedSessionLogs = { + rawEntries: [], + totalLineCount: 0, + parseFailureCount: 0, + }; if (!logUrl && !taskRunId) return empty; let localResult: ParsedSessionLogs | undefined; @@ -3811,11 +3837,10 @@ export class SessionService { newEntries, logUrl, }: CloudLogGapReconcileRequest): Promise { - const { rawEntries, totalLineCount } = await this.fetchSessionLogs( - logUrl, - taskRunId, - { minEntryCount: expectedCount }, - ); + const { rawEntries, totalLineCount, parseFailureCount } = + await this.fetchSessionLogs(logUrl, taskRunId, { + minEntryCount: expectedCount, + }); const session = sessionStoreSetters.getSessions()[taskRunId]; if (!session || session.taskId !== taskId) { return; @@ -3823,6 +3848,7 @@ export class SessionService { const latestCount = session.processedLineCount ?? 0; if (latestCount >= expectedCount) { + this.cloudLogReconcileDeficiency.delete(taskRunId); return; } @@ -3832,6 +3858,7 @@ export class SessionService { sessionStoreSetters.clearTailOptimisticItems(taskRunId); } this.cloudRunIdleTracker.delete(taskRunId); + this.cloudLogReconcileDeficiency.delete(taskRunId); sessionStoreSetters.updateSession(taskRunId, { events, isCloud: true, @@ -3842,16 +3869,48 @@ export class SessionService { return; } - // The fetched logs lag behind expectedCount and `newEntries` is the latest - // tail slice of the snapshot — appending it here would create duplicates - // and gaps in `session.events` (and bump processedLineCount past entries - // we don't actually have). Skip; the next snapshot/log update will retry - // once the source has caught up. + // Break the reconcile loop on proven corruption (parseFailureCount > 0) + // or on a stable repeat of the same deficit. Otherwise wait — likely lag. + const previous = this.cloudLogReconcileDeficiency.get(taskRunId); + const sameDeficiencyAsBefore = + previous?.expectedCount === expectedCount && + previous?.observedLineCount === totalLineCount; + + if (parseFailureCount > 0 || sameDeficiencyAsBefore) { + log.warn("Cloud task log gap unrecoverable; committing best-effort", { + taskRunId, + expectedCount, + observedLineCount: totalLineCount, + parseFailureCount, + fetchedEntries: rawEntries.length, + reason: parseFailureCount > 0 ? "parse-failure" : "stable-deficit", + }); + const events = convertStoredEntriesToEvents(rawEntries); + if (hasSessionPromptEvent(events)) { + sessionStoreSetters.clearTailOptimisticItems(taskRunId); + } + this.cloudRunIdleTracker.delete(taskRunId); + this.cloudLogReconcileDeficiency.delete(taskRunId); + sessionStoreSetters.updateSession(taskRunId, { + events, + isCloud: true, + logUrl: logUrl ?? session.logUrl, + processedLineCount: expectedCount, + }); + this.updatePromptStateFromEvents(taskRunId, events); + return; + } + + this.cloudLogReconcileDeficiency.set(taskRunId, { + expectedCount, + observedLineCount: totalLineCount, + }); log.warn("Cloud task log count inconsistency", { taskRunId, currentCount, expectedCount, fetchedCount: rawEntries.length, + parseFailureCount, entriesReceived: newEntries.length, }); }