From 566a65d65e2f1f989e2d54fd299e77f8d60adc81 Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Fri, 1 Mar 2024 11:40:50 -0600 Subject: [PATCH 1/2] feat: improve pg write queue Signed-off-by: bestmike007 --- src/datastore/helpers.ts | 18 +++++++++++++++--- 1 file changed, 15 insertions(+), 3 deletions(-) diff --git a/src/datastore/helpers.ts b/src/datastore/helpers.ts index fba14fdaa8..734228af30 100644 --- a/src/datastore/helpers.ts +++ b/src/datastore/helpers.ts @@ -1343,14 +1343,26 @@ export function newReOrgUpdatedEntities(): ReOrgUpdatedEntities { */ export class PgWriteQueue { readonly queue: PQueue; + private tasks: Promise[]; constructor() { const concurrency = Math.max(1, getUintEnvOrDefault('STACKS_BLOCK_DATA_INSERT_CONCURRENCY', 4)); this.queue = new PQueue({ concurrency, autoStart: true }); + this.tasks = []; } enqueue(task: Parameters[0]): void { - void this.queue.add(task); + const p = this.queue.add(task); + p.catch(void 0); + this.tasks.push(p); } - done(): Promise { - return this.queue.onIdle(); + async done(): Promise { + // https://medium.com/@alkor_shikyaro/transactions-and-promises-in-node-js-ca5a3aeb6b74 + const results = await Promise.allSettled(this.tasks); + this.tasks = []; + const firstRejected = results.find(v => v.status === 'rejected') as + | PromiseRejectedResult + | undefined; + if (firstRejected != null) { + throw firstRejected.reason; + } } } From fdb956df3b5e4575383a915fa7f673c1d6925f0e Mon Sep 17 00:00:00 2001 From: bestmike007 Date: Tue, 5 Mar 2024 21:46:13 -0600 Subject: [PATCH 2/2] chore: log PgWriteQueue error Signed-off-by: bestmike007 --- src/datastore/helpers.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/datastore/helpers.ts b/src/datastore/helpers.ts index 734228af30..6825546415 100644 --- a/src/datastore/helpers.ts +++ b/src/datastore/helpers.ts @@ -1351,7 +1351,7 @@ export class PgWriteQueue { } enqueue(task: Parameters[0]): void { const p = this.queue.add(task); - p.catch(void 0); + p.catch(e => logger.error(e, 'PgWriteQueue task failed')); this.tasks.push(p); } async done(): Promise {