From c40444922c393d03237ab1bcdbc6db9aae4a1535 Mon Sep 17 00:00:00 2001 From: Emi Date: Mon, 6 Nov 2023 15:10:31 +0100 Subject: [PATCH 01/14] fix: getting list of sorted peers from localdb --- .../nest/connections-manager/connections-manager.service.ts | 2 ++ packages/backend/src/nest/local-db/local-db.service.ts | 5 ++--- packages/common/src/sortPeers.ts | 3 +++ 3 files changed, 7 insertions(+), 3 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index dcef3bdc41..4e4682214e 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -158,6 +158,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI console.log('launchCommunityFromStorage - community', community) if (community) { const sortedPeers = await this.localDbService.getSortedPeers(community.peers) + console.log('launchCommunityFromStorage - sorted peers', sortedPeers) if (sortedPeers.length > 0) { community.peers = sortedPeers } @@ -347,6 +348,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI if (!peers || peers.length === 0) { peers = [this.libp2pService.createLibp2pAddress(onionAddress, _peerId.toString())] } + console.log(`Launching community ${payload.id}, peers for libp2p: ${peers}`) const params: Libp2pNodeParams = { peerId: _peerId, diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 96e6402f81..2488dcda0a 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -73,9 +73,8 @@ export class LocalDbService { public async getSortedPeers(peers: string[] = []): Promise { const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} - const peersAddresses: string[] = [...new Set(Object.keys(peersStats).concat(peers))] + // const peersAddresses: string[] = [...new Set(Object.keys(peersStats).concat(peers))] const stats: NetworkStats[] = Object.values(peersStats) - const sortedPeers = sortPeers(peersAddresses, stats) - return sortedPeers + return sortPeers(peers, stats) } } diff --git a/packages/common/src/sortPeers.ts b/packages/common/src/sortPeers.ts index 33d98c06c7..df687d0a40 100644 --- a/packages/common/src/sortPeers.ts +++ b/packages/common/src/sortPeers.ts @@ -9,6 +9,9 @@ This is the very simple algorithm for evaluating the most wanted peers. 4. We end up with mix of last seen and most uptime descending array of peers, the it is enchanced to libp2p address. */ export const sortPeers = (peersAddresses: string[], stats: NetworkStats[]): string[] => { + peersAddresses = peersAddresses.filter(add => + add.match(/^\/dns4\/[a-z0-9]{56}.onion\/tcp\/80\/ws\/p2p\/[a-zA-Z0-9]{46}$/g) + ) const lastSeenSorted = [...stats].sort((a, b) => { return b.lastSeen - a.lastSeen }) From 6279c4173cc3404830d76363b801a21fdc7500a1 Mon Sep 17 00:00:00 2001 From: Emi Date: Thu, 9 Nov 2023 13:54:37 +0100 Subject: [PATCH 02/14] feat: dial peers after replicating csrs; update peersList in localDB --- .../connections-manager.service.ts | 5 ++-- .../backend/src/nest/libp2p/libp2p.service.ts | 28 ++++++++++++++++++- .../backend/src/nest/libp2p/libp2p.types.ts | 1 + .../src/nest/storage/storage.service.ts | 14 ++++++++-- 4 files changed, 43 insertions(+), 5 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 4e4682214e..d1352a81c3 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -155,7 +155,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.logger('launchCommunityFromStorage') const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY) - console.log('launchCommunityFromStorage - community', community) + console.log('launchCommunityFromStorage - community peers', community?.peers) if (community) { const sortedPeers = await this.localDbService.getSortedPeers(community.peers) console.log('launchCommunityFromStorage - sorted peers', sortedPeers) @@ -366,7 +366,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.serverIoProvider.io.emit(SocketActionTypes.PEER_CONNECTED, payload) }) this.libp2pService.on(Libp2pEvents.PEER_DISCONNECTED, async (payload: NetworkDataPayload) => { - console.log(' this.libp2pService.on(Libp2pEvents.PEER_DISCONNECTED') + console.log(' this.libp2pService.on(Libp2pEvents.PEER_DISCONNECTED', payload.peer) const peerPrevStats = await this.localDbService.find(LocalDBKeys.PEERS, payload.peer) const prev = peerPrevStats?.connectionTime || 0 @@ -573,6 +573,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI StorageEvents.REPLICATED_CSR, async (payload: { csrs: string[]; certificates: string[]; id: string }) => { console.log(`On ${StorageEvents.REPLICATED_CSR}`) + this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, payload.csrs) this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs }) this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload) } diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 311b8f2b47..59eacdbe01 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -18,7 +18,8 @@ import { ServerIoProviderTypes } from '../types' import Logger from '../common/logger' import { webSockets } from '../websocketOverTor' import { all } from '../websocketOverTor/filters' -import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common' +import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common' +import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity' @Injectable() export class Libp2pService extends EventEmitter { @@ -33,6 +34,7 @@ export class Libp2pService extends EventEmitter { } private dialPeer = async (peerAddress: string) => { + console.log('------ dialing peer ', peerAddress) await this.libp2pInstance?.dial(multiaddr(peerAddress)) } @@ -101,8 +103,26 @@ export class Libp2pService extends EventEmitter { this.logger.error('libp2pInstance was not created') throw new Error('libp2pInstance was not created') } + this.on(Libp2pEvents.DIAL_PEERS, async (csrs: string[]) => { + console.log('DIALING PEERS, csrs', csrs) + const csrsPeersPromises = csrs.map(async csr => { + const parsedCsr = await loadCSR(csr) + const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId) + const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName) + + if (peerId && onionAddress && !this.connectedPeers.get(peerId)) { + return this.createLibp2pAddress(onionAddress, peerId) + } + }) + + const csrsPeers = await Promise.all(csrsPeersPromises) + console.log('DIALING PEER, addresses', csrsPeers) + const dialInChunks = new ProcessInChunks(csrsPeers.filter(isDefined), this.dialPeer) + await dialInChunks.process() + }) this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`) + console.log('BOOTSTRAPPING WITH', peers) this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P) const dialInChunks = new ProcessInChunks(peers, this.dialPeer) @@ -123,6 +143,12 @@ export class Libp2pService extends EventEmitter { this.emit(Libp2pEvents.PEER_CONNECTED, { peers: [remotePeerId], }) + const latency = await this.libp2pInstance?.ping(peer.detail.remoteAddr) + console.log(`- - - - peer ${remotePeerId} latency: ${latency}`) + console.log( + 'PEER STORE', + this.libp2pInstance?.peerStore.forEach(p => console.log('metadata for ', p.id, p.addresses, p.metadata)) + ) }) this.libp2pInstance.addEventListener('peer:disconnect', async peer => { diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index d32cc0230f..5d764aa85a 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -5,6 +5,7 @@ export enum Libp2pEvents { PEER_CONNECTED = 'peerConnected', PEER_DISCONNECTED = 'peerDisconnected', NETWORK_STATS = 'networkStats', + DIAL_PEERS = 'dialPeers', } export interface Libp2pNodeParams { diff --git a/packages/backend/src/nest/storage/storage.service.ts b/packages/backend/src/nest/storage/storage.service.ts index 9a46b8ee23..a17b961a9d 100644 --- a/packages/backend/src/nest/storage/storage.service.ts +++ b/packages/backend/src/nest/storage/storage.service.ts @@ -28,6 +28,7 @@ import { ConnectionProcessInfo, DeleteFilesFromChannelSocketPayload, FileMetadata, + InitCommunityPayload, NoCryptoEngineError, PublicChannel, PushNotificationPayload, @@ -337,8 +338,17 @@ export class StorageService extends EventEmitter { const registeredUsers = this.getAllRegisteredUsers() const peers = [...new Set(await getUsersAddresses(allUsers.concat(registeredUsers)))] console.log('updatePeersList, peers count:', peers.length) - const community = await this.localDbService.get(LocalDBKeys.COMMUNITY) - this.emit(StorageEvents.UPDATE_PEERS_LIST, { communityId: community.id, peerList: peers }) + console.log('STORATE peers', peers) + + const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY) + const sortedPeers = await this.localDbService.getSortedPeers(peers) + console.log('STORAGE sortedPeers', sortedPeers) + if (sortedPeers.length > 0) { + community.peers = sortedPeers + await this.localDbService.put(LocalDBKeys.COMMUNITY, community) + } + + this.emit(StorageEvents.UPDATE_PEERS_LIST, { communityId: community.id, peerList: sortedPeers }) } public async loadAllCertificates() { From d3e9009c6f262f399b876adb4d9eadfc90ded5f2 Mon Sep 17 00:00:00 2001 From: Emi Date: Fri, 10 Nov 2023 12:19:24 +0100 Subject: [PATCH 03/14] chore: remove unused code; move libp2p closing logic to libp2p service --- .../connections-manager.service.ts | 16 +++++++++------- .../src/nest/libp2p/libp2p.service.spec.ts | 4 ++-- .../backend/src/nest/libp2p/libp2p.service.ts | 14 ++++---------- 3 files changed, 15 insertions(+), 19 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index d1352a81c3..a809ad5176 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -192,9 +192,9 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.logger('Closing local storage') await this.localDbService.close() } - if (this.libp2pService?.libp2pInstance) { + if (this.libp2pService) { this.logger('Stopping libp2p') - await this.libp2pService.libp2pInstance.stop() + await this.libp2pService.close() } } @@ -209,18 +209,20 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI public async leaveCommunity() { this.tor.resetHiddenServices() - this.serverIoProvider.io.close() + this.closeSocket() await this.localDbService.purge() await this.closeAllServices({ saveTor: true }) await this.purgeData() + await this.resetState() + await this.localDbService.open() + await this.socketService.init() + } + + async resetState() { this.communityId = '' this.ports = { ...this.ports, libp2pHiddenService: await getPort() } - this.libp2pService.libp2pInstance = null - this.libp2pService.connectedPeers = new Map() this.communityState = ServiceState.DEFAULT this.registrarState = ServiceState.DEFAULT - await this.localDbService.open() - await this.socketService.init() } public async purgeData() { diff --git a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts index 6775f9a2e2..d8482ca9d2 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts @@ -30,9 +30,9 @@ describe('Libp2pService', () => { expect(libp2pService?.libp2pInstance?.peerId).toBe(params.peerId) }) - it('destory instance libp2p', async () => { + it('close libp2p service', async () => { await libp2pService.createInstance(params) - await libp2pService.destroyInstance() + await libp2pService.close() expect(libp2pService.libp2pInstance).toBeNull() }) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 59eacdbe01..b2fcdc2918 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -185,16 +185,10 @@ export class Libp2pService extends EventEmitter { this.logger(`Initialized libp2p for peer ${peerId.toString()}`) } - public async destroyInstance(): Promise { - this.libp2pInstance?.removeEventListener('peer:discovery') - this.libp2pInstance?.removeEventListener('peer:connect') - this.libp2pInstance?.removeEventListener('peer:disconnect') - try { - await this.libp2pInstance?.stop() - } catch (error) { - this.logger.error(error) - } - + public async close(): Promise { + this.logger('Closing libp2p service') + await this.libp2pInstance?.stop() this.libp2pInstance = null + this.connectedPeers = new Map() } } From 1b5aae87cd0862603cd0a74fce3dc434666bb169 Mon Sep 17 00:00:00 2001 From: Emi Date: Fri, 10 Nov 2023 13:51:09 +0100 Subject: [PATCH 04/14] fix: dial only peers that haven't been dialed before --- .../connections-manager.service.ts | 2 +- .../backend/src/nest/libp2p/libp2p.service.ts | 23 ++++++++++--------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index a809ad5176..3c4e9b3f65 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -226,7 +226,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI } public async purgeData() { - console.log('removing data') + this.logger('Purging community data') const dirsToRemove = fs .readdirSync(this.quietDir) .filter( diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index b2fcdc2918..62ad9a9d48 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -25,6 +25,7 @@ import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity' export class Libp2pService extends EventEmitter { public libp2pInstance: Libp2p | null public connectedPeers: Map = new Map() + public dialedPeers: Set = new Set() private readonly logger = Logger(Libp2pService.name) constructor( @Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes, @@ -35,6 +36,11 @@ export class Libp2pService extends EventEmitter { private dialPeer = async (peerAddress: string) => { console.log('------ dialing peer ', peerAddress) + if (this.dialedPeers.has(peerAddress)) { + console.log(`Peer ${peerAddress} already dialed, not dialing`) // TODO: remove log + return + } + this.dialedPeers.add(peerAddress) await this.libp2pInstance?.dial(multiaddr(peerAddress)) } @@ -104,25 +110,25 @@ export class Libp2pService extends EventEmitter { throw new Error('libp2pInstance was not created') } this.on(Libp2pEvents.DIAL_PEERS, async (csrs: string[]) => { - console.log('DIALING PEERS, csrs', csrs) + console.log('DIALING PEERS') const csrsPeersPromises = csrs.map(async csr => { const parsedCsr = await loadCSR(csr) const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId) const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName) + if (!peerId || !onionAddress) return - if (peerId && onionAddress && !this.connectedPeers.get(peerId)) { - return this.createLibp2pAddress(onionAddress, peerId) + const peerAddress = this.createLibp2pAddress(onionAddress, peerId) + if (this.dialedPeers.has(peerAddress)) { + return peerAddress } }) const csrsPeers = await Promise.all(csrsPeersPromises) - console.log('DIALING PEER, addresses', csrsPeers) const dialInChunks = new ProcessInChunks(csrsPeers.filter(isDefined), this.dialPeer) await dialInChunks.process() }) this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`) - console.log('BOOTSTRAPPING WITH', peers) this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P) const dialInChunks = new ProcessInChunks(peers, this.dialPeer) @@ -143,12 +149,6 @@ export class Libp2pService extends EventEmitter { this.emit(Libp2pEvents.PEER_CONNECTED, { peers: [remotePeerId], }) - const latency = await this.libp2pInstance?.ping(peer.detail.remoteAddr) - console.log(`- - - - peer ${remotePeerId} latency: ${latency}`) - console.log( - 'PEER STORE', - this.libp2pInstance?.peerStore.forEach(p => console.log('metadata for ', p.id, p.addresses, p.metadata)) - ) }) this.libp2pInstance.addEventListener('peer:disconnect', async peer => { @@ -190,5 +190,6 @@ export class Libp2pService extends EventEmitter { await this.libp2pInstance?.stop() this.libp2pInstance = null this.connectedPeers = new Map() + this.dialedPeers = new Set() } } From 038a5799772b9b8d8fd91e7e87f3c24175182cc5 Mon Sep 17 00:00:00 2001 From: Emi Date: Thu, 16 Nov 2023 11:45:15 +0100 Subject: [PATCH 05/14] chore: add more logs to libp2p service --- .../connections-manager.service.ts | 1 + .../backend/src/nest/libp2p/libp2p.service.ts | 43 ++++++++++++++----- .../backend/src/nest/libp2p/libp2p.types.ts | 1 + .../src/nest/local-db/local-db.service.ts | 1 - 4 files changed, 35 insertions(+), 11 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 05973223e7..8555f8ad2c 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -589,6 +589,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_FETCH_ALL_DIRECT_MESSAGES, payload) }) this.storageService.on(StorageEvents.UPDATE_PEERS_LIST, (payload: StorePeerListPayload) => { + this.libp2pService.emit(Libp2pEvents.UPDATE_KNOWN_PEERS_LIST, payload.peerList) this.serverIoProvider.io.emit(SocketActionTypes.PEER_LIST, payload) }) this.storageService.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => { diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index dccaba21be..e03e30b6bf 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -23,6 +23,7 @@ import { preSharedKey } from 'libp2p/pnet' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import crypto from 'crypto' +import { peerIdFromString } from '@libp2p/peer-id' const KEY_LENGTH = 32 export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' @@ -32,6 +33,7 @@ export class Libp2pService extends EventEmitter { public libp2pInstance: Libp2p | null public connectedPeers: Map = new Map() public dialedPeers: Set = new Set() + // public knownPeers: Set = new Set() private readonly logger = Logger(Libp2pService.name) constructor( @Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes, @@ -87,10 +89,11 @@ export class Libp2pService extends EventEmitter { libp2p = await createLibp2p({ start: false, connectionManager: { - minConnections: 3, - maxConnections: 8, + minConnections: 3, // TODO: increase? + maxConnections: 8, // TODO: increase? dialTimeout: 120_000, maxParallelDials: 10, + autoDial: true, // It's a default but let's set it to have explicit information }, peerId: params.peerId, addresses: { @@ -131,11 +134,24 @@ export class Libp2pService extends EventEmitter { return libp2p } + private async addPeersToPeerBook(addresses: string[]) { + for (const address of addresses) { + const peerId = multiaddr(address).getPeerId() + console.log('PEER ID', peerId) + if (!peerId) return + console.log('PEER ID adding to peer book', peerId) + // @ts-expect-error + await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)]) + console.log('ALL', await this.libp2pInstance?.peerStore.all()) + } + } + private async afterCreation(peers: string[], peerId: PeerId) { if (!this.libp2pInstance) { this.logger.error('libp2pInstance was not created') throw new Error('libp2pInstance was not created') } + this.logger(`Local peerId: ${peerId.toString()}`) this.on(Libp2pEvents.DIAL_PEERS, async (csrs: string[]) => { console.log('DIALING PEERS') const csrsPeersPromises = csrs.map(async csr => { @@ -151,7 +167,8 @@ export class Libp2pService extends EventEmitter { }) const csrsPeers = await Promise.all(csrsPeersPromises) - const dialInChunks = new ProcessInChunks(csrsPeers.filter(isDefined), this.dialPeer) + const addresses = csrsPeers.filter(isDefined) + const dialInChunks = new ProcessInChunks(addresses, this.dialPeer) await dialInChunks.process() }) @@ -165,27 +182,32 @@ export class Libp2pService extends EventEmitter { this.libp2pInstance.addEventListener('peer:connect', async peer => { const remotePeerId = peer.detail.remotePeer.toString() - this.logger(`${peerId.toString()} connected to ${remotePeerId}`) + const localPeerId = peerId.toString() + this.logger(`${localPeerId} connected to ${remotePeerId}`) - // Stop dialing as soon as we connect to a peer - dialInChunks.stop() + //// Stop dialing as soon as we connect to a peer + //dialInChunks.stop() this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf()) - this.logger(`${this.connectedPeers.size} connected peers`) + this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) + this.logger(`${localPeerId} has ${this.libp2pInstance?.getConnections().length} open connections`) this.emit(Libp2pEvents.PEER_CONNECTED, { peers: [remotePeerId], }) + const latency = await this.libp2pInstance?.ping(peer.detail.remoteAddr) + this.logger(`${localPeerId} ping to ${remotePeerId}. Latency: ${latency}`) }) this.libp2pInstance.addEventListener('peer:disconnect', async peer => { const remotePeerId = peer.detail.remotePeer.toString() - this.logger(`${peerId.toString()} disconnected from ${remotePeerId}`) + const localPeerId = peerId.toString() + this.logger(`${localPeerId} disconnected from ${remotePeerId}`) if (!this.libp2pInstance) { this.logger.error('libp2pInstance was not created') throw new Error('libp2pInstance was not created') } - this.logger(`${this.libp2pInstance.getConnections().length} open connections`) + this.logger(`${localPeerId} has ${this.libp2pInstance.getConnections().length} open connections`) const connectionStartTime = this.connectedPeers.get(remotePeerId) if (!connectionStartTime) { @@ -198,7 +220,7 @@ export class Libp2pService extends EventEmitter { const connectionDuration: number = connectionEndTime - connectionStartTime this.connectedPeers.delete(remotePeerId) - this.logger(`${this.connectedPeers.size} connected peers`) + this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) this.emit(Libp2pEvents.PEER_DISCONNECTED, { peer: remotePeerId, @@ -218,5 +240,6 @@ export class Libp2pService extends EventEmitter { this.libp2pInstance = null this.connectedPeers = new Map() this.dialedPeers = new Set() + // this.knownPeers = new Set() } } diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index 1f9d8c9fb7..22205386f5 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -6,6 +6,7 @@ export enum Libp2pEvents { PEER_DISCONNECTED = 'peerDisconnected', NETWORK_STATS = 'networkStats', DIAL_PEERS = 'dialPeers', + UPDATE_KNOWN_PEERS_LIST = 'updateKnownPeersList', } export interface Libp2pNodeParams { diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 2488dcda0a..35ac08fcfb 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -73,7 +73,6 @@ export class LocalDbService { public async getSortedPeers(peers: string[] = []): Promise { const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} - // const peersAddresses: string[] = [...new Set(Object.keys(peersStats).concat(peers))] const stats: NetworkStats[] = Object.values(peersStats) return sortPeers(peers, stats) } From 591c4e5383ad7a2984968b7df283063f8ac5706e Mon Sep 17 00:00:00 2001 From: Emi Date: Thu, 16 Nov 2023 16:00:10 +0100 Subject: [PATCH 06/14] chore: make ProcessInChunks a nest service --- packages/backend/src/nest/common/utils.ts | 17 +++++- .../connections-manager.service.ts | 4 +- packages/backend/src/nest/const.ts | 2 + .../backend/src/nest/libp2p/libp2p.module.ts | 3 +- .../src/nest/libp2p/libp2p.service.spec.ts | 47 ++++++++++++++- .../backend/src/nest/libp2p/libp2p.service.ts | 60 +++++++------------ ...chunks.ts => process-in-chunks.service.ts} | 18 ++++-- .../src/nest/libp2p/process-in-chunks.spec.ts | 8 ++- 8 files changed, 107 insertions(+), 52 deletions(-) rename packages/backend/src/nest/libp2p/{process-in-chunks.ts => process-in-chunks.service.ts} (73%) diff --git a/packages/backend/src/nest/common/utils.ts b/packages/backend/src/nest/common/utils.ts index eeff2ad29c..6b1b445486 100644 --- a/packages/backend/src/nest/common/utils.ts +++ b/packages/backend/src/nest/common/utils.ts @@ -11,8 +11,9 @@ import { type PermsData } from '@quiet/types' import { TestConfig } from '../const' import logger from './logger' import { Libp2pNodeParams } from '../libp2p/libp2p.types' -import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common' +import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common' import { Libp2pService } from '../libp2p/libp2p.service' +import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity' const log = logger('test') @@ -153,6 +154,20 @@ export const getUsersAddresses = async (users: UserData[]): Promise => return await Promise.all(peers) } +export const getLibp2pAddressesFromCsrs = async (csrs: string[]): Promise => { + const addresses = await Promise.all( + csrs.map(async csr => { + const parsedCsr = await loadCSR(csr) + const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId) + const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName) + if (!peerId || !onionAddress) return + + return createLibp2pAddress(onionAddress, peerId) + }) + ) + return addresses.filter(isDefined) +} + /** * Compares given numbers * diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 8555f8ad2c..bacdbc85b3 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -8,7 +8,7 @@ import { setEngine, CryptoEngine } from 'pkijs' import { EventEmitter } from 'events' import getPort from 'get-port' import PeerId from 'peer-id' -import { removeFilesFromDir } from '../common/utils' +import { getLibp2pAddressesFromCsrs, removeFilesFromDir } from '../common/utils' import { AskForMessagesPayload, @@ -606,7 +606,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI StorageEvents.REPLICATED_CSR, async (payload: { csrs: string[]; certificates: string[]; id: string }) => { console.log(`On ${StorageEvents.REPLICATED_CSR}`) - this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, payload.csrs) + this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs)) console.log(`Storage - ${StorageEvents.REPLICATED_CSR}`) this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs }) this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload) diff --git a/packages/backend/src/nest/const.ts b/packages/backend/src/nest/const.ts index f4c86c3a35..80cd62656c 100644 --- a/packages/backend/src/nest/const.ts +++ b/packages/backend/src/nest/const.ts @@ -28,6 +28,8 @@ export const IPFS_REPO_PATCH = 'ipfsRepoPath' export const CONFIG_OPTIONS = 'configOptions' export const SERVER_IO_PROVIDER = 'serverIoProvider' +export const PROCESS_IN_CHUNKS_PROVIDER = 'processInChunksProvider' + export const EXPRESS_PROVIDER = 'expressProvider' export const LEVEL_DB = 'levelDb' diff --git a/packages/backend/src/nest/libp2p/libp2p.module.ts b/packages/backend/src/nest/libp2p/libp2p.module.ts index bdc20e965c..d75edc799e 100644 --- a/packages/backend/src/nest/libp2p/libp2p.module.ts +++ b/packages/backend/src/nest/libp2p/libp2p.module.ts @@ -1,10 +1,11 @@ import { Module } from '@nestjs/common' import { SocketModule } from '../socket/socket.module' import { Libp2pService } from './libp2p.service' +import { ProcessInChunksService } from './process-in-chunks.service' @Module({ imports: [SocketModule], - providers: [Libp2pService], + providers: [Libp2pService, ProcessInChunksService], exports: [Libp2pService], }) export class Libp2pModule {} diff --git a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts index 85f346d9b0..13ebb5f1ed 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts @@ -1,16 +1,20 @@ +import { jest } from '@jest/globals' import { Test, TestingModule } from '@nestjs/testing' import { TestModule } from '../common/test.module' -import { libp2pInstanceParams } from '../common/utils' +import { createPeerId, libp2pInstanceParams } from '../common/utils' import { Libp2pModule } from './libp2p.module' import { LIBP2P_PSK_METADATA, Libp2pService } from './libp2p.service' -import { Libp2pNodeParams } from './libp2p.types' +import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import validator from 'validator' +import waitForExpect from 'wait-for-expect' +import { ProcessInChunksService } from './process-in-chunks.service' describe('Libp2pService', () => { let module: TestingModule let libp2pService: Libp2pService let params: Libp2pNodeParams + let processInChunks: ProcessInChunksService beforeAll(async () => { module = await Test.createTestingModule({ @@ -18,6 +22,7 @@ describe('Libp2pService', () => { }).compile() libp2pService = await module.resolve(Libp2pService) + processInChunks = await module.resolve(ProcessInChunksService) params = await libp2pInstanceParams() }) @@ -38,7 +43,7 @@ describe('Libp2pService', () => { expect(libp2pService.libp2pInstance).toBeNull() }) - it('creates libp2p address with proper ws type (%s)', async () => { + it('creates libp2p address', async () => { const libp2pAddress = libp2pService.createLibp2pAddress(params.localAddress, params.peerId.toString()) expect(libp2pAddress).toStrictEqual(`/dns4/${params.localAddress}.onion/tcp/80/ws/p2p/${params.peerId.toString()}`) }) @@ -58,4 +63,40 @@ describe('Libp2pService', () => { const expectedFullKeyString = LIBP2P_PSK_METADATA + uint8ArrayToString(generatedPskBuffer, 'base16') expect(uint8ArrayToString(generatedKey.fullKey)).toEqual(expectedFullKeyString) }) + + it(`Starts dialing peers on '${Libp2pEvents.DIAL_PEERS}' event`, async () => { + const peerId1 = await createPeerId() + const peerId2 = await createPeerId() + const addresses = [ + libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()), + libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), + ] + await libp2pService.createInstance(params) + // @ts-expect-error processItem is private + const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem') + expect(libp2pService.libp2pInstance).not.toBeNull() + libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { + expect(spyOnProcessItem).toBeCalledTimes(addresses.length) + }) + }) + + it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => { + const peerId1 = await createPeerId() + const peerId2 = await createPeerId() + const alreadyDialedAddress = libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()) + libp2pService.dialedPeers.add(alreadyDialedAddress) + const addresses = [ + alreadyDialedAddress, + libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), + ] + await libp2pService.createInstance(params) + expect(libp2pService.libp2pInstance).not.toBeNull() + // @ts-expect-error processItem is private + const dialPeerSpy = jest.spyOn(processInChunks, 'processItem') + libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { + expect(dialPeerSpy).toBeCalledTimes(1) + }) + }) }) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index e03e30b6bf..80c7a6eaeb 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -9,7 +9,7 @@ import { createServer } from 'it-ws' import { DateTime } from 'luxon' import { EventEmitter } from 'events' import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' -import { ProcessInChunks } from './process-in-chunks' +import { ProcessInChunksService } from './process-in-chunks.service' import { multiaddr } from '@multiformats/multiaddr' import { ConnectionProcessInfo, PeerId, SocketActionTypes } from '@quiet/types' import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const' @@ -33,17 +33,17 @@ export class Libp2pService extends EventEmitter { public libp2pInstance: Libp2p | null public connectedPeers: Map = new Map() public dialedPeers: Set = new Set() - // public knownPeers: Set = new Set() + // public processInChunksService: ProcessInChunks private readonly logger = Logger(Libp2pService.name) constructor( @Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes, - @Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent + @Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent, + private readonly processInChunksService: ProcessInChunksService ) { super() } private dialPeer = async (peerAddress: string) => { - console.log('------ dialing peer ', peerAddress) if (this.dialedPeers.has(peerAddress)) { console.log(`Peer ${peerAddress} already dialed, not dialing`) // TODO: remove log return @@ -134,47 +134,32 @@ export class Libp2pService extends EventEmitter { return libp2p } - private async addPeersToPeerBook(addresses: string[]) { - for (const address of addresses) { - const peerId = multiaddr(address).getPeerId() - console.log('PEER ID', peerId) - if (!peerId) return - console.log('PEER ID adding to peer book', peerId) - // @ts-expect-error - await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)]) - console.log('ALL', await this.libp2pInstance?.peerStore.all()) - } - } + // private async addPeersToPeerBook(addresses: string[]) { + // for (const address of addresses) { + // const peerId = multiaddr(address).getPeerId() + // if (!peerId) return + // // @ts-expect-error + // await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)]) + // } + // } private async afterCreation(peers: string[], peerId: PeerId) { if (!this.libp2pInstance) { this.logger.error('libp2pInstance was not created') throw new Error('libp2pInstance was not created') } - this.logger(`Local peerId: ${peerId.toString()}`) - this.on(Libp2pEvents.DIAL_PEERS, async (csrs: string[]) => { - console.log('DIALING PEERS') - const csrsPeersPromises = csrs.map(async csr => { - const parsedCsr = await loadCSR(csr) - const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId) - const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName) - if (!peerId || !onionAddress) return - - const peerAddress = this.createLibp2pAddress(onionAddress, peerId) - if (this.dialedPeers.has(peerAddress)) { - return peerAddress - } - }) - const csrsPeers = await Promise.all(csrsPeersPromises) - const addresses = csrsPeers.filter(isDefined) - const dialInChunks = new ProcessInChunks(addresses, this.dialPeer) - await dialInChunks.process() + this.logger(`Local peerId: ${peerId.toString()}`) + this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => { + const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress)) + console.log('DIALING PEERS', nonDialedAddresses.length, 'addresses') + this.processInChunksService.updateData(nonDialedAddresses) + await this.processInChunksService.process() }) this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`) this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P) - const dialInChunks = new ProcessInChunks(peers, this.dialPeer) + this.processInChunksService.init(peers, this.dialPeer) this.libp2pInstance.addEventListener('peer:discovery', peer => { this.logger(`${peerId.toString()} discovered ${peer.detail.id}`) @@ -185,8 +170,8 @@ export class Libp2pService extends EventEmitter { const localPeerId = peerId.toString() this.logger(`${localPeerId} connected to ${remotePeerId}`) - //// Stop dialing as soon as we connect to a peer - //dialInChunks.stop() + // Stop dialing as soon as we connect to a peer + this.processInChunksService.stop() this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf()) this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) @@ -229,7 +214,7 @@ export class Libp2pService extends EventEmitter { }) }) - await dialInChunks.process() + await this.processInChunksService.process() this.logger(`Initialized libp2p for peer ${peerId.toString()}`) } @@ -240,6 +225,5 @@ export class Libp2pService extends EventEmitter { this.libp2pInstance = null this.connectedPeers = new Map() this.dialedPeers = new Set() - // this.knownPeers = new Set() } } diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts similarity index 73% rename from packages/backend/src/nest/libp2p/process-in-chunks.ts rename to packages/backend/src/nest/libp2p/process-in-chunks.service.ts index b3e11caf61..714ffa7c0c 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -1,18 +1,27 @@ +import { EventEmitter } from 'events' import Logger from '../common/logger' const DEFAULT_CHUNK_SIZE = 10 -export class ProcessInChunks { +export class ProcessInChunksService extends EventEmitter { private isActive: boolean private data: T[] private chunkSize: number private processItem: (arg: T) => Promise - private readonly logger = Logger(ProcessInChunks.name) - constructor(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { + private readonly logger = Logger(ProcessInChunksService.name) + constructor() { + super() + } + + public init(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { this.data = data this.processItem = processItem this.chunkSize = chunkSize - this.isActive = true + } + + updateData(items: T[]) { + console.log('Updating data, previous', this.data, 'adding:', items) + this.data = [...new Set(this.data.concat(items))] } public async processOneItem() { @@ -32,6 +41,7 @@ export class ProcessInChunks { } public async process() { + this.isActive = true this.logger(`Processing ${Math.min(this.chunkSize, this.data.length)} items`) for (let i = 0; i < this.chunkSize; i++) { // Do not wait for this promise as items should be processed simultineously diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index abd1770f67..343b278d8f 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -1,5 +1,5 @@ import { jest, describe, it, expect } from '@jest/globals' -import { ProcessInChunks } from './process-in-chunks' +import { ProcessInChunksService } from './process-in-chunks.service' describe('ProcessInChunks', () => { it('processes data', async () => { @@ -9,14 +9,16 @@ describe('ProcessInChunks', () => { .mockRejectedValueOnce(new Error('Rejected 1')) .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 2')) - const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem) + const processInChunks = new ProcessInChunksService() + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) await processInChunks.process() expect(mockProcessItem).toBeCalledTimes(4) }) it('does not process more data if stopped', async () => { const mockProcessItem = jest.fn(async () => {}) - const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem) + const processInChunks = new ProcessInChunksService() + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) processInChunks.stop() await processInChunks.process() expect(mockProcessItem).not.toBeCalled() From 8b3097b165da30c1aa6e03fff0408b0a83a5e7cd Mon Sep 17 00:00:00 2001 From: Emi Date: Thu, 16 Nov 2023 17:09:20 +0100 Subject: [PATCH 07/14] fix: do not stop dial on first connection --- .../src/nest/connections-manager/connections-manager.service.ts | 2 +- packages/backend/src/nest/libp2p/libp2p.service.ts | 2 +- packages/backend/src/nest/libp2p/libp2p.types.ts | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index bacdbc85b3..608a31a149 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -589,7 +589,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_FETCH_ALL_DIRECT_MESSAGES, payload) }) this.storageService.on(StorageEvents.UPDATE_PEERS_LIST, (payload: StorePeerListPayload) => { - this.libp2pService.emit(Libp2pEvents.UPDATE_KNOWN_PEERS_LIST, payload.peerList) + // this.libp2pService.emit(Libp2pEvents.UPDATE_KNOWN_PEERS_LIST, payload.peerList) this.serverIoProvider.io.emit(SocketActionTypes.PEER_LIST, payload) }) this.storageService.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => { diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 80c7a6eaeb..5a7e72e1e6 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -171,7 +171,7 @@ export class Libp2pService extends EventEmitter { this.logger(`${localPeerId} connected to ${remotePeerId}`) // Stop dialing as soon as we connect to a peer - this.processInChunksService.stop() + // this.processInChunksService.stop() this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf()) this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index 22205386f5..166be7ad94 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -6,7 +6,7 @@ export enum Libp2pEvents { PEER_DISCONNECTED = 'peerDisconnected', NETWORK_STATS = 'networkStats', DIAL_PEERS = 'dialPeers', - UPDATE_KNOWN_PEERS_LIST = 'updateKnownPeersList', + // UPDATE_KNOWN_PEERS_LIST = 'updateKnownPeersList', } export interface Libp2pNodeParams { From 9940bfd850fa2704f0d9c4c51826525dd1623cc1 Mon Sep 17 00:00:00 2001 From: Emi Date: Fri, 17 Nov 2023 14:51:07 +0100 Subject: [PATCH 08/14] test: fix --- .../connections-manager.service.spec.ts | 49 ++++----------- .../nest/libp2p/process-in-chunks.service.ts | 7 ++- .../src/nest/libp2p/process-in-chunks.spec.ts | 59 +++++++++++++++++-- .../nest/local-db/local-db.service.spec.ts | 36 ++++++----- .../src/nest/local-db/local-db.service.ts | 1 + packages/common/src/sortPeers.ts | 2 +- packages/desktop/package.json | 2 +- 7 files changed, 92 insertions(+), 64 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts index 4952b669a6..0736326698 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts @@ -18,6 +18,7 @@ import { RegistrationService } from '../registration/registration.service' import { SocketModule } from '../socket/socket.module' import { ConnectionsManagerModule } from './connections-manager.module' import { ConnectionsManagerService } from './connections-manager.service' +import { createLibp2pAddress } from '@quiet/common' describe('ConnectionsManagerService', () => { let module: TestingModule @@ -101,7 +102,12 @@ describe('ConnectionsManagerService', () => { key: userIdentity.userCsr?.userKey, CA: [communityRootCa], }, - peers: community.peerList, + peers: [ + createLibp2pAddress( + 'y7yczmugl2tekami7sbdz5pfaemvx7bahwthrdvcbzw5vex2crsr26qd', + 'QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE' + ), + ], } await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload) @@ -113,41 +119,6 @@ describe('ConnectionsManagerService', () => { expect(launchCommunitySpy).toHaveBeenCalledWith(launchCommunityPayload) }) - it('launches community on init if their data exists in local db', async () => { - const launchCommunityPayload: InitCommunityPayload = { - id: community.id, - peerId: userIdentity.peerId, - hiddenService: userIdentity.hiddenService, - certs: { - // @ts-expect-error - certificate: userIdentity.userCertificate, - // @ts-expect-error - key: userIdentity.userCsr?.userKey, - CA: [communityRootCa], - }, - peers: community.peerList, - } - - await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload) - - const peerAddress = '/dns4/test.onion/tcp/80/ws/p2p/peerid' - await localDbService.put(LocalDBKeys.PEERS, { - [peerAddress]: { - peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix', - connectionTime: 50, - lastSeen: 1000, - }, - }) - - await connectionsManagerService.closeAllServices() - - const launchCommunitySpy = jest.spyOn(connectionsManagerService, 'launchCommunity').mockResolvedValue() - - await connectionsManagerService.init() - - expect(launchCommunitySpy).toHaveBeenCalledWith(Object.assign(launchCommunityPayload, { peers: [peerAddress] })) - }) - it('does not launch community on init if its data does not exist in local db', async () => { await connectionsManagerService.closeAllServices() await connectionsManagerService.init() @@ -200,10 +171,10 @@ describe('ConnectionsManagerService', () => { // await connectionsManager.init() await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload) - const peerAddress = '/dns4/test.onion/tcp/80/ws/p2p/peerid' + const peerid = 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix' await localDbService.put(LocalDBKeys.PEERS, { - [peerAddress]: { - peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix', + [peerid]: { + peerId: peerid, connectionTime: 50, lastSeen: 1000, }, diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index 714ffa7c0c..1b2f424efc 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -17,6 +17,7 @@ export class ProcessInChunksService extends EventEmitter { this.data = data this.processItem = processItem this.chunkSize = chunkSize + // this.isActive = true } updateData(items: T[]) { @@ -25,7 +26,7 @@ export class ProcessInChunksService extends EventEmitter { } public async processOneItem() { - if (!this.isActive) return + // if (!this.isActive) return const toProcess = this.data.shift() if (toProcess) { try { @@ -41,8 +42,8 @@ export class ProcessInChunksService extends EventEmitter { } public async process() { - this.isActive = true - this.logger(`Processing ${Math.min(this.chunkSize, this.data.length)} items`) + // this.isActive = true + this.logger(`Processing ${this.data.length} items`) for (let i = 0; i < this.chunkSize; i++) { // Do not wait for this promise as items should be processed simultineously void this.processOneItem() diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index 343b278d8f..38979c1cf9 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -1,21 +1,70 @@ import { jest, describe, it, expect } from '@jest/globals' import { ProcessInChunksService } from './process-in-chunks.service' - +import waitForExpect from 'wait-for-expect' +import { TestModule } from '../common/test.module' +import { Test, TestingModule } from '@nestjs/testing' describe('ProcessInChunks', () => { + let module: TestingModule + let processInChunks: ProcessInChunksService + + beforeEach(async () => { + module = await Test.createTestingModule({ + imports: [TestModule, ProcessInChunksService], + }).compile() + + processInChunks = await module.resolve(ProcessInChunksService) + }) + it('processes data', async () => { const mockProcessItem = jest - .fn(async () => {}) + .fn(async a => { + console.log('processing', a) + }) .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 1')) .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 2')) - const processInChunks = new ProcessInChunksService() processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) await processInChunks.process() - expect(mockProcessItem).toBeCalledTimes(4) + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) + }) + + it('processes new data', async () => { + const mockProcessItem = jest + .fn(async a => { + console.log('processing', a) + }) + .mockResolvedValueOnce() + .mockRejectedValueOnce(new Error('Rejected 1')) + processInChunks.init(['a', 'b'], mockProcessItem) + await processInChunks.process() + processInChunks.updateData(['e', 'f']) + await processInChunks.process() + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) + }) + + it('processes data in chunks', async () => { + const mockProcessItem = jest + .fn(async a => { + console.log('processing', a) + }) + .mockResolvedValueOnce() + .mockRejectedValueOnce(new Error('Rejected 1')) + .mockResolvedValueOnce() + .mockRejectedValueOnce(new Error('Rejected 2')) + const chunkSize = 2 + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize) + await processInChunks.process() + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) }) - it('does not process more data if stopped', async () => { + it.skip('does not process more data if stopped', async () => { const mockProcessItem = jest.fn(async () => {}) const processInChunks = new ProcessInChunksService() processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) diff --git a/packages/backend/src/nest/local-db/local-db.service.spec.ts b/packages/backend/src/nest/local-db/local-db.service.spec.ts index 0cf4af79cf..6d394c9382 100644 --- a/packages/backend/src/nest/local-db/local-db.service.spec.ts +++ b/packages/backend/src/nest/local-db/local-db.service.spec.ts @@ -4,14 +4,12 @@ import { TestModule } from '../common/test.module' import { LocalDbModule } from './local-db.module' import { LocalDbService } from './local-db.service' import { LocalDBKeys } from './local-db.types' +import { createLibp2pAddress } from '@quiet/common' describe('LocalDbService', () => { let module: TestingModule let localDbService: LocalDbService - - let peer1Address: string let peer1Stats: Record = {} - let peer2Address: string let peer2Stats: Record = {} beforeAll(async () => { @@ -21,19 +19,15 @@ describe('LocalDbService', () => { localDbService = await module.resolve(LocalDbService) - peer1Address = - '/dns4/mxtsfs4kzxzuisrw4tumdmycbyerqwakx37kj6om6azcjdaasifxmoqd.onion/tcp/443/wss/p2p/QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix' peer1Stats = { - [peer1Address]: { + ['QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix']: { peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix', connectionTime: 50, lastSeen: 1000, }, } - peer2Address = - '/dns4/hxr74a76b4lerhov75a6ha6yprruvow3wfu4qmmeoc6ajs7m7323lyid.onion/tcp/443/wss/p2p/QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ' peer2Stats = { - [peer2Address]: { + ['QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ']: { peerId: 'QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ', connectionTime: 500, lastSeen: 500, @@ -71,16 +65,28 @@ describe('LocalDbService', () => { expect(localDbService.getStatus()).toEqual('closed') }) + it('get sorted peers returns peers list if no stats in db', async () => { + const peers = [ + createLibp2pAddress( + 'zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion', + 'QmPGdGDUV1PXaJky4V53KSvFszdqEcM7KCoDpF2uFPf5w6' + ), + ] + const sortedPeers = await localDbService.getSortedPeers(peers) + expect(sortedPeers).toEqual(peers) + }) + it('get sorted peers', async () => { - const extraPeers = [ - '/dns4/zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion/tcp/443/ws/p2p/QmPGdGDUV1PXaJky4V53KSvFszdqEcM7KCoDpF2uFPf5w6', + const peers = [ + createLibp2pAddress('nqnw4kc4c77fb47lk52m5l57h4tcxceo7ymxekfn7yh5m66t4jv2olad.onion', Object.keys(peer2Stats)[0]), + createLibp2pAddress('zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion', Object.keys(peer1Stats)[0]), ] await localDbService.put(LocalDBKeys.PEERS, { ...peer1Stats, ...peer2Stats, }) - const sortedPeers = await localDbService.getSortedPeers(extraPeers) - expect(sortedPeers).toEqual([peer1Address, peer2Address, extraPeers[0]]) + const sortedPeers = await localDbService.getSortedPeers(peers.reverse()) + expect(sortedPeers).toEqual(peers) }) it('updates nested object', async () => { @@ -100,13 +106,13 @@ describe('LocalDbService', () => { } await localDbService.update(LocalDBKeys.PEERS, { - [peer2Address]: peer2StatsUpdated, + [peer2StatsUpdated.peerId]: peer2StatsUpdated, }) const updatedPeersDBdata = await localDbService.get(LocalDBKeys.PEERS) expect(updatedPeersDBdata).toEqual({ ...peer1Stats, - [peer2Address]: peer2StatsUpdated, + [peer2StatsUpdated.peerId]: peer2StatsUpdated, }) }) }) diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 35ac08fcfb..3c309ca20f 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -72,6 +72,7 @@ export class LocalDbService { } public async getSortedPeers(peers: string[] = []): Promise { + console.log('getSortedPeers peers got', peers) const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} const stats: NetworkStats[] = Object.values(peersStats) return sortPeers(peers, stats) diff --git a/packages/common/src/sortPeers.ts b/packages/common/src/sortPeers.ts index df687d0a40..0ac9efcf6a 100644 --- a/packages/common/src/sortPeers.ts +++ b/packages/common/src/sortPeers.ts @@ -10,7 +10,7 @@ This is the very simple algorithm for evaluating the most wanted peers. */ export const sortPeers = (peersAddresses: string[], stats: NetworkStats[]): string[] => { peersAddresses = peersAddresses.filter(add => - add.match(/^\/dns4\/[a-z0-9]{56}.onion\/tcp\/80\/ws\/p2p\/[a-zA-Z0-9]{46}$/g) + add.match(/^\/dns4\/[a-z0-9]{56}.onion\/tcp\/(443|80)\/ws\/p2p\/[a-zA-Z0-9]{46}$/g) ) const lastSeenSorted = [...stats].sort((a, b) => { return b.lastSeen - a.lastSeen diff --git a/packages/desktop/package.json b/packages/desktop/package.json index 42dd73f5ff..404fdabd9d 100644 --- a/packages/desktop/package.json +++ b/packages/desktop/package.json @@ -111,7 +111,7 @@ "build:renderer:prod": "webpack --config webpack/webpack.config.renderer.prod.js", "postBuild": "node scripts/postBuild.js", "prestart": "npm run build:main", - "start": "cross-env DEBUG='backend*,quiet*,state-manager*,desktop*,utils*,libp2p:websockets:listener:backend' npm run start:renderer", + "start": "cross-env DEBUG='backend*,quiet*,state-manager*,desktop*,utils*,libp2p:websockets:listener:backend,libp2p:connection-manager:auto-dialler' npm run start:renderer", "start:main": "cross-env NODE_ENV=development electron .", "start:renderer": "cross-env NODE_ENV=development webpack-dev-server --config webpack/webpack.config.renderer.dev.js", "storybook": "export NODE_OPTIONS=--openssl-legacy-provider && start-storybook -p 6006", From 108bf302af7f070dbc9cf252d5562828e689d111 Mon Sep 17 00:00:00 2001 From: Emi Date: Mon, 20 Nov 2023 11:49:45 +0100 Subject: [PATCH 09/14] chore: cleanup --- .../backend/src/nest/libp2p/libp2p.service.ts | 50 +++++++------------ .../nest/libp2p/process-in-chunks.service.ts | 2 +- 2 files changed, 18 insertions(+), 34 deletions(-) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 5a7e72e1e6..588f72a632 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -1,29 +1,27 @@ -import { Inject, Injectable } from '@nestjs/common' -import { Agent } from 'https' -import { createLibp2p, Libp2p } from 'libp2p' -import { noise } from '@chainsafe/libp2p-noise' import { gossipsub } from '@chainsafe/libp2p-gossipsub' -import { mplex } from '@libp2p/mplex' +import { noise } from '@chainsafe/libp2p-noise' import { kadDHT } from '@libp2p/kad-dht' -import { createServer } from 'it-ws' -import { DateTime } from 'luxon' -import { EventEmitter } from 'events' -import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' -import { ProcessInChunksService } from './process-in-chunks.service' +import { mplex } from '@libp2p/mplex' import { multiaddr } from '@multiformats/multiaddr' +import { Inject, Injectable } from '@nestjs/common' +import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common' import { ConnectionProcessInfo, PeerId, SocketActionTypes } from '@quiet/types' +import crypto from 'crypto' +import { EventEmitter } from 'events' +import { Agent } from 'https' +import { createServer } from 'it-ws' +import { Libp2p, createLibp2p } from 'libp2p' +import { preSharedKey } from 'libp2p/pnet' +import { DateTime } from 'luxon' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import Logger from '../common/logger' import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const' import { ServerIoProviderTypes } from '../types' -import Logger from '../common/logger' import { webSockets } from '../websocketOverTor' import { all } from '../websocketOverTor/filters' -import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common' -import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity' -import { preSharedKey } from 'libp2p/pnet' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import crypto from 'crypto' -import { peerIdFromString } from '@libp2p/peer-id' +import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' +import { ProcessInChunksService } from './process-in-chunks.service' const KEY_LENGTH = 32 export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' @@ -33,7 +31,6 @@ export class Libp2pService extends EventEmitter { public libp2pInstance: Libp2p | null public connectedPeers: Map = new Map() public dialedPeers: Set = new Set() - // public processInChunksService: ProcessInChunks private readonly logger = Logger(Libp2pService.name) constructor( @Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes, @@ -45,7 +42,6 @@ export class Libp2pService extends EventEmitter { private dialPeer = async (peerAddress: string) => { if (this.dialedPeers.has(peerAddress)) { - console.log(`Peer ${peerAddress} already dialed, not dialing`) // TODO: remove log return } this.dialedPeers.add(peerAddress) @@ -134,15 +130,6 @@ export class Libp2pService extends EventEmitter { return libp2p } - // private async addPeersToPeerBook(addresses: string[]) { - // for (const address of addresses) { - // const peerId = multiaddr(address).getPeerId() - // if (!peerId) return - // // @ts-expect-error - // await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)]) - // } - // } - private async afterCreation(peers: string[], peerId: PeerId) { if (!this.libp2pInstance) { this.logger.error('libp2pInstance was not created') @@ -152,7 +139,7 @@ export class Libp2pService extends EventEmitter { this.logger(`Local peerId: ${peerId.toString()}`) this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => { const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress)) - console.log('DIALING PEERS', nonDialedAddresses.length, 'addresses') + this.logger('Dialing', nonDialedAddresses.length, 'addresses') this.processInChunksService.updateData(nonDialedAddresses) await this.processInChunksService.process() }) @@ -170,9 +157,6 @@ export class Libp2pService extends EventEmitter { const localPeerId = peerId.toString() this.logger(`${localPeerId} connected to ${remotePeerId}`) - // Stop dialing as soon as we connect to a peer - // this.processInChunksService.stop() - this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf()) this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) this.logger(`${localPeerId} has ${this.libp2pInstance?.getConnections().length} open connections`) diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index 1b2f424efc..bf05640c90 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -21,7 +21,7 @@ export class ProcessInChunksService extends EventEmitter { } updateData(items: T[]) { - console.log('Updating data, previous', this.data, 'adding:', items) + this.logger(`Updating data with ${items.length} items`) this.data = [...new Set(this.data.concat(items))] } From 2a5f8017a405465551469b2242c6d36bf4e22652 Mon Sep 17 00:00:00 2001 From: Emi Date: Mon, 20 Nov 2023 12:38:26 +0100 Subject: [PATCH 10/14] test: fix --- .../nest/local-db/local-db.service.spec.ts | 19 +++++++++++-------- 1 file changed, 11 insertions(+), 8 deletions(-) diff --git a/packages/backend/src/nest/local-db/local-db.service.spec.ts b/packages/backend/src/nest/local-db/local-db.service.spec.ts index 6d394c9382..ee9a42b8c1 100644 --- a/packages/backend/src/nest/local-db/local-db.service.spec.ts +++ b/packages/backend/src/nest/local-db/local-db.service.spec.ts @@ -10,7 +10,9 @@ describe('LocalDbService', () => { let module: TestingModule let localDbService: LocalDbService let peer1Stats: Record = {} + let peer1ID: string let peer2Stats: Record = {} + let peer2ID: string beforeAll(async () => { module = await Test.createTestingModule({ @@ -18,17 +20,18 @@ describe('LocalDbService', () => { }).compile() localDbService = await module.resolve(LocalDbService) - + peer1ID = 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix' peer1Stats = { - ['QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix']: { - peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix', + [peer1ID]: { + peerId: peer1ID, connectionTime: 50, lastSeen: 1000, }, } + peer2ID = 'QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ' peer2Stats = { - ['QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ']: { - peerId: 'QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ', + [peer2ID]: { + peerId: peer2ID, connectionTime: 500, lastSeen: 500, }, @@ -78,8 +81,8 @@ describe('LocalDbService', () => { it('get sorted peers', async () => { const peers = [ - createLibp2pAddress('nqnw4kc4c77fb47lk52m5l57h4tcxceo7ymxekfn7yh5m66t4jv2olad.onion', Object.keys(peer2Stats)[0]), - createLibp2pAddress('zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion', Object.keys(peer1Stats)[0]), + createLibp2pAddress('nqnw4kc4c77fb47lk52m5l57h4tcxceo7ymxekfn7yh5m66t4jv2olad.onion', peer2ID), + createLibp2pAddress('zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion', peer1ID), ] await localDbService.put(LocalDBKeys.PEERS, { ...peer1Stats, @@ -100,7 +103,7 @@ describe('LocalDbService', () => { }) const peer2StatsUpdated: NetworkStats = { - peerId: 'QmR7Qgd4tg2XrGD3kW647ZnYyazTwHQF3cqRBmSduhhusA', + peerId: peer2ID, connectionTime: 777, lastSeen: 678, } From 97a5652d971145c546e041b7019b782db875365a Mon Sep 17 00:00:00 2001 From: Emi Date: Mon, 20 Nov 2023 13:06:50 +0100 Subject: [PATCH 11/14] chore: rename sortPeers --- .../src/nest/local-db/local-db.service.ts | 4 ++-- packages/common/src/libp2p.test.ts | 20 +++++++++++++++++++ packages/common/src/libp2p.ts | 4 ++++ packages/common/src/sortPeers.ts | 8 ++++---- .../appConnection/connection.selectors.ts | 4 ++-- 5 files changed, 32 insertions(+), 8 deletions(-) create mode 100644 packages/common/src/libp2p.test.ts diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 3c309ca20f..fcbecc41c3 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable } from '@nestjs/common' import { Level } from 'level' import { NetworkStats } from '@quiet/types' -import { sortPeers } from '@quiet/common' +import { filterAndSortPeers } from '@quiet/common' import { LEVEL_DB } from '../const' import { LocalDBKeys, LocalDbStatus } from './local-db.types' import Logger from '../common/logger' @@ -75,6 +75,6 @@ export class LocalDbService { console.log('getSortedPeers peers got', peers) const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} const stats: NetworkStats[] = Object.values(peersStats) - return sortPeers(peers, stats) + return filterAndSortPeers(peers, stats) } } diff --git a/packages/common/src/libp2p.test.ts b/packages/common/src/libp2p.test.ts new file mode 100644 index 0000000000..5a623aadb7 --- /dev/null +++ b/packages/common/src/libp2p.test.ts @@ -0,0 +1,20 @@ +import { filterAndSortPeers } from './sortPeers' + +describe('filterValidAddresses', () => { + it('filters out invalid addresses', () => { + const valid = [ + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/443/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/80/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + ] + const addresses = [ + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/443/wss/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + ...valid, + 'invalidAddress', + '/dns4/somethingElse.onion/tcp/443/wss/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSA', + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/443/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbK', + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrj.onion/tcp/443/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + 'QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbK', + ] + expect(filterAndSortPeers(addresses, [])).toEqual(valid) + }) +}) diff --git a/packages/common/src/libp2p.ts b/packages/common/src/libp2p.ts index b2fb70e83f..4ada2cb6b9 100644 --- a/packages/common/src/libp2p.ts +++ b/packages/common/src/libp2p.ts @@ -17,3 +17,7 @@ export const isPSKcodeValid = (psk: string): boolean => { const _psk = psk.trim() return validator.isBase64(_psk) && _psk.length === PSK_LENGTH } + +export const filterValidAddresses = (addresses: string[]) => { + return addresses.filter(add => add.match(/^\/dns4\/[a-z0-9]{56}.onion\/tcp\/(443|80)\/ws\/p2p\/[a-zA-Z0-9]{46}$/g)) +} diff --git a/packages/common/src/sortPeers.ts b/packages/common/src/sortPeers.ts index 0ac9efcf6a..2b92cd1ea8 100644 --- a/packages/common/src/sortPeers.ts +++ b/packages/common/src/sortPeers.ts @@ -1,17 +1,17 @@ import { type NetworkStats } from '@quiet/types' import { isDefined } from './helpers' +import { filterValidAddresses } from './libp2p' /** This is the very simple algorithm for evaluating the most wanted peers. +0. Filters out invalid peers addresses 1. It takes the peers stats list that contains statistics for every peer our node was ever connected to. 2. Two sorted arrays are created - one sorted by last seen and other by most uptime shared. 3. Arrays are merged taking one element from list one and one element from the second list. Duplicates are ommited 4. We end up with mix of last seen and most uptime descending array of peers, the it is enchanced to libp2p address. */ -export const sortPeers = (peersAddresses: string[], stats: NetworkStats[]): string[] => { - peersAddresses = peersAddresses.filter(add => - add.match(/^\/dns4\/[a-z0-9]{56}.onion\/tcp\/(443|80)\/ws\/p2p\/[a-zA-Z0-9]{46}$/g) - ) +export const filterAndSortPeers = (peersAddresses: string[], stats: NetworkStats[]): string[] => { + peersAddresses = filterValidAddresses(peersAddresses) const lastSeenSorted = [...stats].sort((a, b) => { return b.lastSeen - a.lastSeen }) diff --git a/packages/state-manager/src/sagas/appConnection/connection.selectors.ts b/packages/state-manager/src/sagas/appConnection/connection.selectors.ts index b0f7606fcf..2d2bff0cdb 100644 --- a/packages/state-manager/src/sagas/appConnection/connection.selectors.ts +++ b/packages/state-manager/src/sagas/appConnection/connection.selectors.ts @@ -7,7 +7,7 @@ import { peersStatsAdapter } from './connection.adapter' import { connectedPeers } from '../network/network.selectors' import { type NetworkStats } from './connection.types' import { type User } from '../users/users.types' -import { sortPeers } from '@quiet/common' +import { filterAndSortPeers } from '@quiet/common' const connectionSlice: CreatedSelectors[StoreKeys.Connection] = (state: StoreState) => state[StoreKeys.Connection] @@ -33,7 +33,7 @@ export const peerList = createSelector( stats = peersStatsAdapter.getSelectors().selectAll(reducerState.peersStats) } - return sortPeers(arr, stats) + return filterAndSortPeers(arr, stats) } ) From 09076fbe159031d8f5243aa40c0d0a0d7ce59c93 Mon Sep 17 00:00:00 2001 From: Emi Date: Mon, 20 Nov 2023 13:15:44 +0100 Subject: [PATCH 12/14] chore: cleanup --- .../connections-manager/connections-manager.service.ts | 8 +++----- packages/backend/src/nest/libp2p/libp2p.types.ts | 1 - .../backend/src/nest/libp2p/process-in-chunks.service.ts | 3 --- packages/backend/src/nest/local-db/local-db.service.ts | 1 - packages/backend/src/nest/storage/storage.service.ts | 4 ---- 5 files changed, 3 insertions(+), 14 deletions(-) diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 608a31a149..a5983684e6 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -157,10 +157,10 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.logger('launchCommunityFromStorage') const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY) - console.log('launchCommunityFromStorage - community peers', community?.peers) + this.logger('launchCommunityFromStorage - community peers', community?.peers) if (community) { const sortedPeers = await this.localDbService.getSortedPeers(community.peers) - console.log('launchCommunityFromStorage - sorted peers', sortedPeers) + this.logger('launchCommunityFromStorage - sorted peers', sortedPeers) if (sortedPeers.length > 0) { community.peers = sortedPeers } @@ -589,7 +589,6 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_FETCH_ALL_DIRECT_MESSAGES, payload) }) this.storageService.on(StorageEvents.UPDATE_PEERS_LIST, (payload: StorePeerListPayload) => { - // this.libp2pService.emit(Libp2pEvents.UPDATE_KNOWN_PEERS_LIST, payload.peerList) this.serverIoProvider.io.emit(SocketActionTypes.PEER_LIST, payload) }) this.storageService.on(StorageEvents.SEND_PUSH_NOTIFICATION, (payload: PushNotificationPayload) => { @@ -605,9 +604,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.storageService.on( StorageEvents.REPLICATED_CSR, async (payload: { csrs: string[]; certificates: string[]; id: string }) => { - console.log(`On ${StorageEvents.REPLICATED_CSR}`) + this.logger(`Storage - ${StorageEvents.REPLICATED_CSR}`) this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs)) - console.log(`Storage - ${StorageEvents.REPLICATED_CSR}`) this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs }) this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload) } diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index 166be7ad94..1f9d8c9fb7 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -6,7 +6,6 @@ export enum Libp2pEvents { PEER_DISCONNECTED = 'peerDisconnected', NETWORK_STATS = 'networkStats', DIAL_PEERS = 'dialPeers', - // UPDATE_KNOWN_PEERS_LIST = 'updateKnownPeersList', } export interface Libp2pNodeParams { diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts index bf05640c90..2e7b30588b 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.service.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -17,7 +17,6 @@ export class ProcessInChunksService extends EventEmitter { this.data = data this.processItem = processItem this.chunkSize = chunkSize - // this.isActive = true } updateData(items: T[]) { @@ -26,7 +25,6 @@ export class ProcessInChunksService extends EventEmitter { } public async processOneItem() { - // if (!this.isActive) return const toProcess = this.data.shift() if (toProcess) { try { @@ -42,7 +40,6 @@ export class ProcessInChunksService extends EventEmitter { } public async process() { - // this.isActive = true this.logger(`Processing ${this.data.length} items`) for (let i = 0; i < this.chunkSize; i++) { // Do not wait for this promise as items should be processed simultineously diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index fcbecc41c3..f5275ff77f 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -72,7 +72,6 @@ export class LocalDbService { } public async getSortedPeers(peers: string[] = []): Promise { - console.log('getSortedPeers peers got', peers) const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} const stats: NetworkStats[] = Object.values(peersStats) return filterAndSortPeers(peers, stats) diff --git a/packages/backend/src/nest/storage/storage.service.ts b/packages/backend/src/nest/storage/storage.service.ts index 8e90309128..ae3e39acb0 100644 --- a/packages/backend/src/nest/storage/storage.service.ts +++ b/packages/backend/src/nest/storage/storage.service.ts @@ -337,12 +337,8 @@ export class StorageService extends EventEmitter { const allUsers = this.getAllUsers() const registeredUsers = this.getAllRegisteredUsers() const peers = [...new Set(await getUsersAddresses(allUsers.concat(registeredUsers)))] - console.log('updatePeersList, peers count:', peers.length) - console.log('STORATE peers', peers) - const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY) const sortedPeers = await this.localDbService.getSortedPeers(peers) - console.log('STORAGE sortedPeers', sortedPeers) if (sortedPeers.length > 0) { community.peers = sortedPeers await this.localDbService.put(LocalDBKeys.COMMUNITY, community) From c64c05bc9cfcf73b749cb8c98ca2ae54bb57323d Mon Sep 17 00:00:00 2001 From: Emi Date: Mon, 20 Nov 2023 13:29:39 +0100 Subject: [PATCH 13/14] fix: test --- .../connection.selectors.test.ts | 20 +++++++++---------- .../sagas/appConnection/connection.slice.ts | 4 ---- 2 files changed, 10 insertions(+), 14 deletions(-) diff --git a/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts b/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts index 4ab007f989..f91f894082 100644 --- a/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts +++ b/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts @@ -23,11 +23,11 @@ describe('communitiesSelectors', () => { it('select peers sorted by quality', async () => { community = await factory.create['payload']>('Community', { peerList: [ - '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/wss/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', - '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/wss/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', - '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/wss/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', - '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/wss/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', - '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/wss/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', + '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/ws/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', + '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/ws/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', + '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/ws/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', + '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/ws/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', + '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/ws/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', ], }) @@ -68,11 +68,11 @@ describe('communitiesSelectors', () => { ) const expectedArray = [ - '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/wss/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', - '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/wss/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', - '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/wss/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', - '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/wss/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', - '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/wss/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', + '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/ws/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', + '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/ws/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', + '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/ws/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', + '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/ws/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', + '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/ws/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', ] const peersList = connectionSelectors.peerList(store.getState()) diff --git a/packages/state-manager/src/sagas/appConnection/connection.slice.ts b/packages/state-manager/src/sagas/appConnection/connection.slice.ts index ea82144cb7..b72ec8b0a7 100644 --- a/packages/state-manager/src/sagas/appConnection/connection.slice.ts +++ b/packages/state-manager/src/sagas/appConnection/connection.slice.ts @@ -24,11 +24,7 @@ export const connectionSlice = createSlice({ }, updateNetworkData: (state, action: PayloadAction) => { const prev = state.peersStats?.entities[action.payload.peer]?.connectionTime || 0 - - console.log('prev peerStats', state.peersStats) const _peerStats = state.peersStats || peersStatsAdapter.getInitialState() - console.log('next peerStats', _peerStats) - peersStatsAdapter.upsertOne(_peerStats, { peerId: action.payload.peer, lastSeen: action.payload.lastSeen, From 1e459c60aaf59d25474080b49b09c7a1e1073b3c Mon Sep 17 00:00:00 2001 From: Emi Date: Mon, 20 Nov 2023 13:40:01 +0100 Subject: [PATCH 14/14] chore: update changelog --- CHANGELOG.md | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fab8d401e..f7e6a30ba1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +[unreleased] + +* Fix: filter out invalid peer addresses in peer list. Update peer list in localdb. + +* Fix: dial new peers on CSRs replication + [2.0.3-alpha.5] * Fix network data proceeding when using custom protocol multiple times #1847