diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index 6be1859259..f83fd8d218 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -59,15 +59,16 @@ export class ProcessInChunksService extends EventEmitter { 'ProcessInChunksService is not active, adding tasks to the dead letter queue!\n\nWARNING: You must call "resume" on the ProcessInChunksService to process the dead letter queue!!!' ) this.deadLetterQueue.push(task) + this.logger(`There are now ${this.deadLetterQueue.length} items in the dead letter queue`) return } this.logger(`Adding task ${task.taskId} with data ${task.data} to the task queue`) try { - const success = await this.taskQueue.push(task) + const success = await this.pushToQueueAndRun(task) if (!success) { this.logger(`Will try to re-attempt task ${task.taskId} with data ${task.data}`) - await this.taskQueue.push({ ...task, tries: task.tries + 1 }) + await this.pushToQueueAndRun({ ...task, tries: task.tries + 1 }) } } catch (e) { this.logger.error(`Error occurred while adding new task ${task.taskId} with data ${task.data} to the queue`, e) @@ -88,6 +89,19 @@ export class ProcessInChunksService extends EventEmitter { return success } + private async pushToQueueAndRun(task: ProcessTask): Promise { + this.logger( + `Pushing task ${task.taskId} to queue, there will now be ${this.taskQueue.length() + 1} items in the queue` + ) + const success = await this.taskQueue.push(task) + if (success) { + this.logger(`Task ${task.taskId} completed successfully`) + } else { + this.logger(`Task ${task.taskId} failed`) + } + return success + } + public resume() { if (this.isActive) { this.logger('ProcessInChunksService is already active')