diff --git a/docs/jobs-queue/tasks.mdx b/docs/jobs-queue/tasks.mdx index 23adfb117bf..a2a76beadb8 100644 --- a/docs/jobs-queue/tasks.mdx +++ b/docs/jobs-queue/tasks.mdx @@ -203,3 +203,49 @@ export default buildConfig({ } }) ``` + +## Nested tasks + +You can run sub-tasks within an existing task, by using the `tasks` or `ìnlineTask` arguments passed to the task `handler` function: + + +```ts +export default buildConfig({ + // ... + jobs: { + // It is recommended to set `addParentToTaskLog` to `true` when using nested tasks, so that the parent task is included in the task log + // This allows for better observability and debugging of the task execution + addParentToTaskLog: true, + tasks: [ + { + slug: 'parentTask', + inputSchema: [ + { + name: 'text', + type: 'text' + }, + ], + handler: async ({ input, req, tasks, inlineTask }) => { + + await inlineTask('Sub Task 1', { + task: () => { + // Do something + return { + output: {}, + } + }, + }) + + await tasks.CreateSimple('Sub Task 2', { + input: { message: 'hello' }, + }) + + return { + output: {}, + } + } + } as TaskConfig<'parentTask'>, + ] + } +}) +``` diff --git a/packages/payload/src/index.ts b/packages/payload/src/index.ts index 46ff4caf845..4c19d35f42c 100644 --- a/packages/payload/src/index.ts +++ b/packages/payload/src/index.ts @@ -1304,6 +1304,7 @@ export type { JobsConfig, RunJobAccess, RunJobAccessArgs } from './queues/config export type { RunInlineTaskFunction, RunTaskFunction, + RunTaskFunctions, TaskConfig, TaskHandler, TaskHandlerArgs, diff --git a/packages/payload/src/queues/config/jobsCollection.ts b/packages/payload/src/queues/config/jobsCollection.ts index 28bbee498f3..87fcd1ffe5f 100644 --- a/packages/payload/src/queues/config/jobsCollection.ts +++ b/packages/payload/src/queues/config/jobsCollection.ts @@ -30,6 +30,70 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu }) } + const logFields: Field[] = [ + { + name: 'executedAt', + type: 'date', + required: true, + }, + { + name: 'completedAt', + type: 'date', + required: true, + }, + { + name: 'taskSlug', + type: 'select', + options: [...taskSlugs], + required: true, + }, + { + name: 'taskID', + type: 'text', + required: true, + }, + { + name: 'input', + type: 'json', + }, + { + name: 'output', + type: 'json', + }, + { + name: 'state', + type: 'radio', + options: ['failed', 'succeeded'], + required: true, + }, + { + name: 'error', + type: 'json', + admin: { + condition: (_, data) => data.state === 'failed', + }, + required: true, + }, + ] + + if (config?.jobs?.addParentToTaskLog) { + logFields.push({ + name: 'parent', + type: 'group', + fields: [ + { + name: 'taskSlug', + type: 'select', + options: [...taskSlugs], + }, + { + name: 'taskID', + type: 'text', + }, + ], + }) + } + const jobsCollection: CollectionConfig = { slug: 'payload-jobs', admin: { @@ -89,51 +153,7 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu admin: { description: 'Task execution log', }, - fields: [ - { - name: 'executedAt', - type: 'date', - required: true, - }, - { - name: 'completedAt', - type: 'date', - required: true, - }, - { - name: 'taskSlug', - type: 'select', - options: [...taskSlugs], - required: true, - }, - { - name: 'taskID', - type: 'text', - required: true, - }, - { - name: 'input', - type: 'json', - }, - { - name: 'output', - type: 'json', - }, - { - name: 'state', - type: 'radio', - options: ['failed', 'succeeded'], - required: true, - }, - { - name: 'error', - type: 'json', - admin: { - condition: (_, data) => data.state === 'failed', - }, - required: true, - }, - ], + fields: logFields, }, ], label: 'Status', @@ -204,5 +224,6 @@ export const getDefaultJobsCollection: (config: Config) => CollectionConfig | nu }, lockDocuments: false, } + return jobsCollection } diff --git a/packages/payload/src/queues/config/types/index.ts b/packages/payload/src/queues/config/types/index.ts index 31e48f0dcb9..45d39025f49 100644 --- a/packages/payload/src/queues/config/types/index.ts +++ b/packages/payload/src/queues/config/types/index.ts @@ -19,6 +19,14 @@ export type JobsConfig = { */ run?: RunJobAccess } + /** + * Adds information about the parent job to the task log. This is useful for debugging and tracking the flow of tasks. + * + * In 4.0, this will default to `true`. + * + * @default false + */ + addParentToTaskLog?: boolean /** * Determine whether or not to delete a job after it has successfully completed. */ diff --git a/packages/payload/src/queues/config/types/taskTypes.ts b/packages/payload/src/queues/config/types/taskTypes.ts index ea9a1e6127d..e225bc8bcb6 100644 --- a/packages/payload/src/queues/config/types/taskTypes.ts +++ b/packages/payload/src/queues/config/types/taskTypes.ts @@ -20,6 +20,10 @@ export type TaskHandlerArgs< TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] | TaskInputOutput, TWorkflowSlug extends keyof TypedJobs['workflows'] = string, > = { + /** + * Use this function to run a sub-task from within another task. + */ + inlineTask: RunInlineTaskFunction input: TTaskSlugOrInputOutput extends keyof TypedJobs['tasks'] ? TypedJobs['tasks'][TTaskSlugOrInputOutput]['input'] : TTaskSlugOrInputOutput extends TaskInputOutput // Check if it's actually TaskInputOutput type @@ -27,6 +31,7 @@ export type TaskHandlerArgs< : never job: RunningJob req: PayloadRequest + tasks: RunTaskFunctions } /** @@ -92,7 +97,13 @@ export type RunInlineTaskFunction = ; req: PayloadRequest }) => + task: (args: { + inlineTask: RunInlineTaskFunction + input: TTaskInput + job: RunningJob + req: PayloadRequest + tasks: RunTaskFunctions + }) => | { output: TTaskOutput state?: 'failed' | 'succeeded' diff --git a/packages/payload/src/queues/config/types/workflowTypes.ts b/packages/payload/src/queues/config/types/workflowTypes.ts index e3f10722e0f..f4baad000b0 100644 --- a/packages/payload/src/queues/config/types/workflowTypes.ts +++ b/packages/payload/src/queues/config/types/workflowTypes.ts @@ -1,5 +1,6 @@ import type { Field } from '../../../fields/config/types.js' import type { PayloadRequest, StringKeyOf, TypedCollection, TypedJobs } from '../../../index.js' +import type { TaskParent } from '../../operations/runJobs/runJob/getRunTaskFunction.js' import type { RetryConfig, RunInlineTaskFunction, @@ -18,8 +19,12 @@ export type JobLog = { * ID added by the array field when the log is saved in the database */ id?: string - input?: any - output?: any + input?: Record + output?: Record + /** + * Sub-tasks (tasks that are run within a task) will have a parent task ID + */ + parent?: TaskParent state: 'failed' | 'succeeded' taskID: string taskSlug: string diff --git a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts index 8ef3bda7c44..2db3cfc61b2 100644 --- a/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts +++ b/packages/payload/src/queues/operations/runJobs/runJob/getRunTaskFunction.ts @@ -44,6 +44,7 @@ export async function handleTaskFailed({ job, maxRetries, output, + parent, req, retriesConfig, runnerOutput, @@ -60,6 +61,7 @@ export async function handleTaskFailed({ job: BaseJob maxRetries: number output: object + parent?: TaskParent req: PayloadRequest retriesConfig: number | RetryConfig runnerOutput?: TaskHandlerResult @@ -93,6 +95,7 @@ export async function handleTaskFailed({ executedAt: executedAt.toISOString(), input, output, + parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined, state: 'failed', taskID, taskSlug, @@ -142,6 +145,11 @@ export async function handleTaskFailed({ } } +export type TaskParent = { + taskID: string + taskSlug: string +} + export const getRunTaskFunction = ( state: RunTaskFunctionState, job: BaseJob, @@ -149,6 +157,7 @@ export const getRunTaskFunction = ( req: PayloadRequest, isInline: TIsInline, updateJob: UpdateJobFunction, + parent?: TaskParent, ): TIsInline extends true ? RunInlineTaskFunction : RunTaskFunctions => { const runTask: ( taskSlug: TTaskSlug, @@ -240,6 +249,7 @@ export const getRunTaskFunction = ( completedAt: new Date().toISOString(), error: errorMessage, executedAt: executedAt.toISOString(), + parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined, state: 'failed', taskID, taskSlug, @@ -269,9 +279,17 @@ export const getRunTaskFunction = ( try { const runnerOutput = await runner({ + inlineTask: getRunTaskFunction(state, job, workflowConfig, req, true, updateJob, { + taskID, + taskSlug, + }), input, job: job as unknown as RunningJob, // TODO: Type this better req, + tasks: getRunTaskFunction(state, job, workflowConfig, req, false, updateJob, { + taskID, + taskSlug, + }), }) if (runnerOutput.state === 'failed') { @@ -281,6 +299,7 @@ export const getRunTaskFunction = ( job, maxRetries, output, + parent, req, retriesConfig: finalRetriesConfig, runnerOutput, @@ -303,6 +322,7 @@ export const getRunTaskFunction = ( job, maxRetries, output, + parent, req, retriesConfig: finalRetriesConfig, state, @@ -327,6 +347,7 @@ export const getRunTaskFunction = ( executedAt: executedAt.toISOString(), input, output, + parent: req?.payload?.config?.jobs?.addParentToTaskLog ? parent : undefined, state: 'succeeded', taskID, taskSlug, diff --git a/test/queues/config.ts b/test/queues/config.ts index 1db0c9b813e..988d86ea6b5 100644 --- a/test/queues/config.ts +++ b/test/queues/config.ts @@ -846,6 +846,142 @@ export default buildConfigWithDefaults({ }) }, } as WorkflowConfig<'retriesBackoffTest'>, + { + slug: 'subTask', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + handler: async ({ job, inlineTask }) => { + await inlineTask('create two docs', { + task: async ({ input, inlineTask }) => { + const { newSimple } = await inlineTask('create doc 1', { + task: async ({ req }) => { + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + newSimple, + }, + } + }, + }) + + const { newSimple2 } = await inlineTask('create doc 2', { + task: async ({ req }) => { + const newSimple2 = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + return { + output: { + newSimple2, + }, + } + }, + }) + return { + output: { + simpleID1: newSimple.id, + simpleID2: newSimple2.id, + }, + } + }, + input: { + message: job.input.message, + }, + }) + }, + } as WorkflowConfig<'subTask'>, + { + slug: 'subTaskFails', + inputSchema: [ + { + name: 'message', + type: 'text', + required: true, + }, + ], + retries: 3, + handler: async ({ job, inlineTask }) => { + await inlineTask('create two docs', { + task: async ({ input, inlineTask }) => { + const { newSimple } = await inlineTask('create doc 1 - succeeds', { + task: async ({ req }) => { + const newSimple = await req.payload.create({ + collection: 'simple', + req, + data: { + title: input.message, + }, + }) + + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountTask1Retried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountTask1Retried !== undefined + ? // @ts-expect-error + job.input.amountTask1Retried + 1 + : 0, + }, + }, + id: job.id, + }) + return { + output: { + newSimple, + }, + } + }, + }) + + await inlineTask('create doc 2 - fails', { + task: async ({ req }) => { + await req.payload.update({ + collection: 'payload-jobs', + data: { + input: { + ...job.input, + amountTask2Retried: + // @ts-expect-error amountRetried is new arbitrary data and not in the type + job.input.amountTask2Retried !== undefined + ? // @ts-expect-error + job.input.amountTask2Retried + 1 + : 0, + }, + }, + id: job.id, + }) + throw new Error('Failed on purpose') + }, + }) + return { + output: { + simpleID1: newSimple.id, + }, + } + }, + input: { + message: job.input.message, + }, + }) + }, + } as WorkflowConfig<'subTaskFails'>, ], }, editor: lexicalEditor(), diff --git a/test/queues/int.spec.ts b/test/queues/int.spec.ts index c458fb1e56e..03e4e20ba2d 100644 --- a/test/queues/int.spec.ts +++ b/test/queues/int.spec.ts @@ -1051,4 +1051,80 @@ describe('Queues', () => { expect(allCompletedJobs.totalDocs).toBe(1) expect((allCompletedJobs.docs[0].input as any).message).toBe('from single task 2') }) + + it('can run sub-tasks', async () => { + payload.config.jobs.deleteJobOnComplete = false + const job = await payload.jobs.queue({ + workflow: 'subTask', + input: { + message: 'hello!', + }, + }) + + await payload.jobs.run() + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(2) + expect(allSimples.docs[0].title).toBe('hello!') + expect(allSimples.docs[1].title).toBe('hello!') + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + expect(jobAfterRun.log[0].taskID).toBe('create doc 1') + //expect(jobAfterRun.log[0].parent.taskID).toBe('create two docs') + // jobAfterRun.log[0].parent should not exist + expect(jobAfterRun.log[0].parent).toBeUndefined() + + expect(jobAfterRun.log[1].taskID).toBe('create doc 2') + //expect(jobAfterRun.log[1].parent.taskID).toBe('create two docs') + expect(jobAfterRun.log[1].parent).toBeUndefined() + + expect(jobAfterRun.log[2].taskID).toBe('create two docs') + }) + + it('ensure successful sub-tasks are not retried', async () => { + payload.config.jobs.deleteJobOnComplete = false + + const job = await payload.jobs.queue({ + workflow: 'subTaskFails', + input: { + message: 'hello!', + }, + }) + + let hasJobsRemaining = true + + while (hasJobsRemaining) { + const response = await payload.jobs.run() + + if (response.noJobsRemaining) { + hasJobsRemaining = false + } + } + + const allSimples = await payload.find({ + collection: 'simple', + limit: 100, + }) + + expect(allSimples.totalDocs).toBe(1) + expect(allSimples.docs[0].title).toBe('hello!') + + const jobAfterRun = await payload.findByID({ + collection: 'payload-jobs', + id: job.id, + }) + + // @ts-expect-error + expect(jobAfterRun.input.amountTask2Retried).toBe(3) + // @ts-expect-error + expect(jobAfterRun.input.amountTask1Retried).toBe(0) + }) }) diff --git a/test/queues/payload-types.ts b/test/queues/payload-types.ts index 68d2d7a771f..00fbb61fc5a 100644 --- a/test/queues/payload-types.ts +++ b/test/queues/payload-types.ts @@ -66,6 +66,8 @@ export interface Config { inlineTaskTest: WorkflowInlineTaskTest; externalWorkflow: WorkflowExternalWorkflow; retriesBackoffTest: WorkflowRetriesBackoffTest; + subTask: WorkflowSubTask; + subTaskFails: WorkflowSubTaskFails; }; }; } @@ -221,6 +223,21 @@ export interface PayloadJob { | number | boolean | null; + parent?: { + taskSlug?: + | ( + | 'inline' + | 'UpdatePost' + | 'UpdatePostStep2' + | 'CreateSimple' + | 'CreateSimpleRetriesUndefined' + | 'CreateSimpleRetries0' + | 'CreateSimpleWithDuplicateMessage' + | 'ExternalTask' + ) + | null; + taskID?: string | null; + }; state: 'failed' | 'succeeded'; error?: | { @@ -249,6 +266,8 @@ export interface PayloadJob { | 'inlineTaskTest' | 'externalWorkflow' | 'retriesBackoffTest' + | 'subTask' + | 'subTaskFails' ) | null; taskSlug?: @@ -390,6 +409,12 @@ export interface PayloadJobsSelect { taskID?: T; input?: T; output?: T; + parent?: + | T + | { + taskSlug?: T; + taskID?: T; + }; state?: T; error?: T; id?: T; @@ -641,6 +666,24 @@ export interface WorkflowRetriesBackoffTest { message: string; }; } +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowSubTask". + */ +export interface WorkflowSubTask { + input: { + message: string; + }; +} +/** + * This interface was referenced by `Config`'s JSON-Schema + * via the `definition` "WorkflowSubTaskFails". + */ +export interface WorkflowSubTaskFails { + input: { + message: string; + }; +} /** * This interface was referenced by `Config`'s JSON-Schema * via the `definition` "auth".