Skip to content

Commit

Permalink
Merge branch 'feature/2295' into chore/2312
Browse files Browse the repository at this point in the history
  • Loading branch information
EmiM committed Apr 16, 2024
2 parents 7c0898a + f39f6b2 commit 4c66c72
Show file tree
Hide file tree
Showing 23 changed files with 210 additions and 96 deletions.
33 changes: 33 additions & 0 deletions packages/backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@
"events": "^3.2.0",
"express": "^4.17.1",
"fetch-retry": "^6.0.0",
"fastq": "^1.17.1",
"get-port": "^5.1.1",
"go-ipfs": "npm:[email protected]",
"http-server": "^0.12.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import waitForExpect from 'wait-for-expect'
import { Libp2pEvents } from '../libp2p/libp2p.types'
import { sleep } from '../common/sleep'
import { createLibp2pAddress } from '@quiet/common'
import { lib } from 'crypto-js'

jest.setTimeout(100_000)

Expand Down Expand Up @@ -215,8 +216,8 @@ describe('Connections manager', () => {
await sleep(5000)
// It looks LibP2P dials peers initially when it's started and
// then IPFS service dials peers again when started, thus
// peersCount * 2
expect(spyOnDial).toHaveBeenCalledTimes(peersCount * 2)
// peersCount-1 * 2 because we don't dial ourself (the first peer in the list)
expect(spyOnDial).toHaveBeenCalledTimes((peersCount - 1) * 2)
// Temporary fix for hanging test - websocketOverTor doesn't have abortController
await sleep(5000)
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,7 +554,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
agent: this.socksProxyAgent,
localAddress: this.libp2pService.createLibp2pAddress(onionAddress, peerId.toString()),
targetPort: this.ports.libp2pHiddenService,
peers: peers ?? [],
peers: peers ? peers.slice(1) : [],
psk: Libp2pService.generateLibp2pPSK(community.psk).fullKey,
}
await this.libp2pService.createInstance(params)
Expand Down
8 changes: 5 additions & 3 deletions packages/backend/src/nest/libp2p/libp2p.service.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { Libp2pEvents, Libp2pNodeParams } from './libp2p.types'
import { toString as uint8ArrayToString } from 'uint8arrays/to-string'
import validator from 'validator'
import waitForExpect from 'wait-for-expect'
import { ProcessInChunksService } from './process-in-chunks.service'
import { DEFAULT_NUM_TRIES, ProcessInChunksService } from './process-in-chunks.service'

describe('Libp2pService', () => {
let module: TestingModule
Expand Down Expand Up @@ -93,10 +93,12 @@ describe('Libp2pService', () => {
await libp2pService.createInstance(params)
expect(libp2pService.libp2pInstance).not.toBeNull()
// @ts-expect-error processItem is private
const dialPeerSpy = jest.spyOn(processInChunks, 'processItem')
const processItemSpy = jest.spyOn(processInChunks, 'processItem')
const dialSpy = jest.spyOn(libp2pService.libp2pInstance!, 'dial')
libp2pService.emit(Libp2pEvents.DIAL_PEERS, addresses)
await waitForExpect(async () => {
expect(dialPeerSpy).toBeCalledTimes(1)
expect(processItemSpy).toBeCalledTimes(2 * DEFAULT_NUM_TRIES)
expect(dialSpy).toBeCalledTimes(1)
})
})
})
71 changes: 50 additions & 21 deletions packages/backend/src/nest/libp2p/process-in-chunks.service.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,85 @@
import { EventEmitter } from 'events'
import fastq from 'fastq'
import type { queue, done } from 'fastq'

import Logger from '../common/logger'

const DEFAULT_CHUNK_SIZE = 10
export const DEFAULT_NUM_TRIES = 2

type ProcessTask<T> = {
data: T
tries: number
}

export class ProcessInChunksService<T> extends EventEmitter {
private isActive: boolean
private data: T[]
private data: Set<T> = new Set()
private chunkSize: number
private taskQueue: queue<ProcessTask<T>>
private processItem: (arg: T) => Promise<any>
private readonly logger = Logger(ProcessInChunksService.name)
constructor() {
super()
}

public init(data: T[] = [], processItem: (arg: T) => Promise<any>, chunkSize: number = DEFAULT_CHUNK_SIZE) {
this.data = data
public init(data: T[], processItem: (arg: T) => Promise<any>, chunkSize: number = DEFAULT_CHUNK_SIZE) {
this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`)
this.processItem = processItem
this.chunkSize = chunkSize
this.taskQueue = fastq(this, this.processOneItem, this.chunkSize)
this.updateData(data)
this.addToTaskQueue()
}

updateData(items: T[]) {
public updateData(items: T[]) {
this.logger(`Updating data with ${items.length} items`)
this.data = [...new Set(this.data.concat(items))]
this.taskQueue.pause()
items.forEach(item => this.data.add(item))
this.addToTaskQueue()
}

public async processOneItem() {
const toProcess = this.data.shift()
if (toProcess) {
try {
await this.processItem(toProcess)
} catch (e) {
this.logger(`Processing ${toProcess} failed, message:`, e.message)
} finally {
process.nextTick(async () => {
await this.processOneItem()
})
private addToTaskQueue() {
this.logger(`Adding ${this.data.size} items to the task queue`)
for (const item of this.data) {
if (item) {
this.logger(`Adding data ${item} to the task queue`)
this.data.delete(item)
try {
this.taskQueue.push({ data: item, tries: 0 } as ProcessTask<T>)
} catch (e) {
this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e)
this.data.add(item)
}
}
}
}

public async process() {
this.logger(`Processing ${this.data.length} items`)
for (let i = 0; i < this.chunkSize; i++) {
// Do not wait for this promise as items should be processed simultineously
void this.processOneItem()
public async processOneItem(task: ProcessTask<T>) {
try {
this.logger(`Processing task with data ${task.data}`)
await this.processItem(task.data)
} catch (e) {
this.logger.error(`Processing task with data ${task.data} failed`, e)
if (task.tries + 1 < DEFAULT_NUM_TRIES) {
this.logger(`Will try to re-attempt task with data ${task.data}`)
this.taskQueue.push({ ...task, tries: task.tries + 1 })
}
} finally {
this.logger(`Done attempting to process task with data ${task.data}`)
}
}

public async process() {
this.logger(`Processing ${this.taskQueue.length} items`)
this.taskQueue.resume()
}

public stop() {
if (this.isActive) {
this.logger('Stopping initial dial')
this.isActive = false
this.taskQueue.pause()
}
}
}
6 changes: 3 additions & 3 deletions packages/backend/src/nest/libp2p/process-in-chunks.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ describe('ProcessInChunks', () => {
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem)
await processInChunks.process()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(4)
expect(mockProcessItem).toBeCalledTimes(6)
})
})

Expand All @@ -43,7 +43,7 @@ describe('ProcessInChunks', () => {
processInChunks.updateData(['e', 'f'])
await processInChunks.process()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(4)
expect(mockProcessItem).toBeCalledTimes(5)
})
})

Expand All @@ -60,7 +60,7 @@ describe('ProcessInChunks', () => {
processInChunks.init(['a', 'b', 'c', 'd'], mockProcessItem, chunkSize)
await processInChunks.process()
await waitForExpect(() => {
expect(mockProcessItem).toBeCalledTimes(4)
expect(mockProcessItem).toBeCalledTimes(2)
})
})

Expand Down
20 changes: 17 additions & 3 deletions packages/backend/src/nest/local-db/local-db.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,31 @@ export class LocalDbService {
}
}

public async getSortedPeers(peers: string[] = []): Promise<string[]> {
public async getSortedPeers(
peers?: string[] | undefined,
includeLocalPeerAddress: boolean = true
): Promise<string[]> {
if (!peers) {
const currentCommunity = await this.getCurrentCommunity()
if (!currentCommunity) {
throw new Error('No peers were provided and no community was found to extract peers from')
}
peers = currentCommunity.peerList
if (!peers) {
throw new Error('No peers provided and no peers found on current stored community')
}
}

const peersStats = (await this.get(LocalDBKeys.PEERS)) || {}
const stats: NetworkStats[] = Object.values(peersStats)
const network = await this.getNetworkInfo()

if (network) {
const localPeerAddress = createLibp2pAddress(network.hiddenService.onionAddress, network.peerId.id)
this.logger('Local peer', localPeerAddress)
return filterAndSortPeers(peers, stats, localPeerAddress)
return filterAndSortPeers(peers, stats, localPeerAddress, includeLocalPeerAddress)
} else {
return filterAndSortPeers(peers, stats)
return filterAndSortPeers(peers, stats, undefined, includeLocalPeerAddress)
}
}

Expand Down
1 change: 1 addition & 0 deletions packages/backend/src/nest/socket/socket.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
type MessagesLoadedPayload,
type NetworkInfo,
CreateNetworkPayload,
CommunityOwnership,
} from '@quiet/types'
import EventEmitter from 'events'
import { CONFIG_OPTIONS, SERVER_IO_PROVIDER } from '../const'
Expand Down
35 changes: 21 additions & 14 deletions packages/common/src/sortPeers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { type NetworkStats } from '@quiet/types'
import { isDefined } from './helpers'
import { filterValidAddresses } from './libp2p'

/**
Expand All @@ -14,17 +13,19 @@ This is the very simple algorithm for evaluating the most wanted peers.
export const filterAndSortPeers = (
peersAddresses: string[],
stats: NetworkStats[],
localPeerAddress?: string
localPeerAddress?: string,
includeLocalPeerAddress: boolean = true
): string[] => {
peersAddresses = filterValidAddresses(peersAddresses)
const currentlyConnected = [...stats].filter(peer => peer.connectionTime === 0)
const lastSeenSorted = [...stats].sort((a, b) => {
return b.lastSeen - a.lastSeen
})
const mostUptimeSharedSorted = [...stats].sort((a, b) => {
return b.connectionTime - a.connectionTime
})

const mostWantedPeers: NetworkStats[] = []
const mostWantedPeers: NetworkStats[] = currentlyConnected

for (let i = 0; i < stats.length; i++) {
const peerOne = lastSeenSorted[i]
Expand All @@ -39,22 +40,28 @@ export const filterAndSortPeers = (
}
}

const peerList = mostWantedPeers.map(peerId => {
return peersAddresses.find(peerAddress => {
const peerSet: Set<string> = new Set()
if (includeLocalPeerAddress && localPeerAddress) {
peerSet.add(localPeerAddress)
}

mostWantedPeers.forEach(peer => {
const found = peersAddresses.find(peerAddress => {
const id = peerAddress.split('/')[7]
if (id === peerId.peerId) {
if (id === peer.peerId) {
peersAddresses.splice(peersAddresses.indexOf(peerAddress), 1)
return true
}
})
if (found && found !== '') {
peerSet.add(found)
}
})
peersAddresses.forEach(peerAddress => {
if (!peerSet.has(peerAddress)) {
peerSet.add(peerAddress)
}
})

return [
...new Set([
localPeerAddress, // Set local peer as first
...peerList.concat(peersAddresses),
]),
]
.filter(address => address !== null && address !== '')
.filter(isDefined)
return [...peerSet]
}
9 changes: 9 additions & 0 deletions packages/common/src/tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,12 @@ export function getValidInvitationUrlTestData<T extends InvitationDataV1 | Invit
data: data,
}
}

// export const getValidInvitationUrlTestData = (data: InvitationData) => {
// return {
// shareUrl: () => composeInvitationShareUrl(data),
// deepUrl: () => composeInvitationDeepUrl(data),
// code: () => composeInvitationShareUrl(data).split(QUIET_JOIN_PAGE + '#')[1],
// data: data,
// }
// }
Loading

0 comments on commit 4c66c72

Please sign in to comment.