Skip to content

Commit

Permalink
chore: make ProcessInChunks a nest service
Browse files Browse the repository at this point in the history
  • Loading branch information
EmiM committed Nov 16, 2023
1 parent 038a579 commit 591c4e5
Show file tree
Hide file tree
Showing 8 changed files with 107 additions and 52 deletions.
17 changes: 16 additions & 1 deletion packages/backend/src/nest/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import { type PermsData } from '@quiet/types'
import { TestConfig } from '../const'
import logger from './logger'
import { Libp2pNodeParams } from '../libp2p/libp2p.types'
import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common'
import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common'
import { Libp2pService } from '../libp2p/libp2p.service'
import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity'

const log = logger('test')

Expand Down Expand Up @@ -153,6 +154,20 @@ export const getUsersAddresses = async (users: UserData[]): Promise<string[]> =>
return await Promise.all(peers)
}

export const getLibp2pAddressesFromCsrs = async (csrs: string[]): Promise<string[]> => {
const addresses = await Promise.all(
csrs.map(async csr => {
const parsedCsr = await loadCSR(csr)
const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId)
const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName)
if (!peerId || !onionAddress) return

return createLibp2pAddress(onionAddress, peerId)
})
)
return addresses.filter(isDefined)
}

/**
* Compares given numbers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { setEngine, CryptoEngine } from 'pkijs'
import { EventEmitter } from 'events'
import getPort from 'get-port'
import PeerId from 'peer-id'
import { removeFilesFromDir } from '../common/utils'
import { getLibp2pAddressesFromCsrs, removeFilesFromDir } from '../common/utils'

import {
AskForMessagesPayload,
Expand Down Expand Up @@ -606,7 +606,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
StorageEvents.REPLICATED_CSR,
async (payload: { csrs: string[]; certificates: string[]; id: string }) => {
console.log(`On ${StorageEvents.REPLICATED_CSR}`)
this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, payload.csrs)
this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs))
console.log(`Storage - ${StorageEvents.REPLICATED_CSR}`)
this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs })
this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload)
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/nest/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export const IPFS_REPO_PATCH = 'ipfsRepoPath'
export const CONFIG_OPTIONS = 'configOptions'
export const SERVER_IO_PROVIDER = 'serverIoProvider'

export const PROCESS_IN_CHUNKS_PROVIDER = 'processInChunksProvider'

export const EXPRESS_PROVIDER = 'expressProvider'

export const LEVEL_DB = 'levelDb'
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/nest/libp2p/libp2p.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Module } from '@nestjs/common'
import { SocketModule } from '../socket/socket.module'
import { Libp2pService } from './libp2p.service'
import { ProcessInChunksService } from './process-in-chunks.service'

@Module({
imports: [SocketModule],
providers: [Libp2pService],
providers: [Libp2pService, ProcessInChunksService],
exports: [Libp2pService],
})
export class Libp2pModule {}
47 changes: 44 additions & 3 deletions packages/backend/src/nest/libp2p/libp2p.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import { jest } from '@jest/globals'
import { Test, TestingModule } from '@nestjs/testing'
import { TestModule } from '../common/test.module'
import { libp2pInstanceParams } from '../common/utils'
import { createPeerId, libp2pInstanceParams } from '../common/utils'
import { Libp2pModule } from './libp2p.module'
import { LIBP2P_PSK_METADATA, Libp2pService } from './libp2p.service'
import { Libp2pNodeParams } from './libp2p.types'
import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import validator from 'validator'
import waitForExpect from 'wait-for-expect'
import { ProcessInChunksService } from './process-in-chunks.service'

describe('Libp2pService', () => {
let module: TestingModule
let libp2pService: Libp2pService
let params: Libp2pNodeParams
let processInChunks: ProcessInChunksService<string>

beforeAll(async () => {
module = await Test.createTestingModule({
imports: [TestModule, Libp2pModule],
}).compile()

libp2pService = await module.resolve(Libp2pService)
processInChunks = await module.resolve(ProcessInChunksService<string>)
params = await libp2pInstanceParams()
})

Expand All @@ -38,7 +43,7 @@ describe('Libp2pService', () => {
expect(libp2pService.libp2pInstance).toBeNull()
})

it('creates libp2p address with proper ws type (%s)', async () => {
it('creates libp2p address', async () => {
const libp2pAddress = libp2pService.createLibp2pAddress(params.localAddress, params.peerId.toString())
expect(libp2pAddress).toStrictEqual(`/dns4/${params.localAddress}.onion/tcp/80/ws/p2p/${params.peerId.toString()}`)
})
Expand All @@ -58,4 +63,40 @@ describe('Libp2pService', () => {
const expectedFullKeyString = LIBP2P_PSK_METADATA + uint8ArrayToString(generatedPskBuffer, 'base16')
expect(uint8ArrayToString(generatedKey.fullKey)).toEqual(expectedFullKeyString)
})

it(`Starts dialing peers on '${Libp2pEvents.DIAL_PEERS}' event`, async () => {
const peerId1 = await createPeerId()
const peerId2 = await createPeerId()
const addresses = [
libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()),
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
// @ts-expect-error processItem is private
const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem')
expect(libp2pService.libp2pInstance).not.toBeNull()
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)
await waitForExpect(async () => {
expect(spyOnProcessItem).toBeCalledTimes(addresses.length)
})
})

it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => {
const peerId1 = await createPeerId()
const peerId2 = await createPeerId()
const alreadyDialedAddress = libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString())
libp2pService.dialedPeers.add(alreadyDialedAddress)
const addresses = [
alreadyDialedAddress,
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
expect(libp2pService.libp2pInstance).not.toBeNull()
// @ts-expect-error processItem is private
const dialPeerSpy = jest.spyOn(processInChunks, 'processItem')
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)
await waitForExpect(async () => {
expect(dialPeerSpy).toBeCalledTimes(1)
})
})
})
60 changes: 22 additions & 38 deletions packages/backend/src/nest/libp2p/libp2p.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import { createServer } from 'it-ws'
import { DateTime } from 'luxon'
import { EventEmitter } from 'events'
import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { ProcessInChunks } from './process-in-chunks'
import { ProcessInChunksService } from './process-in-chunks.service'
import { multiaddr } from '@multiformats/multiaddr'
import { ConnectionProcessInfo, PeerId, SocketActionTypes } from '@quiet/types'
import { SERVER_IO_PROVIDER, SOCKS_PROXY_AGENT } from '../const'
Expand All @@ -33,17 +33,17 @@ export class Libp2pService extends EventEmitter {
public libp2pInstance: Libp2p | null
public connectedPeers: Map<string, number> = new Map()
public dialedPeers: Set<string> = new Set()
// public knownPeers: Set<string> = new Set()
// public processInChunksService: ProcessInChunks<string>
private readonly logger = Logger(Libp2pService.name)
constructor(
@Inject(SERVER_IO_PROVIDER) public readonly serverIoProvider: ServerIoProviderTypes,
@Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent
@Inject(SOCKS_PROXY_AGENT) public readonly socksProxyAgent: Agent,
private readonly processInChunksService: ProcessInChunksService<string>
) {
super()
}

private dialPeer = async (peerAddress: string) => {
console.log('------ dialing peer ', peerAddress)
if (this.dialedPeers.has(peerAddress)) {
console.log(`Peer ${peerAddress} already dialed, not dialing`) // TODO: remove log
return
Expand Down Expand Up @@ -134,47 +134,32 @@ export class Libp2pService extends EventEmitter {
return libp2p
}

private async addPeersToPeerBook(addresses: string[]) {
for (const address of addresses) {
const peerId = multiaddr(address).getPeerId()
console.log('PEER ID', peerId)
if (!peerId) return
console.log('PEER ID adding to peer book', peerId)
// @ts-expect-error
await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)])
console.log('ALL', await this.libp2pInstance?.peerStore.all())
}
}
// private async addPeersToPeerBook(addresses: string[]) {
// for (const address of addresses) {
// const peerId = multiaddr(address).getPeerId()
// if (!peerId) return
// // @ts-expect-error
// await this.libp2pInstance?.peerStore.addressBook.add(peerIdFromString(peerId), [multiaddr(address)])
// }
// }

private async afterCreation(peers: string[], peerId: PeerId) {
if (!this.libp2pInstance) {
this.logger.error('libp2pInstance was not created')
throw new Error('libp2pInstance was not created')
}
this.logger(`Local peerId: ${peerId.toString()}`)
this.on(Libp2pEvents.DIAL_PEERS, async (csrs: string[]) => {
console.log('DIALING PEERS')
const csrsPeersPromises = csrs.map(async csr => {
const parsedCsr = await loadCSR(csr)
const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId)
const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName)
if (!peerId || !onionAddress) return

const peerAddress = this.createLibp2pAddress(onionAddress, peerId)
if (this.dialedPeers.has(peerAddress)) {
return peerAddress
}
})

const csrsPeers = await Promise.all(csrsPeersPromises)
const addresses = csrsPeers.filter(isDefined)
const dialInChunks = new ProcessInChunks<string>(addresses, this.dialPeer)
await dialInChunks.process()
this.logger(`Local peerId: ${peerId.toString()}`)
this.on(Libp2pEvents.DIAL_PEERS, async (addresses: string[]) => {
const nonDialedAddresses = addresses.filter(peerAddress => !this.dialedPeers.has(peerAddress))
console.log('DIALING PEERS', nonDialedAddresses.length, 'addresses')
this.processInChunksService.updateData(nonDialedAddresses)
await this.processInChunksService.process()
})

this.logger(`Initializing libp2p for ${peerId.toString()}, bootstrapping with ${peers.length} peers`)
this.serverIoProvider.io.emit(SocketActionTypes.CONNECTION_PROCESS_INFO, ConnectionProcessInfo.INITIALIZING_LIBP2P)
const dialInChunks = new ProcessInChunks<string>(peers, this.dialPeer)
this.processInChunksService.init(peers, this.dialPeer)

this.libp2pInstance.addEventListener('peer:discovery', peer => {
this.logger(`${peerId.toString()} discovered ${peer.detail.id}`)
Expand All @@ -185,8 +170,8 @@ export class Libp2pService extends EventEmitter {
const localPeerId = peerId.toString()
this.logger(`${localPeerId} connected to ${remotePeerId}`)

//// Stop dialing as soon as we connect to a peer
//dialInChunks.stop()
// Stop dialing as soon as we connect to a peer
this.processInChunksService.stop()

this.connectedPeers.set(remotePeerId, DateTime.utc().valueOf())
this.logger(`${localPeerId} is connected to ${this.connectedPeers.size} peers`)
Expand Down Expand Up @@ -229,7 +214,7 @@ export class Libp2pService extends EventEmitter {
})
})

await dialInChunks.process()
await this.processInChunksService.process()

this.logger(`Initialized libp2p for peer ${peerId.toString()}`)
}
Expand All @@ -240,6 +225,5 @@ export class Libp2pService extends EventEmitter {
this.libp2pInstance = null
this.connectedPeers = new Map()
this.dialedPeers = new Set()
// this.knownPeers = new Set()
}
}
Original file line number Diff line number Diff line change
@@ -1,18 +1,27 @@
import { EventEmitter } from 'events'
import Logger from '../common/logger'

const DEFAULT_CHUNK_SIZE = 10

export class ProcessInChunks<T> {
export class ProcessInChunksService<T> extends EventEmitter {
private isActive: boolean
private data: T[]
private chunkSize: number
private processItem: (arg: T) => Promise<any>
private readonly logger = Logger(ProcessInChunks.name)
constructor(data: T[], processItem: (arg: T) => Promise<any>, chunkSize: number = DEFAULT_CHUNK_SIZE) {
private readonly logger = Logger(ProcessInChunksService.name)
constructor() {
super()
}

public init(data: T[], processItem: (arg: T) => Promise<any>, chunkSize: number = DEFAULT_CHUNK_SIZE) {
this.data = data
this.processItem = processItem
this.chunkSize = chunkSize
this.isActive = true
}

updateData(items: T[]) {
console.log('Updating data, previous', this.data, 'adding:', items)
this.data = [...new Set(this.data.concat(items))]
}

public async processOneItem() {
Expand All @@ -32,6 +41,7 @@ export class ProcessInChunks<T> {
}

public async process() {
this.isActive = true
this.logger(`Processing ${Math.min(this.chunkSize, this.data.length)} items`)
for (let i = 0; i < this.chunkSize; i++) {
// Do not wait for this promise as items should be processed simultineously
Expand Down
8 changes: 5 additions & 3 deletions packages/backend/src/nest/libp2p/process-in-chunks.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { jest, describe, it, expect } from '@jest/globals'
import { ProcessInChunks } from './process-in-chunks'
import { ProcessInChunksService } from './process-in-chunks.service'

describe('ProcessInChunks', () => {
it('processes data', async () => {
Expand All @@ -9,14 +9,16 @@ describe('ProcessInChunks', () => {
.mockRejectedValueOnce(new Error('Rejected 1'))
.mockResolvedValueOnce()
.mockRejectedValueOnce(new Error('Rejected 2'))
const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem)
const processInChunks = new ProcessInChunksService()
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
await processInChunks.process()
expect(mockProcessItem).toBeCalledTimes(4)
})

it('does not process more data if stopped', async () => {
const mockProcessItem = jest.fn(async () => {})
const processInChunks = new ProcessInChunks(['a', 'b', 'c', 'd'], mockProcessItem)
const processInChunks = new ProcessInChunksService()
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
processInChunks.stop()
await processInChunks.process()
expect(mockProcessItem).not.toBeCalled()
Expand Down

0 comments on commit 591c4e5

Please sign in to comment.