Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: replace waitForRemotePeers() with waku.waitForPeer() method #2161

Merged
merged 30 commits into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
4dbb1b6
fix comment of default number of peers
weboko Oct 1, 2024
315d82f
export default number of peers from base protocol sdk
weboko Oct 1, 2024
23ded77
rename to light_push, move class to separate file
weboko Oct 1, 2024
240b1b6
move waitForRemotePeer to sdk package
weboko Oct 1, 2024
ad56eae
add todo to move waitForGossipSubPeerInMesh into @waku/relay
weboko Oct 1, 2024
7580eeb
clean up waitForRemotePeer, split metadata await from event and optim…
weboko Oct 1, 2024
800eae2
simplify and rename ILightPush interface
weboko Oct 1, 2024
6bcf283
use only connected peers in light push based on connections instead o…
weboko Oct 2, 2024
9742d68
improve readability of result processing in light push
weboko Oct 2, 2024
0f09905
fix check & update tests
weboko Oct 2, 2024
3dda8ad
address tests, add new test cases, fix racing condition in StreamManager
weboko Oct 3, 2024
06498f8
use libp2p.getPeers
weboko Oct 3, 2024
eaee635
feat: confirm metadata and protocols needed in waitForRemotePeer
weboko Oct 3, 2024
3928603
merge with master
weboko Oct 4, 2024
21056fd
rely on passed protocols and fallback to mounted
weboko Oct 4, 2024
ac212c1
add I prefix to Waku interface
weboko Oct 4, 2024
4b94686
implement waku.connect method
weboko Oct 4, 2024
690e630
add docs to IWaku interface
weboko Oct 4, 2024
ba78ed2
remove export and usage of waitForRemotePeer
weboko Oct 4, 2024
9267c70
move wait for remote peer related to Realy out of @waku/sdk
weboko Oct 4, 2024
440f908
change tests to use new API
weboko Oct 4, 2024
4f194ab
fix linting
weboko Oct 4, 2024
cd6acee
update size limit
weboko Oct 4, 2024
64b8f3a
rename .connect to .waitForPeer
weboko Oct 4, 2024
d1e2ba1
export waitForRemotePeer and mark as deprecated
weboko Oct 4, 2024
0c2e7e9
merge with master
weboko Oct 5, 2024
8937a56
feat: add mocha tests to @waku/sdk and cover waitForRemotePeer (#2163)
weboko Oct 8, 2024
027a1d1
feat: expose peerId and protocols from WakuNode (#2166)
weboko Oct 8, 2024
448e659
rename to waitForPeers
weboko Oct 8, 2024
ec8ccb4
up test
weboko Oct 8, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 76 additions & 5 deletions packages/interfaces/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,36 +10,107 @@ import { Protocols } from "./protocols.js";
import type { IRelay } from "./relay.js";
import type { IStore } from "./store.js";

export interface Waku {
export interface IWaku {
libp2p: Libp2p;
relay?: IRelay;
store?: IStore;
filter?: IFilter;
lightPush?: ILightPush;

health: IHealthManager;
connectionManager: IConnectionManager;

/**
* Dials to the provided peer
*
* @param {PeerId | MultiaddrInput} peer information to use for dialing
* @param {Protocols[]} [protocols] array of Waku protocols to be used for dialing. If no provided - will be derived from mounted protocols.
*
* @returns {Promise<Stream>} `Promise` that will resolve to a `Stream` to a dialed peer
*
* @example
* ```typescript
* await waku.dial(remotePeerId, [Protocols.LightPush]);
*
* waku.isConnected() === true;
* ```
*/
dial(peer: PeerId | MultiaddrInput, protocols?: Protocols[]): Promise<Stream>;

/**
* Starts all services and components related to functionality of Waku node.
*
* @returns {Promise<boolean>} `Promise` that will resolve when started.
*
* @example
* ```typescript
* await waku.start();
*
* waku.isStarted() === true;
* ```
*/
start(): Promise<void>;

/**
* Stops all recurring processes and services that are needed for functionality of Waku node.
*
* @returns {Promise<boolean>} `Promise` that resolves when stopped.
*
* @example
* ```typescript
* await waku.stop();
*
* waku.isStarted === false;
* ```
*/
stop(): Promise<void>;

/**
* Resolves when Waku successfully gains connection to a remote peers that fits provided requirements.
weboko marked this conversation as resolved.
Show resolved Hide resolved
* Must be used after attempting to connect to nodes, using {@link IWaku.dial} or
* if was bootstrapped by using {@link IPeerExchange} or {@link DnsDiscoveryComponents}.
*
* @param {Protocols[]} [protocols] Protocols that need to be enabled by remote peers
* @param {number} [timeoutMs] Timeout value in milliseconds after which promise rejects
*
* @returns {Promise<void>} `Promise` that **resolves** if all desired protocols are fulfilled by
* at least one remote peer, **rejects** if the timeoutMs is reached
* @throws If passing a protocol that is not mounted or Waku node is not started
*
* @example
* ```typescript
* try {
* // let's wait for at least one LightPush node and timeout in 1 second
* await waku.connect([Protocols.LightPush], 1000);
* } catch(e) {
* waku.isConnected() === false;
* console.error("Failed to connect due to", e);
* }
*
* waku.isConnected() === true;
* ```
*/
connect(protocols?: Protocols[], timeoutMs?: number): Promise<void>;
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@waku-org/js-waku looking for your feedback on naming

named it connect as when it is used it looks like await waku.connect(); which seems convenient

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agreed to use .waitForPeer instead

@danisharora099 ping for your input

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

waitForConnectivity perhaps would be more verbose. Indifferent from now, we can improve this with external feedback


/**
* @returns {boolean} `true` if the node was started and `false` otherwise
*/
isStarted(): boolean;

/**
* @returns {boolean} `true` if the node has working connection and `false` otherwise
*/
isConnected(): boolean;

health: IHealthManager;
}

export interface LightNode extends Waku {
export interface LightNode extends IWaku {
relay: undefined;
store: IStore;
filter: IFilter;
lightPush: ILightPush;
}

export interface RelayNode extends Waku {
export interface RelayNode extends IWaku {
relay: IRelay;
store: undefined;
filter: undefined;
Expand Down
87 changes: 64 additions & 23 deletions packages/sdk/src/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { IdentifyResult } from "@libp2p/interface";
import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core";
import type { IRelay, Libp2p, Waku } from "@waku/interfaces";
import type { IRelay, IWaku, Libp2p } from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import { pEvent } from "p-event";
Expand All @@ -27,19 +27,20 @@ const log = new Logger("wait-for-remote-peer");
* @default Wait for remote peers with protocols enabled locally and no time out is applied.
*/
export async function waitForRemotePeer(
waku: Waku,
waku: IWaku,
protocols?: Protocols[],
timeoutMs?: number
): Promise<void> {
protocols = protocols ?? getEnabledProtocols(waku);
// if no protocols or empty array passed - try to derive from mounted
protocols = protocols?.length ? protocols : getEnabledProtocols(waku);
const connections = waku.libp2p.getConnections();

if (!waku.isStarted()) {
throw Error("Waku node is not started");
}

if (connections.length > 0 && !protocols.includes(Protocols.Relay)) {
const success = await waitForMetadata(waku.libp2p);
const success = await waitForMetadata(waku, protocols);

if (success) {
return;
Expand Down Expand Up @@ -135,33 +136,55 @@ async function waitForConnectedPeer(
/**
* Waits for the metadata from the remote peer.
*/
async function waitForMetadata(libp2p: Libp2p): Promise<boolean> {
const connections = libp2p.getConnections();
const metadataService = libp2p.services.metadata;

if (!connections.length || !metadataService) {
async function waitForMetadata(
waku: IWaku,
protocols: Protocols[]
): Promise<boolean> {
const connectedPeers = waku.libp2p.getPeers();
const metadataService = waku.libp2p.services.metadata;
const enabledCodes = mapProtocolsToCodecs(protocols);

if (!connectedPeers.length || !metadataService) {
log.info(
`Skipping waitForMetadata due to missing connections:${connections.length} or metadataService:${!!metadataService}`
`Skipping waitForMetadata due to missing connections:${connectedPeers.length} or metadataService:${!!metadataService}`
);
return false;
}

try {
// confirm at least with one connected peer
await Promise.any(
connections
.map((c) => c.remotePeer)
.map((peer) => metadataService.confirmOrAttemptHandshake(peer))
for (const peerId of connectedPeers) {
const confirmedAllCodecs = Array.from(enabledCodes.values()).every(
(v) => v
);

return true;
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
log.error("Connection closed. Some peers can be on different shard.");
if (confirmedAllCodecs) {
return true;
}

log.error(`Error waiting for metadata: ${e}`);
try {
const peer = await waku.libp2p.peerStore.get(peerId);
const hasSomeCodes = peer.protocols.some((c) => enabledCodes.has(c));

if (hasSomeCodes) {
const response =
await metadataService.confirmOrAttemptHandshake(peerId);

if (!response.error) {
peer.protocols.forEach((c) => {
if (enabledCodes.has(c)) {
enabledCodes.set(c, true);
}
});
}
}
} catch (e) {
// eslint-disable-next-line @typescript-eslint/no-explicit-any
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") {
log.error("Connection closed. Some peers can be on different shard.");
}

log.error(`Error while iterating through peers: ${e}`);
continue;
}
}

return false;
Expand Down Expand Up @@ -195,7 +218,7 @@ async function rejectOnTimeout<T>(
await Promise.race([promise, awaitTimeout(timeoutMs, rejectReason)]);
}

function getEnabledProtocols(waku: Waku): Protocols[] {
function getEnabledProtocols(waku: IWaku): Protocols[] {
const protocols = [];

if (waku.relay) {
Expand All @@ -216,3 +239,21 @@ function getEnabledProtocols(waku: Waku): Protocols[] {

return protocols;
}

function mapProtocolsToCodecs(protocols: Protocols[]): Map<string, boolean> {
const codecs: Map<string, boolean> = new Map();

const protocolToCodec: Record<string, string> = {
[Protocols.Filter]: FilterCodecs.SUBSCRIBE,
[Protocols.LightPush]: LightPushCodec,
[Protocols.Store]: StoreCodec
};

for (const protocol of protocols) {
if (protocolToCodec[protocol]) {
codecs.set(protocolToCodec[protocol], false);
}
}

return codecs;
}
20 changes: 11 additions & 9 deletions packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ import type {
ILightPush,
IRelay,
IStore,
IWaku,
Libp2p,
ProtocolCreateOptions,
PubsubTopic,
Waku
PubsubTopic
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";
Expand All @@ -20,6 +20,7 @@ import { wakuFilter } from "./protocols/filter/index.js";
import { wakuLightPush } from "./protocols/light_push/index.js";
import { wakuStore } from "./protocols/store/index.js";
import { ReliabilityMonitorManager } from "./reliability_monitor/index.js";
import { waitForRemotePeer } from "./wait_for_remote_peer.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -59,7 +60,7 @@ type ProtocolsEnabled = {
store?: boolean;
};

export class WakuNode implements Waku {
export class WakuNode implements IWaku {
public libp2p: Libp2p;
public relay?: IRelay;
public store?: IStore;
Expand Down Expand Up @@ -126,12 +127,6 @@ export class WakuNode implements Waku {
);
}

/**
* Dials to the provided peer.
*
* @param peer The peer to dial
* @param protocols Waku protocols we expect from the peer; Defaults to mounted protocols
*/
public async dial(
peer: PeerId | MultiaddrInput,
protocols?: Protocols[]
Expand Down Expand Up @@ -201,6 +196,13 @@ export class WakuNode implements Waku {
await this.libp2p.stop();
}

public async connect(
protocols?: Protocols[],
timeoutMs?: number
): Promise<void> {
return waitForRemotePeer(this, protocols, timeoutMs);
}

public isStarted(): boolean {
return this.libp2p.status == "started";
}
Expand Down
6 changes: 3 additions & 3 deletions packages/tests/src/utils/nodes.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import {
DefaultNetworkConfig,
IWaku,
LightNode,
NetworkConfig,
ProtocolCreateOptions,
Protocols,
Waku
Protocols
} from "@waku/interfaces";
import { createLightNode, waitForRemotePeer } from "@waku/sdk";
import { derivePubsubTopicsFromNetworkConfig, isDefined } from "@waku/utils";
Expand Down Expand Up @@ -79,7 +79,7 @@ export async function runMultipleNodes(

export async function teardownNodesWithRedundancy(
serviceNodes: ServiceNodesFleet,
wakuNodes: Waku | Waku[]
wakuNodes: IWaku | IWaku[]
): Promise<void> {
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];

Expand Down
4 changes: 2 additions & 2 deletions packages/tests/src/utils/teardown.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { Waku } from "@waku/interfaces";
import { IWaku } from "@waku/interfaces";
import { Logger } from "@waku/utils";
import pRetry from "p-retry";

Expand All @@ -8,7 +8,7 @@ const log = new Logger("test:teardown");

export async function tearDownNodes(
nwakuNodes: ServiceNode | ServiceNode[],
wakuNodes: Waku | Waku[]
wakuNodes: IWaku | IWaku[]
): Promise<void> {
const nNodes = Array.isArray(nwakuNodes) ? nwakuNodes : [nwakuNodes];
const wNodes = Array.isArray(wakuNodes) ? wakuNodes : [wakuNodes];
Expand Down
Loading