diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index f6fcd4663c..73db2a4469 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -1,6 +1,5 @@ import type { Libp2p } from "@libp2p/interface"; import type { Stream } from "@libp2p/interface/connection"; -import type { PeerId } from "@libp2p/interface/peer-id"; import { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { IBaseProtocol, @@ -9,10 +8,10 @@ import type { ShardInfo } from "@waku/interfaces"; import { shardInfoToPubsubTopics } from "@waku/utils"; -import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; +import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p"; import { DefaultPubsubTopic } from "./constants.js"; -import { filterPeers } from "./filterPeers.js"; +import { filterPeersByDiscovery } from "./filterPeers.js"; import { StreamManager } from "./stream_manager.js"; /** @@ -60,15 +59,6 @@ export class BaseProtocol implements IBaseProtocol { return getPeersForProtocol(this.peerStore, [this.multicodec]); } - protected async getPeer(peerId?: PeerId): Promise { - const { peer } = await selectPeerForProtocol( - this.peerStore, - [this.multicodec], - peerId - ); - return peer; - } - /** * Retrieves a list of peers based on the specified criteria. * @@ -93,8 +83,19 @@ export class BaseProtocol implements IBaseProtocol { this.multicodec ]); - // Filter the peers based on the specified criteria - return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); + // Filter the peers based on discovery & number of peers requested + const filteredPeers = await filterPeersByDiscovery( + allPeersForProtocol, + numPeers, + maxBootstrapPeers + ); + + const sortedFilteredPeers = await sortPeersByLatency( + this.peerStore, + filteredPeers + ); + + return sortedFilteredPeers; } initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] { diff --git a/packages/core/src/lib/filterPeers.ts b/packages/core/src/lib/filterPeers.ts index 298c9f15bc..380cdb30cf 100644 --- a/packages/core/src/lib/filterPeers.ts +++ b/packages/core/src/lib/filterPeers.ts @@ -2,14 +2,17 @@ import { Peer } from "@libp2p/interface/peer-store"; import { Tags } from "@waku/interfaces"; /** - * Retrieves a list of peers based on the specified criteria. + * Retrieves a list of peers based on the specified criteria: + * 1. If numPeers is 0, return all peers + * 2. Bootstrap peers are prioritized + * 3. Non-bootstrap peers are randomly selected to fill up to numPeers * * @param peers - The list of peers to filter from. * @param numPeers - The total number of peers to retrieve. If 0, all peers are returned. * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. * @returns A Promise that resolves to an array of peers based on the specified criteria. */ -export async function filterPeers( +export async function filterPeersByDiscovery( peers: Peer[], numPeers: number, maxBootstrapPeers: number diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index b3c6892ab0..1be215df2a 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -65,7 +65,10 @@ class Metadata extends BaseProtocol { async query(peerId: PeerId): Promise { const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo); - const peer = await this.getPeer(peerId); + const peer = await this.peerStore.get(peerId); + if (!peer) { + throw new Error(`Peer ${peerId.toString()} not found`); + } const stream = await this.getStream(peer); diff --git a/packages/interfaces/src/peer_exchange.ts b/packages/interfaces/src/peer_exchange.ts index 8183d6fe13..b194c9c6a9 100644 --- a/packages/interfaces/src/peer_exchange.ts +++ b/packages/interfaces/src/peer_exchange.ts @@ -11,7 +11,7 @@ export interface IPeerExchange extends IBaseProtocol { export interface PeerExchangeQueryParams { numPeers: number; - peerId?: PeerId; + peerId: PeerId; } export interface PeerExchangeResponse { diff --git a/packages/peer-exchange/src/waku_peer_exchange.ts b/packages/peer-exchange/src/waku_peer_exchange.ts index 21ac4aab4d..6ab7c5fc5e 100644 --- a/packages/peer-exchange/src/waku_peer_exchange.ts +++ b/packages/peer-exchange/src/waku_peer_exchange.ts @@ -42,7 +42,10 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange { numPeers: BigInt(numPeers) }); - const peer = await this.getPeer(params.peerId); + const peer = await this.peerStore.get(params.peerId); + if (!peer) { + throw new Error(`Peer ${params.peerId.toString()} not found`); + } const stream = await this.getStream(peer); diff --git a/packages/tests/tests/peer_exchange.node.spec.ts b/packages/tests/tests/peer_exchange.node.spec.ts index 657d2c1781..2ca440a7d8 100644 --- a/packages/tests/tests/peer_exchange.node.spec.ts +++ b/packages/tests/tests/peer_exchange.node.spec.ts @@ -62,6 +62,7 @@ describe("Peer Exchange", () => { let peerInfos: PeerInfo[] = []; while (peerInfos.length <= 0) { peerInfos = (await peerExchange.query({ + peerId: nwaku1PeerId, numPeers: numPeersToRequest })) as PeerInfo[]; await delay(3000); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index b5db2b73c2..10d780914e 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,5 +1,4 @@ import type { Connection } from "@libp2p/interface/connection"; -import type { PeerId } from "@libp2p/interface/peer-id"; import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import { bytesToUtf8 } from "../bytes/index.js"; @@ -16,35 +15,39 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined { } /** - * Returns the peer with the lowest latency. + * Function to sort peers by latency from lowest to highest * @param peerStore - The Libp2p PeerStore * @param peers - The list of peers to choose from - * @returns The peer with the lowest latency, or undefined if no peer could be reached + * @returns Sorted array of peers by latency */ -export async function selectLowestLatencyPeer( +export async function sortPeersByLatency( peerStore: PeerStore, peers: Peer[] -): Promise { - if (peers.length === 0) return; +): Promise { + if (peers.length === 0) return []; const results = await Promise.all( peers.map(async (peer) => { - const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping"); - if (!pingBytes) return { peer, ping: Infinity }; - - const ping = Number(bytesToUtf8(pingBytes)) ?? Infinity; - return { peer, ping }; + try { + const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping"); + if (!pingBytes) return null; + + const ping = Number(bytesToUtf8(pingBytes)); + return isNaN(ping) ? null : { peer, ping }; + } catch (error) { + return null; + } }) ); - const lowestLatencyResult = results.sort((a, b) => a.ping - b.ping)[0]; - if (!lowestLatencyResult) { - return undefined; - } + // filter out null values + const validResults = results.filter( + (result): result is { peer: Peer; ping: number } => result !== null + ); - return lowestLatencyResult.ping !== Infinity - ? lowestLatencyResult.peer - : undefined; + return validResults + .sort((a, b) => a.ping - b.ping) + .map((result) => result.peer); } /** @@ -66,53 +69,6 @@ export async function getPeersForProtocol( return peers; } -/** - * Returns a peer that supports the given protocol. - * If peerId is provided, the peer with that id is returned. - * Otherwise, the peer with the lowest latency is returned. - * If no peer is found from the above criteria, a random peer is returned. - */ -export async function selectPeerForProtocol( - peerStore: PeerStore, - protocols: string[], - peerId?: PeerId -): Promise<{ peer: Peer; protocol: string }> { - let peer: Peer | undefined; - if (peerId) { - peer = await peerStore.get(peerId); - if (!peer) { - throw new Error( - `Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}` - ); - } - } else { - const peers = await getPeersForProtocol(peerStore, protocols); - peer = await selectLowestLatencyPeer(peerStore, peers); - if (!peer) { - peer = selectRandomPeer(peers); - if (!peer) - throw new Error( - `Failed to find known peer that registers protocols: ${protocols}` - ); - } - } - - let protocol; - for (const codec of protocols) { - if (peer.protocols.includes(codec)) { - protocol = codec; - // Do not break as we want to keep the last value - } - } - if (!protocol) { - throw new Error( - `Peer does not register required protocols (${peer.id.toString()}): ${protocols}` - ); - } - - return { peer, protocol }; -} - export function selectConnection( connections: Connection[] ): Connection | undefined {