Skip to content

Commit

Permalink
Log when we push to queue
Browse files Browse the repository at this point in the history
  • Loading branch information
islathehut committed May 13, 2024
1 parent 50f3344 commit aaa2589
Showing 1 changed file with 16 additions and 2 deletions.
18 changes: 16 additions & 2 deletions packages/backend/src/nest/libp2p/process-in-chunks.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,16 @@ export class ProcessInChunksService<T> extends EventEmitter {
'ProcessInChunksService is not active, adding tasks to the dead letter queue!\n\nWARNING: You must call "resume" on the ProcessInChunksService to process the dead letter queue!!!'
)
this.deadLetterQueue.push(task)
this.logger(`There are now ${this.deadLetterQueue.length} items in the dead letter queue`)
return
}

this.logger(`Adding task ${task.taskId} with data ${task.data} to the task queue`)
try {
const success = await this.taskQueue.push(task)
const success = await this.pushToQueueAndRun(task)
if (!success) {
this.logger(`Will try to re-attempt task ${task.taskId} with data ${task.data}`)
await this.taskQueue.push({ ...task, tries: task.tries + 1 })
await this.pushToQueueAndRun({ ...task, tries: task.tries + 1 })
}
} catch (e) {
this.logger.error(`Error occurred while adding new task ${task.taskId} with data ${task.data} to the queue`, e)
Expand All @@ -88,6 +89,19 @@ export class ProcessInChunksService<T> extends EventEmitter {
return success
}

private async pushToQueueAndRun(task: ProcessTask<T>): Promise<boolean> {
this.logger(
`Pushing task ${task.taskId} to queue, there will now be ${this.taskQueue.length() + 1} items in the queue`
)
const success = await this.taskQueue.push(task)
if (success) {
this.logger(`Task ${task.taskId} completed successfully`)
} else {
this.logger(`Task ${task.taskId} failed`)
}
return success
}

public resume() {
if (this.isActive) {
this.logger('ProcessInChunksService is already active')
Expand Down

0 comments on commit aaa2589

Please sign in to comment.