From 919aeac81fe56c1aed8ebe8415e2d7e81853fe40 Mon Sep 17 00:00:00 2001 From: Danish Arora Date: Wed, 9 Oct 2024 15:25:12 +0530 Subject: [PATCH] feat: store peer usage for protocols in metadata --- packages/core/src/lib/connection_manager.ts | 2 +- packages/interfaces/src/connection_manager.ts | 8 +- packages/sdk/src/protocols/peer_manager.ts | 90 +++++++++++++++---- 3 files changed, 79 insertions(+), 21 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index 8fc61f4d54..24d0298608 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -34,7 +34,7 @@ export class ConnectionManager private static instances = new Map(); private keepAliveManager: KeepAliveManager; private options: ConnectionManagerOptions; - private libp2p: Libp2p; + public libp2p: Libp2p; private dialAttemptsForPeer: Map = new Map(); private dialErrorsForPeer: Map = new Map(); diff --git a/packages/interfaces/src/connection_manager.ts b/packages/interfaces/src/connection_manager.ts index a8a6591fdc..fcbf79010f 100644 --- a/packages/interfaces/src/connection_manager.ts +++ b/packages/interfaces/src/connection_manager.ts @@ -1,4 +1,9 @@ -import type { Peer, PeerId, TypedEventEmitter } from "@libp2p/interface"; +import type { + Libp2p, + Peer, + PeerId, + TypedEventEmitter +} from "@libp2p/interface"; import { PubsubTopic } from "./misc.js"; @@ -67,4 +72,5 @@ export interface IConnectionManager dropConnection(peerId: PeerId): Promise; getPeersByDiscovery(): Promise; stop(): void; + libp2p: Libp2p; } diff --git a/packages/sdk/src/protocols/peer_manager.ts b/packages/sdk/src/protocols/peer_manager.ts index dcdc024f1b..a6c33e4a56 100644 --- a/packages/sdk/src/protocols/peer_manager.ts +++ b/packages/sdk/src/protocols/peer_manager.ts @@ -1,12 +1,14 @@ import { Peer, PeerId } from "@libp2p/interface"; +import { utf8ToBytes } from "@noble/hashes/utils"; import { ConnectionManager, getHealthManager } from "@waku/core"; import { BaseProtocol } from "@waku/core/lib/base_protocol"; import { IHealthManager } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { Mutex } from "async-mutex"; +const METADATA_KEY = "usedByProtocol"; + export class PeerManager { - private peers: Map = new Map(); private healthManager: IHealthManager; private readMutex = new Mutex(); @@ -26,48 +28,58 @@ export class PeerManager { return this.writeLockHolder; } - public getPeers(): Peer[] { - return Array.from(this.peers.values()); + public async getPeers(): Promise { + const allPeers = await this.connectionManager.libp2p.peerStore.all(); + return allPeers.filter((peer) => this.isPeerUsedByProtocol(peer)); } public async addPeer(peer: Peer): Promise { return this.writeMutex.runExclusive(async () => { this.writeLockHolder = `addPeer: ${peer.id.toString()}`; await this.connectionManager.attemptDial(peer.id); - this.peers.set(peer.id.toString(), peer); + await this._addPeer(peer); this.log.info(`Added and dialed peer: ${peer.id.toString()}`); - this.healthManager.updateProtocolHealth( - this.core.multicodec, - this.peers.size - ); + const peerCount = await this.getPeerCount(); + this.healthManager.updateProtocolHealth(this.core.multicodec, peerCount); this.writeLockHolder = null; }); } public async removePeer(peerId: PeerId): Promise { - return this.writeMutex.runExclusive(() => { + return this.writeMutex.runExclusive(async () => { this.writeLockHolder = `removePeer: ${peerId.toString()}`; - this.peers.delete(peerId.toString()); - this.log.info(`Removed peer: ${peerId.toString()}`); - this.healthManager.updateProtocolHealth( - this.core.multicodec, - this.peers.size - ); + const peer = await this.connectionManager.libp2p.peerStore.get(peerId); + if (peer) { + await this._removePeer(peer); + this.log.info(`Removed peer: ${peerId.toString()}`); + const peerCount = await this.getPeerCount(); + this.healthManager.updateProtocolHealth( + this.core.multicodec, + peerCount + ); + } this.writeLockHolder = null; }); } public async getPeerCount(): Promise { - return this.readMutex.runExclusive(() => this.peers.size); + return this.readMutex.runExclusive(async () => { + const peers = await this.getPeers(); + return peers.length; + }); } public async hasPeers(): Promise { - return this.readMutex.runExclusive(() => this.peers.size > 0); + return this.readMutex.runExclusive(async () => { + const peerCount = await this.getPeerCount(); + return peerCount > 0; + }); } public async removeExcessPeers(excessPeers: number): Promise { this.log.info(`Removing ${excessPeers} excess peer(s)`); - const peersToRemove = Array.from(this.peers.values()).slice(0, excessPeers); + const peers = await this.getPeers(); + const peersToRemove = peers.slice(0, excessPeers); for (const peer of peersToRemove) { await this.removePeer(peer.id); } @@ -94,8 +106,9 @@ export class PeerManager { const connectedPeers = await this.core.getPeers(); return this.readMutex.runExclusive(async () => { + const currentPeers = await this.getPeers(); const newPeers = connectedPeers - .filter((peer) => !this.peers.has(peer.id.toString())) + .filter((peer) => !currentPeers.some((p) => p.id.equals(peer.id))) .slice(0, numPeers); return newPeers; @@ -110,4 +123,43 @@ export class PeerManager { } return addedPeers; } + + private async _addPeer(peer: Peer): Promise { + const connectedPeers = this.connectionManager.libp2p.getPeers(); + if (connectedPeers.some((p) => p.equals(peer.id))) { + return; + } + const _peer = await this.connectionManager.libp2p.peerStore.get(peer.id); + if (!_peer) { + return; + } + this.updatePeerMetadataWithUsageStatus(peer, true); + } + + private async _removePeer(peer: Peer): Promise { + this.updatePeerMetadataWithUsageStatus(peer, false); + } + + private updatePeerMetadataWithUsageStatus( + peer: Peer, + isCurrentlyUsed: boolean + ): void { + if (isCurrentlyUsed) { + peer.metadata.set( + METADATA_KEY, + utf8ToBytes(this.core.multicodec.toString()) + ); + } else { + peer.metadata.delete(METADATA_KEY); + } + } + + private isPeerUsedByProtocol(peer: Peer): boolean { + const usedByProtocol = peer.metadata.get(METADATA_KEY); + return usedByProtocol + ? utf8ToBytes(this.core.multicodec.toString()).every( + (byte, index) => byte === usedByProtocol[index] + ) + : false; + } }