From 1d68526e724155f76bb786239f475a774115ee97 Mon Sep 17 00:00:00 2001 From: Sasha <118575614+weboko@users.noreply.github.com> Date: Fri, 4 Oct 2024 10:50:58 +0200 Subject: [PATCH] feat(lightPush): improve peer usage and improve readability (#2155) * fix comment of default number of peers * export default number of peers from base protocol sdk * rename to light_push, move class to separate file * move waitForRemotePeer to sdk package * add todo to move waitForGossipSubPeerInMesh into @waku/relay * clean up waitForRemotePeer, split metadata await from event and optimise, decouple from protocol implementations * simplify and rename ILightPush interface * use only connected peers in light push based on connections instead of peer renewal mechanism * improve readability of result processing in light push * fix check & update tests * address tests, add new test cases, fix racing condition in StreamManager * use libp2p.getPeers --- package-lock.json | 4 +- packages/core/package.json | 1 - packages/core/src/index.ts | 4 +- .../lib/stream_manager/stream_manager.spec.ts | 29 ++- .../src/lib/stream_manager/stream_manager.ts | 27 ++- packages/interfaces/src/light_push.ts | 2 +- packages/interfaces/src/protocols.ts | 2 +- packages/interfaces/src/waku.ts | 6 +- packages/sdk/package.json | 3 +- packages/sdk/src/index.ts | 6 +- packages/sdk/src/protocols/base_protocol.ts | 4 +- .../sdk/src/protocols/light_push/index.ts | 1 + .../index.ts => light_push/light_push.ts} | 107 ++++++----- .../lib => sdk/src}/wait_for_remote_peer.ts | 170 ++++++++++-------- packages/sdk/src/waku.ts | 6 +- packages/tests/src/lib/runNodes.ts | 3 +- packages/tests/src/utils/nodes.ts | 3 +- packages/tests/tests/enr.node.spec.ts | 2 +- packages/tests/tests/ephemeral.node.spec.ts | 9 +- .../tests/filter/peer_management.spec.ts | 3 - packages/tests/tests/filter/push.node.spec.ts | 3 +- .../single_node/multiple_pubsub.node.spec.ts | 3 +- .../filter/single_node/push.node.spec.ts | 3 +- .../filter/single_node/subscribe.node.spec.ts | 4 +- packages/tests/tests/filter/utils.ts | 4 +- packages/tests/tests/getPeers.spec.ts | 5 +- .../tests/tests/health-manager/node.spec.ts | 28 ++- .../tests/light-push/peer_management.spec.ts | 69 +++---- .../single_node/multiple_pubsub.node.spec.ts | 3 +- .../tests/tests/relay/interop.node.spec.ts | 3 +- .../tests/relay/multiple_pubsub.node.spec.ts | 8 +- packages/tests/tests/relay/utils.ts | 3 +- packages/tests/tests/store/index.node.spec.ts | 3 +- .../tests/tests/store/multiple_pubsub.spec.ts | 4 +- .../tests/wait_for_remote_peer.node.spec.ts | 3 +- packages/tests/tests/waku.node.spec.ts | 5 +- 36 files changed, 308 insertions(+), 235 deletions(-) create mode 100644 packages/sdk/src/protocols/light_push/index.ts rename packages/sdk/src/protocols/{lightpush/index.ts => light_push/light_push.ts} (55%) rename packages/{core/src/lib => sdk/src}/wait_for_remote_peer.ts (53%) diff --git a/package-lock.json b/package-lock.json index 6ac618e40c..66344a70e7 100644 --- a/package-lock.json +++ b/package-lock.json @@ -39089,7 +39089,6 @@ "it-all": "^3.0.4", "it-length-prefixed": "^9.0.4", "it-pipe": "^3.0.1", - "p-event": "^6.0.1", "uint8arraylist": "^2.4.3", "uuid": "^9.0.0" }, @@ -39410,7 +39409,8 @@ "@waku/message-hash": "0.1.16", "@waku/proto": "^0.0.8", "@waku/utils": "0.0.20", - "libp2p": "^1.8.1" + "libp2p": "^1.8.1", + "p-event": "^6.0.1" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", diff --git a/packages/core/package.json b/packages/core/package.json index 5039fb9e5c..48e2e8eb67 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -77,7 +77,6 @@ "it-all": "^3.0.4", "it-length-prefixed": "^9.0.4", "it-pipe": "^3.0.1", - "p-event": "^6.0.1", "uint8arraylist": "^2.4.3", "uuid": "^9.0.0" }, diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index 3368021ead..9b4acf2eae 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -13,9 +13,7 @@ export * as waku_light_push from "./lib/light_push/index.js"; export { LightPushCodec, LightPushCore } from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; -export { StoreCore } from "./lib/store/index.js"; - -export { waitForRemotePeer } from "./lib/wait_for_remote_peer.js"; +export { StoreCore, StoreCodec } from "./lib/store/index.js"; export { ConnectionManager } from "./lib/connection_manager.js"; diff --git a/packages/core/src/lib/stream_manager/stream_manager.spec.ts b/packages/core/src/lib/stream_manager/stream_manager.spec.ts index 6706a5ff9f..e66b199d2e 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.spec.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.spec.ts @@ -86,6 +86,32 @@ describe("StreamManager", () => { } }); + it("should return different streams if requested simultaniously", async () => { + const con1 = createMockConnection(); + con1.streams = [createMockStream({ id: "1", protocol: MULTICODEC })]; + + const newStreamSpy = sinon.spy(async (_protocol, _options) => + createMockStream({ + id: "2", + protocol: MULTICODEC, + writeStatus: "writable" + }) + ); + + con1.newStream = newStreamSpy; + streamManager["getConnections"] = (_peerId: PeerId | undefined) => [con1]; + + const [stream1, stream2] = await Promise.all([ + streamManager.getStream(mockPeer), + streamManager.getStream(mockPeer) + ]); + + const expected = ["1", "2"].toString(); + const actual = [stream1.id, stream2.id].sort().toString(); + + expect(actual).to.be.eq(expected); + }); + it("peer:update - should do nothing if another protocol hit", async () => { const scheduleNewStreamSpy = sinon.spy(); streamManager["scheduleNewStream"] = scheduleNewStreamSpy; @@ -156,6 +182,7 @@ function createMockStream(options: MockStreamOptions): Stream { return { id: options.id, protocol: options.protocol, - writeStatus: options.writeStatus || "ready" + writeStatus: options.writeStatus || "ready", + metadata: {} } as Stream; } diff --git a/packages/core/src/lib/stream_manager/stream_manager.ts b/packages/core/src/lib/stream_manager/stream_manager.ts index 6b2799a706..e1da56453d 100644 --- a/packages/core/src/lib/stream_manager/stream_manager.ts +++ b/packages/core/src/lib/stream_manager/stream_manager.ts @@ -4,6 +4,8 @@ import { Logger } from "@waku/utils"; import { selectOpenConnection } from "./utils.js"; +const STREAM_LOCK_KEY = "consumed"; + export class StreamManager { private readonly log: Logger; @@ -29,16 +31,20 @@ export class StreamManager { await scheduledStream; } - const stream = this.getOpenStreamForCodec(peer.id); + let stream = this.getOpenStreamForCodec(peer.id); if (stream) { this.log.info( `Found existing stream peerId=${peer.id.toString()} multicodec=${this.multicodec}` ); + this.lockStream(peer.id.toString(), stream); return stream; } - return this.createStream(peer); + stream = await this.createStream(peer); + this.lockStream(peer.id.toString(), stream); + + return stream; } private async createStream(peer: Peer, retries = 0): Promise { @@ -142,13 +148,26 @@ export class StreamManager { (s) => s.protocol === this.multicodec ); + if (!stream) { + return; + } + const isStreamUnusable = ["done", "closed", "closing"].includes( - stream?.writeStatus || "" + stream.writeStatus || "" ); - if (isStreamUnusable) { + if (isStreamUnusable || this.isStreamLocked(stream)) { return; } return stream; } + + private lockStream(peerId: string, stream: Stream): void { + this.log.info(`Locking stream for peerId:${peerId}\tstreamId:${stream.id}`); + stream.metadata[STREAM_LOCK_KEY] = true; + } + + private isStreamLocked(stream: Stream): boolean { + return !!stream.metadata[STREAM_LOCK_KEY]; + } } diff --git a/packages/interfaces/src/light_push.ts b/packages/interfaces/src/light_push.ts index 26bc8bacdc..32b29048e6 100644 --- a/packages/interfaces/src/light_push.ts +++ b/packages/interfaces/src/light_push.ts @@ -1,5 +1,5 @@ import { IBaseProtocolCore, IBaseProtocolSDK } from "./protocols.js"; import type { ISender } from "./sender.js"; -export type ILightPushSDK = ISender & +export type ILightPush = ISender & IBaseProtocolSDK & { protocol: IBaseProtocolCore }; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 5b5e9ea919..ddd864659e 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -102,7 +102,7 @@ export type ProtocolCreateOptions = { * This is used by: * - Light Push to send messages, * - Filter to retrieve messages. - * Defaults to 3. + * Defaults to 2. */ numPeersToUse?: number; /** diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index c5659c4a92..482c2cb19f 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -5,7 +5,7 @@ import { IConnectionManager } from "./connection_manager.js"; import type { IFilterSDK } from "./filter.js"; import { IHealthManager } from "./health_manager.js"; import type { Libp2p } from "./libp2p.js"; -import type { ILightPushSDK } from "./light_push.js"; +import type { ILightPush } from "./light_push.js"; import { Protocols } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { IStoreSDK } from "./store.js"; @@ -15,7 +15,7 @@ export interface Waku { relay?: IRelay; store?: IStoreSDK; filter?: IFilterSDK; - lightPush?: ILightPushSDK; + lightPush?: ILightPush; connectionManager: IConnectionManager; @@ -36,7 +36,7 @@ export interface LightNode extends Waku { relay: undefined; store: IStoreSDK; filter: IFilterSDK; - lightPush: ILightPushSDK; + lightPush: ILightPush; } export interface RelayNode extends Waku { diff --git a/packages/sdk/package.json b/packages/sdk/package.json index e2a6c6dc0b..c828b6afa4 100644 --- a/packages/sdk/package.json +++ b/packages/sdk/package.json @@ -70,7 +70,8 @@ "@waku/proto": "^0.0.8", "@waku/utils": "0.0.20", "@waku/message-hash": "0.1.16", - "libp2p": "^1.8.1" + "libp2p": "^1.8.1", + "p-event": "^6.0.1" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index 2cbcfce937..79e2140449 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -1,4 +1,4 @@ -export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core"; +export { createEncoder, createDecoder } from "@waku/core"; export { DecodedMessage, Decoder, @@ -14,10 +14,12 @@ export { defaultLibp2p, createLibp2pAndUpdateOptions } from "./create/index.js"; -export { wakuLightPush } from "./protocols/lightpush/index.js"; +export { wakuLightPush } from "./protocols/light_push/index.js"; export { wakuFilter } from "./protocols/filter/index.js"; export { wakuStore } from "./protocols/store/index.js"; +export { waitForRemotePeer } from "./wait_for_remote_peer.js"; + export * as waku from "@waku/core"; export * as utils from "@waku/utils"; export * from "@waku/interfaces"; diff --git a/packages/sdk/src/protocols/base_protocol.ts b/packages/sdk/src/protocols/base_protocol.ts index 84a883eb0a..944ad497c2 100644 --- a/packages/sdk/src/protocols/base_protocol.ts +++ b/packages/sdk/src/protocols/base_protocol.ts @@ -14,11 +14,11 @@ interface Options { } const RENEW_TIME_LOCK_DURATION = 30 * 1000; -const DEFAULT_NUM_PEERS_TO_USE = 2; +export const DEFAULT_NUM_PEERS_TO_USE = 2; const DEFAULT_MAINTAIN_PEERS_INTERVAL = 30_000; export class BaseProtocolSDK implements IBaseProtocolSDK { - private healthManager: IHealthManager; + protected healthManager: IHealthManager; public readonly numPeersToUse: number; private peers: Peer[] = []; private maintainPeersIntervalId: ReturnType< diff --git a/packages/sdk/src/protocols/light_push/index.ts b/packages/sdk/src/protocols/light_push/index.ts new file mode 100644 index 0000000000..57c03d3296 --- /dev/null +++ b/packages/sdk/src/protocols/light_push/index.ts @@ -0,0 +1 @@ +export { wakuLightPush } from "./light_push.js"; diff --git a/packages/sdk/src/protocols/lightpush/index.ts b/packages/sdk/src/protocols/light_push/light_push.ts similarity index 55% rename from packages/sdk/src/protocols/lightpush/index.ts rename to packages/sdk/src/protocols/light_push/light_push.ts index f3be55d54a..7007ff28d5 100644 --- a/packages/sdk/src/protocols/lightpush/index.ts +++ b/packages/sdk/src/protocols/light_push/light_push.ts @@ -1,9 +1,9 @@ -import type { PeerId } from "@libp2p/interface"; -import { ConnectionManager, LightPushCore } from "@waku/core"; +import type { Peer, PeerId } from "@libp2p/interface"; +import { ConnectionManager, LightPushCodec, LightPushCore } from "@waku/core"; import { Failure, type IEncoder, - ILightPushSDK, + ILightPush, type IMessage, type Libp2p, type ProtocolCreateOptions, @@ -19,14 +19,14 @@ import { BaseProtocolSDK } from "../base_protocol.js"; const log = new Logger("sdk:light-push"); -class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { +class LightPush extends BaseProtocolSDK implements ILightPush { public readonly protocol: LightPushCore; private readonly reliabilityMonitor: SenderReliabilityMonitor; public constructor( connectionManager: ConnectionManager, - libp2p: Libp2p, + private libp2p: Libp2p, options?: ProtocolCreateOptions ) { super( @@ -49,11 +49,6 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { message: IMessage, _options?: ProtocolUseOptions ): Promise { - const options = { - autoRetry: true, - ..._options - } as ProtocolUseOptions; - const successes: PeerId[] = []; const failures: Failure[] = []; @@ -63,17 +58,17 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { } catch (error) { log.error("Failed to send waku light push: pubsub topic not configured"); return { + successes, failures: [ { error: ProtocolError.TOPIC_NOT_CONFIGURED } - ], - successes: [] + ] }; } - const hasPeers = await this.hasPeers(options); - if (!hasPeers) { + const peers = await this.getConnectedPeers(); + if (peers.length === 0) { return { successes, failures: [ @@ -84,53 +79,75 @@ class LightPushSDK extends BaseProtocolSDK implements ILightPushSDK { }; } - const sendPromises = this.connectedPeers.map((peer) => - this.protocol.send(encoder, message, peer) + const results = await Promise.allSettled( + peers.map((peer) => this.protocol.send(encoder, message, peer)) ); - const results = await Promise.allSettled(sendPromises); - for (const result of results) { - if (result.status === "fulfilled") { - const { failure, success } = result.value; - if (success) { - successes.push(success); - } - if (failure) { - failures.push(failure); - if (failure.peerId) { - const peer = this.connectedPeers.find((connectedPeer) => - connectedPeer.id.equals(failure.peerId) - ); - if (peer) { - log.info(` - Failed to send message to peer ${failure.peerId}. - Retrying the message with the same peer in the background. - If this fails, the peer will be renewed. - `); - void this.reliabilityMonitor.attemptRetriesOrRenew( - failure.peerId, - () => this.protocol.send(encoder, message, peer) - ); - } - } - } - } else { + if (result.status !== "fulfilled") { log.error("Failed unexpectedly while sending:", result.reason); failures.push({ error: ProtocolError.GENERIC_FAIL }); + continue; + } + + const { failure, success } = result.value; + + if (success) { + successes.push(success); + continue; + } + + if (failure) { + failures.push(failure); + + const connectedPeer = this.connectedPeers.find((connectedPeer) => + connectedPeer.id.equals(failure.peerId) + ); + + if (connectedPeer) { + void this.reliabilityMonitor.attemptRetriesOrRenew( + connectedPeer.id, + () => this.protocol.send(encoder, message, connectedPeer) + ); + } } } + this.healthManager.updateProtocolHealth(LightPushCodec, successes.length); + return { successes, failures }; } + + private async getConnectedPeers(): Promise { + const peerIDs = this.libp2p.getPeers(); + + if (peerIDs.length === 0) { + return []; + } + + const peers = await Promise.all( + peerIDs.map(async (id) => { + try { + return await this.libp2p.peerStore.get(id); + } catch (e) { + return null; + } + }) + ); + + return peers + .filter((p) => !!p) + .filter((p) => (p as Peer).protocols.includes(LightPushCodec)) + .slice(0, this.numPeersToUse) as Peer[]; + } } export function wakuLightPush( connectionManager: ConnectionManager, init: Partial = {} -): (libp2p: Libp2p) => ILightPushSDK { - return (libp2p: Libp2p) => new LightPushSDK(connectionManager, libp2p, init); +): (libp2p: Libp2p) => ILightPush { + return (libp2p: Libp2p) => new LightPush(connectionManager, libp2p, init); } diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/sdk/src/wait_for_remote_peer.ts similarity index 53% rename from packages/core/src/lib/wait_for_remote_peer.ts rename to packages/sdk/src/wait_for_remote_peer.ts index 809264a648..44bb1d63fb 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/sdk/src/wait_for_remote_peer.ts @@ -1,16 +1,12 @@ import type { IdentifyResult } from "@libp2p/interface"; -import type { - IBaseProtocolCore, - IMetadata, - IRelay, - Waku -} from "@waku/interfaces"; +import { FilterCodecs, LightPushCodec, StoreCodec } from "@waku/core"; +import type { IRelay, Libp2p, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { pEvent } from "p-event"; + const log = new Logger("wait-for-remote-peer"); -//TODO: move this function within the Waku class: https://github.com/waku-org/js-waku/issues/1761 /** * Wait for a remote peer to be ready given the passed protocols. * Must be used after attempting to connect to nodes, using @@ -36,42 +32,48 @@ export async function waitForRemotePeer( timeoutMs?: number ): Promise { protocols = protocols ?? getEnabledProtocols(waku); + const connections = waku.libp2p.getConnections(); + + if (!waku.isStarted()) { + throw Error("Waku node is not started"); + } - if (!waku.isStarted()) return Promise.reject("Waku node is not started"); + if (connections.length > 0 && !protocols.includes(Protocols.Relay)) { + const success = await waitForMetadata(waku.libp2p); + + if (success) { + return; + } + } const promises = []; if (protocols.includes(Protocols.Relay)) { - if (!waku.relay) - throw new Error("Cannot wait for Relay peer: protocol not mounted"); + if (!waku.relay) { + throw Error("Cannot wait for Relay peer: protocol not mounted"); + } promises.push(waitForGossipSubPeerInMesh(waku.relay)); } if (protocols.includes(Protocols.Store)) { - if (!waku.store) - throw new Error("Cannot wait for Store peer: protocol not mounted"); - promises.push( - waitForConnectedPeer(waku.store.protocol, waku.libp2p.services.metadata) - ); + if (!waku.store) { + throw Error("Cannot wait for Store peer: protocol not mounted"); + } + promises.push(waitForConnectedPeer(StoreCodec, waku.libp2p)); } if (protocols.includes(Protocols.LightPush)) { - if (!waku.lightPush) - throw new Error("Cannot wait for LightPush peer: protocol not mounted"); - promises.push( - waitForConnectedPeer( - waku.lightPush.protocol, - waku.libp2p.services.metadata - ) - ); + if (!waku.lightPush) { + throw Error("Cannot wait for LightPush peer: protocol not mounted"); + } + promises.push(waitForConnectedPeer(LightPushCodec, waku.libp2p)); } if (protocols.includes(Protocols.Filter)) { - if (!waku.filter) + if (!waku.filter) { throw new Error("Cannot wait for Filter peer: protocol not mounted"); - promises.push( - waitForConnectedPeer(waku.filter.protocol, waku.libp2p.services.metadata) - ); + } + promises.push(waitForConnectedPeer(FilterCodecs.SUBSCRIBE, waku.libp2p)); } if (timeoutMs) { @@ -85,71 +87,87 @@ export async function waitForRemotePeer( } } -//TODO: move this function within protocol SDK class: https://github.com/waku-org/js-waku/issues/1761 +type EventListener = (_: CustomEvent) => void; + /** * Wait for a peer with the given protocol to be connected. * If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service. */ async function waitForConnectedPeer( - protocol: IBaseProtocolCore, - metadataService?: IMetadata + codec: string, + libp2p: Libp2p ): Promise { - const codec = protocol.multicodec; - const peers = await protocol.connectedPeers(); - - if (peers.length) { - if (!metadataService) { - log.info(`${codec} peer found: `, peers[0].id.toString()); - return; - } + log.info(`Waiting for ${codec} peer.`); - // once a peer is connected, we need to confirm the metadata handshake with at least one of those peers if sharding is enabled - try { - await Promise.any( - peers.map((peer) => metadataService.confirmOrAttemptHandshake(peer.id)) - ); - return; - } catch (e) { - if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") - log.error( - `Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}` - ); + await new Promise((resolve) => { + const cb = (async (evt: CustomEvent): Promise => { + if (evt.detail?.protocols?.includes(codec)) { + const metadataService = libp2p.services.metadata; - log.error(`Error waiting for handshake confirmation: ${e}`); - } - } + if (!metadataService) { + libp2p.removeEventListener("peer:identify", cb); + resolve(); + return; + } - log.info(`Waiting for ${codec} peer`); + try { + await metadataService.confirmOrAttemptHandshake(evt.detail.peerId); - // else we'll just wait for the next peer to connect - await new Promise((resolve) => { - const cb = (evt: CustomEvent): void => { - if (evt.detail?.protocols?.includes(codec)) { - if (metadataService) { - metadataService - .confirmOrAttemptHandshake(evt.detail.peerId) - .then(() => { - protocol.removeLibp2pEventListener("peer:identify", cb); - resolve(); - }) - .catch((e) => { - if (e.code === "ERR_CONNECTION_BEING_CLOSED") - log.error( - `Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}` - ); - - log.error(`Error waiting for handshake confirmation: ${e}`); - }); - } else { - protocol.removeLibp2pEventListener("peer:identify", cb); + libp2p.removeEventListener("peer:identify", cb); resolve(); + } catch (e) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") { + log.error( + "Connection closed. Some peers can be on different shard." + ); + } + + log.error(`Error waiting for metadata: ${e}`); } } - }; - protocol.addLibp2pEventListener("peer:identify", cb); + }) as EventListener; + + libp2p.addEventListener("peer:identify", cb); }); } +/** + * Waits for the metadata from the remote peer. + */ +async function waitForMetadata(libp2p: Libp2p): Promise { + const connections = libp2p.getConnections(); + const metadataService = libp2p.services.metadata; + + if (!connections.length || !metadataService) { + log.info( + `Skipping waitForMetadata due to missing connections:${connections.length} or metadataService:${!!metadataService}` + ); + return false; + } + + try { + // confirm at least with one connected peer + await Promise.any( + connections + .map((c) => c.remotePeer) + .map((peer) => metadataService.confirmOrAttemptHandshake(peer)) + ); + + return true; + } catch (e) { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") { + log.error("Connection closed. Some peers can be on different shard."); + } + + log.error(`Error waiting for metadata: ${e}`); + } + + return false; +} + +// TODO: move to @waku/relay and use in `node.connect()` API https://github.com/waku-org/js-waku/issues/1761 /** * Wait for at least one peer with the given protocol to be connected and in the gossipsub * mesh for all pubsubTopics. diff --git a/packages/sdk/src/waku.ts b/packages/sdk/src/waku.ts index a5314a99f8..be656ad796 100644 --- a/packages/sdk/src/waku.ts +++ b/packages/sdk/src/waku.ts @@ -5,7 +5,7 @@ import { ConnectionManager, getHealthManager } from "@waku/core"; import type { IFilterSDK, IHealthManager, - ILightPushSDK, + ILightPush, IRelay, IStoreSDK, Libp2p, @@ -17,7 +17,7 @@ import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { wakuFilter } from "./protocols/filter/index.js"; -import { wakuLightPush } from "./protocols/lightpush/index.js"; +import { wakuLightPush } from "./protocols/light_push/index.js"; import { wakuStore } from "./protocols/store/index.js"; import { ReliabilityMonitorManager } from "./reliability_monitor/index.js"; @@ -64,7 +64,7 @@ export class WakuNode implements Waku { public relay?: IRelay; public store?: IStoreSDK; public filter?: IFilterSDK; - public lightPush?: ILightPushSDK; + public lightPush?: ILightPush; public connectionManager: ConnectionManager; public readonly health: IHealthManager; diff --git a/packages/tests/src/lib/runNodes.ts b/packages/tests/src/lib/runNodes.ts index 56af7bd7a0..82893f4f97 100644 --- a/packages/tests/src/lib/runNodes.ts +++ b/packages/tests/src/lib/runNodes.ts @@ -1,11 +1,10 @@ -import { waitForRemotePeer } from "@waku/core"; import { NetworkConfig, ProtocolCreateOptions, Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; -import { createLightNode, WakuNode } from "@waku/sdk"; +import { createLightNode, waitForRemotePeer, WakuNode } from "@waku/sdk"; import { derivePubsubTopicsFromNetworkConfig, Logger, diff --git a/packages/tests/src/utils/nodes.ts b/packages/tests/src/utils/nodes.ts index 2cdef056aa..dbf75e456f 100644 --- a/packages/tests/src/utils/nodes.ts +++ b/packages/tests/src/utils/nodes.ts @@ -1,4 +1,3 @@ -import { waitForRemotePeer } from "@waku/core"; import { DefaultNetworkConfig, LightNode, @@ -7,7 +6,7 @@ import { Protocols, Waku } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; +import { createLightNode, waitForRemotePeer } from "@waku/sdk"; import { derivePubsubTopicsFromNetworkConfig, isDefined } from "@waku/utils"; import { Context } from "mocha"; import pRetry from "p-retry"; diff --git a/packages/tests/tests/enr.node.spec.ts b/packages/tests/tests/enr.node.spec.ts index 18ca31a04e..6a02e70fb9 100644 --- a/packages/tests/tests/enr.node.spec.ts +++ b/packages/tests/tests/enr.node.spec.ts @@ -1,8 +1,8 @@ -import { waitForRemotePeer } from "@waku/core"; import { EnrDecoder } from "@waku/enr"; import type { RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; +import { waitForRemotePeer } from "@waku/sdk"; import { expect } from "chai"; import { diff --git a/packages/tests/tests/ephemeral.node.spec.ts b/packages/tests/tests/ephemeral.node.spec.ts index c9a68a110f..25ad308d8e 100644 --- a/packages/tests/tests/ephemeral.node.spec.ts +++ b/packages/tests/tests/ephemeral.node.spec.ts @@ -1,9 +1,4 @@ -import { - createDecoder, - createEncoder, - DecodedMessage, - waitForRemotePeer -} from "@waku/core"; +import { createDecoder, createEncoder, DecodedMessage } from "@waku/core"; import { Protocols } from "@waku/interfaces"; import type { LightNode } from "@waku/interfaces"; import { @@ -19,7 +14,7 @@ import { createDecoder as createSymDecoder, createEncoder as createSymEncoder } from "@waku/message-encryption/symmetric"; -import { createLightNode } from "@waku/sdk"; +import { createLightNode, waitForRemotePeer } from "@waku/sdk"; import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/filter/peer_management.spec.ts b/packages/tests/tests/filter/peer_management.spec.ts index 13e7baba05..31a6c07fad 100644 --- a/packages/tests/tests/filter/peer_management.spec.ts +++ b/packages/tests/tests/filter/peer_management.spec.ts @@ -275,8 +275,5 @@ describe("Waku Filter: Peer Management: E2E", function () { await sendMessage(); expect(waku.filter.connectedPeers.length).to.equal(2); - expect( - waku.filter.connectedPeers.map((p) => p.id.toString()) - ).to.not.include(nodeWithouDiscoveryPeerIdStr); }); }); diff --git a/packages/tests/tests/filter/push.node.spec.ts b/packages/tests/tests/filter/push.node.spec.ts index 5d69c73948..bbd49be6c2 100644 --- a/packages/tests/tests/filter/push.node.spec.ts +++ b/packages/tests/tests/filter/push.node.spec.ts @@ -1,6 +1,5 @@ -import { waitForRemotePeer } from "@waku/core"; import { LightNode, Protocols } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/sdk"; +import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk"; import { expect } from "chai"; import { diff --git a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts index 75e12b1919..60aeeebfe9 100644 --- a/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/single_node/multiple_pubsub.node.spec.ts @@ -1,4 +1,4 @@ -import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { createDecoder, createEncoder } from "@waku/core"; import type { ContentTopicInfo, LightNode, @@ -6,6 +6,7 @@ import type { SingleShardInfo } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; +import { waitForRemotePeer } from "@waku/sdk"; import { contentTopicToPubsubTopic, contentTopicToShardIndex, diff --git a/packages/tests/tests/filter/single_node/push.node.spec.ts b/packages/tests/tests/filter/single_node/push.node.spec.ts index 79fd020ae3..baf074b2c2 100644 --- a/packages/tests/tests/filter/single_node/push.node.spec.ts +++ b/packages/tests/tests/filter/single_node/push.node.spec.ts @@ -1,6 +1,5 @@ -import { waitForRemotePeer } from "@waku/core"; import { LightNode, Protocols } from "@waku/interfaces"; -import { utf8ToBytes } from "@waku/sdk"; +import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk"; import { expect } from "chai"; import { diff --git a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts index e75f81f94d..cf2aef1303 100644 --- a/packages/tests/tests/filter/single_node/subscribe.node.spec.ts +++ b/packages/tests/tests/filter/single_node/subscribe.node.spec.ts @@ -1,4 +1,4 @@ -import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { createDecoder, createEncoder } from "@waku/core"; import { LightNode, Protocols } from "@waku/interfaces"; import { ecies, @@ -7,7 +7,7 @@ import { getPublicKey, symmetric } from "@waku/message-encryption"; -import { utf8ToBytes } from "@waku/sdk"; +import { utf8ToBytes, waitForRemotePeer } from "@waku/sdk"; import { expect } from "chai"; import type { Context } from "mocha"; diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 2bcbd88125..1e3165af24 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -1,4 +1,4 @@ -import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { createDecoder, createEncoder } from "@waku/core"; import { DefaultNetworkConfig, ISubscriptionSDK, @@ -8,7 +8,7 @@ import { Protocols, Waku } from "@waku/interfaces"; -import { createLightNode } from "@waku/sdk"; +import { createLightNode, waitForRemotePeer } from "@waku/sdk"; import { contentTopicToPubsubTopic, derivePubsubTopicsFromNetworkConfig, diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 252340a5bf..52127bb54b 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -1,6 +1,6 @@ import type { Connection, Peer, PeerStore } from "@libp2p/interface"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; -import { LightPushCodec, waitForRemotePeer } from "@waku/core"; +import { LightPushCodec } from "@waku/core"; import { ContentTopicInfo, createLightNode, @@ -9,7 +9,8 @@ import { Protocols, ShardInfo, Tags, - utf8ToBytes + utf8ToBytes, + waitForRemotePeer } from "@waku/sdk"; import { encodeRelayShard, diff --git a/packages/tests/tests/health-manager/node.spec.ts b/packages/tests/tests/health-manager/node.spec.ts index b0edc41cb5..5627adab37 100644 --- a/packages/tests/tests/health-manager/node.spec.ts +++ b/packages/tests/tests/health-manager/node.spec.ts @@ -1,4 +1,4 @@ -import { HealthStatus, LightNode, Protocols } from "@waku/interfaces"; +import { HealthStatus, LightNode, Protocols, Waku } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { shardInfoToPubsubTopics } from "@waku/utils"; import { expect } from "chai"; @@ -34,8 +34,7 @@ describe("Node Health Status Matrix Tests", function () { peerCounts.forEach((lightPushPeers) => { peerCounts.forEach((filterPeers) => { - const expectedHealth = getExpectedNodeHealth(lightPushPeers, filterPeers); - it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers - Expected: ${expectedHealth}`, async function () { + it(`LightPush: ${lightPushPeers} peers, Filter: ${filterPeers} peers`, async function () { this.timeout(10_000); [waku, serviceNodes] = await setupTestEnvironment( @@ -59,6 +58,10 @@ describe("Node Health Status Matrix Tests", function () { ); const filterHealth = waku.health.getProtocolStatus(Protocols.Filter); + lightPushPeers = await getPeerCounBasedOnConnections( + waku, + waku.lightPush.protocol.multicodec + ); expect(lightPushHealth?.status).to.equal( getExpectedProtocolStatus(lightPushPeers) ); @@ -66,6 +69,10 @@ describe("Node Health Status Matrix Tests", function () { getExpectedProtocolStatus(filterPeers) ); + const expectedHealth = getExpectedNodeHealth( + lightPushPeers, + filterPeers + ); const nodeHealth = waku.health.getHealthStatus(); expect(nodeHealth).to.equal(expectedHealth); }); @@ -79,6 +86,21 @@ function getExpectedProtocolStatus(peerCount: number): HealthStatus { return HealthStatus.SufficientlyHealthy; } +async function getPeerCounBasedOnConnections( + waku: Waku, + codec: string +): Promise { + const peerIDs = waku.libp2p + .getConnections() + .map((c) => c.remotePeer.toString()); + + const peers = await waku.libp2p.peerStore.all(); + + return peers + .filter((peer) => peerIDs.includes(peer.id.toString())) + .filter((peer) => peer.protocols.includes(codec)).length; +} + function getExpectedNodeHealth( lightPushPeers: number, filterPeers: number diff --git a/packages/tests/tests/light-push/peer_management.spec.ts b/packages/tests/tests/light-push/peer_management.spec.ts index a275f4e970..776d503cb3 100644 --- a/packages/tests/tests/light-push/peer_management.spec.ts +++ b/packages/tests/tests/light-push/peer_management.spec.ts @@ -1,6 +1,5 @@ import { LightNode } from "@waku/interfaces"; import { createEncoder, utf8ToBytes } from "@waku/sdk"; -import { delay } from "@waku/utils"; import { expect } from "chai"; import { describe } from "mocha"; @@ -15,7 +14,7 @@ import { } from "../../src/index.js"; import { TestContentTopic } from "../filter/utils.js"; -describe("Waku Light Push: Peer Management: E2E", function () { +describe("Waku Light Push: Connection Management: E2E", function () { this.timeout(15000); let waku: LightNode; let serviceNodes: ServiceNodesFleet; @@ -39,64 +38,44 @@ describe("Waku Light Push: Peer Management: E2E", function () { contentTopic: TestContentTopic }); - it("Number of peers are maintained correctly", async function () { + it("should push to needed amount of connections", async function () { const { successes, failures } = await waku.lightPush.send(encoder, { payload: utf8ToBytes("Hello_World") }); - expect(successes.length).to.be.greaterThan(0); expect(successes.length).to.be.equal(waku.lightPush.numPeersToUse); - - if (failures) { - expect(failures.length).to.equal(0); - } + expect(failures?.length || 0).to.equal(0); }); - it("Failed peers are renewed", async function () { - // send a lightpush request -- should have all successes - const response1 = await waku.lightPush.send(encoder, { - payload: utf8ToBytes("Hello_World") - }); - - expect(response1.successes.length).to.be.equal( - waku.lightPush.numPeersToUse + it("should push to available amount of connection if less than required", async function () { + const connections = waku.libp2p.getConnections(); + await Promise.all( + connections + .slice(0, connections.length - 1) + .map((c) => waku.connectionManager.dropConnection(c.remotePeer)) ); - if (response1.failures) { - expect(response1.failures.length).to.equal(0); - } - // disconnect from one peer to force a failure - const peerToDisconnect = response1.successes[0]; - await waku.connectionManager.dropConnection(peerToDisconnect); - - // send another lightpush request -- should have all successes except the one that was disconnected - const response2 = await waku.lightPush.send(encoder, { + const { successes, failures } = await waku.lightPush.send(encoder, { payload: utf8ToBytes("Hello_World") }); - // check that the peer that was disconnected is not in the new successes - expect(response2.successes).to.not.include(peerToDisconnect); - expect(response2.failures).to.have.length(1); - expect(response2.failures?.[0].peerId).to.equal(peerToDisconnect); + expect(successes.length).to.be.equal(1); + expect(failures?.length || 0).to.equal(0); + }); - // send another lightpush request - // reattempts to send should be triggerred - // then renewal should happen - // so one failure should exist - const response3 = await waku.lightPush.send(encoder, { + it("should fail to send if no connections available", async function () { + const connections = waku.libp2p.getConnections(); + await Promise.all( + connections.map((c) => + waku.connectionManager.dropConnection(c.remotePeer) + ) + ); + + const { successes, failures } = await waku.lightPush.send(encoder, { payload: utf8ToBytes("Hello_World") }); - // wait for reattempts to finish as they are async and not awaited - await delay(500); - - // doing -1 because the peer that was disconnected is not in the successes - expect(response3.successes.length).to.be.equal( - waku.lightPush.numPeersToUse - 1 - ); - // and exists in failure instead - expect(response3.failures).to.have.length(1); - - expect(response3.successes).to.not.include(peerToDisconnect); + expect(successes.length).to.be.equal(0); + expect(failures?.length).to.equal(1); }); }); diff --git a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts index 4fabf1919f..f36a614aaf 100644 --- a/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/single_node/multiple_pubsub.node.spec.ts @@ -1,5 +1,5 @@ import type { PeerId } from "@libp2p/interface"; -import { createEncoder, waitForRemotePeer } from "@waku/core"; +import { createEncoder } from "@waku/core"; import { ContentTopicInfo, LightNode, @@ -8,6 +8,7 @@ import { ShardInfo, SingleShardInfo } from "@waku/interfaces"; +import { waitForRemotePeer } from "@waku/sdk"; import { contentTopicToPubsubTopic, contentTopicToShardIndex, diff --git a/packages/tests/tests/relay/interop.node.spec.ts b/packages/tests/tests/relay/interop.node.spec.ts index b9cc3eeb70..3c05bebfa0 100644 --- a/packages/tests/tests/relay/interop.node.spec.ts +++ b/packages/tests/tests/relay/interop.node.spec.ts @@ -1,7 +1,8 @@ import type { PeerId } from "@libp2p/interface"; -import { DecodedMessage, waitForRemotePeer } from "@waku/core"; +import { DecodedMessage } from "@waku/core"; import { Protocols, RelayNode } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; +import { waitForRemotePeer } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index 91915f76a0..6434153f17 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -1,9 +1,4 @@ -import { - createDecoder, - createEncoder, - DecodedMessage, - waitForRemotePeer -} from "@waku/core"; +import { createDecoder, createEncoder, DecodedMessage } from "@waku/core"; import { ContentTopicInfo, RelayNode, @@ -12,6 +7,7 @@ import { } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; +import { waitForRemotePeer } from "@waku/sdk"; import { contentTopicToPubsubTopic, pubsubTopicToSingleShardInfo, diff --git a/packages/tests/tests/relay/utils.ts b/packages/tests/tests/relay/utils.ts index 090d6221e6..f0141e3481 100644 --- a/packages/tests/tests/relay/utils.ts +++ b/packages/tests/tests/relay/utils.ts @@ -1,4 +1,4 @@ -import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; +import { createDecoder, createEncoder } from "@waku/core"; import { NetworkConfig, Protocols, @@ -6,6 +6,7 @@ import { ShardInfo } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; +import { waitForRemotePeer } from "@waku/sdk"; import { contentTopicToPubsubTopic, Logger } from "@waku/utils"; import { Context } from "mocha"; diff --git a/packages/tests/tests/store/index.node.spec.ts b/packages/tests/tests/store/index.node.spec.ts index e34864d604..e5ce92d7a0 100644 --- a/packages/tests/tests/store/index.node.spec.ts +++ b/packages/tests/tests/store/index.node.spec.ts @@ -1,4 +1,4 @@ -import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; +import { createDecoder, DecodedMessage } from "@waku/core"; import type { IMessage, LightNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { @@ -14,6 +14,7 @@ import { createDecoder as createSymDecoder, createEncoder as createSymEncoder } from "@waku/message-encryption/symmetric"; +import { waitForRemotePeer } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai"; import { equals } from "uint8arrays/equals"; diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 2a98793cb2..de2b346b7b 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -1,6 +1,6 @@ -import { createDecoder, waitForRemotePeer } from "@waku/core"; +import { createDecoder } from "@waku/core"; import type { ContentTopicInfo, IMessage, LightNode } from "@waku/interfaces"; -import { createLightNode, Protocols } from "@waku/sdk"; +import { createLightNode, Protocols, waitForRemotePeer } from "@waku/sdk"; import { contentTopicToPubsubTopic, pubsubTopicToSingleShardInfo diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 90393bf2f3..71c89eda0f 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -1,8 +1,7 @@ -import { waitForRemotePeer } from "@waku/core"; import type { LightNode, RelayNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createRelayNode } from "@waku/relay"; -import { createLightNode } from "@waku/sdk"; +import { createLightNode, waitForRemotePeer } from "@waku/sdk"; import { expect } from "chai"; import { diff --git a/packages/tests/tests/waku.node.spec.ts b/packages/tests/tests/waku.node.spec.ts index 3881971e1e..250835f16b 100644 --- a/packages/tests/tests/waku.node.spec.ts +++ b/packages/tests/tests/waku.node.spec.ts @@ -1,6 +1,6 @@ import { bootstrap } from "@libp2p/bootstrap"; import type { PeerId } from "@libp2p/interface"; -import { DecodedMessage, waitForRemotePeer } from "@waku/core"; +import { DecodedMessage } from "@waku/core"; import type { LightNode, RelayNode, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { generateSymmetricKey } from "@waku/message-encryption"; @@ -12,7 +12,8 @@ import { createRelayNode } from "@waku/relay"; import { createLightNode, createEncoder as createPlainEncoder, - DefaultUserAgent + DefaultUserAgent, + waitForRemotePeer } from "@waku/sdk"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; import { expect } from "chai";