Skip to content

Commit

Permalink
remove: unnecessary function
Browse files Browse the repository at this point in the history
  • Loading branch information
danisharora099 committed Dec 18, 2023
1 parent 566e02e commit 012a4ad
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 84 deletions.
29 changes: 15 additions & 14 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { Libp2p } from "@libp2p/interface";
import type { Stream } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type {
IBaseProtocol,
Expand All @@ -9,10 +8,10 @@ import type {
ShardInfo
} from "@waku/interfaces";
import { shardInfoToPubsubTopics } from "@waku/utils";
import { getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p";
import { getPeersForProtocol, sortPeersByLatency } from "@waku/utils/libp2p";

import { DefaultPubsubTopic } from "./constants.js";
import { filterPeers } from "./filterPeers.js";
import { filterPeersByDiscovery } from "./filterPeers.js";
import { StreamManager } from "./stream_manager.js";

/**
Expand Down Expand Up @@ -60,15 +59,6 @@ export class BaseProtocol implements IBaseProtocol {
return getPeersForProtocol(this.peerStore, [this.multicodec]);
}

protected async getPeer(peerId?: PeerId): Promise<Peer> {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
peerId
);
return peer;
}

/**
* Retrieves a list of peers based on the specified criteria.
*
Expand All @@ -93,8 +83,19 @@ export class BaseProtocol implements IBaseProtocol {
this.multicodec
]);

// Filter the peers based on the specified criteria
return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers);
// Filter the peers based on discovery & number of peers requested
const filteredPeers = await filterPeersByDiscovery(
allPeersForProtocol,
numPeers,
maxBootstrapPeers
);

const sortedFilteredPeers = await sortPeersByLatency(
this.peerStore,
filteredPeers
);

return sortedFilteredPeers;
}

initializePubsubTopic(shardInfo?: ShardInfo): PubsubTopic[] {
Expand Down
7 changes: 5 additions & 2 deletions packages/core/src/lib/filterPeers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ import { Peer } from "@libp2p/interface/peer-store";
import { Tags } from "@waku/interfaces";

/**
* Retrieves a list of peers based on the specified criteria.
* Retrieves a list of peers based on the specified criteria:
* 1. If numPeers is 0, return all peers
* 2. Bootstrap peers are prioritized
* 3. Non-bootstrap peers are randomly selected to fill up to numPeers
*
* @param peers - The list of peers to filter from.
* @param numPeers - The total number of peers to retrieve. If 0, all peers are returned.
* @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve.
* @returns A Promise that resolves to an array of peers based on the specified criteria.
*/
export async function filterPeers(
export async function filterPeersByDiscovery(
peers: Peer[],
numPeers: number,
maxBootstrapPeers: number
Expand Down
5 changes: 4 additions & 1 deletion packages/core/src/lib/metadata/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,10 @@ class Metadata extends BaseProtocol {
async query(peerId: PeerId): Promise<ShardInfo> {
const request = proto_metadata.WakuMetadataRequest.encode(this.shardInfo);

const peer = await this.getPeer(peerId);
const peer = await this.peerStore.get(peerId);
if (!peer) {
throw new Error(`Peer ${peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
2 changes: 1 addition & 1 deletion packages/interfaces/src/peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ export interface IPeerExchange extends IBaseProtocol {

export interface PeerExchangeQueryParams {
numPeers: number;
peerId?: PeerId;
peerId: PeerId;
}

export interface PeerExchangeResponse {
Expand Down
5 changes: 4 additions & 1 deletion packages/peer-exchange/src/waku_peer_exchange.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,10 @@ export class WakuPeerExchange extends BaseProtocol implements IPeerExchange {
numPeers: BigInt(numPeers)
});

const peer = await this.getPeer(params.peerId);
const peer = await this.peerStore.get(params.peerId);
if (!peer) {
throw new Error(`Peer ${params.peerId.toString()} not found`);
}

const stream = await this.getStream(peer);

Expand Down
1 change: 1 addition & 0 deletions packages/tests/tests/peer_exchange.node.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ describe("Peer Exchange", () => {
let peerInfos: PeerInfo[] = [];
while (peerInfos.length <= 0) {
peerInfos = (await peerExchange.query({
peerId: nwaku1PeerId,
numPeers: numPeersToRequest
})) as PeerInfo[];
await delay(3000);
Expand Down
86 changes: 21 additions & 65 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import type { Connection } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { Peer, PeerStore } from "@libp2p/interface/peer-store";

import { bytesToUtf8 } from "../bytes/index.js";
Expand All @@ -16,35 +15,39 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined {
}

/**
* Returns the peer with the lowest latency.
* Function to sort peers by latency from lowest to highest
* @param peerStore - The Libp2p PeerStore
* @param peers - The list of peers to choose from
* @returns The peer with the lowest latency, or undefined if no peer could be reached
* @returns Sorted array of peers by latency
*/
export async function selectLowestLatencyPeer(
export async function sortPeersByLatency(
peerStore: PeerStore,
peers: Peer[]
): Promise<Peer | undefined> {
if (peers.length === 0) return;
): Promise<Peer[]> {
if (peers.length === 0) return [];

const results = await Promise.all(
peers.map(async (peer) => {
const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping");
if (!pingBytes) return { peer, ping: Infinity };

const ping = Number(bytesToUtf8(pingBytes)) ?? Infinity;
return { peer, ping };
try {
const pingBytes = (await peerStore.get(peer.id)).metadata.get("ping");
if (!pingBytes) return null;

const ping = Number(bytesToUtf8(pingBytes));
return isNaN(ping) ? null : { peer, ping };
} catch (error) {
return null;
}
})
);

const lowestLatencyResult = results.sort((a, b) => a.ping - b.ping)[0];
if (!lowestLatencyResult) {
return undefined;
}
// filter out null values
const validResults = results.filter(
(result): result is { peer: Peer; ping: number } => result !== null
);

return lowestLatencyResult.ping !== Infinity
? lowestLatencyResult.peer
: undefined;
return validResults
.sort((a, b) => a.ping - b.ping)
.map((result) => result.peer);
}

/**
Expand All @@ -66,53 +69,6 @@ export async function getPeersForProtocol(
return peers;
}

/**
* Returns a peer that supports the given protocol.
* If peerId is provided, the peer with that id is returned.
* Otherwise, the peer with the lowest latency is returned.
* If no peer is found from the above criteria, a random peer is returned.
*/
export async function selectPeerForProtocol(
peerStore: PeerStore,
protocols: string[],
peerId?: PeerId
): Promise<{ peer: Peer; protocol: string }> {
let peer: Peer | undefined;
if (peerId) {
peer = await peerStore.get(peerId);
if (!peer) {
throw new Error(
`Failed to retrieve connection details for provided peer in peer store: ${peerId.toString()}`
);
}
} else {
const peers = await getPeersForProtocol(peerStore, protocols);
peer = await selectLowestLatencyPeer(peerStore, peers);
if (!peer) {
peer = selectRandomPeer(peers);
if (!peer)
throw new Error(
`Failed to find known peer that registers protocols: ${protocols}`
);
}
}

let protocol;
for (const codec of protocols) {
if (peer.protocols.includes(codec)) {
protocol = codec;
// Do not break as we want to keep the last value
}
}
if (!protocol) {
throw new Error(
`Peer does not register required protocols (${peer.id.toString()}): ${protocols}`
);
}

return { peer, protocol };
}

export function selectConnection(
connections: Connection[]
): Connection | undefined {
Expand Down

0 comments on commit 012a4ad

Please sign in to comment.