Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 10 additions & 35 deletions packages/payload/src/queues/config/types/taskTypes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,25 +15,13 @@ export type TaskInputOutput = {
}
export type TaskHandlerResult<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
> =
| {
/**
* @deprecated Returning `state: 'failed'` is deprecated. Throw an error instead.
*/
errorMessage?: string
/**
* @deprecated Returning `state: 'failed'` is deprecated. Throw an error instead.
*/
state: 'failed'
}
| {
output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output']
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
? TTaskSlugOrInputOutput['output']
: never
state?: 'succeeded'
}
> = {
output: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks']
? TypedJobs['tasks'][TTaskSlugOrInputOutput]['output']
: TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type
? TTaskSlugOrInputOutput['output']
: never
}

export type TaskHandlerArgs<
TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput,
Expand Down Expand Up @@ -125,22 +113,9 @@ export type RunInlineTaskFunction = <TTaskInput extends object, TTaskOutput exte
job: Job<any>
req: PayloadRequest
tasks: RunTaskFunctions
}) => MaybePromise<
| {
/**
* @deprecated Returning `state: 'failed'` is deprecated. Throw an error instead.
*/
errorMessage?: string
/**
* @deprecated Returning `state: 'failed'` is deprecated. Throw an error instead.
*/
state: 'failed'
}
| {
output: TTaskOutput
state?: 'succeeded'
}
>
}) => MaybePromise<{
output: TTaskOutput
}>
},
) => Promise<TTaskOutput>

Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import ObjectIdImport from 'bson-objectid'

import type { Job } from '../../../../index.js'
import type { JsonObject, PayloadRequest } from '../../../../types/index.js'
import type { PayloadRequest } from '../../../../types/index.js'
import type {
RetryConfig,
RunInlineTaskFunction,
Expand Down Expand Up @@ -128,23 +128,24 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
})
}

let taskHandlerResult: TaskHandlerResult<string>
let output: JsonObject | undefined = {}
let output: TaskHandlerResult<string>['output']

try {
taskHandlerResult = await runner({
inlineTask: getRunTaskFunction(job, workflowConfig, req, true, updateJob, {
taskID,
taskSlug,
}),
input,
job: job as unknown as Job<WorkflowTypes>,
req,
tasks: getRunTaskFunction(job, workflowConfig, req, false, updateJob, {
taskID,
taskSlug,
}),
})
output = (
await runner({
inlineTask: getRunTaskFunction(job, workflowConfig, req, true, updateJob, {
taskID,
taskSlug,
}),
input,
job: job as unknown as Job<WorkflowTypes>,
req,
tasks: getRunTaskFunction(job, workflowConfig, req, false, updateJob, {
taskID,
taskSlug,
}),
})
)?.output
} catch (err: any) {
if (err instanceof JobCancelledError) {
// Re-throw JobCancelledError to be handled by the top-level error handler
Expand All @@ -166,25 +167,6 @@ export const getRunTaskFunction = <TIsInline extends boolean>(
})
}

if (taskHandlerResult.state === 'failed') {
throw new TaskError({
executedAt,
input: input!,
job,
message: taskHandlerResult.errorMessage ?? 'Task handler returned a failed state',
output,
parent,
retriesConfig: finalRetriesConfig,
taskConfig,
taskID,
taskSlug,
taskStatus,
workflowConfig,
})
} else {
output = taskHandlerResult.output
}

if (taskConfig?.onSuccess) {
await taskConfig.onSuccess({
input,
Expand Down
4 changes: 0 additions & 4 deletions test/queues/getConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import { CreateSimpleTask } from './tasks/CreateSimpleTask.js'
import { CreateSimpleWithDuplicateMessageTask } from './tasks/CreateSimpleWithDuplicateMessageTask.js'
import { DoNothingTask } from './tasks/DoNothingTask.js'
import { ExternalTask } from './tasks/ExternalTask.js'
import { ReturnCustomErrorTask } from './tasks/ReturnCustomErrorTask.js'
import { ReturnErrorTask } from './tasks/ReturnErrorTask.js'
import { SelfCancelTask } from './tasks/SelfCancelTask.js'
import { ThrowErrorTask } from './tasks/ThrowErrorTask.js'
import { UpdatePostStep2Task } from './tasks/UpdatePostStep2Task.js'
Expand Down Expand Up @@ -148,8 +146,6 @@ export const getConfig: () => Partial<Config> = () => ({
CreateSimpleWithDuplicateMessageTask,
ExternalTask,
ThrowErrorTask,
ReturnErrorTask,
ReturnCustomErrorTask,
DoNothingTask,
SelfCancelTask,
],
Expand Down
46 changes: 1 addition & 45 deletions test/queues/int.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import { afterAll, afterEach, beforeAll, beforeEach, describe, expect, it } from

import type { NextRESTClient } from '../__helpers/shared/NextRESTClient.js'

import { devUser } from '../credentials.js'
import { initPayloadInt } from '../__helpers/shared/initPayloadInt.js'
import { devUser } from '../credentials.js'
import { clearAndSeedEverything } from './seed.js'
import { waitUntilAutorunIsDone } from './utilities.js'

Expand Down Expand Up @@ -1770,50 +1770,6 @@ describe('Queues - Payload', () => {
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})

it('can tasks return error', async () => {
payload.config.jobs.deleteJobOnComplete = false

const job = await payload.jobs.queue({
task: 'ReturnError',
input: {},
})

await payload.jobs.run({ silent: true })

const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})

expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('Task handler returned a failed state')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})

it('can tasks return error with custom error message', async () => {
payload.config.jobs.deleteJobOnComplete = false

const job = await payload.jobs.queue({
task: 'ReturnCustomError',
input: {
errorMessage: 'custom error message',
},
})

await payload.jobs.run({ silent: true })

const jobAfterRun = await payload.findByID({
collection: 'payload-jobs',
id: job.id,
})

expect(jobAfterRun.hasError).toBe(true)
expect(jobAfterRun.log?.length).toBe(1)
expect(jobAfterRun?.log?.[0]?.error?.message).toBe('custom error message')
expect(jobAfterRun?.log?.[0]?.state).toBe('failed')
})

it('can reliably run workflows with parallel tasks', async () => {
if (process.env.PAYLOAD_DATABASE === 'supabase') {
// TODO: This test is flaky on supabase in CI, so we skip it for now
Expand Down
29 changes: 3 additions & 26 deletions test/queues/payload-types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -105,8 +105,6 @@ export interface Config {
CreateSimpleWithDuplicateMessage: TaskCreateSimpleWithDuplicateMessage;
ExternalTask: TaskExternalTask;
ThrowError: TaskThrowError;
ReturnError: TaskReturnError;
ReturnCustomError: TaskReturnCustomError;
DoNothingTask: TaskDoNothingTask;
SelfCancel: TaskSelfCancel;
inline: {
Expand Down Expand Up @@ -302,8 +300,6 @@ export interface PayloadJob {
| 'CreateSimpleWithDuplicateMessage'
| 'ExternalTask'
| 'ThrowError'
| 'ReturnError'
| 'ReturnCustomError'
| 'DoNothingTask'
| 'SelfCancel';
taskID: string;
Expand Down Expand Up @@ -378,8 +374,6 @@ export interface PayloadJob {
| 'CreateSimpleWithDuplicateMessage'
| 'ExternalTask'
| 'ThrowError'
| 'ReturnError'
| 'ReturnCustomError'
| 'DoNothingTask'
| 'SelfCancel'
)
Expand Down Expand Up @@ -667,24 +661,7 @@ export interface TaskThrowError {
input?: unknown;
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskReturnError".
*/
export interface TaskReturnError {
input?: unknown;
output?: unknown;
}
/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskReturnCustomError".
*/
export interface TaskReturnCustomError {
input: {
errorMessage: string;
};
output?: unknown;
}

/**
* This interface was referenced by `Config`'s JSON-Schema
* via the `definition` "TaskDoNothingTask".
Expand Down Expand Up @@ -942,6 +919,6 @@ export interface Auth {


declare module 'payload' {
// @ts-ignore
// @ts-ignore
export interface GeneratedTypes extends Config {}
}
}
12 changes: 2 additions & 10 deletions test/queues/runners/updatePost.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,7 @@ export const updatePostStep1: TaskHandler<'UpdatePost'> = async ({ req, input })
typeof input.post === 'string' || typeof input.post === 'number' ? input.post : input.post.id

if (!postID) {
return {
state: 'failed',
output: null,
}
throw new Error('No post ID provided')
}

await req.payload.update({
Expand All @@ -21,7 +18,6 @@ export const updatePostStep1: TaskHandler<'UpdatePost'> = async ({ req, input })
})

return {
state: 'succeeded',
output: {
messageTwice: input.message + input.message,
},
Expand All @@ -33,10 +29,7 @@ export const updatePostStep2: TaskHandler<'UpdatePostStep2'> = async ({ req, inp
typeof input.post === 'string' || typeof input.post === 'number' ? input.post : input.post.id

if (!postID) {
return {
state: 'failed',
output: null,
}
throw new Error('No post ID provided')
}

await req.payload.update({
Expand All @@ -49,7 +42,6 @@ export const updatePostStep2: TaskHandler<'UpdatePostStep2'> = async ({ req, inp
})

return {
state: 'succeeded',
output: null,
}
}
1 change: 0 additions & 1 deletion test/queues/tasks/DoNothingTask.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ export const DoNothingTask: TaskConfig<'DoNothingTask'> = {
outputSchema: [],
handler: async ({ input }) => {
return {
state: 'succeeded',
output: {
message: input.message,
},
Expand Down
20 changes: 0 additions & 20 deletions test/queues/tasks/ReturnCustomErrorTask.ts

This file was deleted.

13 changes: 0 additions & 13 deletions test/queues/tasks/ReturnErrorTask.ts

This file was deleted.

Loading