From 67b5c274de3232514025cfcab121e7d9d939990f Mon Sep 17 00:00:00 2001 From: Isla Koenigsknecht Date: Thu, 11 Apr 2024 22:16:35 -0400 Subject: [PATCH 1/2] Better peer sorting and updated initial diallng (#2427) * Better peer sorting and updated initial diallng * Update tests --- packages/backend/package-lock.json | 33 +++++++++ packages/backend/package.json | 1 + .../connections-manager.service.tor.spec.ts | 7 +- .../connections-manager.service.ts | 2 +- .../src/nest/libp2p/libp2p.service.spec.ts | 8 ++- .../nest/libp2p/process-in-chunks.service.ts | 69 +++++++++++++------ .../src/nest/libp2p/process-in-chunks.spec.ts | 6 +- .../src/nest/local-db/local-db.service.ts | 20 +++++- packages/common/src/sortPeers.ts | 35 ++++++---- 9 files changed, 134 insertions(+), 47 deletions(-) diff --git a/packages/backend/package-lock.json b/packages/backend/package-lock.json index 72b65f7167..2697f66df2 100644 --- a/packages/backend/package-lock.json +++ b/packages/backend/package-lock.json @@ -26,6 +26,7 @@ "dotenv": "8.2.0", "events": "^3.2.0", "express": "^4.17.1", + "fastq": "^1.17.1", "get-port": "^5.1.1", "go-ipfs": "npm:mocked-go-ipfs@0.17.0", "http-server": "^0.12.3", @@ -10816,6 +10817,23 @@ "node": ">= 4.9.1" } }, + "node_modules/fastq": { + "version": "1.17.1", + "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.17.1.tgz", + "integrity": "sha512-sRVD3lWVIXWg6By68ZN7vho9a1pQcN/WBFaAAsDDFzlJjvoGx0P8z7V1t72grFJfJhu3YPZBuu25f7Kaw2jN1w==", + "dependencies": { + "reusify": "^1.0.4" + } + }, + "node_modules/fastq/node_modules/reusify": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", + "integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==", + "engines": { + "iojs": ">=1.0.0", + "node": ">=0.10.0" + } + }, "node_modules/fb-watchman": { "version": "2.0.1", "license": "Apache-2.0", @@ -30783,6 +30801,21 @@ "version": "1.0.16", "dev": true }, + "fastq": { + "version": "1.17.1", + "resolved": "https://registry.npmjs.org/fastq/-/fastq-1.17.1.tgz", + "integrity": "sha512-sRVD3lWVIXWg6By68ZN7vho9a1pQcN/WBFaAAsDDFzlJjvoGx0P8z7V1t72grFJfJhu3YPZBuu25f7Kaw2jN1w==", + "requires": { + "reusify": "^1.0.4" + }, + "dependencies": { + "reusify": { + "version": "1.0.4", + "resolved": "https://registry.npmjs.org/reusify/-/reusify-1.0.4.tgz", + "integrity": "sha512-U9nH88a3fc/ekCF1l0/UP1IosiuIjyTh7hBvXVMHYgVcfGvt897Xguj2UOLDeI5BG2m7/uwyaLVT6fbtCwTyzw==" + } + } + }, "fb-watchman": { "version": "2.0.1", "requires": { diff --git a/packages/backend/package.json b/packages/backend/package.json index 58e01cc76f..87312a5a6e 100644 --- a/packages/backend/package.json +++ b/packages/backend/package.json @@ -108,6 +108,7 @@ "dotenv": "8.2.0", "events": "^3.2.0", "express": "^4.17.1", + "fastq": "^1.17.1", "get-port": "^5.1.1", "go-ipfs": "npm:mocked-go-ipfs@0.17.0", "http-server": "^0.12.3", diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts index fb41bf4aa2..b56b6d121a 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts @@ -31,6 +31,7 @@ import waitForExpect from 'wait-for-expect' import { Libp2pEvents } from '../libp2p/libp2p.types' import { sleep } from '../common/sleep' import { createLibp2pAddress } from '@quiet/common' +import { lib } from 'crypto-js' jest.setTimeout(100_000) @@ -121,7 +122,7 @@ afterEach(async () => { }) describe('Connections manager', () => { - it('saves peer stats when peer has been disconnected', async () => { + it.only('saves peer stats when peer has been disconnected', async () => { class RemotePeerEventDetail { peerId: string @@ -205,8 +206,8 @@ describe('Connections manager', () => { await sleep(5000) // It looks LibP2P dials peers initially when it's started and // then IPFS service dials peers again when started, thus - // peersCount * 2 - expect(spyOnDial).toHaveBeenCalledTimes(peersCount * 2) + // peersCount-1 * 2 because we don't dial ourself (the first peer in the list) + expect(spyOnDial).toHaveBeenCalledTimes((peersCount - 1) * 2) // Temporary fix for hanging test - websocketOverTor doesn't have abortController await sleep(5000) }) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 07353b6fa0..f52de0f6bb 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -517,7 +517,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI agent: this.socksProxyAgent, localAddress: this.libp2pService.createLibp2pAddress(onionAddress, peerId.toString()), targetPort: this.ports.libp2pHiddenService, - peers: peers ?? [], + peers: peers ? peers.slice(1) : [], psk: Libp2pService.generateLibp2pPSK(community.psk).fullKey, } await this.libp2pService.createInstance(params) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts index 13ebb5f1ed..91d9d177c4 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts @@ -8,7 +8,7 @@ import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import validator from 'validator' import waitForExpect from 'wait-for-expect' -import { ProcessInChunksService } from './process-in-chunks.service' +import { DEFAULT_NUM_TRIES, ProcessInChunksService } from './process-in-chunks.service' describe('Libp2pService', () => { let module: TestingModule @@ -93,10 +93,12 @@ describe('Libp2pService', () => { await libp2pService.createInstance(params) expect(libp2pService.libp2pInstance).not.toBeNull() // @ts-expect-error processItem is private - const dialPeerSpy = jest.spyOn(processInChunks, 'processItem') + const processItemSpy = jest.spyOn(processInChunks, 'processItem') + const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial') libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) await waitForExpect(async () => { - expect(dialPeerSpy).toBeCalledTimes(1) + expect(processItemSpy).toBeCalledTimes(2 * DEFAULT_NUM_TRIES) + expect(dialSpy).toBeCalledTimes(1) }) }) }) diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index 2e7b30588b..e57e681227 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -1,12 +1,22 @@ import { EventEmitter } from 'events' +import fastq from 'fastq' +import type { queue, done } from 'fastq' + import Logger from '../common/logger' const DEFAULT_CHUNK_SIZE = 10 +export const DEFAULT_NUM_TRIES = 2 + +type ProcessTask = { + data: T + tries: number +} export class ProcessInChunksService extends EventEmitter { private isActive: boolean - private data: T[] + private data: Set = new Set() private chunkSize: number + private taskQueue: queue> private processItem: (arg: T) => Promise private readonly logger = Logger(ProcessInChunksService.name) constructor() { @@ -14,43 +24,62 @@ export class ProcessInChunksService extends EventEmitter { } public init(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { - this.data = data + this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`) this.processItem = processItem this.chunkSize = chunkSize + this.taskQueue = fastq(this, this.processOneItem, this.chunkSize) + this.updateData(data) + this.addToTaskQueue() } - updateData(items: T[]) { + public updateData(items: T[]) { this.logger(`Updating data with ${items.length} items`) - this.data = [...new Set(this.data.concat(items))] + this.taskQueue.pause() + items.forEach(item => this.data.add(item)) + this.addToTaskQueue() } - public async processOneItem() { - const toProcess = this.data.shift() - if (toProcess) { - try { - await this.processItem(toProcess) - } catch (e) { - this.logger(`Processing ${toProcess} failed, message:`, e.message) - } finally { - process.nextTick(async () => { - await this.processOneItem() - }) + private addToTaskQueue() { + this.logger(`Adding ${this.data.size} items to the task queue`) + for (const item of this.data) { + if (item) { + this.logger(`Adding data ${item} to the task queue`) + this.data.delete(item) + try { + this.taskQueue.push({ data: item, tries: 0 } as ProcessTask) + } catch (e) { + this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e) + this.data.add(item) + } } } } - public async process() { - this.logger(`Processing ${this.data.length} items`) - for (let i = 0; i < this.chunkSize; i++) { - // Do not wait for this promise as items should be processed simultineously - void this.processOneItem() + public async processOneItem(task: ProcessTask) { + try { + this.logger(`Processing task with data ${task.data}`) + await this.processItem(task.data) + } catch (e) { + this.logger.error(`Processing task with data ${task.data} failed`, e) + if (task.tries + 1 < DEFAULT_NUM_TRIES) { + this.logger(`Will try to re-attempt task with data ${task.data}`) + this.taskQueue.push({ ...task, tries: task.tries + 1 }) + } + } finally { + this.logger(`Done attempting to process task with data ${task.data}`) } } + public async process() { + this.logger(`Processing ${this.taskQueue.length} items`) + this.taskQueue.resume() + } + public stop() { if (this.isActive) { this.logger('Stopping initial dial') this.isActive = false + this.taskQueue.pause() } } } diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index 38979c1cf9..ce751afdf3 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -27,7 +27,7 @@ describe('ProcessInChunks', () => { processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) await processInChunks.process() await waitForExpect(() => { - expect(mockProcessItem).toBeCalledTimes(4) + expect(mockProcessItem).toBeCalledTimes(6) }) }) @@ -43,7 +43,7 @@ describe('ProcessInChunks', () => { processInChunks.updateData(['e', 'f']) await processInChunks.process() await waitForExpect(() => { - expect(mockProcessItem).toBeCalledTimes(4) + expect(mockProcessItem).toBeCalledTimes(5) }) }) @@ -60,7 +60,7 @@ describe('ProcessInChunks', () => { processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize) await processInChunks.process() await waitForExpect(() => { - expect(mockProcessItem).toBeCalledTimes(4) + expect(mockProcessItem).toBeCalledTimes(2) }) }) diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 6c85552ac8..fd36ed80d8 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -95,7 +95,21 @@ export class LocalDbService { } } - public async getSortedPeers(peers: string[] = []): Promise { + public async getSortedPeers( + peers?: string[] | undefined, + includeLocalPeerAddress: boolean = true + ): Promise { + if (!peers) { + const currentCommunity = await this.getCurrentCommunity() + if (!currentCommunity) { + throw new Error('No peers were provided and no community was found to extract peers from') + } + peers = currentCommunity.peerList + if (!peers) { + throw new Error('No peers provided and no peers found on current stored community') + } + } + const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} const stats: NetworkStats[] = Object.values(peersStats) const network = await this.getNetworkInfo() @@ -103,9 +117,9 @@ export class LocalDbService { if (network) { const localPeerAddress = createLibp2pAddress(network.hiddenService.onionAddress, network.peerId.id) this.logger('Local peer', localPeerAddress) - return filterAndSortPeers(peers, stats, localPeerAddress) + return filterAndSortPeers(peers, stats, localPeerAddress, includeLocalPeerAddress) } else { - return filterAndSortPeers(peers, stats) + return filterAndSortPeers(peers, stats, undefined, includeLocalPeerAddress) } } diff --git a/packages/common/src/sortPeers.ts b/packages/common/src/sortPeers.ts index 682111c1f2..634a79cfaf 100644 --- a/packages/common/src/sortPeers.ts +++ b/packages/common/src/sortPeers.ts @@ -1,5 +1,4 @@ import { type NetworkStats } from '@quiet/types' -import { isDefined } from './helpers' import { filterValidAddresses } from './libp2p' /** @@ -14,9 +13,11 @@ This is the very simple algorithm for evaluating the most wanted peers. export const filterAndSortPeers = ( peersAddresses: string[], stats: NetworkStats[], - localPeerAddress?: string + localPeerAddress?: string, + includeLocalPeerAddress: boolean = true ): string[] => { peersAddresses = filterValidAddresses(peersAddresses) + const currentlyConnected = [...stats].filter(peer => peer.connectionTime === 0) const lastSeenSorted = [...stats].sort((a, b) => { return b.lastSeen - a.lastSeen }) @@ -24,7 +25,7 @@ export const filterAndSortPeers = ( return b.connectionTime - a.connectionTime }) - const mostWantedPeers: NetworkStats[] = [] + const mostWantedPeers: NetworkStats[] = currentlyConnected for (let i = 0; i < stats.length; i++) { const peerOne = lastSeenSorted[i] @@ -39,22 +40,28 @@ export const filterAndSortPeers = ( } } - const peerList = mostWantedPeers.map(peerId => { - return peersAddresses.find(peerAddress => { + const peerSet: Set = new Set() + if (includeLocalPeerAddress && localPeerAddress) { + peerSet.add(localPeerAddress) + } + + mostWantedPeers.forEach(peer => { + const found = peersAddresses.find(peerAddress => { const id = peerAddress.split('/')[7] - if (id === peerId.peerId) { + if (id === peer.peerId) { peersAddresses.splice(peersAddresses.indexOf(peerAddress), 1) return true } }) + if (found && found !== '') { + peerSet.add(found) + } + }) + peersAddresses.forEach(peerAddress => { + if (!peerSet.has(peerAddress)) { + peerSet.add(peerAddress) + } }) - return [ - ...new Set([ - localPeerAddress, // Set local peer as first - ...peerList.concat(peersAddresses), - ]), - ] - .filter(address => address !== null && address !== '') - .filter(isDefined) + return [...peerSet] } From c18e97cae50c5f6cfe25820b33a80c739cc77431 Mon Sep 17 00:00:00 2001 From: Isla Koenigsknecht Date: Fri, 12 Apr 2024 08:19:43 -0400 Subject: [PATCH 2/2] Remove only (#2429) --- .../connections-manager/connections-manager.service.tor.spec.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts index b56b6d121a..98d717762b 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.tor.spec.ts @@ -122,7 +122,7 @@ afterEach(async () => { }) describe('Connections manager', () => { - it.only('saves peer stats when peer has been disconnected', async () => { + it('saves peer stats when peer has been disconnected', async () => { class RemotePeerEventDetail { peerId: string