Skip to content

Commit

Permalink
chore: add shutdown config for the token metadata processor
Browse files Browse the repository at this point in the history
  • Loading branch information
zone117x committed Aug 20, 2021
1 parent 8bf8496 commit 0f0b046
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 8 deletions.
23 changes: 16 additions & 7 deletions src/event-stream/tokens-contract-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -707,22 +707,28 @@ export class TokensProcessorQueue {
/** The entries currently queued for processing in memory, keyed by the queue entry db id. */
readonly queuedEntries: Map<number, DbTokenMetadataQueueEntry> = new Map();

readonly onTokenMetadataUpdateQueued: (entry: DbTokenMetadataQueueEntry) => void;

constructor(db: DataStore, chainId: ChainID) {
this.db = db;
this.chainId = chainId;
this.queue = new PQueue({ concurrency: TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT });
this.db.on('tokenMetadataUpdateQueued', entry => {
// Only add to queue if it's not already backed up.
// if (this.queue.size < this.queue.concurrency && this.queue.pending < this.queue.concurrency) {
if (this.queuedEntries.size < this.queue.concurrency) {
this.queueHandler(entry);
}
});
this.onTokenMetadataUpdateQueued = entry => this.queueHandler(entry);
this.db.on('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
}

close() {
this.db.off('tokenMetadataUpdateQueued', this.onTokenMetadataUpdateQueued);
this.queue.pause();
this.queue.clear();
}

async drainDbQueue(): Promise<void> {
let entries: DbTokenMetadataQueueEntry[] = [];
do {
if (this.queue.isPaused) {
return;
}
const queuedEntries = [...this.queuedEntries.keys()];
entries = await this.db.getTokenMetadataQueue(
TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT,
Expand All @@ -737,6 +743,9 @@ export class TokensProcessorQueue {
}

async checkDbQueue(): Promise<void> {
if (this.queue.isPaused) {
return;
}
const queuedEntries = [...this.queuedEntries.keys()];
const limit = TOKEN_METADATA_PARSING_CONCURRENCY_LIMIT - this.queuedEntries.size;
if (limit > 0) {
Expand Down
6 changes: 5 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,8 +138,12 @@ async function init(): Promise<void> {
logger.error(`Error monitoring RPC connection: ${error}`, error);
});

// TODO: register an exit-handler for the tokenProcessorQueue instance
const tokenMetadataProcessor = new TokensProcessorQueue(db, configuredChainID);
registerShutdownConfig({
name: 'Token Metadata Processor',
handler: () => tokenMetadataProcessor.close(),
forceKillable: true,
});
// check if db has any non-processed token queues and await them all here
await tokenMetadataProcessor.drainDbQueue();
}
Expand Down

0 comments on commit 0f0b046

Please sign in to comment.