diff --git a/CHANGELOG.md b/CHANGELOG.md index 9fab8d401e..f7e6a30ba1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +[unreleased] + +* Fix: filter out invalid peer addresses in peer list. Update peer list in localdb. + +* Fix: dial new peers on CSRs replication + [2.0.3-alpha.5] * Fix network data proceeding when using custom protocol multiple times #1847 diff --git a/packages/backend/src/nest/common/utils.ts b/packages/backend/src/nest/common/utils.ts index eeff2ad29c..6b1b445486 100644 --- a/packages/backend/src/nest/common/utils.ts +++ b/packages/backend/src/nest/common/utils.ts @@ -11,8 +11,9 @@ import { type PermsData } from '@quiet/types' import { TestConfig } from '../const' import logger from './logger' import { Libp2pNodeParams } from '../libp2p/libp2p.types' -import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common' +import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common' import { Libp2pService } from '../libp2p/libp2p.service' +import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity' const log = logger('test') @@ -153,6 +154,20 @@ export const getUsersAddresses = async (users: UserData[]): Promise => return await Promise.all(peers) } +export const getLibp2pAddressesFromCsrs = async (csrs: string[]): Promise => { + const addresses = await Promise.all( + csrs.map(async csr => { + const parsedCsr = await loadCSR(csr) + const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId) + const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName) + if (!peerId || !onionAddress) return + + return createLibp2pAddress(onionAddress, peerId) + }) + ) + return addresses.filter(isDefined) +} + /** * Compares given numbers * diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts index 4952b669a6..0736326698 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.spec.ts @@ -18,6 +18,7 @@ import { RegistrationService } from '../registration/registration.service' import { SocketModule } from '../socket/socket.module' import { ConnectionsManagerModule } from './connections-manager.module' import { ConnectionsManagerService } from './connections-manager.service' +import { createLibp2pAddress } from '@quiet/common' describe('ConnectionsManagerService', () => { let module: TestingModule @@ -101,7 +102,12 @@ describe('ConnectionsManagerService', () => { key: userIdentity.userCsr?.userKey, CA: [communityRootCa], }, - peers: community.peerList, + peers: [ + createLibp2pAddress( + 'y7yczmugl2tekami7sbdz5pfaemvx7bahwthrdvcbzw5vex2crsr26qd', + 'QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE' + ), + ], } await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload) @@ -113,41 +119,6 @@ describe('ConnectionsManagerService', () => { expect(launchCommunitySpy).toHaveBeenCalledWith(launchCommunityPayload) }) - it('launches community on init if their data exists in local db', async () => { - const launchCommunityPayload: InitCommunityPayload = { - id: community.id, - peerId: userIdentity.peerId, - hiddenService: userIdentity.hiddenService, - certs: { - // @ts-expect-error - certificate: userIdentity.userCertificate, - // @ts-expect-error - key: userIdentity.userCsr?.userKey, - CA: [communityRootCa], - }, - peers: community.peerList, - } - - await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload) - - const peerAddress = '/dns4/test.onion/tcp/80/ws/p2p/peerid' - await localDbService.put(LocalDBKeys.PEERS, { - [peerAddress]: { - peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix', - connectionTime: 50, - lastSeen: 1000, - }, - }) - - await connectionsManagerService.closeAllServices() - - const launchCommunitySpy = jest.spyOn(connectionsManagerService, 'launchCommunity').mockResolvedValue() - - await connectionsManagerService.init() - - expect(launchCommunitySpy).toHaveBeenCalledWith(Object.assign(launchCommunityPayload, { peers: [peerAddress] })) - }) - it('does not launch community on init if its data does not exist in local db', async () => { await connectionsManagerService.closeAllServices() await connectionsManagerService.init() @@ -200,10 +171,10 @@ describe('ConnectionsManagerService', () => { // await connectionsManager.init() await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload) - const peerAddress = '/dns4/test.onion/tcp/80/ws/p2p/peerid' + const peerid = 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix' await localDbService.put(LocalDBKeys.PEERS, { - [peerAddress]: { - peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix', + [peerid]: { + peerId: peerid, connectionTime: 50, lastSeen: 1000, }, diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 41f61c694d..a5983684e6 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -8,7 +8,7 @@ import { setEngine, CryptoEngine } from 'pkijs' import { EventEmitter } from 'events' import getPort from 'get-port' import PeerId from 'peer-id' -import { removeFilesFromDir } from '../common/utils' +import { getLibp2pAddressesFromCsrs, removeFilesFromDir } from '../common/utils' import { AskForMessagesPayload, @@ -157,9 +157,10 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.logger('launchCommunityFromStorage') const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY) - this.logger('launchCommunityFromStorage - community:', community?.id) + this.logger('launchCommunityFromStorage - community peers', community?.peers) if (community) { const sortedPeers = await this.localDbService.getSortedPeers(community.peers) + this.logger('launchCommunityFromStorage - sorted peers', sortedPeers) if (sortedPeers.length > 0) { community.peers = sortedPeers } @@ -191,9 +192,9 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.logger('Closing local storage') await this.localDbService.close() } - if (this.libp2pService?.libp2pInstance) { + if (this.libp2pService) { this.logger('Stopping libp2p') - await this.libp2pService.libp2pInstance.stop() + await this.libp2pService.close() } } @@ -208,22 +209,24 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI public async leaveCommunity() { this.tor.resetHiddenServices() - this.serverIoProvider.io.close() + this.closeSocket() await this.localDbService.purge() await this.closeAllServices({ saveTor: true }) await this.purgeData() + await this.resetState() + await this.localDbService.open() + await this.socketService.init() + } + + async resetState() { this.communityId = '' this.ports = { ...this.ports, libp2pHiddenService: await getPort() } - this.libp2pService.libp2pInstance = null - this.libp2pService.connectedPeers = new Map() this.communityState = ServiceState.DEFAULT this.registrarState = ServiceState.DEFAULT - await this.localDbService.open() - await this.socketService.init() } public async purgeData() { - console.log('removing data') + this.logger('Purging community data') const dirsToRemove = fs .readdirSync(this.quietDir) .filter( @@ -601,7 +604,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI this.storageService.on( StorageEvents.REPLICATED_CSR, async (payload: { csrs: string[]; certificates: string[]; id: string }) => { - console.log(`Storage - ${StorageEvents.REPLICATED_CSR}`) + this.logger(`Storage - ${StorageEvents.REPLICATED_CSR}`) + this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs)) this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs }) this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload) } diff --git a/packages/backend/src/nest/const.ts b/packages/backend/src/nest/const.ts index f4c86c3a35..80cd62656c 100644 --- a/packages/backend/src/nest/const.ts +++ b/packages/backend/src/nest/const.ts @@ -28,6 +28,8 @@ export const IPFS_REPO_PATCH = 'ipfsRepoPath' export const CONFIG_OPTIONS = 'configOptions' export const SERVER_IO_PROVIDER = 'serverIoProvider' +export const PROCESS_IN_CHUNKS_PROVIDER = 'processInChunksProvider' + export const EXPRESS_PROVIDER = 'expressProvider' export const LEVEL_DB = 'levelDb' diff --git a/packages/backend/src/nest/libp2p/libp2p.module.ts b/packages/backend/src/nest/libp2p/libp2p.module.ts index bdc20e965c..d75edc799e 100644 --- a/packages/backend/src/nest/libp2p/libp2p.module.ts +++ b/packages/backend/src/nest/libp2p/libp2p.module.ts @@ -1,10 +1,11 @@ import { Module } from '@nestjs/common' import { SocketModule } from '../socket/socket.module' import { Libp2pService } from './libp2p.service' +import { ProcessInChunksService } from './process-in-chunks.service' @Module({ imports: [SocketModule], - providers: [Libp2pService], + providers: [Libp2pService, ProcessInChunksService], exports: [Libp2pService], }) export class Libp2pModule {} diff --git a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts index 5fc4e370aa..13ebb5f1ed 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts @@ -1,16 +1,20 @@ +import { jest } from '@jest/globals' import { Test, TestingModule } from '@nestjs/testing' import { TestModule } from '../common/test.module' -import { libp2pInstanceParams } from '../common/utils' +import { createPeerId, libp2pInstanceParams } from '../common/utils' import { Libp2pModule } from './libp2p.module' import { LIBP2P_PSK_METADATA, Libp2pService } from './libp2p.service' -import { Libp2pNodeParams } from './libp2p.types' +import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import validator from 'validator' +import waitForExpect from 'wait-for-expect' +import { ProcessInChunksService } from './process-in-chunks.service' describe('Libp2pService', () => { let module: TestingModule let libp2pService: Libp2pService let params: Libp2pNodeParams + let processInChunks: ProcessInChunksService beforeAll(async () => { module = await Test.createTestingModule({ @@ -18,6 +22,7 @@ describe('Libp2pService', () => { }).compile() libp2pService = await module.resolve(Libp2pService) + processInChunks = await module.resolve(ProcessInChunksService) params = await libp2pInstanceParams() }) @@ -32,13 +37,13 @@ describe('Libp2pService', () => { expect(libp2pService?.libp2pInstance?.peerId).toBe(params.peerId) }) - it('destory instance libp2p', async () => { + it('close libp2p service', async () => { await libp2pService.createInstance(params) - await libp2pService.destroyInstance() + await libp2pService.close() expect(libp2pService.libp2pInstance).toBeNull() }) - it('creates libp2p address with proper ws type (%s)', async () => { + it('creates libp2p address', async () => { const libp2pAddress = libp2pService.createLibp2pAddress(params.localAddress, params.peerId.toString()) expect(libp2pAddress).toStrictEqual(`/dns4/${params.localAddress}.onion/tcp/80/ws/p2p/${params.peerId.toString()}`) }) @@ -58,4 +63,40 @@ describe('Libp2pService', () => { const expectedFullKeyString = LIBP2P_PSK_METADATA + uint8ArrayToString(generatedPskBuffer, 'base16') expect(uint8ArrayToString(generatedKey.fullKey)).toEqual(expectedFullKeyString) }) + + it(`Starts dialing peers on '${Libp2pEvents.DIAL_PEERS}' event`, async () => { + const peerId1 = await createPeerId() + const peerId2 = await createPeerId() + const addresses = [ + libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()), + libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), + ] + await libp2pService.createInstance(params) + // @ts-expect-error processItem is private + const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem') + expect(libp2pService.libp2pInstance).not.toBeNull() + libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { + expect(spyOnProcessItem).toBeCalledTimes(addresses.length) + }) + }) + + it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => { + const peerId1 = await createPeerId() + const peerId2 = await createPeerId() + const alreadyDialedAddress = libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()) + libp2pService.dialedPeers.add(alreadyDialedAddress) + const addresses = [ + alreadyDialedAddress, + libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), + ] + await libp2pService.createInstance(params) + expect(libp2pService.libp2pInstance).not.toBeNull() + // @ts-expect-error processItem is private + const dialPeerSpy = jest.spyOn(processInChunks, 'processItem') + libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { + expect(dialPeerSpy).toBeCalledTimes(1) + }) + }) }) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index 68a4fd6c7c..588f72a632 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -1,27 +1,27 @@ -import { Inject, Injectable } from '@nestjs/common' -import { Agent } from 'https' -import { createLibp2p, Libp2p } from 'libp2p' -import { noise } from '@chainsafe/libp2p-noise' import { gossipsub } from '@chainsafe/libp2p-gossipsub' -import { mplex } from '@libp2p/mplex' +import { noise } from '@chainsafe/libp2p-noise' import { kadDHT } from '@libp2p/kad-dht' -import { createServer } from 'it-ws' -import { DateTime } from 'luxon' -import { EventEmitter } from 'events' -import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' -import { ProcessInChunks } from './process-in-chunks' +import { mplex } from '@libp2p/mplex' import { multiaddr } from '@multiformats/multiaddr' +import { Inject, Injectable } from '@nestjs/common' +import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common' import { ConnectionProcessInfo, PeerId, SocketActionTypes } from '@quiet/types' +import crypto from 'crypto' +import { EventEmitter } from 'events' +import { Agent } from 'https' +import { createServer } from 'it-ws' +import { Libp2p, createLibp2p } from 'libp2p' +import { preSharedKey } from 'libp2p/pnet' +import { DateTime } from 'luxon' +import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' +import { toString as uint8ArrayToString } from 'uint8arrays/to-string' +import Logger from '../common/logger' import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const' import { ServerIoProviderTypes } from '../types' -import Logger from '../common/logger' import { webSockets } from '../websocketOverTor' import { all } from '../websocketOverTor/filters' -import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common' -import { preSharedKey } from 'libp2p/pnet' -import { toString as uint8ArrayToString } from 'uint8arrays/to-string' -import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' -import crypto from 'crypto' +import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' +import { ProcessInChunksService } from './process-in-chunks.service' const KEY_LENGTH = 32 export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' @@ -30,15 +30,21 @@ export const LIBP2P_PSK_METADATA = '/key/swarm/psk/1.0.0/\n/base16/\n' export class Libp2pService extends EventEmitter { public libp2pInstance: Libp2p | null public connectedPeers: Map = new Map() + public dialedPeers: Set = new Set() private readonly logger = Logger(Libp2pService.name) constructor( @Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes, - @Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent + @Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent, + private readonly processInChunksService: ProcessInChunksService ) { super() } private dialPeer = async (peerAddress: string) => { + if (this.dialedPeers.has(peerAddress)) { + return + } + this.dialedPeers.add(peerAddress) await this.libp2pInstance?.dial(multiaddr(peerAddress)) } @@ -79,10 +85,11 @@ export class Libp2pService extends EventEmitter { libp2p = await createLibp2p({ start: false, connectionManager: { - minConnections: 3, - maxConnections: 8, + minConnections: 3, // TODO: increase? + maxConnections: 8, // TODO: increase? dialTimeout: 120_000, maxParallelDials: 10, + autoDial: true, // It's a default but let's set it to have explicit information }, peerId: params.peerId, addresses: { @@ -129,9 +136,17 @@ export class Libp2pService extends EventEmitter { throw new Error('libp2pInstance was not created') } + this.logger(`Local peerId: ${peerId.toString()}`) + this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => { + const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress)) + this.logger('Dialing', nonDialedAddresses.length, 'addresses') + this.processInChunksService.updateData(nonDialedAddresses) + await this.processInChunksService.process() + }) + this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`) this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P) - const dialInChunks = new ProcessInChunks(peers, this.dialPeer) + this.processInChunksService.init(peers, this.dialPeer) this.libp2pInstance.addEventListener('peer:discovery', peer => { this.logger(`${peerId.toString()} discovered ${peer.detail.id}`) @@ -139,27 +154,29 @@ export class Libp2pService extends EventEmitter { this.libp2pInstance.addEventListener('peer:connect', async peer => { const remotePeerId = peer.detail.remotePeer.toString() - this.logger(`${peerId.toString()} connected to ${remotePeerId}`) - - // Stop dialing as soon as we connect to a peer - dialInChunks.stop() + const localPeerId = peerId.toString() + this.logger(`${localPeerId} connected to ${remotePeerId}`) this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf()) - this.logger(`${this.connectedPeers.size} connected peers`) + this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) + this.logger(`${localPeerId} has ${this.libp2pInstance?.getConnections().length} open connections`) this.emit(Libp2pEvents.PEER_CONNECTED, { peers: [remotePeerId], }) + const latency = await this.libp2pInstance?.ping(peer.detail.remoteAddr) + this.logger(`${localPeerId} ping to ${remotePeerId}. Latency: ${latency}`) }) this.libp2pInstance.addEventListener('peer:disconnect', async peer => { const remotePeerId = peer.detail.remotePeer.toString() - this.logger(`${peerId.toString()} disconnected from ${remotePeerId}`) + const localPeerId = peerId.toString() + this.logger(`${localPeerId} disconnected from ${remotePeerId}`) if (!this.libp2pInstance) { this.logger.error('libp2pInstance was not created') throw new Error('libp2pInstance was not created') } - this.logger(`${this.libp2pInstance.getConnections().length} open connections`) + this.logger(`${localPeerId} has ${this.libp2pInstance.getConnections().length} open connections`) const connectionStartTime = this.connectedPeers.get(remotePeerId) if (!connectionStartTime) { @@ -172,7 +189,7 @@ export class Libp2pService extends EventEmitter { const connectionDuration: number = connectionEndTime - connectionStartTime this.connectedPeers.delete(remotePeerId) - this.logger(`${this.connectedPeers.size} connected peers`) + this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) this.emit(Libp2pEvents.PEER_DISCONNECTED, { peer: remotePeerId, @@ -181,21 +198,16 @@ export class Libp2pService extends EventEmitter { }) }) - await dialInChunks.process() + await this.processInChunksService.process() this.logger(`Initialized libp2p for peer ${peerId.toString()}`) } - public async destroyInstance(): Promise { - this.libp2pInstance?.removeEventListener('peer:discovery') - this.libp2pInstance?.removeEventListener('peer:connect') - this.libp2pInstance?.removeEventListener('peer:disconnect') - try { - await this.libp2pInstance?.stop() - } catch (error) { - this.logger.error(error) - } - + public async close(): Promise { + this.logger('Closing libp2p service') + await this.libp2pInstance?.stop() this.libp2pInstance = null + this.connectedPeers = new Map() + this.dialedPeers = new Set() } } diff --git a/packages/backend/src/nest/libp2p/libp2p.types.ts b/packages/backend/src/nest/libp2p/libp2p.types.ts index 8b80845033..1f9d8c9fb7 100644 --- a/packages/backend/src/nest/libp2p/libp2p.types.ts +++ b/packages/backend/src/nest/libp2p/libp2p.types.ts @@ -5,6 +5,7 @@ export enum Libp2pEvents { PEER_CONNECTED = 'peerConnected', PEER_DISCONNECTED = 'peerDisconnected', NETWORK_STATS = 'networkStats', + DIAL_PEERS = 'dialPeers', } export interface Libp2pNodeParams { diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts similarity index 68% rename from packages/backend/src/nest/libp2p/process-in-chunks.ts rename to packages/backend/src/nest/libp2p/process-in-chunks.service.ts index b3e11caf61..2e7b30588b 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -1,22 +1,30 @@ +import { EventEmitter } from 'events' import Logger from '../common/logger' const DEFAULT_CHUNK_SIZE = 10 -export class ProcessInChunks { +export class ProcessInChunksService extends EventEmitter { private isActive: boolean private data: T[] private chunkSize: number private processItem: (arg: T) => Promise - private readonly logger = Logger(ProcessInChunks.name) - constructor(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { + private readonly logger = Logger(ProcessInChunksService.name) + constructor() { + super() + } + + public init(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { this.data = data this.processItem = processItem this.chunkSize = chunkSize - this.isActive = true + } + + updateData(items: T[]) { + this.logger(`Updating data with ${items.length} items`) + this.data = [...new Set(this.data.concat(items))] } public async processOneItem() { - if (!this.isActive) return const toProcess = this.data.shift() if (toProcess) { try { @@ -32,7 +40,7 @@ export class ProcessInChunks { } public async process() { - this.logger(`Processing ${Math.min(this.chunkSize, this.data.length)} items`) + this.logger(`Processing ${this.data.length} items`) for (let i = 0; i < this.chunkSize; i++) { // Do not wait for this promise as items should be processed simultineously void this.processOneItem() diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index abd1770f67..38979c1cf9 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -1,22 +1,73 @@ import { jest, describe, it, expect } from '@jest/globals' -import { ProcessInChunks } from './process-in-chunks' - +import { ProcessInChunksService } from './process-in-chunks.service' +import waitForExpect from 'wait-for-expect' +import { TestModule } from '../common/test.module' +import { Test, TestingModule } from '@nestjs/testing' describe('ProcessInChunks', () => { + let module: TestingModule + let processInChunks: ProcessInChunksService + + beforeEach(async () => { + module = await Test.createTestingModule({ + imports: [TestModule, ProcessInChunksService], + }).compile() + + processInChunks = await module.resolve(ProcessInChunksService) + }) + it('processes data', async () => { const mockProcessItem = jest - .fn(async () => {}) + .fn(async a => { + console.log('processing', a) + }) + .mockResolvedValueOnce() + .mockRejectedValueOnce(new Error('Rejected 1')) + .mockResolvedValueOnce() + .mockRejectedValueOnce(new Error('Rejected 2')) + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) + await processInChunks.process() + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) + }) + + it('processes new data', async () => { + const mockProcessItem = jest + .fn(async a => { + console.log('processing', a) + }) + .mockResolvedValueOnce() + .mockRejectedValueOnce(new Error('Rejected 1')) + processInChunks.init(['a', 'b'], mockProcessItem) + await processInChunks.process() + processInChunks.updateData(['e', 'f']) + await processInChunks.process() + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) + }) + + it('processes data in chunks', async () => { + const mockProcessItem = jest + .fn(async a => { + console.log('processing', a) + }) .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 1')) .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 2')) - const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem) + const chunkSize = 2 + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize) await processInChunks.process() - expect(mockProcessItem).toBeCalledTimes(4) + await waitForExpect(() => { + expect(mockProcessItem).toBeCalledTimes(4) + }) }) - it('does not process more data if stopped', async () => { + it.skip('does not process more data if stopped', async () => { const mockProcessItem = jest.fn(async () => {}) - const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem) + const processInChunks = new ProcessInChunksService() + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) processInChunks.stop() await processInChunks.process() expect(mockProcessItem).not.toBeCalled() diff --git a/packages/backend/src/nest/local-db/local-db.service.spec.ts b/packages/backend/src/nest/local-db/local-db.service.spec.ts index 0cf4af79cf..ee9a42b8c1 100644 --- a/packages/backend/src/nest/local-db/local-db.service.spec.ts +++ b/packages/backend/src/nest/local-db/local-db.service.spec.ts @@ -4,15 +4,15 @@ import { TestModule } from '../common/test.module' import { LocalDbModule } from './local-db.module' import { LocalDbService } from './local-db.service' import { LocalDBKeys } from './local-db.types' +import { createLibp2pAddress } from '@quiet/common' describe('LocalDbService', () => { let module: TestingModule let localDbService: LocalDbService - - let peer1Address: string let peer1Stats: Record = {} - let peer2Address: string + let peer1ID: string let peer2Stats: Record = {} + let peer2ID: string beforeAll(async () => { module = await Test.createTestingModule({ @@ -20,21 +20,18 @@ describe('LocalDbService', () => { }).compile() localDbService = await module.resolve(LocalDbService) - - peer1Address = - '/dns4/mxtsfs4kzxzuisrw4tumdmycbyerqwakx37kj6om6azcjdaasifxmoqd.onion/tcp/443/wss/p2p/QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix' + peer1ID = 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix' peer1Stats = { - [peer1Address]: { - peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix', + [peer1ID]: { + peerId: peer1ID, connectionTime: 50, lastSeen: 1000, }, } - peer2Address = - '/dns4/hxr74a76b4lerhov75a6ha6yprruvow3wfu4qmmeoc6ajs7m7323lyid.onion/tcp/443/wss/p2p/QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ' + peer2ID = 'QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ' peer2Stats = { - [peer2Address]: { - peerId: 'QmZB6pVafcvAQfy5R5LxvDXvB8xcDifD39Lp3XGDM9XDuQ', + [peer2ID]: { + peerId: peer2ID, connectionTime: 500, lastSeen: 500, }, @@ -71,16 +68,28 @@ describe('LocalDbService', () => { expect(localDbService.getStatus()).toEqual('closed') }) + it('get sorted peers returns peers list if no stats in db', async () => { + const peers = [ + createLibp2pAddress( + 'zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion', + 'QmPGdGDUV1PXaJky4V53KSvFszdqEcM7KCoDpF2uFPf5w6' + ), + ] + const sortedPeers = await localDbService.getSortedPeers(peers) + expect(sortedPeers).toEqual(peers) + }) + it('get sorted peers', async () => { - const extraPeers = [ - '/dns4/zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion/tcp/443/ws/p2p/QmPGdGDUV1PXaJky4V53KSvFszdqEcM7KCoDpF2uFPf5w6', + const peers = [ + createLibp2pAddress('nqnw4kc4c77fb47lk52m5l57h4tcxceo7ymxekfn7yh5m66t4jv2olad.onion', peer2ID), + createLibp2pAddress('zl37gnntp64dhnisddftypxbt5cqx6cum65vdv6oeaffrbqmemwc52ad.onion', peer1ID), ] await localDbService.put(LocalDBKeys.PEERS, { ...peer1Stats, ...peer2Stats, }) - const sortedPeers = await localDbService.getSortedPeers(extraPeers) - expect(sortedPeers).toEqual([peer1Address, peer2Address, extraPeers[0]]) + const sortedPeers = await localDbService.getSortedPeers(peers.reverse()) + expect(sortedPeers).toEqual(peers) }) it('updates nested object', async () => { @@ -94,19 +103,19 @@ describe('LocalDbService', () => { }) const peer2StatsUpdated: NetworkStats = { - peerId: 'QmR7Qgd4tg2XrGD3kW647ZnYyazTwHQF3cqRBmSduhhusA', + peerId: peer2ID, connectionTime: 777, lastSeen: 678, } await localDbService.update(LocalDBKeys.PEERS, { - [peer2Address]: peer2StatsUpdated, + [peer2StatsUpdated.peerId]: peer2StatsUpdated, }) const updatedPeersDBdata = await localDbService.get(LocalDBKeys.PEERS) expect(updatedPeersDBdata).toEqual({ ...peer1Stats, - [peer2Address]: peer2StatsUpdated, + [peer2StatsUpdated.peerId]: peer2StatsUpdated, }) }) }) diff --git a/packages/backend/src/nest/local-db/local-db.service.ts b/packages/backend/src/nest/local-db/local-db.service.ts index 96e6402f81..f5275ff77f 100644 --- a/packages/backend/src/nest/local-db/local-db.service.ts +++ b/packages/backend/src/nest/local-db/local-db.service.ts @@ -1,7 +1,7 @@ import { Inject, Injectable } from '@nestjs/common' import { Level } from 'level' import { NetworkStats } from '@quiet/types' -import { sortPeers } from '@quiet/common' +import { filterAndSortPeers } from '@quiet/common' import { LEVEL_DB } from '../const' import { LocalDBKeys, LocalDbStatus } from './local-db.types' import Logger from '../common/logger' @@ -73,9 +73,7 @@ export class LocalDbService { public async getSortedPeers(peers: string[] = []): Promise { const peersStats = (await this.get(LocalDBKeys.PEERS)) || {} - const peersAddresses: string[] = [...new Set(Object.keys(peersStats).concat(peers))] const stats: NetworkStats[] = Object.values(peersStats) - const sortedPeers = sortPeers(peersAddresses, stats) - return sortedPeers + return filterAndSortPeers(peers, stats) } } diff --git a/packages/backend/src/nest/storage/storage.service.ts b/packages/backend/src/nest/storage/storage.service.ts index fdef474a25..ae3e39acb0 100644 --- a/packages/backend/src/nest/storage/storage.service.ts +++ b/packages/backend/src/nest/storage/storage.service.ts @@ -28,6 +28,7 @@ import { ConnectionProcessInfo, DeleteFilesFromChannelSocketPayload, FileMetadata, + InitCommunityPayload, NoCryptoEngineError, PublicChannel, PushNotificationPayload, @@ -336,9 +337,14 @@ export class StorageService extends EventEmitter { const allUsers = this.getAllUsers() const registeredUsers = this.getAllRegisteredUsers() const peers = [...new Set(await getUsersAddresses(allUsers.concat(registeredUsers)))] - console.log('updatePeersList, peers count:', peers.length) - const community = await this.localDbService.get(LocalDBKeys.COMMUNITY) - this.emit(StorageEvents.UPDATE_PEERS_LIST, { communityId: community.id, peerList: peers }) + const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY) + const sortedPeers = await this.localDbService.getSortedPeers(peers) + if (sortedPeers.length > 0) { + community.peers = sortedPeers + await this.localDbService.put(LocalDBKeys.COMMUNITY, community) + } + + this.emit(StorageEvents.UPDATE_PEERS_LIST, { communityId: community.id, peerList: sortedPeers }) } public async loadAllCertificates() { diff --git a/packages/common/src/libp2p.test.ts b/packages/common/src/libp2p.test.ts new file mode 100644 index 0000000000..5a623aadb7 --- /dev/null +++ b/packages/common/src/libp2p.test.ts @@ -0,0 +1,20 @@ +import { filterAndSortPeers } from './sortPeers' + +describe('filterValidAddresses', () => { + it('filters out invalid addresses', () => { + const valid = [ + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/443/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/80/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + ] + const addresses = [ + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/443/wss/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + ...valid, + 'invalidAddress', + '/dns4/somethingElse.onion/tcp/443/wss/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSA', + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrjad.onion/tcp/443/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbK', + '/dns4/gloao6h5plwjy4tdlze24zzgcxll6upq2ex2fmu2ohhyu4gtys4nrj.onion/tcp/443/ws/p2p/QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE', + 'QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbK', + ] + expect(filterAndSortPeers(addresses, [])).toEqual(valid) + }) +}) diff --git a/packages/common/src/libp2p.ts b/packages/common/src/libp2p.ts index b2fb70e83f..4ada2cb6b9 100644 --- a/packages/common/src/libp2p.ts +++ b/packages/common/src/libp2p.ts @@ -17,3 +17,7 @@ export const isPSKcodeValid = (psk: string): boolean => { const _psk = psk.trim() return validator.isBase64(_psk) && _psk.length === PSK_LENGTH } + +export const filterValidAddresses = (addresses: string[]) => { + return addresses.filter(add => add.match(/^\/dns4\/[a-z0-9]{56}.onion\/tcp\/(443|80)\/ws\/p2p\/[a-zA-Z0-9]{46}$/g)) +} diff --git a/packages/common/src/sortPeers.ts b/packages/common/src/sortPeers.ts index 33d98c06c7..2b92cd1ea8 100644 --- a/packages/common/src/sortPeers.ts +++ b/packages/common/src/sortPeers.ts @@ -1,14 +1,17 @@ import { type NetworkStats } from '@quiet/types' import { isDefined } from './helpers' +import { filterValidAddresses } from './libp2p' /** This is the very simple algorithm for evaluating the most wanted peers. +0. Filters out invalid peers addresses 1. It takes the peers stats list that contains statistics for every peer our node was ever connected to. 2. Two sorted arrays are created - one sorted by last seen and other by most uptime shared. 3. Arrays are merged taking one element from list one and one element from the second list. Duplicates are ommited 4. We end up with mix of last seen and most uptime descending array of peers, the it is enchanced to libp2p address. */ -export const sortPeers = (peersAddresses: string[], stats: NetworkStats[]): string[] => { +export const filterAndSortPeers = (peersAddresses: string[], stats: NetworkStats[]): string[] => { + peersAddresses = filterValidAddresses(peersAddresses) const lastSeenSorted = [...stats].sort((a, b) => { return b.lastSeen - a.lastSeen }) diff --git a/packages/desktop/package.json b/packages/desktop/package.json index 42dd73f5ff..404fdabd9d 100644 --- a/packages/desktop/package.json +++ b/packages/desktop/package.json @@ -111,7 +111,7 @@ "build:renderer:prod": "webpack --config webpack/webpack.config.renderer.prod.js", "postBuild": "node scripts/postBuild.js", "prestart": "npm run build:main", - "start": "cross-env DEBUG='backend*,quiet*,state-manager*,desktop*,utils*,libp2p:websockets:listener:backend' npm run start:renderer", + "start": "cross-env DEBUG='backend*,quiet*,state-manager*,desktop*,utils*,libp2p:websockets:listener:backend,libp2p:connection-manager:auto-dialler' npm run start:renderer", "start:main": "cross-env NODE_ENV=development electron .", "start:renderer": "cross-env NODE_ENV=development webpack-dev-server --config webpack/webpack.config.renderer.dev.js", "storybook": "export NODE_OPTIONS=--openssl-legacy-provider && start-storybook -p 6006", diff --git a/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts b/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts index 4ab007f989..f91f894082 100644 --- a/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts +++ b/packages/state-manager/src/sagas/appConnection/connection.selectors.test.ts @@ -23,11 +23,11 @@ describe('communitiesSelectors', () => { it('select peers sorted by quality', async () => { community = await factory.create['payload']>('Community', { peerList: [ - '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/wss/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', - '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/wss/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', - '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/wss/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', - '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/wss/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', - '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/wss/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', + '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/ws/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', + '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/ws/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', + '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/ws/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', + '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/ws/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', + '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/ws/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', ], }) @@ -68,11 +68,11 @@ describe('communitiesSelectors', () => { ) const expectedArray = [ - '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/wss/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', - '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/wss/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', - '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/wss/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', - '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/wss/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', - '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/wss/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', + '/dns4/f3lupwnhaqplbn4djaut5rtipwmlotlb57flfvjzgexek2yezlpjddid.onion/tcp/443/ws/p2p/Qmd35TsAvtskei8zWY3A65ifNWcY4x4SdqkQDHMkH5xPF9', + '/dns4/ubapl2lfxci5cc35oegshdsjhlt656xo6vbmztpb2ndb6ftqjjuv5myd.onion/tcp/443/ws/p2p/QmQEk68gnPTRhfBvRAPWXjjXjPydV1MvvZGGJF7W7w2Sv5', + '/dns4/rjdhzqgrl3bzu4v5cwfla3tafjtdeuzeapk34qvf7mvfhc3hih5fmnqd.onion/tcp/443/ws/p2p/QmbrDuN2oCb8G2e1ajRzpfnALGbeFDYFSoVCBhUYGLSeRD', + '/dns4/hricycxramxkn4v46b3pllnozfop6fkl7xdfk2htboe3zakhq3ephjid.onion/tcp/443/ws/p2p/QmTjQLMxJq74yXWBabh1VM8hZsRNhci4wfbVz6vFhLH5am', + '/dns4/kkzkv2u53aehfjz7mqgnt3mp2hemcr2h74vtmxpxuh4a5yna7kltsiqd.onion/tcp/443/ws/p2p/QmeiKGmfD64o3sTt6T4PVyvp1hnJ42Zki754rQgYr6fSnN', ] const peersList = connectionSelectors.peerList(store.getState()) diff --git a/packages/state-manager/src/sagas/appConnection/connection.selectors.ts b/packages/state-manager/src/sagas/appConnection/connection.selectors.ts index b0f7606fcf..2d2bff0cdb 100644 --- a/packages/state-manager/src/sagas/appConnection/connection.selectors.ts +++ b/packages/state-manager/src/sagas/appConnection/connection.selectors.ts @@ -7,7 +7,7 @@ import { peersStatsAdapter } from './connection.adapter' import { connectedPeers } from '../network/network.selectors' import { type NetworkStats } from './connection.types' import { type User } from '../users/users.types' -import { sortPeers } from '@quiet/common' +import { filterAndSortPeers } from '@quiet/common' const connectionSlice: CreatedSelectors[StoreKeys.Connection] = (state: StoreState) => state[StoreKeys.Connection] @@ -33,7 +33,7 @@ export const peerList = createSelector( stats = peersStatsAdapter.getSelectors().selectAll(reducerState.peersStats) } - return sortPeers(arr, stats) + return filterAndSortPeers(arr, stats) } ) diff --git a/packages/state-manager/src/sagas/appConnection/connection.slice.ts b/packages/state-manager/src/sagas/appConnection/connection.slice.ts index ea82144cb7..b72ec8b0a7 100644 --- a/packages/state-manager/src/sagas/appConnection/connection.slice.ts +++ b/packages/state-manager/src/sagas/appConnection/connection.slice.ts @@ -24,11 +24,7 @@ export const connectionSlice = createSlice({ }, updateNetworkData: (state, action: PayloadAction) => { const prev = state.peersStats?.entities[action.payload.peer]?.connectionTime || 0 - - console.log('prev peerStats', state.peersStats) const _peerStats = state.peersStats || peersStatsAdapter.getInitialState() - console.log('next peerStats', _peerStats) - peersStatsAdapter.upsertOne(_peerStats, { peerId: action.payload.peer, lastSeen: action.payload.lastSeen,