Skip to content

Commit

Permalink
fix: retry after rate limit
Browse files Browse the repository at this point in the history
  • Loading branch information
rafaelcr committed Aug 30, 2024
1 parent a8fb329 commit 90b78af
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 33 deletions.
2 changes: 1 addition & 1 deletion src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ const schema = Type.Object({
* next request that is sent to it (seconds). This value will be overridden by the `Retry-After`
* header returned by the domain, if any.
*/
METADATA_RATE_LIMITED_HOST_RETRY_AFTER: Type.Number({ default: 3600 }), // 1 hour
METADATA_RATE_LIMITED_HOST_RETRY_AFTER: Type.Number({ default: 60 }), // 1 minute
/**
* Maximum number of HTTP redirections to follow when fetching metadata. Defaults to 5.
*/
Expand Down
23 changes: 18 additions & 5 deletions src/token-processor/queue/job/job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ import { logger, resolveOrTimeout, stopwatch } from '@hirosystems/api-toolkit';
import { ENV } from '../../../env';
import { PgStore } from '../../../pg/pg-store';
import { DbJob, DbJobInvalidReason, DbJobStatus } from '../../../pg/types';
import { getUserErrorInvalidReason, UserError } from '../../util/errors';
import { getUserErrorInvalidReason, TooManyRequestsHttpError, UserError } from '../../util/errors';
import { RetryableJobError } from '../errors';
import { getJobQueueProcessingMode, JobQueueProcessingMode } from '../helpers';

Expand Down Expand Up @@ -52,10 +52,16 @@ export abstract class Job {
}
} catch (error) {
if (error instanceof RetryableJobError) {
const retries = await this.db.increaseJobRetryCount({
id: this.job.id,
retry_after: ENV.JOB_QUEUE_RETRY_AFTER_MS,
});
let retry_after = ENV.JOB_QUEUE_RETRY_AFTER_MS;
// If we got rate limited, save this host so we can skip further calls even from jobs for
// other tokens.
if (error.cause instanceof TooManyRequestsHttpError) {
await this.saveRateLimitedHost(error.cause);
if (error.cause.retryAfter) {
retry_after = error.cause.retryAfter * 1_000;
}
}
const retries = await this.db.increaseJobRetryCount({ id: this.job.id, retry_after });
if (
getJobQueueProcessingMode() === JobQueueProcessingMode.strict ||
retries <= ENV.JOB_QUEUE_MAX_RETRIES
Expand Down Expand Up @@ -98,4 +104,11 @@ export abstract class Job {
return false;
}
}

private async saveRateLimitedHost(error: TooManyRequestsHttpError) {
const hostname = error.url.hostname;
const retryAfter = error.retryAfter ?? ENV.METADATA_RATE_LIMITED_HOST_RETRY_AFTER;
logger.info(`Job saving rate limited host ${hostname}, retry after ${retryAfter}s`);
await this.db.insertRateLimitedHost({ values: { hostname, retry_after: retryAfter } });
}
}
36 changes: 10 additions & 26 deletions src/token-processor/queue/job/process-token-job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,25 +54,16 @@ export class ProcessTokenJob extends Job {
contractPrincipal: contract.principal,
});
logger.info(`ProcessTokenJob processing ${this.description()}`);
try {
switch (token.type) {
case DbTokenType.ft:
await this.handleFt(client, token, contract);
break;
case DbTokenType.nft:
await this.handleNft(client, token, contract);
break;
case DbTokenType.sft:
await this.handleSft(client, token, contract);
break;
}
} catch (error) {
// If we got rate limited, save this host so we can skip further calls even from jobs for
// other tokens.
if (error instanceof RetryableJobError && error.cause instanceof TooManyRequestsHttpError) {
await this.saveRateLimitedHost(error.cause);
}
throw error;
switch (token.type) {
case DbTokenType.ft:
await this.handleFt(client, token, contract);
break;
case DbTokenType.nft:
await this.handleNft(client, token, contract);
break;
case DbTokenType.sft:
await this.handleSft(client, token, contract);
break;
}
}

Expand Down Expand Up @@ -191,13 +182,6 @@ export class ProcessTokenJob extends Job {
await this.db.updateProcessedTokenWithMetadata({ id: token.id, values: tokenValues });
}

private async saveRateLimitedHost(error: TooManyRequestsHttpError) {
const hostname = error.url.hostname;
const retryAfter = error.retryAfter ?? ENV.METADATA_RATE_LIMITED_HOST_RETRY_AFTER;
logger.info(`ProcessTokenJob saving rate limited host ${hostname}, retry after ${retryAfter}s`);
await this.db.insertRateLimitedHost({ values: { hostname, retry_after: retryAfter } });
}

private async getTokenUri(
client: StacksNodeRpcClient,
tokenNumber?: bigint
Expand Down
2 changes: 1 addition & 1 deletion tests/token-queue/process-token-job.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -851,7 +851,7 @@ describe('ProcessTokenJob', () => {
})
.reply(429, { error: 'nope' }, { headers: { 'retry-after': '999' } });
try {
await new ProcessTokenJob({ db, job: tokenJob }).handler();
await new ProcessTokenJob({ db, job: tokenJob }).work();
} catch (error) {
expect(error).toBeInstanceOf(RetryableJobError);
const err = error as RetryableJobError;
Expand Down

0 comments on commit 90b78af

Please sign in to comment.