Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix queue processing #2525

Merged
merged 3 commits into from
May 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -263,20 +263,24 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
public async resume() {
this.logger('Resuming!')
await this.openSocket()
this.logger('Attempting to redial peers!')
const peersToDial = await this.getPeersOnResume()
this.libp2pService?.resume(peersToDial)
}

public async getPeersOnResume(): Promise<string[]> {
this.logger('Getting peers to redial')
if (this.peerInfo && (this.peerInfo?.connected.length !== 0 || this.peerInfo?.dialed.length !== 0)) {
this.logger('Dialing peers with info from pause: ', this.peerInfo)
await this.libp2pService?.redialPeers([...this.peerInfo.connected, ...this.peerInfo.dialed])
} else {
this.logger('Dialing peers from stored community (if exists)')
const community = await this.localDbService.getCurrentCommunity()
if (!community) {
this.logger(`No community launched, can't redial`)
return
}
const sortedPeers = await this.localDbService.getSortedPeers(community.peerList ?? [])
await this.libp2pService?.redialPeers(sortedPeers)
this.logger('Found peer info from pause: ', this.peerInfo)
return [...this.peerInfo.connected, ...this.peerInfo.dialed]
}

this.logger('Getting peers from stored community (if exists)')
const community = await this.localDbService.getCurrentCommunity()
if (!community) {
this.logger(`No community launched, no peers found`)
return []
}
return await this.localDbService.getSortedPeers(community.peerList ?? [])
}

// This method is only used on iOS through rn-bridge for reacting on lifecycle changes
Expand Down
23 changes: 17 additions & 6 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,21 @@ export class Libp2pService extends EventEmitter {
await this.hangUpPeers(peerInfo.dialed)
this.dialedPeers.clear()
this.connectedPeers.clear()
this.processInChunksService.pause()
return peerInfo
}

public resume = async (peersToDial: string[]): Promise<void> => {
this.processInChunksService.resume()
if (peersToDial.length === 0) {
this.logger('No peers to redial!')
return
}

this.logger(`Redialing ${peersToDial.length} peers`)
await this.redialPeers(peersToDial)
}

public readonly createLibp2pAddress = (address: string, peerId: string): string => {
return createLibp2pAddress(address, peerId)
}
Expand Down Expand Up @@ -138,8 +150,7 @@ export class Libp2pService extends EventEmitter {
// TODO: Sort peers
await this.hangUpPeers(dialed)

this.processInChunksService.updateData(toDial)
await this.processInChunksService.process()
this.processInChunksService.updateQueue(toDial)
}

public async createInstance(params: Libp2pNodeParams): Promise<Libp2p> {
Expand Down Expand Up @@ -208,13 +219,12 @@ export class Libp2pService extends EventEmitter {
this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => {
const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress))
this.logger('Dialing', nonDialedAddresses.length, 'addresses')
this.processInChunksService.updateData(nonDialedAddresses)
await this.processInChunksService.process()
this.processInChunksService.updateQueue(nonDialedAddresses)
})

this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`)
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P)
this.processInChunksService.init(peers, this.dialPeer)
this.processInChunksService.init([], this.dialPeer)

this.libp2pInstance.addEventListener('peer:discovery', peer => {
this.logger(`${peerId.toString()} discovered ${peer.detail.id}`)
Expand Down Expand Up @@ -268,14 +278,15 @@ export class Libp2pService extends EventEmitter {
this.emit(Libp2pEvents.PEER_DISCONNECTED, peerStat)
})

await this.processInChunksService.process()
this.processInChunksService.updateQueue(peers)

this.logger(`Initialized libp2p for peer ${peerId.toString()}`)
}

public async close(): Promise<void> {
this.logger('Closing libp2p service')
await this.libp2pInstance?.stop()
this.processInChunksService.pause()
this.libp2pInstance = null
this.connectedPeers = new Map()
this.dialedPeers = new Set()
Expand Down
124 changes: 86 additions & 38 deletions packages/backend/src/nest/libp2p/process-in-chunks.service.ts
Original file line number Diff line number Diff line change
@@ -1,22 +1,23 @@
import { EventEmitter } from 'events'
import fastq from 'fastq'
import type { queue, done } from 'fastq'
import fastq, { queueAsPromised } from 'fastq'

import Logger from '../common/logger'
import { randomUUID } from 'crypto'

const DEFAULT_CHUNK_SIZE = 10
export const DEFAULT_NUM_TRIES = 2

type ProcessTask<T> = {
data: T
tries: number
taskId: string
}

export class ProcessInChunksService<T> extends EventEmitter {
private isActive: boolean
private data: Set<T> = new Set()
private chunkSize: number
private taskQueue: queue<ProcessTask<T>>
private taskQueue: queueAsPromised<ProcessTask<T>>
private deadLetterQueue: ProcessTask<T>[] = []
private processItem: (arg: T) => Promise<any>
private readonly logger = Logger(ProcessInChunksService.name)
constructor() {
Expand All @@ -27,59 +28,106 @@ export class ProcessInChunksService<T> extends EventEmitter {
this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`)
this.processItem = processItem
this.chunkSize = chunkSize
this.taskQueue = fastq(this, this.processOneItem, this.chunkSize)
this.updateData(data)
this.addToTaskQueue()
this.taskQueue = fastq.promise(this, this.processOneItem, this.chunkSize)
this.isActive = true
this.updateQueue(data)
}

public updateData(items: T[]) {
this.logger(`Updating data with ${items.length} items`)
this.taskQueue.pause()
items.forEach(item => this.data.add(item))
this.addToTaskQueue()
public updateQueue(items: T[]) {
this.logger(`Adding ${items.length} items to the task queue`)
items.forEach(item => this.addToTaskQueue(item))
islathehut marked this conversation as resolved.
Show resolved Hide resolved
}

private addToTaskQueue() {
this.logger(`Adding ${this.data.size} items to the task queue`)
for (const item of this.data) {
if (item) {
this.logger(`Adding data ${item} to the task queue`)
this.data.delete(item)
try {
this.taskQueue.push({ data: item, tries: 0 } as ProcessTask<T>)
} catch (e) {
this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e)
this.data.add(item)
}
private async addToTaskQueue(task: ProcessTask<T>): Promise<void>
private async addToTaskQueue(item: T): Promise<void>
private async addToTaskQueue(itemOrTask: T | ProcessTask<T>): Promise<void> {
islathehut marked this conversation as resolved.
Show resolved Hide resolved
if (!itemOrTask) {
this.logger.error('Item/task is null or undefined, skipping!')
return
}

let task: ProcessTask<T>
if ((itemOrTask as ProcessTask<T>).taskId != null) {
task = itemOrTask as ProcessTask<T>
} else {
this.logger(`Creating new task for ${itemOrTask}`)
task = { data: itemOrTask as T, tries: 0, taskId: randomUUID() }
}

if (!this.isActive) {
this.logger(
'ProcessInChunksService is not active, adding tasks to the dead letter queue!\n\nWARNING: You must call "resume" on the ProcessInChunksService to process the dead letter queue!!!'
)
this.deadLetterQueue.push(task)
this.logger(`There are now ${this.deadLetterQueue.length} items in the dead letter queue`)
return
}

this.logger(`Adding task ${task.taskId} with data ${task.data} to the task queue`)
try {
const success = await this.pushToQueueAndRun(task)
if (!success) {
this.logger(`Will try to re-attempt task ${task.taskId} with data ${task.data}`)
await this.pushToQueueAndRun({ ...task, tries: task.tries + 1 })
}
} catch (e) {
this.logger.error(`Error occurred while adding new task ${task.taskId} with data ${task.data} to the queue`, e)
}
}

public async processOneItem(task: ProcessTask<T>) {
public async processOneItem(task: ProcessTask<T>): Promise<boolean> {
let success: boolean = false
try {
this.logger(`Processing task with data ${task.data}`)
this.logger(`Processing task ${task.taskId} with data ${task.data}`)
await this.processItem(task.data)
success = true
} catch (e) {
this.logger.error(`Processing task with data ${task.data} failed`, e)
if (task.tries + 1 < DEFAULT_NUM_TRIES) {
this.logger(`Will try to re-attempt task with data ${task.data}`)
this.taskQueue.push({ ...task, tries: task.tries + 1 })
}
this.logger.error(`Processing task ${task.taskId} with data ${task.data} failed`, e)
} finally {
this.logger(`Done attempting to process task with data ${task.data}`)
}
return success
}

public async process() {
this.logger(`Processing ${this.taskQueue.length()} items`)
this.taskQueue.resume()
private async pushToQueueAndRun(task: ProcessTask<T>): Promise<boolean> {
this.logger(
`Pushing task ${task.taskId} to queue, there will now be ${this.taskQueue.length() + 1} items in the queue`
)
const success = await this.taskQueue.push(task)
if (success) {
this.logger(`Task ${task.taskId} completed successfully`)
} else {
this.logger(`Task ${task.taskId} failed`)
}
return success
}

public stop() {
public resume() {
if (this.isActive) {
this.logger('Stopping initial dial')
this.isActive = false
this.taskQueue.pause()
this.logger('ProcessInChunksService is already active')
return
}

this.logger('Resuming ProcessInChunksService')
this.isActive = true
this.taskQueue.resume()
if (this.deadLetterQueue) {
this.logger(`Adding ${this.deadLetterQueue.length} tasks from the dead letter queue to the task queue`)
this.deadLetterQueue.forEach(task => this.addToTaskQueue(task))
this.deadLetterQueue = []
}
}

public pause() {
if (!this.isActive) {
this.logger('ProcessInChunksService is already paused')
return
}

this.logger('Pausing ProcessInChunksService')
this.isActive = false
this.deadLetterQueue = this.taskQueue.getQueue()
this.taskQueue.kill()
this.taskQueue.pause()
}
}
30 changes: 19 additions & 11 deletions packages/backend/src/nest/libp2p/process-in-chunks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import { ProcessInChunksService } from './process-in-chunks.service'
import waitForExpect from 'wait-for-expect'
import { TestModule } from '../common/test.module'
import { Test, TestingModule } from '@nestjs/testing'
import { sleep } from '../common/sleep'
describe('ProcessInChunks', () => {
let module: TestingModule
let processInChunks: ProcessInChunksService<string>
Expand All @@ -25,7 +26,6 @@ describe('ProcessInChunks', () => {
.mockResolvedValueOnce()
.mockRejectedValueOnce(new Error('Rejected 2'))
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
await processInChunks.process()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(6)
})
Expand All @@ -39,9 +39,7 @@ describe('ProcessInChunks', () => {
.mockResolvedValueOnce()
.mockRejectedValueOnce(new Error('Rejected 1'))
processInChunks.init(['a', 'b'], mockProcessItem)
await processInChunks.process()
processInChunks.updateData(['e', 'f'])
await processInChunks.process()
processInChunks.updateQueue(['e', 'f'])
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(5)
})
Expand All @@ -58,18 +56,28 @@ describe('ProcessInChunks', () => {
.mockRejectedValueOnce(new Error('Rejected 2'))
const chunkSize = 2
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize)
await processInChunks.process()
await sleep(10000)
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(2)
expect(mockProcessItem).toBeCalledTimes(6)
})
})

it.skip('does not process more data if stopped', async () => {
it('does not process more data if stopped', async () => {
const mockProcessItem = jest.fn(async () => {})
const processInChunks = new ProcessInChunksService()
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
processInChunks.stop()
await processInChunks.process()
processInChunks.init([], mockProcessItem)
processInChunks.pause()
processInChunks.updateQueue(['a', 'b', 'c', 'd'])
expect(mockProcessItem).not.toBeCalled()
})

it('processes tasks after resuming from pause', async () => {
const mockProcessItem = jest.fn(async () => {})
processInChunks.init([], mockProcessItem)
processInChunks.pause()
processInChunks.updateQueue(['a', 'b', 'c', 'd'])
processInChunks.resume()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(4)
})
})
})
Loading