diff --git a/migrations/1725039927416_job-retry-after.ts b/migrations/1725039927416_job-retry-after.ts new file mode 100644 index 00000000..3adefafd --- /dev/null +++ b/migrations/1725039927416_job-retry-after.ts @@ -0,0 +1,12 @@ +/* eslint-disable @typescript-eslint/naming-convention */ +import { MigrationBuilder, ColumnDefinitions } from 'node-pg-migrate'; + +export const shorthands: ColumnDefinitions | undefined = undefined; + +export function up(pgm: MigrationBuilder): void { + pgm.addColumn('jobs', { + retry_after: { + type: 'timestamptz', + }, + }); +} diff --git a/src/env.ts b/src/env.ts index 2dfb4857..decc840d 100644 --- a/src/env.ts +++ b/src/env.ts @@ -84,6 +84,8 @@ const schema = Type.Object({ JOB_QUEUE_SIZE_LIMIT: Type.Number({ default: 200 }), /** Maximum time a job will run before marking it as failed. */ JOB_QUEUE_TIMEOUT_MS: Type.Number({ default: 60_000 }), + /** Minimum time we will wait to retry a job after it's been executed. */ + JOB_QUEUE_RETRY_AFTER_MS: Type.Number({ default: 5_000 }), /** * The max number of immediate attempts that will be made to retrieve metadata from external URIs @@ -118,7 +120,7 @@ const schema = Type.Object({ * next request that is sent to it (seconds). This value will be overridden by the `Retry-After` * header returned by the domain, if any. */ - METADATA_RATE_LIMITED_HOST_RETRY_AFTER: Type.Number({ default: 3600 }), // 1 hour + METADATA_RATE_LIMITED_HOST_RETRY_AFTER: Type.Number({ default: 60 }), // 1 minute /** * Maximum number of HTTP redirections to follow when fetching metadata. Defaults to 5. */ diff --git a/src/pg/pg-store.ts b/src/pg/pg-store.ts index 3223a2ab..88656dae 100644 --- a/src/pg/pg-store.ts +++ b/src/pg/pg-store.ts @@ -207,7 +207,16 @@ export class PgStore extends BasePgStore { await this.sql` UPDATE jobs SET status = ${args.status}, - invalid_reason = ${args.invalidReason ? args.invalidReason : this.sql`NULL`}, + invalid_reason = ${ + args.status == DbJobStatus.invalid && args.invalidReason + ? args.invalidReason + : this.sql`NULL` + }, + ${ + args.status != DbJobStatus.pending + ? this.sql`retry_count = 0, retry_after = NULL,` + : this.sql`` + } updated_at = NOW() WHERE id = ${args.id} `; @@ -216,15 +225,18 @@ export class PgStore extends BasePgStore { async retryAllFailedJobs(): Promise { await this.sql` UPDATE jobs - SET status = ${DbJobStatus.pending}, retry_count = 0, updated_at = NOW() + SET status = ${DbJobStatus.pending}, retry_count = 0, updated_at = NOW(), retry_after = NULL WHERE status IN (${DbJobStatus.failed}, ${DbJobStatus.invalid}) `; } - async increaseJobRetryCount(args: { id: number }): Promise { + async increaseJobRetryCount(args: { id: number; retry_after: number }): Promise { + const retryAfter = args.retry_after.toString(); const result = await this.sql<{ retry_count: number }[]>` UPDATE jobs - SET retry_count = retry_count + 1, updated_at = NOW() + SET retry_count = retry_count + 1, + updated_at = NOW(), + retry_after = NOW() + INTERVAL '${this.sql(retryAfter)} ms' WHERE id = ${args.id} RETURNING retry_count `; @@ -232,14 +244,14 @@ export class PgStore extends BasePgStore { } /** - * Retrieves a number of queued jobs so they can be processed immediately. + * Retrieves a number of pending jobs so they can be processed immediately. * @param limit - number of jobs to retrieve * @returns `DbJob[]` */ async getPendingJobBatch(args: { limit: number }): Promise { return this.sql` SELECT ${this.sql(JOBS_COLUMNS)} FROM jobs - WHERE status = 'pending' + WHERE status = 'pending' AND (retry_after IS NULL OR retry_after < NOW()) ORDER BY COALESCE(updated_at, created_at) ASC LIMIT ${args.limit} `; diff --git a/src/pg/types.ts b/src/pg/types.ts index 1853269c..85231cd9 100644 --- a/src/pg/types.ts +++ b/src/pg/types.ts @@ -106,6 +106,7 @@ export type DbJob = { retry_count: number; created_at: string; updated_at?: string; + retry_after?: string; }; export type DbUpdateNotification = { @@ -285,6 +286,7 @@ export const JOBS_COLUMNS = [ 'retry_count', 'created_at', 'updated_at', + 'retry_after', ]; export const METADATA_COLUMNS = [ diff --git a/src/token-processor/queue/job/job.ts b/src/token-processor/queue/job/job.ts index 258a8ed5..ad272401 100644 --- a/src/token-processor/queue/job/job.ts +++ b/src/token-processor/queue/job/job.ts @@ -2,7 +2,7 @@ import { logger, resolveOrTimeout, stopwatch } from '@hirosystems/api-toolkit'; import { ENV } from '../../../env'; import { PgStore } from '../../../pg/pg-store'; import { DbJob, DbJobInvalidReason, DbJobStatus } from '../../../pg/types'; -import { getUserErrorInvalidReason, UserError } from '../../util/errors'; +import { getUserErrorInvalidReason, TooManyRequestsHttpError, UserError } from '../../util/errors'; import { RetryableJobError } from '../errors'; import { getJobQueueProcessingMode, JobQueueProcessingMode } from '../helpers'; @@ -52,7 +52,16 @@ export abstract class Job { } } catch (error) { if (error instanceof RetryableJobError) { - const retries = await this.db.increaseJobRetryCount({ id: this.job.id }); + let retry_after = ENV.JOB_QUEUE_RETRY_AFTER_MS; + // If we got rate limited, save this host so we can skip further calls even from jobs for + // other tokens. + if (error.cause instanceof TooManyRequestsHttpError) { + await this.saveRateLimitedHost(error.cause); + if (error.cause.retryAfter) { + retry_after = error.cause.retryAfter * 1_000; + } + } + const retries = await this.db.increaseJobRetryCount({ id: this.job.id, retry_after }); if ( getJobQueueProcessingMode() === JobQueueProcessingMode.strict || retries <= ENV.JOB_QUEUE_MAX_RETRIES @@ -95,4 +104,11 @@ export abstract class Job { return false; } } + + private async saveRateLimitedHost(error: TooManyRequestsHttpError) { + const hostname = error.url.hostname; + const retryAfter = error.retryAfter ?? ENV.METADATA_RATE_LIMITED_HOST_RETRY_AFTER; + logger.info(`Job saving rate limited host ${hostname}, retry after ${retryAfter}s`); + await this.db.insertRateLimitedHost({ values: { hostname, retry_after: retryAfter } }); + } } diff --git a/src/token-processor/queue/job/process-token-job.ts b/src/token-processor/queue/job/process-token-job.ts index 7c919d42..a454f177 100644 --- a/src/token-processor/queue/job/process-token-job.ts +++ b/src/token-processor/queue/job/process-token-job.ts @@ -54,25 +54,16 @@ export class ProcessTokenJob extends Job { contractPrincipal: contract.principal, }); logger.info(`ProcessTokenJob processing ${this.description()}`); - try { - switch (token.type) { - case DbTokenType.ft: - await this.handleFt(client, token, contract); - break; - case DbTokenType.nft: - await this.handleNft(client, token, contract); - break; - case DbTokenType.sft: - await this.handleSft(client, token, contract); - break; - } - } catch (error) { - // If we got rate limited, save this host so we can skip further calls even from jobs for - // other tokens. - if (error instanceof RetryableJobError && error.cause instanceof TooManyRequestsHttpError) { - await this.saveRateLimitedHost(error.cause); - } - throw error; + switch (token.type) { + case DbTokenType.ft: + await this.handleFt(client, token, contract); + break; + case DbTokenType.nft: + await this.handleNft(client, token, contract); + break; + case DbTokenType.sft: + await this.handleSft(client, token, contract); + break; } } @@ -191,13 +182,6 @@ export class ProcessTokenJob extends Job { await this.db.updateProcessedTokenWithMetadata({ id: token.id, values: tokenValues }); } - private async saveRateLimitedHost(error: TooManyRequestsHttpError) { - const hostname = error.url.hostname; - const retryAfter = error.retryAfter ?? ENV.METADATA_RATE_LIMITED_HOST_RETRY_AFTER; - logger.info(`ProcessTokenJob saving rate limited host ${hostname}, retry after ${retryAfter}s`); - await this.db.insertRateLimitedHost({ values: { hostname, retry_after: retryAfter } }); - } - private async getTokenUri( client: StacksNodeRpcClient, tokenNumber?: bigint diff --git a/tests/token-queue/job.test.ts b/tests/token-queue/job.test.ts index 8d0316a4..72b929d3 100644 --- a/tests/token-queue/job.test.ts +++ b/tests/token-queue/job.test.ts @@ -1,4 +1,4 @@ -import { cycleMigrations } from '@hirosystems/api-toolkit'; +import { cycleMigrations, timeout } from '@hirosystems/api-toolkit'; import { ENV } from '../../src/env'; import { MIGRATIONS_DIR, PgStore } from '../../src/pg/pg-store'; import { DbJob, DbSipNumber, DbSmartContractInsert } from '../../src/pg/types'; @@ -64,14 +64,14 @@ describe('Job', () => { const job = new TestRetryableJob({ db, job: dbJob }); await expect(job.work()).resolves.not.toThrow(); - const jobs1 = await db.getPendingJobBatch({ limit: 1 }); - expect(jobs1[0].retry_count).toBe(1); - expect(jobs1[0].status).toBe('pending'); + const jobs1 = await db.getJob({ id: 1 }); + expect(jobs1?.retry_count).toBe(1); + expect(jobs1?.status).toBe('pending'); await expect(job.work()).resolves.not.toThrow(); - const jobs2 = await db.getPendingJobBatch({ limit: 1 }); - expect(jobs2[0].retry_count).toBe(2); - expect(jobs2[0].status).toBe('pending'); + const jobs2 = await db.getJob({ id: 1 }); + expect(jobs2?.retry_count).toBe(2); + expect(jobs2?.status).toBe('pending'); }); test('user error marks job invalid', async () => { @@ -98,6 +98,7 @@ describe('Job', () => { test('strict mode ignores retry_count limit', async () => { ENV.JOB_QUEUE_STRICT_MODE = true; ENV.JOB_QUEUE_MAX_RETRIES = 0; + ENV.JOB_QUEUE_RETRY_AFTER_MS = 0; const job = new TestRetryableJob({ db, job: dbJob }); await expect(job.work()).resolves.not.toThrow(); @@ -106,6 +107,19 @@ describe('Job', () => { expect(jobs1[0].status).toBe('pending'); }); + test('pending job batches consider retry_after', async () => { + ENV.JOB_QUEUE_RETRY_AFTER_MS = 200; + const job = new TestRetryableJob({ db, job: dbJob }); + + await expect(job.work()).resolves.not.toThrow(); + const jobs1 = await db.getPendingJobBatch({ limit: 1 }); + expect(jobs1).toHaveLength(0); + + await timeout(300); + const jobs2 = await db.getPendingJobBatch({ limit: 1 }); + expect(jobs2).toHaveLength(1); + }); + test('db errors are not re-thrown', async () => { await db.close(); const job = new TestDbJob({ db, job: dbJob }); diff --git a/tests/token-queue/process-token-job.test.ts b/tests/token-queue/process-token-job.test.ts index 5d79c93e..22843e18 100644 --- a/tests/token-queue/process-token-job.test.ts +++ b/tests/token-queue/process-token-job.test.ts @@ -851,7 +851,7 @@ describe('ProcessTokenJob', () => { }) .reply(429, { error: 'nope' }, { headers: { 'retry-after': '999' } }); try { - await new ProcessTokenJob({ db, job: tokenJob }).handler(); + await new ProcessTokenJob({ db, job: tokenJob }).work(); } catch (error) { expect(error).toBeInstanceOf(RetryableJobError); const err = error as RetryableJobError;