diff --git a/packages/backend/package-lock.json b/packages/backend/package-lock.json index 72b65f7167..2697f66df2 100644 --- a/packages/backend/package-lock.json +++ b/packages/backend/package-lock.json @@ -26,6 +26,7 @@ "dotenv": "8.2.0", "events": "^3.2.0", "express": "^4.17.1", + "fastq": "^1.17.1", "get-port": "^5.1.1", "go-ipfs": "npm:mocked-go-ipfs@0.17.0", "http-server": "^0.12.3", @@ -10816,6 +10817,23 @@ "node": ">= 4.9.1" } }, + "node_modules/fastq": { + "version": "1.17.1", + "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.17.1.tgz", + "integrity": "sha512-sRVD3lWVIXWg6By68ZN7vho9a1pQcN/WBFaAAsDDFzlJjvoGx0P8z7V1t72grFJfJhu3YPZBuu25f7Kaw2jN1w==", + "dependencies": { + "reusify": "^1.0.4" + } + }, + "node_modules/fastq/node_modules/reusify": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", + "integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==", + "engines": { + "iojs": ">=1.0.0", + "node": ">=0.10.0" + } + }, "node_modules/fb-watchman": { "version": "2.0.1", "license": "Apache-2.0", @@ -30783,6 +30801,21 @@ "version": "1.0.16", "dev": true }, + "fastq": { + "version": "1.17.1", + "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.17.1.tgz", + "integrity": "sha512-sRVD3lWVIXWg6By68ZN7vho9a1pQcN/WBFaAAsDDFzlJjvoGx0P8z7V1t72grFJfJhu3YPZBuu25f7Kaw2jN1w==", + "requires": { + "reusify": "^1.0.4" + }, + "dependencies": { + "reusify": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", + "integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==" + } + } + }, "fb-watchman": { "version": "2.0.1", "requires": { diff --git a/packages/backend/package.json b/packages/backend/package.json index 58e01cc76f..87312a5a6e 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -108,6 +108,7 @@ "dotenv": "8.2.0", "events": "^3.2.0", "express": "^4.17.1", + "fastq": "^1.17.1", "get-port": "^5.1.1", "go-ipfs": "npm:mocked-go-ipfs@0.17.0", "http-server": "^0.12.3", diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 07353b6fa0..f52de0f6bb 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -517,7 +517,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI agent: this.socksProxyAgent, localAddress: this.libp2pService.createLibp2pAddress(onionAddress, peerId.toString()), targetPort: this.ports.libp2pHiddenService, - peers: peers ?? [], + peers: peers ? peers.slice(1) : [], psk: Libp2pService.generateLibp2pPSK(community.psk).fullKey, } await this.libp2pService.createInstance(params) diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index 2e7b30588b..c5ed9be966 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -1,12 +1,22 @@ import { EventEmitter } from 'events' +import fastq from 'fastq' +import type { queue, done } from 'fastq' + import Logger from '../common/logger' const DEFAULT_CHUNK_SIZE = 10 +const DEFAULT_NUM_TRIES = 2 + +type ProcessTask = { + data: T + tries: number +} export class ProcessInChunksService extends EventEmitter { private isActive: boolean - private data: T[] + private data: Set = new Set() private chunkSize: number + private taskQueue: queue> private processItem: (arg: T) => Promise private readonly logger = Logger(ProcessInChunksService.name) constructor() { @@ -14,43 +24,65 @@ export class ProcessInChunksService extends EventEmitter { } public init(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { - this.data = data + this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`) this.processItem = processItem this.chunkSize = chunkSize + this.taskQueue = fastq(this, this.processOneItem, this.chunkSize) + this.updateData(data) + this.addToTaskQueue() } - updateData(items: T[]) { + public updateData(items: T[]) { this.logger(`Updating data with ${items.length} items`) - this.data = [...new Set(this.data.concat(items))] + this.taskQueue.pause() + items.forEach(item => this.data.add(item)) + this.addToTaskQueue() } - public async processOneItem() { - const toProcess = this.data.shift() - if (toProcess) { - try { - await this.processItem(toProcess) - } catch (e) { - this.logger(`Processing ${toProcess} failed, message:`, e.message) - } finally { - process.nextTick(async () => { - await this.processOneItem() - }) + private addToTaskQueue() { + const maxChunkSize = Math.min(this.data.size, this.chunkSize) + let count = 0 + this.logger(`Adding ${maxChunkSize} items to the task queue`) + for (const item of this.data) { + if (item && count < maxChunkSize) { + this.logger(`Adding data ${item} to the task queue`) + this.data.delete(item) + try { + this.taskQueue.push({ data: item, tries: 0 } as ProcessTask) + count++ + } catch (e) { + this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e) + this.data.add(item) + } } } } - public async process() { - this.logger(`Processing ${this.data.length} items`) - for (let i = 0; i < this.chunkSize; i++) { - // Do not wait for this promise as items should be processed simultineously - void this.processOneItem() + public async processOneItem(task: ProcessTask) { + try { + this.logger(`Processing task with data ${task.data}`) + await this.processItem(task.data) + } catch (e) { + this.logger(`Processing task with data ${task.data} failed, message:`, e.message) + if (task.tries + 1 < DEFAULT_NUM_TRIES) { + this.logger(`Will try to re-attempt task with data ${task.data}`) + this.taskQueue.push({ ...task, tries: task.tries + 1 }) + } + } finally { + this.logger(`Done attempting to process task with data ${task.data}`) } } + public async process() { + this.logger(`Processing ${this.taskQueue.length} items`) + this.taskQueue.resume() + } + public stop() { if (this.isActive) { this.logger('Stopping initial dial') this.isActive = false + this.taskQueue.pause() } } } diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 6c85552ac8..fd36ed80d8 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -95,7 +95,21 @@ export class LocalDbService { } } - public async getSortedPeers(peers: string[] = []): Promise { + public async getSortedPeers( + peers?: string[] | undefined, + includeLocalPeerAddress: boolean = true + ): Promise { + if (!peers) { + const currentCommunity = await this.getCurrentCommunity() + if (!currentCommunity) { + throw new Error('No peers were provided and no community was found to extract peers from') + } + peers = currentCommunity.peerList + if (!peers) { + throw new Error('No peers provided and no peers found on current stored community') + } + } + const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} const stats: NetworkStats[] = Object.values(peersStats) const network = await this.getNetworkInfo() @@ -103,9 +117,9 @@ export class LocalDbService { if (network) { const localPeerAddress = createLibp2pAddress(network.hiddenService.onionAddress, network.peerId.id) this.logger('Local peer', localPeerAddress) - return filterAndSortPeers(peers, stats, localPeerAddress) + return filterAndSortPeers(peers, stats, localPeerAddress, includeLocalPeerAddress) } else { - return filterAndSortPeers(peers, stats) + return filterAndSortPeers(peers, stats, undefined, includeLocalPeerAddress) } } diff --git a/packages/common/src/sortPeers.ts b/packages/common/src/sortPeers.ts index 682111c1f2..634a79cfaf 100644 --- a/packages/common/src/sortPeers.ts +++ b/packages/common/src/sortPeers.ts @@ -1,5 +1,4 @@ import { type NetworkStats } from '@quiet/types' -import { isDefined } from './helpers' import { filterValidAddresses } from './libp2p' /** @@ -14,9 +13,11 @@ This is the very simple algorithm for evaluating the most wanted peers. export const filterAndSortPeers = ( peersAddresses: string[], stats: NetworkStats[], - localPeerAddress?: string + localPeerAddress?: string, + includeLocalPeerAddress: boolean = true ): string[] => { peersAddresses = filterValidAddresses(peersAddresses) + const currentlyConnected = [...stats].filter(peer => peer.connectionTime === 0) const lastSeenSorted = [...stats].sort((a, b) => { return b.lastSeen - a.lastSeen }) @@ -24,7 +25,7 @@ export const filterAndSortPeers = ( return b.connectionTime - a.connectionTime }) - const mostWantedPeers: NetworkStats[] = [] + const mostWantedPeers: NetworkStats[] = currentlyConnected for (let i = 0; i < stats.length; i++) { const peerOne = lastSeenSorted[i] @@ -39,22 +40,28 @@ export const filterAndSortPeers = ( } } - const peerList = mostWantedPeers.map(peerId => { - return peersAddresses.find(peerAddress => { + const peerSet: Set = new Set() + if (includeLocalPeerAddress && localPeerAddress) { + peerSet.add(localPeerAddress) + } + + mostWantedPeers.forEach(peer => { + const found = peersAddresses.find(peerAddress => { const id = peerAddress.split('/')[7] - if (id === peerId.peerId) { + if (id === peer.peerId) { peersAddresses.splice(peersAddresses.indexOf(peerAddress), 1) return true } }) + if (found && found !== '') { + peerSet.add(found) + } + }) + peersAddresses.forEach(peerAddress => { + if (!peerSet.has(peerAddress)) { + peerSet.add(peerAddress) + } }) - return [ - ...new Set([ - localPeerAddress, // Set local peer as first - ...peerList.concat(peersAddresses), - ]), - ] - .filter(address => address !== null && address !== '') - .filter(isDefined) + return [...peerSet] }