Skip to content

Commit

Permalink
Fix queue processing (#2525)
Browse files Browse the repository at this point in the history
* Process queue fixes and updates

* Remove unnecessary log

* Log when we push to queue
  • Loading branch information
Isla Koenigsknecht authored May 13, 2024
1 parent 134fbcb commit c317ffb
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,24 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
public async resume() {
this.logger('Resuming!')
await this.openSocket()
this.logger('Attempting to redial peers!')
const peersToDial = await this.getPeersOnResume()
this.libp2pService?.resume(peersToDial)
}

public async getPeersOnResume(): Promise<string[]> {
this.logger('Getting peers to redial')
if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) {
this.logger('Dialing peers with info from pause: ', this.peerInfo)
await this.libp2pService?.redialPeers([...this.peerInfo.connected, ...this.peerInfo.dialed])
} else {
this.logger('Dialing peers from stored community (if exists)')
const community = await this.localDbService.getCurrentCommunity()
if (!community) {
this.logger(`No community launched, can't redial`)
return
}
const sortedPeers = await this.localDbService.getSortedPeers(community.peerList ?? [])
await this.libp2pService?.redialPeers(sortedPeers)
this.logger('Found peer info from pause: ', this.peerInfo)
return [...this.peerInfo.connected, ...this.peerInfo.dialed]
}

this.logger('Getting peers from stored community (if exists)')
const community = await this.localDbService.getCurrentCommunity()
if (!community) {
this.logger(`No community launched, no peers found`)
return []
}
return await this.localDbService.getSortedPeers(community.peerList ?? [])
}

// This method is only used on iOS through rn-bridge for reacting on lifecycle changes
Expand Down
23 changes: 17 additions & 6 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,21 @@ export class Libp2pService extends EventEmitter {
await this.hangUpPeers(peerInfo.dialed)
this.dialedPeers.clear()
this.connectedPeers.clear()
this.processInChunksService.pause()
return peerInfo
}

public resume = async (peersToDial: string[]): Promise<void> => {
this.processInChunksService.resume()
if (peersToDial.length === 0) {
this.logger('No peers to redial!')
return
}

this.logger(`Redialing ${peersToDial.length} peers`)
await this.redialPeers(peersToDial)
}

public readonly createLibp2pAddress = (address: string, peerId: string): string => {
return createLibp2pAddress(address, peerId)
}
Expand Down Expand Up @@ -138,8 +150,7 @@ export class Libp2pService extends EventEmitter {
// TODO: Sort peers
await this.hangUpPeers(dialed)

this.processInChunksService.updateData(toDial)
await this.processInChunksService.process()
this.processInChunksService.updateQueue(toDial)
}

public async createInstance(params: Libp2pNodeParams): Promise<Libp2p> {
Expand Down Expand Up @@ -208,13 +219,12 @@ export class Libp2pService extends EventEmitter {
this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => {
const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress))
this.logger('Dialing', nonDialedAddresses.length, 'addresses')
this.processInChunksService.updateData(nonDialedAddresses)
await this.processInChunksService.process()
this.processInChunksService.updateQueue(nonDialedAddresses)
})

this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`)
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P)
this.processInChunksService.init(peers, this.dialPeer)
this.processInChunksService.init([], this.dialPeer)

this.libp2pInstance.addEventListener('peer:discovery', peer => {
this.logger(`${peerId.toString()} discovered ${peer.detail.id}`)
Expand Down Expand Up @@ -268,14 +278,15 @@ export class Libp2pService extends EventEmitter {
this.emit(Libp2pEvents.PEER_DISCONNECTED, peerStat)
})

await this.processInChunksService.process()
this.processInChunksService.updateQueue(peers)

this.logger(`Initialized libp2p for peer ${peerId.toString()}`)
}

public async close(): Promise<void> {
this.logger('Closing libp2p service')
await this.libp2pInstance?.stop()
this.processInChunksService.pause()
this.libp2pInstance = null
this.connectedPeers = new Map()
this.dialedPeers = new Set()
Expand Down
124 changes: 86 additions & 38 deletions packages/backend/src/nest/libp2p/process-in-chunks.service.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { EventEmitter } from 'events'
import fastq from 'fastq'
import type { queue, done } from 'fastq'
import fastq, { queueAsPromised } from 'fastq'

import Logger from '../common/logger'
import { randomUUID } from 'crypto'

const DEFAULT_CHUNK_SIZE = 10
export const DEFAULT_NUM_TRIES = 2

type ProcessTask<T> = {
data: T
tries: number
taskId: string
}

export class ProcessInChunksService<T> extends EventEmitter {
private isActive: boolean
private data: Set<T> = new Set()
private chunkSize: number
private taskQueue: queue<ProcessTask<T>>
private taskQueue: queueAsPromised<ProcessTask<T>>
private deadLetterQueue: ProcessTask<T>[] = []
private processItem: (arg: T) => Promise<any>
private readonly logger = Logger(ProcessInChunksService.name)
constructor() {
Expand All @@ -27,59 +28,106 @@ export class ProcessInChunksService<T> extends EventEmitter {
this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`)
this.processItem = processItem
this.chunkSize = chunkSize
this.taskQueue = fastq(this, this.processOneItem, this.chunkSize)
this.updateData(data)
this.addToTaskQueue()
this.taskQueue = fastq.promise(this, this.processOneItem, this.chunkSize)
this.isActive = true
this.updateQueue(data)
}

public updateData(items: T[]) {
this.logger(`Updating data with ${items.length} items`)
this.taskQueue.pause()
items.forEach(item => this.data.add(item))
this.addToTaskQueue()
public updateQueue(items: T[]) {
this.logger(`Adding ${items.length} items to the task queue`)
items.forEach(item => this.addToTaskQueue(item))
}

private addToTaskQueue() {
this.logger(`Adding ${this.data.size} items to the task queue`)
for (const item of this.data) {
if (item) {
this.logger(`Adding data ${item} to the task queue`)
this.data.delete(item)
try {
this.taskQueue.push({ data: item, tries: 0 } as ProcessTask<T>)
} catch (e) {
this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e)
this.data.add(item)
}
private async addToTaskQueue(task: ProcessTask<T>): Promise<void>
private async addToTaskQueue(item: T): Promise<void>
private async addToTaskQueue(itemOrTask: T | ProcessTask<T>): Promise<void> {
if (!itemOrTask) {
this.logger.error('Item/task is null or undefined, skipping!')
return
}

let task: ProcessTask<T>
if ((itemOrTask as ProcessTask<T>).taskId != null) {
task = itemOrTask as ProcessTask<T>
} else {
this.logger(`Creating new task for ${itemOrTask}`)
task = { data: itemOrTask as T, tries: 0, taskId: randomUUID() }
}

if (!this.isActive) {
this.logger(
'ProcessInChunksService is not active, adding tasks to the dead letter queue!\n\nWARNING: You must call "resume" on the ProcessInChunksService to process the dead letter queue!!!'
)
this.deadLetterQueue.push(task)
this.logger(`There are now ${this.deadLetterQueue.length} items in the dead letter queue`)
return
}

this.logger(`Adding task ${task.taskId} with data ${task.data} to the task queue`)
try {
const success = await this.pushToQueueAndRun(task)
if (!success) {
this.logger(`Will try to re-attempt task ${task.taskId} with data ${task.data}`)
await this.pushToQueueAndRun({ ...task, tries: task.tries + 1 })
}
} catch (e) {
this.logger.error(`Error occurred while adding new task ${task.taskId} with data ${task.data} to the queue`, e)
}
}

public async processOneItem(task: ProcessTask<T>) {
public async processOneItem(task: ProcessTask<T>): Promise<boolean> {
let success: boolean = false
try {
this.logger(`Processing task with data ${task.data}`)
this.logger(`Processing task ${task.taskId} with data ${task.data}`)
await this.processItem(task.data)
success = true
} catch (e) {
this.logger.error(`Processing task with data ${task.data} failed`, e)
if (task.tries + 1 < DEFAULT_NUM_TRIES) {
this.logger(`Will try to re-attempt task with data ${task.data}`)
this.taskQueue.push({ ...task, tries: task.tries + 1 })
}
this.logger.error(`Processing task ${task.taskId} with data ${task.data} failed`, e)
} finally {
this.logger(`Done attempting to process task with data ${task.data}`)
}
return success
}

public async process() {
this.logger(`Processing ${this.taskQueue.length()} items`)
this.taskQueue.resume()
private async pushToQueueAndRun(task: ProcessTask<T>): Promise<boolean> {
this.logger(
`Pushing task ${task.taskId} to queue, there will now be ${this.taskQueue.length() + 1} items in the queue`
)
const success = await this.taskQueue.push(task)
if (success) {
this.logger(`Task ${task.taskId} completed successfully`)
} else {
this.logger(`Task ${task.taskId} failed`)
}
return success
}

public stop() {
public resume() {
if (this.isActive) {
this.logger('Stopping initial dial')
this.isActive = false
this.taskQueue.pause()
this.logger('ProcessInChunksService is already active')
return
}

this.logger('Resuming ProcessInChunksService')
this.isActive = true
this.taskQueue.resume()
if (this.deadLetterQueue) {
this.logger(`Adding ${this.deadLetterQueue.length} tasks from the dead letter queue to the task queue`)
this.deadLetterQueue.forEach(task => this.addToTaskQueue(task))
this.deadLetterQueue = []
}
}

public pause() {
if (!this.isActive) {
this.logger('ProcessInChunksService is already paused')
return
}

this.logger('Pausing ProcessInChunksService')
this.isActive = false
this.deadLetterQueue = this.taskQueue.getQueue()
this.taskQueue.kill()
this.taskQueue.pause()
}
}
30 changes: 19 additions & 11 deletions packages/backend/src/nest/libp2p/process-in-chunks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { ProcessInChunksService } from './process-in-chunks.service'
import waitForExpect from 'wait-for-expect'
import { TestModule } from '../common/test.module'
import { Test, TestingModule } from '@nestjs/testing'
import { sleep } from '../common/sleep'
describe('ProcessInChunks', () => {
let module: TestingModule
let processInChunks: ProcessInChunksService<string>
Expand All @@ -25,7 +26,6 @@ describe('ProcessInChunks', () => {
.mockResolvedValueOnce()
.mockRejectedValueOnce(new Error('Rejected 2'))
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
await processInChunks.process()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(6)
})
Expand All @@ -39,9 +39,7 @@ describe('ProcessInChunks', () => {
.mockResolvedValueOnce()
.mockRejectedValueOnce(new Error('Rejected 1'))
processInChunks.init(['a', 'b'], mockProcessItem)
await processInChunks.process()
processInChunks.updateData(['e', 'f'])
await processInChunks.process()
processInChunks.updateQueue(['e', 'f'])
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(5)
})
Expand All @@ -58,18 +56,28 @@ describe('ProcessInChunks', () => {
.mockRejectedValueOnce(new Error('Rejected 2'))
const chunkSize = 2
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize)
await processInChunks.process()
await sleep(10000)
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(2)
expect(mockProcessItem).toBeCalledTimes(6)
})
})

it.skip('does not process more data if stopped', async () => {
it('does not process more data if stopped', async () => {
const mockProcessItem = jest.fn(async () => {})
const processInChunks = new ProcessInChunksService()
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
processInChunks.stop()
await processInChunks.process()
processInChunks.init([], mockProcessItem)
processInChunks.pause()
processInChunks.updateQueue(['a', 'b', 'c', 'd'])
expect(mockProcessItem).not.toBeCalled()
})

it('processes tasks after resuming from pause', async () => {
const mockProcessItem = jest.fn(async () => {})
processInChunks.init([], mockProcessItem)
processInChunks.pause()
processInChunks.updateQueue(['a', 'b', 'c', 'd'])
processInChunks.resume()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(4)
})
})
})

0 comments on commit c317ffb

Please sign in to comment.