Skip to content

Commit

Permalink
Better peer sorting and updated initial diallng
Browse files Browse the repository at this point in the history
  • Loading branch information
islathehut committed Apr 11, 2024
1 parent 4ce042e commit 39ddf35
Show file tree
Hide file tree
Showing 6 changed files with 125 additions and 38 deletions.
33 changes: 33 additions & 0 deletions packages/backend/package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/backend/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@
"dotenv": "8.2.0",
"events": "^3.2.0",
"express": "^4.17.1",
"fastq": "^1.17.1",
"get-port": "^5.1.1",
"go-ipfs": "npm:[email protected]",
"http-server": "^0.12.3",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -517,7 +517,7 @@ export class ConnectionsManagerService extends EventEmitter implements OnModuleI
agent: this.socksProxyAgent,
localAddress: this.libp2pService.createLibp2pAddress(onionAddress, peerId.toString()),
targetPort: this.ports.libp2pHiddenService,
peers: peers ?? [],
peers: peers ? peers.slice(1) : [],
psk: Libp2pService.generateLibp2pPSK(community.psk).fullKey,
}
await this.libp2pService.createInstance(params)
Expand Down
72 changes: 52 additions & 20 deletions packages/backend/src/nest/libp2p/process-in-chunks.service.ts
Original file line number Diff line number Diff line change
@@ -1,56 +1,88 @@
import { EventEmitter } from 'events'
import fastq from 'fastq'
import type { queue, done } from 'fastq'

import Logger from '../common/logger'

const DEFAULT_CHUNK_SIZE = 10
const DEFAULT_NUM_TRIES = 2

type ProcessTask<T> = {
data: T
tries: number
}

export class ProcessInChunksService<T> extends EventEmitter {
private isActive: boolean
private data: T[]
private data: Set<T> = new Set()
private chunkSize: number
private taskQueue: queue<ProcessTask<T>>
private processItem: (arg: T) => Promise<any>
private readonly logger = Logger(ProcessInChunksService.name)
constructor() {
super()
}

public init(data: T[], processItem: (arg: T) => Promise<any>, chunkSize: number = DEFAULT_CHUNK_SIZE) {
this.data = data
this.logger(`Initializing process-in-chunks.service with peers ${JSON.stringify(data, null, 2)}`)
this.processItem = processItem
this.chunkSize = chunkSize
this.taskQueue = fastq(this, this.processOneItem, this.chunkSize)
this.updateData(data)
this.addToTaskQueue()
}

updateData(items: T[]) {
public updateData(items: T[]) {
this.logger(`Updating data with ${items.length} items`)
this.data = [...new Set(this.data.concat(items))]
this.taskQueue.pause()
items.forEach(item => this.data.add(item))
this.addToTaskQueue()
}

public async processOneItem() {
const toProcess = this.data.shift()
if (toProcess) {
try {
await this.processItem(toProcess)
} catch (e) {
this.logger(`Processing ${toProcess} failed, message:`, e.message)
} finally {
process.nextTick(async () => {
await this.processOneItem()
})
private addToTaskQueue() {
const maxChunkSize = Math.min(this.data.size, this.chunkSize)
let count = 0
this.logger(`Adding ${maxChunkSize} items to the task queue`)
for (const item of this.data) {
if (item && count < maxChunkSize) {
this.logger(`Adding data ${item} to the task queue`)
this.data.delete(item)
try {
this.taskQueue.push({ data: item, tries: 0 } as ProcessTask<T>)
count++
} catch (e) {
this.logger.error(`Error occurred while adding new task for item ${item} to the queue`, e)
this.data.add(item)
}
}
}
}

public async process() {
this.logger(`Processing ${this.data.length} items`)
for (let i = 0; i < this.chunkSize; i++) {
// Do not wait for this promise as items should be processed simultineously
void this.processOneItem()
public async processOneItem(task: ProcessTask<T>) {
try {
this.logger(`Processing task with data ${task.data}`)
await this.processItem(task.data)
} catch (e) {
this.logger(`Processing task with data ${task.data} failed, message:`, e.message)
if (task.tries + 1 < DEFAULT_NUM_TRIES) {
this.logger(`Will try to re-attempt task with data ${task.data}`)
this.taskQueue.push({ ...task, tries: task.tries + 1 })
}
} finally {
this.logger(`Done attempting to process task with data ${task.data}`)
}
}

public async process() {
this.logger(`Processing ${this.taskQueue.length} items`)
this.taskQueue.resume()
}

public stop() {
if (this.isActive) {
this.logger('Stopping initial dial')
this.isActive = false
this.taskQueue.pause()
}
}
}
20 changes: 17 additions & 3 deletions packages/backend/src/nest/local-db/local-db.service.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,17 +95,31 @@ export class LocalDbService {
}
}

public async getSortedPeers(peers: string[] = []): Promise<string[]> {
public async getSortedPeers(
peers?: string[] | undefined,
includeLocalPeerAddress: boolean = true
): Promise<string[]> {
if (!peers) {
const currentCommunity = await this.getCurrentCommunity()
if (!currentCommunity) {
throw new Error('No peers were provided and no community was found to extract peers from')
}
peers = currentCommunity.peerList
if (!peers) {
throw new Error('No peers provided and no peers found on current stored community')
}
}

const peersStats = (await this.get(LocalDBKeys.PEERS)) || {}
const stats: NetworkStats[] = Object.values(peersStats)
const network = await this.getNetworkInfo()

if (network) {
const localPeerAddress = createLibp2pAddress(network.hiddenService.onionAddress, network.peerId.id)
this.logger('Local peer', localPeerAddress)
return filterAndSortPeers(peers, stats, localPeerAddress)
return filterAndSortPeers(peers, stats, localPeerAddress, includeLocalPeerAddress)
} else {
return filterAndSortPeers(peers, stats)
return filterAndSortPeers(peers, stats, undefined, includeLocalPeerAddress)
}
}

Expand Down
35 changes: 21 additions & 14 deletions packages/common/src/sortPeers.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import { type NetworkStats } from '@quiet/types'
import { isDefined } from './helpers'
import { filterValidAddresses } from './libp2p'

/**
Expand All @@ -14,17 +13,19 @@ This is the very simple algorithm for evaluating the most wanted peers.
export const filterAndSortPeers = (
peersAddresses: string[],
stats: NetworkStats[],
localPeerAddress?: string
localPeerAddress?: string,
includeLocalPeerAddress: boolean = true
): string[] => {
peersAddresses = filterValidAddresses(peersAddresses)
const currentlyConnected = [...stats].filter(peer => peer.connectionTime === 0)
const lastSeenSorted = [...stats].sort((a, b) => {
return b.lastSeen - a.lastSeen
})
const mostUptimeSharedSorted = [...stats].sort((a, b) => {
return b.connectionTime - a.connectionTime
})

const mostWantedPeers: NetworkStats[] = []
const mostWantedPeers: NetworkStats[] = currentlyConnected

for (let i = 0; i < stats.length; i++) {
const peerOne = lastSeenSorted[i]
Expand All @@ -39,22 +40,28 @@ export const filterAndSortPeers = (
}
}

const peerList = mostWantedPeers.map(peerId => {
return peersAddresses.find(peerAddress => {
const peerSet: Set<string> = new Set()
if (includeLocalPeerAddress && localPeerAddress) {
peerSet.add(localPeerAddress)
}

mostWantedPeers.forEach(peer => {
const found = peersAddresses.find(peerAddress => {
const id = peerAddress.split('/')[7]
if (id === peerId.peerId) {
if (id === peer.peerId) {
peersAddresses.splice(peersAddresses.indexOf(peerAddress), 1)
return true
}
})
if (found && found !== '') {
peerSet.add(found)
}
})
peersAddresses.forEach(peerAddress => {
if (!peerSet.has(peerAddress)) {
peerSet.add(peerAddress)
}
})

return [
...new Set([
localPeerAddress, // Set local peer as first
...peerList.concat(peersAddresses),
]),
]
.filter(address => address !== null && address !== '')
.filter(isDefined)
return [...peerSet]
}

0 comments on commit 39ddf35

Please sign in to comment.