Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use the lowest latency peer for light protocols #1511

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions packages/core/src/lib/base_protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ export class BaseProtocol implements IBaseProtocol {
const { peer } = await selectPeerForProtocol(
this.peerStore,
[this.multicodec],
{
registrar: this.components.registrar,
connectionManager: this.components.connectionManager
},
peerId
);
return peer;
Expand Down
13 changes: 13 additions & 0 deletions packages/core/src/lib/filter/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,19 @@ class Filter extends BaseProtocol implements IReceiver {
this.options = options ?? {};
}

/**
* Creates a subscription for a given pub/sub topic and peer.
*
* @param pubSubTopic - The pub/sub topic for the subscription. If not provided,
* it defaults to the protocol's configured topic or the default topic.
* @param peerId - The ID of the peer to use for Filter subscribe. If not specified,
* the peer with the lowest latency from the connected peers is used.
*
* @returns A promise that resolves to a `Subscription` object for the given topic and peer.
*
* @throws {Error} - Throws an error if there's an issue creating the subscription.
*
*/
async createSubscription(
pubSubTopic?: string,
peerId?: PeerId
Expand Down
14 changes: 14 additions & 0 deletions packages/core/src/lib/light_push/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,20 @@ class LightPush extends BaseProtocol implements ILightPush {
return { query };
}

/**
* Sends a message using the Waku v2 Light Push protocol.
*
* @param encoder - The encoder to use for encoding the message.
* @param message - The message to be sent.
* @param opts - Optional protocol options.
* @param opts.peerId - The ID of the peer to use for LightPush send. If not specified,
* the peer with the lowest latency from the connected peers is used.
*
* @returns A promise that resolves to a `SendResult` object containing the recipients
* of the message and any potential error that occurred during the send operation.
*
* @throws {SendError} - Throws an error if there's an issue with sending the message.
*/
async send(
encoder: IEncoder,
message: IMessage,
Expand Down
24 changes: 13 additions & 11 deletions packages/core/src/lib/store/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ export interface TimeFilter {

export interface QueryOptions {
/**
* The peer to query. If undefined, a pseudo-random peer is selected from the connected Waku Store peers.
* The peer to query. If undefined, the peer with the lowest latency is selected from the connected Waku Store peers.
*/
peerId?: PeerId;
/**
Expand Down Expand Up @@ -198,18 +198,20 @@ class Store extends BaseProtocol implements IStore {
}

/**
* Do a query to a Waku Store to retrieve historical/missed messages.
* Queries a Waku Store to retrieve historical or missed messages using a generator.
*
* This is a generator, useful if you want most control on how messages
* are processed.
* This generator provides granular control over how messages are processed.
* Messages returned by the remote Waku node are ordered as follows:
* - Within a page, messages are ordered from the oldest to the most recent.
* - The direction of pages depends on the {@link QueryOptions.pageDirection}.
* @params options.peerId - The ID of the peer to use for the Store query. If not specified,
* the peer with the lowest latency from the connected peers is used.
*
* The order of the messages returned by the remote Waku node SHOULD BE
* as follows:
* - within a page, messages SHOULD be ordered from oldest to most recent
* - pages direction depends on { @link QueryOptions.pageDirection }
* @throws If not able to reach a Waku Store peer to query,
* or if an error is encountered when processing the reply,
* or if two decoders with the same content topic are passed.
* @yields An array of promises that resolve to decoded messages.
*
* @throws {Error} - Throws an error if unable to reach a Waku Store peer,
* if an error occurs while processing the reply,
* or if two decoders with the same content topic are passed.
*/
async *queryGenerator<T extends IDecodedMessage>(
decoders: IDecoder<T>[],
Expand Down
6 changes: 6 additions & 0 deletions packages/interfaces/src/misc.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { Libp2pComponents } from "./libp2p.js";
import type { IDecodedMessage } from "./message.js";

export interface IAsyncIterator<T extends IDecodedMessage> {
Expand All @@ -11,3 +12,8 @@ export type PubSubTopic = string;
export type ContentTopic = string;

export type PeerIdStr = string;

export interface PingServiceComponents {
registrar: Libp2pComponents["registrar"];
connectionManager: Libp2pComponents["connectionManager"];
}
2 changes: 1 addition & 1 deletion packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ export async function defaultLibp2p(
identify: identifyService({
agentVersion: userAgent ?? DefaultUserAgent
}),
ping: pingService(),
ping: pingService({ maxOutboundStreams: 2 }),
...pubsubService,
...options?.services
}
Expand Down
46 changes: 43 additions & 3 deletions packages/utils/src/libp2p/index.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import type { Connection } from "@libp2p/interface/connection";
import type { PeerId } from "@libp2p/interface/peer-id";
import type { Peer, PeerStore } from "@libp2p/interface/peer-store";
import type { PingServiceComponents } from "@waku/interfaces";
import debug from "debug";
import type { PingService } from "libp2p/ping";
import { pingService as libp2pPingService } from "libp2p/ping";

const log = debug("waku:libp2p-utils");

/**
* @deprecated uses fastest peer selection instead
* Returns a pseudo-random peer that supports the given protocol.
* Useful for protocols such as store and light push
*/
Expand All @@ -16,6 +19,30 @@ export function selectRandomPeer(peers: Peer[]): Peer | undefined {
return peers[index];
}

/**
*
* @param ping The libp2p ping service's ping function
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, I'd just re-use the keep alive functionality at this point in time.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#1520 :D

* @param peerStore The libp2p peer store
* @param protocols The protocols that the peer must support
* @returns The peer with the lowest latency that supports the given protocols
*/

async function selectFastestPeer(
ping: PingService["ping"],
peers: Peer[]
): Promise<Peer> {
const peerLatencies = await Promise.all(
peers.map(async (peer) => {
const latency = await ping(peer.id);
return { peer, latency };
})
);

peerLatencies.sort((a, b) => a.latency - b.latency);

return peerLatencies[0].peer;
}

/**
* Returns the list of peers that supports the given protocol.
*/
Expand All @@ -35,12 +62,23 @@ export async function getPeersForProtocol(
return peers;
}

/**
* Returns a peer that supports the given protocol.
* If peerId is provided, it will be used to retrieve the peer's connection details.
* Otherwise, the peer with the lowest latency will be selected.
* @param peerStore The libp2p peer store
* @param protocols The protocols that the peer must support
* @param ping The libp2p ping service's ping function
* @param peerId The peerId of the peer to select
* @returns The peer and protocol that was selected
*/
export async function selectPeerForProtocol(
peerStore: PeerStore,
protocols: string[],
{ registrar, connectionManager }: PingServiceComponents,
peerId?: PeerId
): Promise<{ peer: Peer; protocol: string }> {
let peer;
let peer: Peer;
if (peerId) {
peer = await peerStore.get(peerId);
if (!peer) {
Expand All @@ -50,7 +88,9 @@ export async function selectPeerForProtocol(
}
} else {
const peers = await getPeersForProtocol(peerStore, protocols);
peer = selectRandomPeer(peers);
const pingService = libp2pPingService()({ connectionManager, registrar });
const { ping } = pingService;
peer = await selectFastestPeer(ping.bind(pingService), peers);
if (!peer) {
throw new Error(
`Failed to find known peer that registers protocols: ${protocols}`
Expand Down