Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/young-crabs-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@workflow/core': patch
---

Fix premature workflow suspension while replay is still propagating hydrated step results across the VM boundary.
40 changes: 40 additions & 0 deletions packages/core/e2e/e2e.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2312,4 +2312,44 @@ describe('e2e', () => {
expect(returnValue).toBe(133);
}
);

/**
* Regression test for the scheduleWhenIdle premature-suspension bug.
*
* Stresses replay with setup steps followed by a moderately wide outer Promise.all
* where each item advances through multiple sequential waves. A few
* phase-one steps lag 2-3s behind the rest of the batch, while fast items
* continue through later waves.
*
* Expected (after fix): status === 'completed', completed === 12.
* Before fix: run can fail with WorkflowRuntimeError "Unconsumed event in
* event log" for one of the next-wave steps because
* scheduleWhenIdle fires WorkflowSuspension in the gap between fast
* hydrations completing and the next useStep callback registering.
*
* PR with full context: https://github.com/vercel/workflow/pull/1961/changes#top
*/
test.skipIf(
process.env.APP_NAME !== 'nextjs-turbopack' ||
!process.env.WORKFLOW_VERCEL_ENV
)(
'scheduleWhenIdle - concurrent multi-wave workflow completes without unconsumed event error',
{ timeout: 600_000 },
async () => {
const run = await start(
await getWorkflowMetadata(
deploymentUrl,
'workflows/96_many_steps.ts',
'concurrentMultiWaveWorkflow'
),
[]
);

const returnValue = await run.returnValue;
expect(returnValue).toEqual({ totalItems: 12, completed: 12 });

const { json } = await cliInspectJson(`runs ${run.runId}`);
expect(json.status).toBe('completed');
}
);
});
22 changes: 18 additions & 4 deletions packages/core/src/async-deserialization-ordering.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import * as nanoid from 'nanoid';
import { monotonicFactory } from 'ulid';
import { afterEach, describe, expect, it, vi } from 'vitest';
import { EventsConsumer } from './events-consumer.js';
import type { WorkflowOrchestratorContext } from './private.js';
import { isVmIdle, type WorkflowOrchestratorContext } from './private.js';
import { dehydrateStepReturnValue } from './serialization.js';
import { createUseStep } from './step.js';
import { createContext } from './vm/index.js';
Expand All @@ -28,23 +28,37 @@ function setupWorkflowContext(events: Event[]): WorkflowOrchestratorContext {
});
const ulid = monotonicFactory(() => context.globalThis.Math.random());
const workflowStartedAt = context.globalThis.Date.now();
return {
const promiseQueueHolder = { current: Promise.resolve() };
const ctx: WorkflowOrchestratorContext = {
runId: 'wrun_test',
encryptionKey: undefined,
globalThis: context.globalThis,
eventsConsumer: new EventsConsumer(events, {
onUnconsumedEvent: () => {},
getPromiseQueue: () => Promise.resolve(),
getPromiseQueue: () => promiseQueueHolder.current,
isVmIdle: () => isVmIdle(ctx),
onceVmIdle: (callback) => {
ctx.vmIdleObservers.add(callback);
return () => ctx.vmIdleObservers.delete(callback);
},
}),
invocationsQueue: new Map(),
generateUlid: () => ulid(workflowStartedAt),
generateNanoid: nanoid.customRandom(nanoid.urlAlphabet, 21, (size) =>
new Uint8Array(size).map(() => 256 * context.globalThis.Math.random())
),
onWorkflowError: vi.fn(),
promiseQueue: Promise.resolve(),
get promiseQueue() {
return promiseQueueHolder.current;
},
set promiseQueue(value: Promise<void>) {
promiseQueueHolder.current = value;
},
pendingDeliveries: 0,
pendingVmWork: 0,
vmIdleObservers: new Set<() => void>(),
};
return ctx;
}

describe('async deserialization ordering', () => {
Expand Down
Loading
Loading