diff --git a/packages/backend-tasks/src/tasks/TaskWorker.test.ts b/packages/backend-tasks/src/tasks/TaskWorker.test.ts index 4cdd3e973ee10..f235c58973d9b 100644 --- a/packages/backend-tasks/src/tasks/TaskWorker.test.ts +++ b/packages/backend-tasks/src/tasks/TaskWorker.test.ts @@ -22,7 +22,6 @@ import { migrateBackendTasks } from '../database/migrateBackendTasks'; import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables'; import { TaskWorker } from './TaskWorker'; import { TaskSettingsV2 } from './types'; -import { AbortSignal } from 'node-abort-controller'; describe('TaskWorker', () => { const logger = getVoidLogger(); @@ -125,28 +124,20 @@ describe('TaskWorker', () => { const knex = await databases.init(databaseId); await migrateBackendTasks(knex); - const fn = jest.fn(); - const promise = new Promise(resolve => { - fn.mockImplementation(() => { - if (fn.mock.calls.length === 3) { - resolve(); - } - throw new Error('failed'); - }); - }); + const fn = jest.fn().mockRejectedValue(new Error('failed')); const settings: TaskSettingsV2 = { version: 2, initialDelayDuration: undefined, cadence: '* * * * * *', timeoutAfterDuration: Duration.fromMillis(60000).toISO(), }; - - const worker = new TaskWorker('task1', fn, knex, logger); + const checkFrequency = Duration.fromObject({ milliseconds: 100 }); + const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency); worker.start(settings); - await promise; - expect(fn).toBeCalledTimes(3); - expect(fn).toBeCalledWith(expect.any(AbortSignal)); + await waitForExpect(() => { + expect(fn).toBeCalledTimes(3); + }); }, 60_000, ); diff --git a/packages/backend-tasks/src/tasks/TaskWorker.ts b/packages/backend-tasks/src/tasks/TaskWorker.ts index 84f45a7fe6b46..edec453a69a27 100644 --- a/packages/backend-tasks/src/tasks/TaskWorker.ts +++ b/packages/backend-tasks/src/tasks/TaskWorker.ts @@ -24,7 +24,7 @@ import { TaskFunction, TaskSettingsV2, taskSettingsV2Schema } from './types'; import { delegateAbortController, nowPlus, sleep } from './util'; import { CronTime } from 'cron'; -const WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 }); +const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 }); /** * Performs the actual work of a task. @@ -32,17 +32,13 @@ const WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 }); * @private */ export class TaskWorker { - private readonly taskId: string; - private readonly fn: TaskFunction; - private readonly knex: Knex; - private readonly logger: Logger; - - constructor(taskId: string, fn: TaskFunction, knex: Knex, logger: Logger) { - this.taskId = taskId; - this.fn = fn; - this.knex = knex; - this.logger = logger; - } + constructor( + private readonly taskId: string, + private readonly fn: TaskFunction, + private readonly knex: Knex, + private readonly logger: Logger, + private readonly workCheckFrequency: Duration = DEFAULT_WORK_CHECK_FREQUENCY, + ) {} async start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) { try { @@ -63,7 +59,7 @@ export class TaskWorker { break; } - await sleep(WORK_CHECK_FREQUENCY, options?.signal); + await sleep(this.workCheckFrequency, options?.signal); } this.logger.info(`Task worker finished: ${this.taskId}`); } catch (e) {