Skip to content

Commit

Permalink
backend-tasks: refactor TaskWorker retry test
Browse files Browse the repository at this point in the history
Signed-off-by: Patrik Oldsberg <[email protected]>
  • Loading branch information
Rugvip committed Mar 25, 2022
1 parent e2cfd53 commit 447e870
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 28 deletions.
21 changes: 6 additions & 15 deletions packages/backend-tasks/src/tasks/TaskWorker.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import { migrateBackendTasks } from '../database/migrateBackendTasks';
import { DbTasksRow, DB_TASKS_TABLE } from '../database/tables';
import { TaskWorker } from './TaskWorker';
import { TaskSettingsV2 } from './types';
import { AbortSignal } from 'node-abort-controller';

describe('TaskWorker', () => {
const logger = getVoidLogger();
Expand Down Expand Up @@ -125,28 +124,20 @@ describe('TaskWorker', () => {
const knex = await databases.init(databaseId);
await migrateBackendTasks(knex);

const fn = jest.fn();
const promise = new Promise<void>(resolve => {
fn.mockImplementation(() => {
if (fn.mock.calls.length === 3) {
resolve();
}
throw new Error('failed');
});
});
const fn = jest.fn().mockRejectedValue(new Error('failed'));
const settings: TaskSettingsV2 = {
version: 2,
initialDelayDuration: undefined,
cadence: '* * * * * *',
timeoutAfterDuration: Duration.fromMillis(60000).toISO(),
};

const worker = new TaskWorker('task1', fn, knex, logger);
const checkFrequency = Duration.fromObject({ milliseconds: 100 });
const worker = new TaskWorker('task1', fn, knex, logger, checkFrequency);
worker.start(settings);

await promise;
expect(fn).toBeCalledTimes(3);
expect(fn).toBeCalledWith(expect.any(AbortSignal));
await waitForExpect(() => {
expect(fn).toBeCalledTimes(3);
});
},
60_000,
);
Expand Down
22 changes: 9 additions & 13 deletions packages/backend-tasks/src/tasks/TaskWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,25 +24,21 @@ import { TaskFunction, TaskSettingsV2, taskSettingsV2Schema } from './types';
import { delegateAbortController, nowPlus, sleep } from './util';
import { CronTime } from 'cron';

const WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });
const DEFAULT_WORK_CHECK_FREQUENCY = Duration.fromObject({ seconds: 5 });

/**
* Performs the actual work of a task.
*
* @private
*/
export class TaskWorker {
private readonly taskId: string;
private readonly fn: TaskFunction;
private readonly knex: Knex;
private readonly logger: Logger;

constructor(taskId: string, fn: TaskFunction, knex: Knex, logger: Logger) {
this.taskId = taskId;
this.fn = fn;
this.knex = knex;
this.logger = logger;
}
constructor(
private readonly taskId: string,
private readonly fn: TaskFunction,
private readonly knex: Knex,
private readonly logger: Logger,
private readonly workCheckFrequency: Duration = DEFAULT_WORK_CHECK_FREQUENCY,
) {}

async start(settings: TaskSettingsV2, options?: { signal?: AbortSignal }) {
try {
Expand All @@ -63,7 +59,7 @@ export class TaskWorker {
break;
}

await sleep(WORK_CHECK_FREQUENCY, options?.signal);
await sleep(this.workCheckFrequency, options?.signal);
}
this.logger.info(`Task worker finished: ${this.taskId}`);
} catch (e) {
Expand Down

0 comments on commit 447e870

Please sign in to comment.