From 591c4e5383ad7a2984968b7df283063f8ac5706e Mon Sep 17 00:00:00 2001 From: Emi Date: Thu, 16 Nov 2023 16:00:10 +0100 Subject: [PATCH] chore: make ProcessInChunks a nest service --- packages/backend/src/nest/common/utils.ts | 17 +++++- .../connections-manager.service.ts | 4 +- packages/backend/src/nest/const.ts | 2 + .../backend/src/nest/libp2p/libp2p.module.ts | 3 +- .../src/nest/libp2p/libp2p.service.spec.ts | 47 ++++++++++++++- .../backend/src/nest/libp2p/libp2p.service.ts | 60 +++++++------------ ...chunks.ts => process-in-chunks.service.ts} | 18 ++++-- .../src/nest/libp2p/process-in-chunks.spec.ts | 8 ++- 8 files changed, 107 insertions(+), 52 deletions(-) rename packages/backend/src/nest/libp2p/{process-in-chunks.ts => process-in-chunks.service.ts} (73%) diff --git a/packages/backend/src/nest/common/utils.ts b/packages/backend/src/nest/common/utils.ts index eeff2ad29c..6b1b445486 100644 --- a/packages/backend/src/nest/common/utils.ts +++ b/packages/backend/src/nest/common/utils.ts @@ -11,8 +11,9 @@ import { type PermsData } from '@quiet/types' import { TestConfig } from '../const' import logger from './logger' import { Libp2pNodeParams } from '../libp2p/libp2p.types' -import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common' +import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common' import { Libp2pService } from '../libp2p/libp2p.service' +import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity' const log = logger('test') @@ -153,6 +154,20 @@ export const getUsersAddresses = async (users: UserData[]): Promise => return await Promise.all(peers) } +export const getLibp2pAddressesFromCsrs = async (csrs: string[]): Promise => { + const addresses = await Promise.all( + csrs.map(async csr => { + const parsedCsr = await loadCSR(csr) + const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId) + const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName) + if (!peerId || !onionAddress) return + + return createLibp2pAddress(onionAddress, peerId) + }) + ) + return addresses.filter(isDefined) +} + /** * Compares given numbers * diff --git a/packages/backend/src/nest/connections-manager/connections-manager.service.ts b/packages/backend/src/nest/connections-manager/connections-manager.service.ts index 8555f8ad2c..bacdbc85b3 100644 --- a/packages/backend/src/nest/connections-manager/connections-manager.service.ts +++ b/packages/backend/src/nest/connections-manager/connections-manager.service.ts @@ -8,7 +8,7 @@ import { setEngine, CryptoEngine } from 'pkijs' import { EventEmitter } from 'events' import getPort from 'get-port' import PeerId from 'peer-id' -import { removeFilesFromDir } from '../common/utils' +import { getLibp2pAddressesFromCsrs, removeFilesFromDir } from '../common/utils' import { AskForMessagesPayload, @@ -606,7 +606,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI StorageEvents.REPLICATED_CSR, async (payload: { csrs: string[]; certificates: string[]; id: string }) => { console.log(`On ${StorageEvents.REPLICATED_CSR}`) - this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, payload.csrs) + this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs)) console.log(`Storage - ${StorageEvents.REPLICATED_CSR}`) this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs }) this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload) diff --git a/packages/backend/src/nest/const.ts b/packages/backend/src/nest/const.ts index f4c86c3a35..80cd62656c 100644 --- a/packages/backend/src/nest/const.ts +++ b/packages/backend/src/nest/const.ts @@ -28,6 +28,8 @@ export const IPFS_REPO_PATCH = 'ipfsRepoPath' export const CONFIG_OPTIONS = 'configOptions' export const SERVER_IO_PROVIDER = 'serverIoProvider' +export const PROCESS_IN_CHUNKS_PROVIDER = 'processInChunksProvider' + export const EXPRESS_PROVIDER = 'expressProvider' export const LEVEL_DB = 'levelDb' diff --git a/packages/backend/src/nest/libp2p/libp2p.module.ts b/packages/backend/src/nest/libp2p/libp2p.module.ts index bdc20e965c..d75edc799e 100644 --- a/packages/backend/src/nest/libp2p/libp2p.module.ts +++ b/packages/backend/src/nest/libp2p/libp2p.module.ts @@ -1,10 +1,11 @@ import { Module } from '@nestjs/common' import { SocketModule } from '../socket/socket.module' import { Libp2pService } from './libp2p.service' +import { ProcessInChunksService } from './process-in-chunks.service' @Module({ imports: [SocketModule], - providers: [Libp2pService], + providers: [Libp2pService, ProcessInChunksService], exports: [Libp2pService], }) export class Libp2pModule {} diff --git a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts index 85f346d9b0..13ebb5f1ed 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.spec.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.spec.ts @@ -1,16 +1,20 @@ +import { jest } from '@jest/globals' import { Test, TestingModule } from '@nestjs/testing' import { TestModule } from '../common/test.module' -import { libp2pInstanceParams } from '../common/utils' +import { createPeerId, libp2pInstanceParams } from '../common/utils' import { Libp2pModule } from './libp2p.module' import { LIBP2P_PSK_METADATA, Libp2pService } from './libp2p.service' -import { Libp2pNodeParams } from './libp2p.types' +import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' import { toString as uint8ArrayToString } from 'uint8arrays/to-string' import validator from 'validator' +import waitForExpect from 'wait-for-expect' +import { ProcessInChunksService } from './process-in-chunks.service' describe('Libp2pService', () => { let module: TestingModule let libp2pService: Libp2pService let params: Libp2pNodeParams + let processInChunks: ProcessInChunksService beforeAll(async () => { module = await Test.createTestingModule({ @@ -18,6 +22,7 @@ describe('Libp2pService', () => { }).compile() libp2pService = await module.resolve(Libp2pService) + processInChunks = await module.resolve(ProcessInChunksService) params = await libp2pInstanceParams() }) @@ -38,7 +43,7 @@ describe('Libp2pService', () => { expect(libp2pService.libp2pInstance).toBeNull() }) - it('creates libp2p address with proper ws type (%s)', async () => { + it('creates libp2p address', async () => { const libp2pAddress = libp2pService.createLibp2pAddress(params.localAddress, params.peerId.toString()) expect(libp2pAddress).toStrictEqual(`/dns4/${params.localAddress}.onion/tcp/80/ws/p2p/${params.peerId.toString()}`) }) @@ -58,4 +63,40 @@ describe('Libp2pService', () => { const expectedFullKeyString = LIBP2P_PSK_METADATA + uint8ArrayToString(generatedPskBuffer, 'base16') expect(uint8ArrayToString(generatedKey.fullKey)).toEqual(expectedFullKeyString) }) + + it(`Starts dialing peers on '${Libp2pEvents.DIAL_PEERS}' event`, async () => { + const peerId1 = await createPeerId() + const peerId2 = await createPeerId() + const addresses = [ + libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()), + libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), + ] + await libp2pService.createInstance(params) + // @ts-expect-error processItem is private + const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem') + expect(libp2pService.libp2pInstance).not.toBeNull() + libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { + expect(spyOnProcessItem).toBeCalledTimes(addresses.length) + }) + }) + + it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => { + const peerId1 = await createPeerId() + const peerId2 = await createPeerId() + const alreadyDialedAddress = libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()) + libp2pService.dialedPeers.add(alreadyDialedAddress) + const addresses = [ + alreadyDialedAddress, + libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()), + ] + await libp2pService.createInstance(params) + expect(libp2pService.libp2pInstance).not.toBeNull() + // @ts-expect-error processItem is private + const dialPeerSpy = jest.spyOn(processInChunks, 'processItem') + libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses) + await waitForExpect(async () => { + expect(dialPeerSpy).toBeCalledTimes(1) + }) + }) }) diff --git a/packages/backend/src/nest/libp2p/libp2p.service.ts b/packages/backend/src/nest/libp2p/libp2p.service.ts index e03e30b6bf..80c7a6eaeb 100644 --- a/packages/backend/src/nest/libp2p/libp2p.service.ts +++ b/packages/backend/src/nest/libp2p/libp2p.service.ts @@ -9,7 +9,7 @@ import { createServer } from 'it-ws' import { DateTime } from 'luxon' import { EventEmitter } from 'events' import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types' -import { ProcessInChunks } from './process-in-chunks' +import { ProcessInChunksService } from './process-in-chunks.service' import { multiaddr } from '@multiformats/multiaddr' import { ConnectionProcessInfo, PeerId, SocketActionTypes } from '@quiet/types' import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const' @@ -33,17 +33,17 @@ export class Libp2pService extends EventEmitter { public libp2pInstance: Libp2p | null public connectedPeers: Map = new Map() public dialedPeers: Set = new Set() - // public knownPeers: Set = new Set() + // public processInChunksService: ProcessInChunks private readonly logger = Logger(Libp2pService.name) constructor( @Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes, - @Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent + @Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent, + private readonly processInChunksService: ProcessInChunksService ) { super() } private dialPeer = async (peerAddress: string) => { - console.log('------ dialing peer ', peerAddress) if (this.dialedPeers.has(peerAddress)) { console.log(`Peer ${peerAddress} already dialed, not dialing`) // TODO: remove log return @@ -134,47 +134,32 @@ export class Libp2pService extends EventEmitter { return libp2p } - private async addPeersToPeerBook(addresses: string[]) { - for (const address of addresses) { - const peerId = multiaddr(address).getPeerId() - console.log('PEER ID', peerId) - if (!peerId) return - console.log('PEER ID adding to peer book', peerId) - // @ts-expect-error - await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)]) - console.log('ALL', await this.libp2pInstance?.peerStore.all()) - } - } + // private async addPeersToPeerBook(addresses: string[]) { + // for (const address of addresses) { + // const peerId = multiaddr(address).getPeerId() + // if (!peerId) return + // // @ts-expect-error + // await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)]) + // } + // } private async afterCreation(peers: string[], peerId: PeerId) { if (!this.libp2pInstance) { this.logger.error('libp2pInstance was not created') throw new Error('libp2pInstance was not created') } - this.logger(`Local peerId: ${peerId.toString()}`) - this.on(Libp2pEvents.DIAL_PEERS, async (csrs: string[]) => { - console.log('DIALING PEERS') - const csrsPeersPromises = csrs.map(async csr => { - const parsedCsr = await loadCSR(csr) - const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId) - const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName) - if (!peerId || !onionAddress) return - - const peerAddress = this.createLibp2pAddress(onionAddress, peerId) - if (this.dialedPeers.has(peerAddress)) { - return peerAddress - } - }) - const csrsPeers = await Promise.all(csrsPeersPromises) - const addresses = csrsPeers.filter(isDefined) - const dialInChunks = new ProcessInChunks(addresses, this.dialPeer) - await dialInChunks.process() + this.logger(`Local peerId: ${peerId.toString()}`) + this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => { + const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress)) + console.log('DIALING PEERS', nonDialedAddresses.length, 'addresses') + this.processInChunksService.updateData(nonDialedAddresses) + await this.processInChunksService.process() }) this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`) this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P) - const dialInChunks = new ProcessInChunks(peers, this.dialPeer) + this.processInChunksService.init(peers, this.dialPeer) this.libp2pInstance.addEventListener('peer:discovery', peer => { this.logger(`${peerId.toString()} discovered ${peer.detail.id}`) @@ -185,8 +170,8 @@ export class Libp2pService extends EventEmitter { const localPeerId = peerId.toString() this.logger(`${localPeerId} connected to ${remotePeerId}`) - //// Stop dialing as soon as we connect to a peer - //dialInChunks.stop() + // Stop dialing as soon as we connect to a peer + this.processInChunksService.stop() this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf()) this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`) @@ -229,7 +214,7 @@ export class Libp2pService extends EventEmitter { }) }) - await dialInChunks.process() + await this.processInChunksService.process() this.logger(`Initialized libp2p for peer ${peerId.toString()}`) } @@ -240,6 +225,5 @@ export class Libp2pService extends EventEmitter { this.libp2pInstance = null this.connectedPeers = new Map() this.dialedPeers = new Set() - // this.knownPeers = new Set() } } diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.ts b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts similarity index 73% rename from packages/backend/src/nest/libp2p/process-in-chunks.ts rename to packages/backend/src/nest/libp2p/process-in-chunks.service.ts index b3e11caf61..714ffa7c0c 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.service.ts @@ -1,18 +1,27 @@ +import { EventEmitter } from 'events' import Logger from '../common/logger' const DEFAULT_CHUNK_SIZE = 10 -export class ProcessInChunks { +export class ProcessInChunksService extends EventEmitter { private isActive: boolean private data: T[] private chunkSize: number private processItem: (arg: T) => Promise - private readonly logger = Logger(ProcessInChunks.name) - constructor(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { + private readonly logger = Logger(ProcessInChunksService.name) + constructor() { + super() + } + + public init(data: T[], processItem: (arg: T) => Promise, chunkSize: number = DEFAULT_CHUNK_SIZE) { this.data = data this.processItem = processItem this.chunkSize = chunkSize - this.isActive = true + } + + updateData(items: T[]) { + console.log('Updating data, previous', this.data, 'adding:', items) + this.data = [...new Set(this.data.concat(items))] } public async processOneItem() { @@ -32,6 +41,7 @@ export class ProcessInChunks { } public async process() { + this.isActive = true this.logger(`Processing ${Math.min(this.chunkSize, this.data.length)} items`) for (let i = 0; i < this.chunkSize; i++) { // Do not wait for this promise as items should be processed simultineously diff --git a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts index abd1770f67..343b278d8f 100644 --- a/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts +++ b/packages/backend/src/nest/libp2p/process-in-chunks.spec.ts @@ -1,5 +1,5 @@ import { jest, describe, it, expect } from '@jest/globals' -import { ProcessInChunks } from './process-in-chunks' +import { ProcessInChunksService } from './process-in-chunks.service' describe('ProcessInChunks', () => { it('processes data', async () => { @@ -9,14 +9,16 @@ describe('ProcessInChunks', () => { .mockRejectedValueOnce(new Error('Rejected 1')) .mockResolvedValueOnce() .mockRejectedValueOnce(new Error('Rejected 2')) - const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem) + const processInChunks = new ProcessInChunksService() + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) await processInChunks.process() expect(mockProcessItem).toBeCalledTimes(4) }) it('does not process more data if stopped', async () => { const mockProcessItem = jest.fn(async () => {}) - const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem) + const processInChunks = new ProcessInChunksService() + processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem) processInChunks.stop() await processInChunks.process() expect(mockProcessItem).not.toBeCalled()