Skip to content

Commit

Permalink
Fix/1982 (#2051)
Browse files Browse the repository at this point in the history
* fix: getting list of sorted peers from localdb

* feat: dial peers after replicating csrs; update peersList in localDB

* chore: make ProcessInChunks a nest service

* fix: do not stop dial on first connection
  • Loading branch information
EmiM authored Nov 21, 2023
1 parent 8d35350 commit b51de0d
Show file tree
Hide file tree
Showing 21 changed files with 301 additions and 153 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
[unreleased]

* Fix: filter out invalid peer addresses in peer list. Update peer list in localdb.

* Fix: dial new peers on CSRs replication

[2.0.3-alpha.5]

* Fix network data proceeding when using custom protocol multiple times #1847
Expand Down
17 changes: 16 additions & 1 deletion packages/backend/src/nest/common/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,9 @@ import { type PermsData } from '@quiet/types'
import { TestConfig } from '../const'
import logger from './logger'
import { Libp2pNodeParams } from '../libp2p/libp2p.types'
import { createLibp2pAddress, createLibp2pListenAddress } from '@quiet/common'
import { createLibp2pAddress, createLibp2pListenAddress, isDefined } from '@quiet/common'
import { Libp2pService } from '../libp2p/libp2p.service'
import { CertFieldsTypes, getReqFieldValue, loadCSR } from '@quiet/identity'

const log = logger('test')

Expand Down Expand Up @@ -153,6 +154,20 @@ export const getUsersAddresses = async (users: UserData[]): Promise<string[]> =>
return await Promise.all(peers)
}

export const getLibp2pAddressesFromCsrs = async (csrs: string[]): Promise<string[]> => {
const addresses = await Promise.all(
csrs.map(async csr => {
const parsedCsr = await loadCSR(csr)
const peerId = getReqFieldValue(parsedCsr, CertFieldsTypes.peerId)
const onionAddress = getReqFieldValue(parsedCsr, CertFieldsTypes.commonName)
if (!peerId || !onionAddress) return

return createLibp2pAddress(onionAddress, peerId)
})
)
return addresses.filter(isDefined)
}

/**
* Compares given numbers
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import { RegistrationService } from '../registration/registration.service'
import { SocketModule } from '../socket/socket.module'
import { ConnectionsManagerModule } from './connections-manager.module'
import { ConnectionsManagerService } from './connections-manager.service'
import { createLibp2pAddress } from '@quiet/common'

describe('ConnectionsManagerService', () => {
let module: TestingModule
Expand Down Expand Up @@ -101,7 +102,12 @@ describe('ConnectionsManagerService', () => {
key: userIdentity.userCsr?.userKey,
CA: [communityRootCa],
},
peers: community.peerList,
peers: [
createLibp2pAddress(
'y7yczmugl2tekami7sbdz5pfaemvx7bahwthrdvcbzw5vex2crsr26qd',
'QmZoiJNAvCffeEHBjk766nLuKVdkxkAT7wfFJDPPLsbKSE'
),
],
}

await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload)
Expand All @@ -113,41 +119,6 @@ describe('ConnectionsManagerService', () => {
expect(launchCommunitySpy).toHaveBeenCalledWith(launchCommunityPayload)
})

it('launches community on init if their data exists in local db', async () => {
const launchCommunityPayload: InitCommunityPayload = {
id: community.id,
peerId: userIdentity.peerId,
hiddenService: userIdentity.hiddenService,
certs: {
// @ts-expect-error
certificate: userIdentity.userCertificate,
// @ts-expect-error
key: userIdentity.userCsr?.userKey,
CA: [communityRootCa],
},
peers: community.peerList,
}

await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload)

const peerAddress = '/dns4/test.onion/tcp/80/ws/p2p/peerid'
await localDbService.put(LocalDBKeys.PEERS, {
[peerAddress]: {
peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix',
connectionTime: 50,
lastSeen: 1000,
},
})

await connectionsManagerService.closeAllServices()

const launchCommunitySpy = jest.spyOn(connectionsManagerService, 'launchCommunity').mockResolvedValue()

await connectionsManagerService.init()

expect(launchCommunitySpy).toHaveBeenCalledWith(Object.assign(launchCommunityPayload, { peers: [peerAddress] }))
})

it('does not launch community on init if its data does not exist in local db', async () => {
await connectionsManagerService.closeAllServices()
await connectionsManagerService.init()
Expand Down Expand Up @@ -200,10 +171,10 @@ describe('ConnectionsManagerService', () => {
// await connectionsManager.init()
await localDbService.put(LocalDBKeys.COMMUNITY, launchCommunityPayload)

const peerAddress = '/dns4/test.onion/tcp/80/ws/p2p/peerid'
const peerid = 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix'
await localDbService.put(LocalDBKeys.PEERS, {
[peerAddress]: {
peerId: 'QmaEvCkpUG7GxhgvMkk8wxurfi1ehjHhSUNRksWTmXN2ix',
[peerid]: {
peerId: peerid,
connectionTime: 50,
lastSeen: 1000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { setEngine, CryptoEngine } from 'pkijs'
import { EventEmitter } from 'events'
import getPort from 'get-port'
import PeerId from 'peer-id'
import { removeFilesFromDir } from '../common/utils'
import { getLibp2pAddressesFromCsrs, removeFilesFromDir } from '../common/utils'

import {
AskForMessagesPayload,
Expand Down Expand Up @@ -157,9 +157,10 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.logger('launchCommunityFromStorage')

const community: InitCommunityPayload = await this.localDbService.get(LocalDBKeys.COMMUNITY)
this.logger('launchCommunityFromStorage - community:', community?.id)
this.logger('launchCommunityFromStorage - community peers', community?.peers)
if (community) {
const sortedPeers = await this.localDbService.getSortedPeers(community.peers)
this.logger('launchCommunityFromStorage - sorted peers', sortedPeers)
if (sortedPeers.length > 0) {
community.peers = sortedPeers
}
Expand Down Expand Up @@ -191,9 +192,9 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.logger('Closing local storage')
await this.localDbService.close()
}
if (this.libp2pService?.libp2pInstance) {
if (this.libp2pService) {
this.logger('Stopping libp2p')
await this.libp2pService.libp2pInstance.stop()
await this.libp2pService.close()
}
}

Expand All @@ -208,22 +209,24 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI

public async leaveCommunity() {
this.tor.resetHiddenServices()
this.serverIoProvider.io.close()
this.closeSocket()
await this.localDbService.purge()
await this.closeAllServices({ saveTor: true })
await this.purgeData()
await this.resetState()
await this.localDbService.open()
await this.socketService.init()
}

async resetState() {
this.communityId = ''
this.ports = { ...this.ports, libp2pHiddenService: await getPort() }
this.libp2pService.libp2pInstance = null
this.libp2pService.connectedPeers = new Map()
this.communityState = ServiceState.DEFAULT
this.registrarState = ServiceState.DEFAULT
await this.localDbService.open()
await this.socketService.init()
}

public async purgeData() {
console.log('removing data')
this.logger('Purging community data')
const dirsToRemove = fs
.readdirSync(this.quietDir)
.filter(
Expand Down Expand Up @@ -601,7 +604,8 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
this.storageService.on(
StorageEvents.REPLICATED_CSR,
async (payload: { csrs: string[]; certificates: string[]; id: string }) => {
console.log(`Storage - ${StorageEvents.REPLICATED_CSR}`)
this.logger(`Storage - ${StorageEvents.REPLICATED_CSR}`)
this.libp2pService.emit(Libp2pEvents.DIAL_PEERS, await getLibp2pAddressesFromCsrs(payload.csrs))
this.serverIoProvider.io.emit(SocketActionTypes.RESPONSE_GET_CSRS, { csrs: payload.csrs })
this.registrationService.emit(RegistrationEvents.REGISTER_USER_CERTIFICATE, payload)
}
Expand Down
2 changes: 2 additions & 0 deletions packages/backend/src/nest/const.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ export const IPFS_REPO_PATCH = 'ipfsRepoPath'
export const CONFIG_OPTIONS = 'configOptions'
export const SERVER_IO_PROVIDER = 'serverIoProvider'

export const PROCESS_IN_CHUNKS_PROVIDER = 'processInChunksProvider'

export const EXPRESS_PROVIDER = 'expressProvider'

export const LEVEL_DB = 'levelDb'
Expand Down
3 changes: 2 additions & 1 deletion packages/backend/src/nest/libp2p/libp2p.module.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
import { Module } from '@nestjs/common'
import { SocketModule } from '../socket/socket.module'
import { Libp2pService } from './libp2p.service'
import { ProcessInChunksService } from './process-in-chunks.service'

@Module({
imports: [SocketModule],
providers: [Libp2pService],
providers: [Libp2pService, ProcessInChunksService],
exports: [Libp2pService],
})
export class Libp2pModule {}
51 changes: 46 additions & 5 deletions packages/backend/src/nest/libp2p/libp2p.service.spec.ts
Original file line number Diff line number Diff line change
@@ -1,23 +1,28 @@
import { jest } from '@jest/globals'
import { Test, TestingModule } from '@nestjs/testing'
import { TestModule } from '../common/test.module'
import { libp2pInstanceParams } from '../common/utils'
import { createPeerId, libp2pInstanceParams } from '../common/utils'
import { Libp2pModule } from './libp2p.module'
import { LIBP2P_PSK_METADATA, Libp2pService } from './libp2p.service'
import { Libp2pNodeParams } from './libp2p.types'
import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import validator from 'validator'
import waitForExpect from 'wait-for-expect'
import { ProcessInChunksService } from './process-in-chunks.service'

describe('Libp2pService', () => {
let module: TestingModule
let libp2pService: Libp2pService
let params: Libp2pNodeParams
let processInChunks: ProcessInChunksService<string>

beforeAll(async () => {
module = await Test.createTestingModule({
imports: [TestModule, Libp2pModule],
}).compile()

libp2pService = await module.resolve(Libp2pService)
processInChunks = await module.resolve(ProcessInChunksService<string>)
params = await libp2pInstanceParams()
})

Expand All @@ -32,13 +37,13 @@ describe('Libp2pService', () => {
expect(libp2pService?.libp2pInstance?.peerId).toBe(params.peerId)
})

it('destory instance libp2p', async () => {
it('close libp2p service', async () => {
await libp2pService.createInstance(params)
await libp2pService.destroyInstance()
await libp2pService.close()
expect(libp2pService.libp2pInstance).toBeNull()
})

it('creates libp2p address with proper ws type (%s)', async () => {
it('creates libp2p address', async () => {
const libp2pAddress = libp2pService.createLibp2pAddress(params.localAddress, params.peerId.toString())
expect(libp2pAddress).toStrictEqual(`/dns4/${params.localAddress}.onion/tcp/80/ws/p2p/${params.peerId.toString()}`)
})
Expand All @@ -58,4 +63,40 @@ describe('Libp2pService', () => {
const expectedFullKeyString = LIBP2P_PSK_METADATA + uint8ArrayToString(generatedPskBuffer, 'base16')
expect(uint8ArrayToString(generatedKey.fullKey)).toEqual(expectedFullKeyString)
})

it(`Starts dialing peers on '${Libp2pEvents.DIAL_PEERS}' event`, async () => {
const peerId1 = await createPeerId()
const peerId2 = await createPeerId()
const addresses = [
libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString()),
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
// @ts-expect-error processItem is private
const spyOnProcessItem = jest.spyOn(processInChunks, 'processItem')
expect(libp2pService.libp2pInstance).not.toBeNull()
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)
await waitForExpect(async () => {
expect(spyOnProcessItem).toBeCalledTimes(addresses.length)
})
})

it(`Do not dial peer on '${Libp2pEvents.DIAL_PEERS}' event if peer was already dialed`, async () => {
const peerId1 = await createPeerId()
const peerId2 = await createPeerId()
const alreadyDialedAddress = libp2pService.createLibp2pAddress('onionAddress1.onion', peerId1.toString())
libp2pService.dialedPeers.add(alreadyDialedAddress)
const addresses = [
alreadyDialedAddress,
libp2pService.createLibp2pAddress('onionAddress2.onion', peerId2.toString()),
]
await libp2pService.createInstance(params)
expect(libp2pService.libp2pInstance).not.toBeNull()
// @ts-expect-error processItem is private
const dialPeerSpy = jest.spyOn(processInChunks, 'processItem')
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)
await waitForExpect(async () => {
expect(dialPeerSpy).toBeCalledTimes(1)
})
})
})
Loading

0 comments on commit b51de0d

Please sign in to comment.