From c317ffb1d8b431daf71c0dc5a5f4acde3895a49a Mon Sep 17 00:00:00 2001 From: Isla Koenigsknecht Date: Mon, 13 May 2024 18:22:07 -0400 Subject: [PATCH] Fix queue processing (#2525) * Process queue fixes and updates * Remove unnecessary log * Log when we push to queue --- .../connections-manager.service.ts | 28 ++-- .../backend/src/nest/libp2p/libp2p.service.ts | 23 +++- .../nest/libp2p/process-in-chunks.service.ts | 124 ++++++++++++------ .../src/nest/libp2p/process-in-chunks.spec.ts | 30 +++-- 4 files changed, 138 insertions(+), 67 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index a030206be7..20ebdc3c21 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -263,20 +263,24 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI public async resume() { this.logger('Resuming!') await this.openSocket() - this.logger('Attempting to redial peers!') + const peersToDial = await this.getPeersOnResume() + this.libp2pService?.resume(peersToDial) + } + + public async getPeersOnResume(): Promise { + this.logger('Getting peers to redial') if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) { - this.logger('Dialing peers with info from pause: ', this.peerInfo) - await this.libp2pService?.redialPeers([...this.peerInfo.connected, ...this.peerInfo.dialed]) - } else { - this.logger('Dialing peers from stored community (if exists)') - const community = await this.localDbService.getCurrentCommunity() - if (!community) { - this.logger(`No community launched, can't redial`) - return - } - const sortedPeers = await this.localDbService.getSortedPeers(community.peerList ?? []) - await this.libp2pService?.redialPeers(sortedPeers) + this.logger('Found peer info from pause: ', this.peerInfo) + return [...this.peerInfo.connected, ...this.peerInfo.dialed] + } + + this.logger('Getting peers from stored community (if exists)') + const community = await this.localDbService.getCurrentCommunity() + if (!community) { + this.logger(`No community launched, no peers found`) + return [] } + return await this.localDbService.getSortedPeers(community.peerList ?? []) } // This method is only used on iOS through rn-bridge for reacting on lifecycle changes diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index dcec245cd4..47a90e1487 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -62,9 +62,21 @@ export class Libp2pService extends EventEmitter { await this.hangUpPeers(peerInfo.dialed) this.dialedPeers.clear() this.connectedPeers.clear() + this.processInChunksService.pause() return peerInfo } + public resume = async (peersToDial: string[]): Promise => { + this.processInChunksService.resume() + if (peersToDial.length === 0) { + this.logger('No peers to redial!') + return + } + + this.logger(`Redialing ${peersToDial.length} peers`) + await this.redialPeers(peersToDial) + } + public readonly createLibp2pAddress = (address: string, peerId: string): string => { return createLibp2pAddress(address, peerId) } @@ -138,8 +150,7 @@ export class Libp2pService extends EventEmitter { // TODO: Sort peers await this.hangUpPeers(dialed) - this.processInChunksService.updateData(toDial) - await this.processInChunksService.process() + this.processInChunksService.updateQueue(toDial) } public async createInstance(params: Libp2pNodeParams): Promise { @@ -208,13 +219,12 @@ export class Libp2pService extends EventEmitter { this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => { const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress)) this.logger('Dialing', nonDialedAddresses.length, 'addresses') - this.processInChunksService.updateData(nonDialedAddresses) - await this.processInChunksService.process() + this.processInChunksService.updateQueue(nonDialedAddresses) }) this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`) this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P) - this.processInChunksService.init(peers, this.dialPeer) + this.processInChunksService.init([], this.dialPeer) this.libp2pInstance.addEventListener('peer:discovery', peer => { this.logger(`${peerId.toString()} discovered ${peer.detail.id}`) @@ -268,7 +278,7 @@ export class Libp2pService extends EventEmitter { this.emit(Libp2pEvents.PEER_DISCONNECTED, peerStat) }) - await this.processInChunksService.process() + this.processInChunksService.updateQueue(peers) this.logger(`Initialized libp2p for peer ${peerId.toString()}`) } @@ -276,6 +286,7 @@ export class Libp2pService extends EventEmitter { public async close(): Promise { this.logger('Closing libp2p service') await this.libp2pInstance?.stop() + this.processInChunksService.pause() this.libp2pInstance = null this.connectedPeers = new Map() this.dialedPeers = new Set() diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index bad92b873d..f83fd8d218 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -1,8 +1,8 @@ import { EventEmitter } from 'events' -import fastq from 'fastq' -import type { queue, done } from 'fastq' +import fastq, { queueAsPromised } from 'fastq' import Logger from '../common/logger' +import { randomUUID } from 'crypto' const DEFAULT_CHUNK_SIZE = 10 export const DEFAULT_NUM_TRIES = 2 @@ -10,13 +10,14 @@ export const DEFAULT_NUM_TRIES = 2 type ProcessTask = { data: T tries: number + taskId: string } export class ProcessInChunksService extends EventEmitter { private isActive: boolean - private data: Set = new Set() private chunkSize: number - private taskQueue: queue> + private taskQueue: queueAsPromised> + private deadLetterQueue: ProcessTask[] = [] private processItem: (arg: T) => Promise private readonly logger = Logger(ProcessInChunksService.name) constructor() { @@ -27,59 +28,106 @@ export class ProcessInChunksService extends EventEmitter { this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`) this.processItem = processItem this.chunkSize = chunkSize - this.taskQueue = fastq(this, this.processOneItem, this.chunkSize) - this.updateData(data) - this.addToTaskQueue() + this.taskQueue = fastq.promise(this, this.processOneItem, this.chunkSize) + this.isActive = true + this.updateQueue(data) } - public updateData(items: T[]) { - this.logger(`Updating data with ${items.length} items`) - this.taskQueue.pause() - items.forEach(item => this.data.add(item)) - this.addToTaskQueue() + public updateQueue(items: T[]) { + this.logger(`Adding ${items.length} items to the task queue`) + items.forEach(item => this.addToTaskQueue(item)) } - private addToTaskQueue() { - this.logger(`Adding ${this.data.size} items to the task queue`) - for (const item of this.data) { - if (item) { - this.logger(`Adding data ${item} to the task queue`) - this.data.delete(item) - try { - this.taskQueue.push({ data: item, tries: 0 } as ProcessTask) - } catch (e) { - this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e) - this.data.add(item) - } + private async addToTaskQueue(task: ProcessTask): Promise + private async addToTaskQueue(item: T): Promise + private async addToTaskQueue(itemOrTask: T | ProcessTask): Promise { + if (!itemOrTask) { + this.logger.error('Item/task is null or undefined, skipping!') + return + } + + let task: ProcessTask + if ((itemOrTask as ProcessTask).taskId != null) { + task = itemOrTask as ProcessTask + } else { + this.logger(`Creating new task for ${itemOrTask}`) + task = { data: itemOrTask as T, tries: 0, taskId: randomUUID() } + } + + if (!this.isActive) { + this.logger( + 'ProcessInChunksService is not active, adding tasks to the dead letter queue!\n\nWARNING: You must call "resume" on the ProcessInChunksService to process the dead letter queue!!!' + ) + this.deadLetterQueue.push(task) + this.logger(`There are now ${this.deadLetterQueue.length} items in the dead letter queue`) + return + } + + this.logger(`Adding task ${task.taskId} with data ${task.data} to the task queue`) + try { + const success = await this.pushToQueueAndRun(task) + if (!success) { + this.logger(`Will try to re-attempt task ${task.taskId} with data ${task.data}`) + await this.pushToQueueAndRun({ ...task, tries: task.tries + 1 }) } + } catch (e) { + this.logger.error(`Error occurred while adding new task ${task.taskId} with data ${task.data} to the queue`, e) } } - public async processOneItem(task: ProcessTask) { + public async processOneItem(task: ProcessTask): Promise { + let success: boolean = false try { - this.logger(`Processing task with data ${task.data}`) + this.logger(`Processing task ${task.taskId} with data ${task.data}`) await this.processItem(task.data) + success = true } catch (e) { - this.logger.error(`Processing task with data ${task.data} failed`, e) - if (task.tries + 1 < DEFAULT_NUM_TRIES) { - this.logger(`Will try to re-attempt task with data ${task.data}`) - this.taskQueue.push({ ...task, tries: task.tries + 1 }) - } + this.logger.error(`Processing task ${task.taskId} with data ${task.data} failed`, e) } finally { this.logger(`Done attempting to process task with data ${task.data}`) } + return success } - public async process() { - this.logger(`Processing ${this.taskQueue.length()} items`) - this.taskQueue.resume() + private async pushToQueueAndRun(task: ProcessTask): Promise { + this.logger( + `Pushing task ${task.taskId} to queue, there will now be ${this.taskQueue.length() + 1} items in the queue` + ) + const success = await this.taskQueue.push(task) + if (success) { + this.logger(`Task ${task.taskId} completed successfully`) + } else { + this.logger(`Task ${task.taskId} failed`) + } + return success } - public stop() { + public resume() { if (this.isActive) { - this.logger('Stopping initial dial') - this.isActive = false - this.taskQueue.pause() + this.logger('ProcessInChunksService is already active') + return + } + + this.logger('Resuming ProcessInChunksService') + this.isActive = true + this.taskQueue.resume() + if (this.deadLetterQueue) { + this.logger(`Adding ${this.deadLetterQueue.length} tasks from the dead letter queue to the task queue`) + this.deadLetterQueue.forEach(task => this.addToTaskQueue(task)) + this.deadLetterQueue = [] } } + + public pause() { + if (!this.isActive) { + this.logger('ProcessInChunksService is already paused') + return + } + + this.logger('Pausing ProcessInChunksService') + this.isActive = false + this.deadLetterQueue = this.taskQueue.getQueue() + this.taskQueue.kill() + this.taskQueue.pause() + } } diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index ce751afdf3..24d05dd42f 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -3,6 +3,7 @@ import { ProcessInChunksService } from './process-in-chunks.service' import waitForExpect from 'wait-for-expect' import { TestModule } from '../common/test.module' import { Test, TestingModule } from '@nestjs/testing' +import { sleep } from '../common/sleep' describe('ProcessInChunks', () => { let module: TestingModule let processInChunks: ProcessInChunksService @@ -25,7 +26,6 @@ describe('ProcessInChunks', () => { .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 2')) processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) - await processInChunks.process() await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(6) }) @@ -39,9 +39,7 @@ describe('ProcessInChunks', () => { .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 1')) processInChunks.init(['a', 'b'], mockProcessItem) - await processInChunks.process() - processInChunks.updateData(['e', 'f']) - await processInChunks.process() + processInChunks.updateQueue(['e', 'f']) await waitForExpect(() => { expect(mockProcessItem).toBeCalledTimes(5) }) @@ -58,18 +56,28 @@ describe('ProcessInChunks', () => { .mockRejectedValueOnce(new Error('Rejected 2')) const chunkSize = 2 processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize) - await processInChunks.process() + await sleep(10000) await waitForExpect(() => { - expect(mockProcessItem).toBeCalledTimes(2) + expect(mockProcessItem).toBeCalledTimes(6) }) }) - it.skip('does not process more data if stopped', async () => { + it('does not process more data if stopped', async () => { const mockProcessItem = jest.fn(async () => {}) - const processInChunks = new ProcessInChunksService() - processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) - processInChunks.stop() - await processInChunks.process() + processInChunks.init([], mockProcessItem) + processInChunks.pause() + processInChunks.updateQueue(['a', 'b', 'c', 'd']) expect(mockProcessItem).not.toBeCalled() }) + + it('processes tasks after resuming from pause', async () => { + const mockProcessItem = jest.fn(async () => {}) + processInChunks.init([], mockProcessItem) + processInChunks.pause() + processInChunks.updateQueue(['a', 'b', 'c', 'd']) + processInChunks.resume() + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) + }) })