From fb1ce980392b4cd2774444f9ac5c279f8776b17a Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 10 Jan 2024 13:29:15 +0530 Subject: [PATCH 01/10] merge: master --- package-lock.json | 2 + packages/core/src/index.ts | 3 +- packages/core/src/lib/base_protocol.ts | 42 +++- packages/core/src/lib/connection_manager.ts | 5 +- packages/core/src/lib/filter/index.ts | 7 +- packages/core/src/lib/filterPeers.spec.ts | 8 +- packages/core/src/lib/filterPeers.ts | 4 +- packages/core/src/lib/light_push/index.ts | 6 +- packages/core/src/lib/metadata/index.ts | 55 +++-- packages/core/src/lib/store/index.ts | 7 +- packages/core/src/lib/wait_for_remote_peer.ts | 67 +++++- packages/core/src/lib/waku.ts | 6 +- packages/dns-discovery/src/dns_discovery.ts | 3 +- packages/enr/src/enr.spec.ts | 1 - packages/enr/src/index.ts | 1 - packages/enr/src/raw_enr.ts | 2 +- packages/interfaces/src/metadata.ts | 7 +- packages/interfaces/src/protocols.ts | 4 +- packages/interfaces/src/waku.ts | 3 + .../src/waku_peer_exchange_discovery.ts | 3 +- packages/tests/tests/filter/utils.ts | 3 +- packages/tests/tests/getPeers.spec.ts | 190 ++++++++++++++++++ .../light-push/multiple_pubsub.node.spec.ts | 23 ++- packages/tests/tests/light-push/utils.ts | 7 +- packages/tests/tests/metadata.spec.ts | 2 +- .../tests/tests/store/multiple_pubsub.spec.ts | 4 + packages/tests/tests/store/utils.ts | 18 +- .../tests/wait_for_remote_peer.node.spec.ts | 20 +- packages/utils/package.json | 1 + packages/utils/src/common/index.ts | 1 + .../src/common}/relay_shard_codec.spec.ts | 0 .../src/common}/relay_shard_codec.ts | 0 packages/utils/src/libp2p/index.ts | 79 +++++++- 33 files changed, 477 insertions(+), 107 deletions(-) create mode 100644 packages/tests/tests/getPeers.spec.ts rename packages/{enr/src => utils/src/common}/relay_shard_codec.spec.ts (100%) rename packages/{enr/src => utils/src/common}/relay_shard_codec.ts (100%) diff --git a/package-lock.json b/package-lock.json index 3f4bb5b45d..5c8e357541 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26643,6 +26643,7 @@ "@noble/hashes": "^1.3.2", "chai": "^4.3.10", "debug": "^4.3.4", + "fast-check": "^3.14.0", "uint8arrays": "^4.0.4" }, "devDependencies": { @@ -30095,6 +30096,7 @@ "chai": "^4.3.10", "cspell": "^7.3.2", "debug": "^4.3.4", + "fast-check": "^3.14.0", "npm-run-all": "^4.1.5", "rollup": "^4.6.0", "uint8arrays": "^4.0.4" diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index c7f7462ca4..d20bdc977f 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -14,8 +14,7 @@ export * as waku_filter from "./lib/filter/index.js"; export { wakuFilter, FilterCodecs } from "./lib/filter/index.js"; export * as waku_light_push from "./lib/light_push/index.js"; -export { LightPushCodec } from "./lib/light_push/index.js"; -export { wakuLightPush } from "./lib/light_push/index.js"; +export { LightPushCodec, wakuLightPush } from "./lib/light_push/index.js"; export * as waku_store from "./lib/store/index.js"; diff --git a/packages/core/src/lib/base_protocol.ts b/packages/core/src/lib/base_protocol.ts index bed4d8fc89..b5dcb1fd37 100644 --- a/packages/core/src/lib/base_protocol.ts +++ b/packages/core/src/lib/base_protocol.ts @@ -11,7 +11,7 @@ import type { import { DefaultPubsubTopic } from "@waku/interfaces"; import { shardInfoToPubsubTopics } from "@waku/utils"; import { - getConnectedPeersForProtocol, + getConnectedPeersForProtocolAndShard, getPeersForProtocol, selectPeerForProtocol } from "@waku/utils/libp2p"; @@ -27,11 +27,15 @@ export class BaseProtocol implements IBaseProtocol { public readonly addLibp2pEventListener: Libp2p["addEventListener"]; public readonly removeLibp2pEventListener: Libp2p["removeEventListener"]; protected streamManager: StreamManager; + protected pubsubTopics: PubsubTopic[]; constructor( public multicodec: string, - private components: Libp2pComponents + private components: Libp2pComponents, + public options?: ProtocolCreateOptions ) { + this.pubsubTopics = this.initializePubsubTopic(options); + this.addLibp2pEventListener = components.events.addEventListener.bind( components.events ); @@ -60,10 +64,19 @@ export class BaseProtocol implements IBaseProtocol { * the class protocol. Waku may or may not be currently connected to these * peers. */ - public async peers(): Promise { + public async allPeers(): Promise { return getPeersForProtocol(this.peerStore, [this.multicodec]); } + public async connectedPeers(): Promise { + const peers = await this.allPeers(); + return peers.filter((peer) => { + return ( + this.components.connectionManager.getConnections(peer.id).length > 0 + ); + }); + } + protected async getPeer(peerId?: PeerId): Promise { const { peer } = await selectPeerForProtocol( this.peerStore, @@ -92,18 +105,25 @@ export class BaseProtocol implements IBaseProtocol { numPeers: 0 } ): Promise { - // Retrieve all connected peers that support the protocol - const allPeersForProtocol = await getConnectedPeersForProtocol( - this.components.connectionManager.getConnections(), - this.peerStore, - [this.multicodec] - ); + // Retrieve all connected peers that support the protocol & shard (if configured) + const connectedPeersForProtocolAndShard = + await getConnectedPeersForProtocolAndShard( + this.components.connectionManager.getConnections(), + this.peerStore, + [this.multicodec] + ); // Filter the peers based on the specified criteria - return filterPeers(allPeersForProtocol, numPeers, maxBootstrapPeers); + return filterPeers( + connectedPeersForProtocolAndShard, + numPeers, + maxBootstrapPeers + ); } - initializePubsubTopic(options?: ProtocolCreateOptions): PubsubTopic[] { + private initializePubsubTopic( + options?: ProtocolCreateOptions + ): PubsubTopic[] { return ( options?.pubsubTopics ?? (options?.shardInfo diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index ca74079479..1020c0bb4e 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -3,7 +3,6 @@ import type { PeerInfo } from "@libp2p/interface/peer-info"; import type { Peer } from "@libp2p/interface/peer-store"; import type { PeerStore } from "@libp2p/interface/peer-store"; import { CustomEvent, EventEmitter } from "@libp2p/interfaces/events"; -import { decodeRelayShard } from "@waku/enr"; import { ConnectionManagerOptions, EConnectionStateEvents, @@ -18,7 +17,7 @@ import { ShardInfo } from "@waku/interfaces"; import { Libp2p, Tags } from "@waku/interfaces"; -import { shardInfoToPubsubTopics } from "@waku/utils"; +import { decodeRelayShard, shardInfoToPubsubTopics } from "@waku/utils"; import { Logger } from "@waku/utils"; import { KeepAliveManager } from "./keep_alive_manager.js"; @@ -380,6 +379,8 @@ export class ConnectionManager }, "peer:connect": (evt: CustomEvent): void => { void (async () => { + log.info(`Connected to peer ${evt.detail.toString()}`); + const peerId = evt.detail; this.keepAliveManager.start( diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 06caa77b51..fc3b397520 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -258,7 +258,6 @@ class Subscription { } class Filter extends BaseProtocol implements IReceiver { - private readonly pubsubTopics: PubsubTopic[] = []; private activeSubscriptions = new Map(); private readonly NUM_PEERS_PROTOCOL = 1; @@ -279,9 +278,7 @@ class Filter extends BaseProtocol implements IReceiver { } constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(FilterCodecs.SUBSCRIBE, libp2p.components); - - this.pubsubTopics = this.initializePubsubTopic(options); + super(FilterCodecs.SUBSCRIBE, libp2p.components, options); libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log.error("Failed to register ", FilterCodecs.PUSH, e); @@ -300,8 +297,6 @@ class Filter extends BaseProtocol implements IReceiver { ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); - //TODO: get a relevant peer for the topic/shard - // https://github.com/waku-org/js-waku/pull/1586#discussion_r1336428230 const peer = ( await this.getPeers({ maxBootstrapPeers: 1, diff --git a/packages/core/src/lib/filterPeers.spec.ts b/packages/core/src/lib/filterPeers.spec.ts index de51da5593..50cfbfd30e 100644 --- a/packages/core/src/lib/filterPeers.spec.ts +++ b/packages/core/src/lib/filterPeers.spec.ts @@ -27,7 +27,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 0, 10); + const result = filterPeers(mockPeers, 0, 10); expect(result.length).to.deep.equal(mockPeers.length); }); @@ -56,7 +56,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 0, 0); + const result = filterPeers(mockPeers, 0, 0); // result should have no bootstrap peers, and a total of 2 peers expect(result.length).to.equal(2); @@ -95,7 +95,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 0, 1); + const result = filterPeers(mockPeers, 0, 1); // result should have 1 bootstrap peers, and a total of 4 peers expect(result.length).to.equal(4); @@ -134,7 +134,7 @@ describe("filterPeers function", function () { } ] as unknown as Peer[]; - const result = await filterPeers(mockPeers, 5, 2); + const result = filterPeers(mockPeers, 5, 2); // check that result has at least 2 bootstrap peers and no more than 5 peers expect(result.length).to.be.at.least(2); diff --git a/packages/core/src/lib/filterPeers.ts b/packages/core/src/lib/filterPeers.ts index 298c9f15bc..8751aeb4b5 100644 --- a/packages/core/src/lib/filterPeers.ts +++ b/packages/core/src/lib/filterPeers.ts @@ -9,11 +9,11 @@ import { Tags } from "@waku/interfaces"; * @param maxBootstrapPeers - The maximum number of bootstrap peers to retrieve. * @returns A Promise that resolves to an array of peers based on the specified criteria. */ -export async function filterPeers( +export function filterPeers( peers: Peer[], numPeers: number, maxBootstrapPeers: number -): Promise { +): Peer[] { // Collect the bootstrap peers up to the specified maximum const bootstrapPeers = peers .filter((peer) => peer.tags.has(Tags.BOOTSTRAP)) diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 773e033565..7bba940b2f 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -6,7 +6,6 @@ import { IMessage, Libp2p, ProtocolCreateOptions, - PubsubTopic, SendError, SendResult } from "@waku/interfaces"; @@ -44,12 +43,10 @@ type PreparePushMessageResult = * Implements the [Waku v2 Light Push protocol](https://rfc.vac.dev/spec/19/). */ class LightPush extends BaseProtocol implements ILightPush { - private readonly pubsubTopics: PubsubTopic[]; private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(LightPushCodec, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options); + super(LightPushCodec, libp2p.components, options); } private async preparePushMessage( @@ -108,7 +105,6 @@ class LightPush extends BaseProtocol implements ILightPush { }; } - //TODO: get a relevant peer for the topic/shard const peers = await this.getPeers({ maxBootstrapPeers: 1, numPeers: this.NUM_PEERS_PROTOCOL diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index 9f95b5ba24..b84e26a530 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -1,6 +1,5 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import { IncomingStreamData } from "@libp2p/interface/stream-handler"; -import { encodeRelayShard } from "@waku/enr"; import type { IMetadata, Libp2pComponents, @@ -8,7 +7,7 @@ import type { ShardingParams } from "@waku/interfaces"; import { proto_metadata } from "@waku/proto"; -import { Logger } from "@waku/utils"; +import { encodeRelayShard, Logger } from "@waku/utils"; import all from "it-all"; import * as lp from "it-length-prefixed"; import { pipe } from "it-pipe"; @@ -20,13 +19,21 @@ const log = new Logger("metadata"); export const MetadataCodec = "/vac/waku/metadata/1.0.0"; -class Metadata extends BaseProtocol { - private readonly shardInfo: ShardingParams; +class Metadata extends BaseProtocol implements IMetadata { private libp2pComponents: Libp2pComponents; - constructor(shardInfo: ShardingParams, libp2p: Libp2pComponents) { - super(MetadataCodec, libp2p.components); + handshakesConfirmed: Set = new Set(); + + checkHandshake(peerId: PeerId): boolean { + const handshakesArr = [...this.handshakesConfirmed]; + return handshakesArr.some((id) => id.equals(peerId)); + } + + constructor( + public shardInfo: ShardingParams, + libp2p: Libp2pComponents + ) { + super(MetadataCodec, libp2p.components, shardInfo && { shardInfo }); this.libp2pComponents = libp2p; - this.shardInfo = shardInfo; void libp2p.registrar.handle(MetadataCodec, (streamData) => { void this.onRequest(streamData); }); @@ -53,12 +60,10 @@ class Metadata extends BaseProtocol { const remoteShardInfoResponse = this.decodeMetadataResponse(encodedResponse); - // add or update the shardInfo to peer store - await this.libp2pComponents.peerStore.merge(connection.remotePeer, { - metadata: { - shardInfo: encodeRelayShard(remoteShardInfoResponse) - } - }); + await this.savePeerShardInfo( + connection.remotePeer, + remoteShardInfoResponse + ); } catch (error) { log.error("Error handling metadata request", error); } @@ -84,9 +89,19 @@ class Metadata extends BaseProtocol { const decodedResponse = this.decodeMetadataResponse(encodedResponse); + await this.savePeerShardInfo(peerId, decodedResponse); + return decodedResponse; } + public async confirmOrAttemptHandshake(peerId: PeerId): Promise { + if (this.checkHandshake(peerId)) return; + + await this.query(peerId); + + return; + } + private decodeMetadataResponse(encodedResponse: Uint8ArrayList[]): ShardInfo { const bytes = new Uint8ArrayList(); @@ -101,6 +116,20 @@ class Metadata extends BaseProtocol { return response; } + + private async savePeerShardInfo( + peerId: PeerId, + shardInfo: ShardInfo + ): Promise { + // add or update the shardInfo to peer store + await this.libp2pComponents.peerStore.merge(peerId, { + metadata: { + shardInfo: encodeRelayShard(shardInfo) + } + }); + + this.handshakesConfirmed.add(peerId); + } } export function wakuMetadata( diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 1e06533311..afed546a9a 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -6,8 +6,7 @@ import { IDecoder, IStore, Libp2p, - ProtocolCreateOptions, - PubsubTopic + ProtocolCreateOptions } from "@waku/interfaces"; import { proto_store as proto } from "@waku/proto"; import { ensurePubsubTopicIsConfigured, isDefined } from "@waku/utils"; @@ -74,12 +73,10 @@ export interface QueryOptions { * The Waku Store protocol can be used to retrieved historical messages. */ class Store extends BaseProtocol implements IStore { - private readonly pubsubTopics: PubsubTopic[]; private readonly NUM_PEERS_PROTOCOL = 1; constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { - super(StoreCodec, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options); + super(StoreCodec, libp2p.components, options); } /** diff --git a/packages/core/src/lib/wait_for_remote_peer.ts b/packages/core/src/lib/wait_for_remote_peer.ts index 68e025cb83..183e81d75c 100644 --- a/packages/core/src/lib/wait_for_remote_peer.ts +++ b/packages/core/src/lib/wait_for_remote_peer.ts @@ -1,9 +1,8 @@ import type { IdentifyResult } from "@libp2p/interface"; -import type { IBaseProtocol, IRelay, Waku } from "@waku/interfaces"; +import type { IBaseProtocol, IMetadata, IRelay, Waku } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { Logger } from "@waku/utils"; import { pEvent } from "p-event"; - const log = new Logger("wait-for-remote-peer"); /** @@ -32,6 +31,11 @@ export async function waitForRemotePeer( ): Promise { protocols = protocols ?? getEnabledProtocols(waku); + const isShardingEnabled = waku.shardInfo !== undefined; + const metadataService = isShardingEnabled + ? waku.libp2p.services.metadata + : undefined; + if (!waku.isStarted()) return Promise.reject("Waku node is not started"); const promises = []; @@ -45,19 +49,19 @@ export async function waitForRemotePeer( if (protocols.includes(Protocols.Store)) { if (!waku.store) throw new Error("Cannot wait for Store peer: protocol not mounted"); - promises.push(waitForConnectedPeer(waku.store)); + promises.push(waitForConnectedPeer(waku.store, metadataService)); } if (protocols.includes(Protocols.LightPush)) { if (!waku.lightPush) throw new Error("Cannot wait for LightPush peer: protocol not mounted"); - promises.push(waitForConnectedPeer(waku.lightPush)); + promises.push(waitForConnectedPeer(waku.lightPush, metadataService)); } if (protocols.includes(Protocols.Filter)) { if (!waku.filter) throw new Error("Cannot wait for Filter peer: protocol not mounted"); - promises.push(waitForConnectedPeer(waku.filter)); + promises.push(waitForConnectedPeer(waku.filter, metadataService)); } if (timeoutMs) { @@ -73,21 +77,62 @@ export async function waitForRemotePeer( /** * Wait for a peer with the given protocol to be connected. + * If sharding is enabled on the node, it will also wait for the peer to be confirmed by the metadata service. */ -async function waitForConnectedPeer(protocol: IBaseProtocol): Promise { +async function waitForConnectedPeer( + protocol: IBaseProtocol, + metadataService?: IMetadata +): Promise { const codec = protocol.multicodec; - const peers = await protocol.peers(); + const peers = await protocol.connectedPeers(); if (peers.length) { - log.info(`${codec} peer found: `, peers[0].id.toString()); - return; + if (!metadataService) { + log.info(`${codec} peer found: `, peers[0].id.toString()); + return; + } + + // once a peer is connected, we need to confirm the metadata handshake with at least one of those peers if sharding is enabled + try { + await Promise.any( + peers.map((peer) => metadataService.confirmOrAttemptHandshake(peer.id)) + ); + return; + } catch (e) { + if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED") + log.error( + `Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}` + ); + + log.error(`Error waiting for handshake confirmation: ${e}`); + } } + log.info(`Waiting for ${codec} peer`); + + // else we'll just wait for the next peer to connect await new Promise((resolve) => { const cb = (evt: CustomEvent): void => { if (evt.detail?.protocols?.includes(codec)) { - protocol.removeLibp2pEventListener("peer:identify", cb); - resolve(); + if (metadataService) { + metadataService + .confirmOrAttemptHandshake(evt.detail.peerId) + .then(() => { + protocol.removeLibp2pEventListener("peer:identify", cb); + resolve(); + }) + .catch((e) => { + if (e.code === "ERR_CONNECTION_BEING_CLOSED") + log.error( + `Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}` + ); + + log.error(`Error waiting for handshake confirmation: ${e}`); + }); + } else { + protocol.removeLibp2pEventListener("peer:identify", cb); + resolve(); + } } }; protocol.addLibp2pEventListener("peer:identify", cb); diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 1fc3f35643..9ca698b3e8 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -57,7 +57,7 @@ export class WakuNode implements Waku { options: WakuOptions, pubsubTopics: PubsubTopic[] = [], libp2p: Libp2p, - pubsubShardInfo?: ShardingParams, + private pubsubShardInfo?: ShardingParams, store?: (libp2p: Libp2p) => IStore, lightPush?: (libp2p: Libp2p) => ILightPush, filter?: (libp2p: Libp2p) => IFilter, @@ -110,6 +110,10 @@ export class WakuNode implements Waku { ); } + get shardingParams(): ShardingParams | undefined { + return this.pubsubShardInfo; + } + /** * Dials to the provided peer. * diff --git a/packages/dns-discovery/src/dns_discovery.ts b/packages/dns-discovery/src/dns_discovery.ts index e0e41007c1..2b4e314716 100644 --- a/packages/dns-discovery/src/dns_discovery.ts +++ b/packages/dns-discovery/src/dns_discovery.ts @@ -5,14 +5,13 @@ import type { } from "@libp2p/interface/peer-discovery"; import { peerDiscovery as symbol } from "@libp2p/interface/peer-discovery"; import type { PeerInfo } from "@libp2p/interface/peer-info"; -import { encodeRelayShard } from "@waku/enr"; import type { DnsDiscOptions, DnsDiscoveryComponents, IEnr, NodeCapabilityCount } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; +import { encodeRelayShard, Logger } from "@waku/utils"; import { DEFAULT_BOOTSTRAP_TAG_NAME, diff --git a/packages/enr/src/enr.spec.ts b/packages/enr/src/enr.spec.ts index e4b923b9a7..84171055ce 100644 --- a/packages/enr/src/enr.spec.ts +++ b/packages/enr/src/enr.spec.ts @@ -393,7 +393,6 @@ describe("ENR", function () { it("should properly create peer info with all multiaddrs", () => { const peerInfo = enr.peerInfo!; - console.log(peerInfo); expect(peerInfo.id.toString()).to.equal(peerId.toString()); expect(peerInfo.multiaddrs.length).to.equal(5); expect(peerInfo.multiaddrs.map((ma) => ma.toString())).to.contain( diff --git a/packages/enr/src/index.ts b/packages/enr/src/index.ts index 4835e900ba..d8b6fb5481 100644 --- a/packages/enr/src/index.ts +++ b/packages/enr/src/index.ts @@ -5,4 +5,3 @@ export * from "./enr.js"; export * from "./peer_id.js"; export * from "./waku2_codec.js"; export * from "./crypto.js"; -export * from "./relay_shard_codec.js"; diff --git a/packages/enr/src/raw_enr.ts b/packages/enr/src/raw_enr.ts index 252d63a4a4..b7c607880e 100644 --- a/packages/enr/src/raw_enr.ts +++ b/packages/enr/src/raw_enr.ts @@ -10,11 +10,11 @@ import type { ShardInfo, Waku2 } from "@waku/interfaces"; +import { decodeRelayShard } from "@waku/utils"; import { bytesToUtf8 } from "@waku/utils/bytes"; import { ERR_INVALID_ID } from "./constants.js"; import { decodeMultiaddrs, encodeMultiaddrs } from "./multiaddrs_codec.js"; -import { decodeRelayShard } from "./relay_shard_codec.js"; import { decodeWaku2, encodeWaku2 } from "./waku2_codec.js"; export class RawEnr extends Map { diff --git a/packages/interfaces/src/metadata.ts b/packages/interfaces/src/metadata.ts index f37bcba0f3..ecf9adb039 100644 --- a/packages/interfaces/src/metadata.ts +++ b/packages/interfaces/src/metadata.ts @@ -1,8 +1,11 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { ShardInfo } from "./enr.js"; -import type { IBaseProtocol } from "./protocols.js"; +import type { IBaseProtocol, ShardingParams } from "./protocols.js"; -export interface IMetadata extends IBaseProtocol { +// IMetadata always has shardInfo defined while it is optionally undefined in IBaseProtocol +export interface IMetadata extends Omit { + shardInfo: ShardingParams; + confirmOrAttemptHandshake(peerId: PeerId): Promise; query(peerId: PeerId): Promise; } diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index c7c223c6b4..df543b793a 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -15,9 +15,11 @@ export enum Protocols { } export interface IBaseProtocol { + shardInfo?: ShardInfo; multicodec: string; peerStore: PeerStore; - peers: () => Promise; + allPeers: () => Promise; + connectedPeers: () => Promise; addLibp2pEventListener: Libp2p["addEventListener"]; removeLibp2pEventListener: Libp2p["removeEventListener"]; } diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 876403ecea..70e5e8c86c 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -3,6 +3,7 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { Multiaddr } from "@multiformats/multiaddr"; import { IConnectionManager } from "./connection_manager.js"; +import type { ShardInfo } from "./enr.js"; import type { IFilter } from "./filter.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; @@ -17,6 +18,8 @@ export interface Waku { filter?: IFilter; lightPush?: ILightPush; + shardInfo?: ShardInfo; + connectionManager: IConnectionManager; dial(peer: PeerId | Multiaddr, protocols?: Protocols[]): Promise; diff --git a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts index d4973f6440..450611cdfd 100644 --- a/packages/peer-exchange/src/waku_peer_exchange_discovery.ts +++ b/packages/peer-exchange/src/waku_peer_exchange_discovery.ts @@ -7,9 +7,8 @@ import type { import { peerDiscovery as symbol } from "@libp2p/interface/peer-discovery"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { PeerInfo } from "@libp2p/interface/peer-info"; -import { encodeRelayShard } from "@waku/enr"; import { Libp2pComponents, Tags } from "@waku/interfaces"; -import { Logger } from "@waku/utils"; +import { encodeRelayShard, Logger } from "@waku/utils"; import { PeerExchangeCodec, WakuPeerExchange } from "./waku_peer_exchange.js"; diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index c7a929cfa2..84c08c1862 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -55,7 +55,8 @@ export async function runNodes( filter: true, lightpush: true, relay: true, - pubsubTopic: pubsubTopics + pubsubTopic: pubsubTopics, + ...(shardInfo && { clusterId: shardInfo.clusterId }) }, { retries: 3 } ); diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts new file mode 100644 index 0000000000..723e180c19 --- /dev/null +++ b/packages/tests/tests/getPeers.spec.ts @@ -0,0 +1,190 @@ +import { LightPushCodec, waitForRemotePeer } from "@waku/core"; +import { + createLightNode, + type LightNode, + Protocols, + ShardInfo +} from "@waku/sdk"; +import { shardInfoToPubsubTopics } from "@waku/utils"; +import { getPeersForProtocolAndShard } from "@waku/utils/libp2p"; +import { expect } from "chai"; + +import { makeLogFileName } from "../src/log_file.js"; +import { NimGoNode } from "../src/node/node.js"; +import { tearDownNodes } from "../src/teardown.js"; + +describe("getPeersForProtocolAndShard", function () { + let waku: LightNode; + let serviceNode1: NimGoNode; + let serviceNode2: NimGoNode; + + this.beforeEach(async function () { + this.timeout(15000); + serviceNode1 = new NimGoNode(makeLogFileName(this) + "1"); + serviceNode2 = new NimGoNode(makeLogFileName(this) + "2"); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([serviceNode1, serviceNode2], waku); + }); + + it("same cluster, same shard: nodes connect", async function () { + this.timeout(15000); + + const shardInfo: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo), + lightpush: true + }); + + const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo }); + await waku.start(); + await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); + await waitForRemotePeer(waku, [Protocols.LightPush]); + const peers = await getPeersForProtocolAndShard( + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo + ); + expect(peers.length).to.be.greaterThan(0); + }); + + it("same cluster, different shard: nodes connect", async function () { + this.timeout(15000); + + const shardInfo: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo), + lightpush: true + }); + + const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo }); + await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getPeersForProtocolAndShard( + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo + ); + expect(peers.length).to.be.greaterThan(0); + }); + + it("different cluster, same shard: nodes don't connect", async function () { + this.timeout(15000); + + const shardInfo1: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + const shardInfo2: ShardInfo = { + clusterId: 2, + shards: [1] + }; + + // we start one node in a separate cluster + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true + }); + + // and another node in the same cluster cluster as our node + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true + }); + + const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); + const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); + + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getPeersForProtocolAndShard( + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); + + it("different cluster, different shard: nodes don't connect", async function () { + this.timeout(15000); + + const shardInfo1: ShardInfo = { + clusterId: 1, + shards: [1] + }; + + const shardInfo2: ShardInfo = { + clusterId: 2, + shards: [2] + }; + + // we start one node in a separate cluster + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true + }); + + // and another node in the same cluster cluster as our node + const serviceNode2 = new NimGoNode(makeLogFileName(this) + "2"); + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true + }); + + const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); + const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getPeersForProtocolAndShard( + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); +}); diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 19c08ccf13..3d4a46bc13 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -34,10 +34,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { clusterId: 3, shard: 1 }); - const customPubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 2 - }); + const shardInfo: ShardInfo = { clusterId: 3, shards: [1, 2] }; const singleShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; const singleShardInfo2: SingleShardInfo = { clusterId: 3, shard: 2 }; @@ -58,7 +55,10 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { this.timeout(15000); [nwaku, waku] = await runNodes( this, - [customPubsubTopic1, customPubsubTopic2], + [ + singleShardInfoToPubsubTopic(singleShardInfo1), + singleShardInfoToPubsubTopic(singleShardInfo2) + ], shardInfo ); messageCollector = new MessageCollector(nwaku); @@ -108,7 +108,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { expect( await messageCollector2.waitForMessages(1, { - pubsubTopic: customPubsubTopic2 + pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }) ).to.eq(true); @@ -131,9 +131,12 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { filter: true, lightpush: true, relay: true, - pubsubTopic: [customPubsubTopic2] + pubsubTopic: [singleShardInfoToPubsubTopic(singleShardInfo2)], + clusterId: singleShardInfo2.clusterId }); - await nwaku2.ensureSubscriptions([customPubsubTopic2]); + await nwaku2.ensureSubscriptions([ + singleShardInfoToPubsubTopic(singleShardInfo2) + ]); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -148,7 +151,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { pubsubTopic: customPubsubTopic1 })) || !(await messageCollector2.waitForMessages(1, { - pubsubTopic: customPubsubTopic2 + pubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) })) || pushResponse1!.recipients[0].toString() === pushResponse2!.recipients[0].toString() @@ -169,7 +172,7 @@ describe("Waku Light Push : Multiple PubsubTopics", function () { messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, - expectedPubsubTopic: customPubsubTopic2 + expectedPubsubTopic: singleShardInfoToPubsubTopic(singleShardInfo2) }); }); }); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 435b7b50d2..a484e097de 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -24,7 +24,12 @@ export async function runNodes( ): Promise<[NimGoNode, LightNode]> { const nwaku = new NimGoNode(makeLogFileName(context)); await nwaku.start( - { lightpush: true, relay: true, pubsubTopic: pubsubTopics }, + { + lightpush: true, + relay: true, + pubsubTopic: pubsubTopics, + ...(shardInfo && { clusterId: shardInfo.clusterId }) + }, { retries: 3 } ); diff --git a/packages/tests/tests/metadata.spec.ts b/packages/tests/tests/metadata.spec.ts index b8bd961436..f96fa984c3 100644 --- a/packages/tests/tests/metadata.spec.ts +++ b/packages/tests/tests/metadata.spec.ts @@ -1,7 +1,7 @@ import { MetadataCodec } from "@waku/core"; -import { decodeRelayShard } from "@waku/enr"; import type { LightNode, ShardInfo } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; +import { decodeRelayShard } from "@waku/utils"; import { shardInfoToPubsubTopics } from "@waku/utils"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 74ad7219f1..2d8f251a75 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -18,6 +18,8 @@ import { customDecoder2, customShardedPubsubTopic1, customShardedPubsubTopic2, + customShardInfo1, + customShardInfo2, processQueriedMessages, sendMessages, sendMessagesAutosharding, @@ -39,6 +41,7 @@ describe("Waku Store, custom pubsub topic", function () { await nwaku.start({ store: true, pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2], + clusterId: customShardInfo1.clusterId, relay: true }); await nwaku.ensureSubscriptions([ @@ -123,6 +126,7 @@ describe("Waku Store, custom pubsub topic", function () { await nwaku2.start({ store: true, pubsubTopic: [customShardedPubsubTopic2], + clusterId: customShardInfo2.clusterId, relay: true }); await nwaku2.ensureSubscriptions([customShardedPubsubTopic2]); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 252df84cb0..2cf9156dc4 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -10,7 +10,8 @@ import { LightNode, Protocols, ShardInfo, - ShardingParams + ShardingParams, + type SingleShardInfo } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { Logger, singleShardInfoToPubsubTopic } from "@waku/utils"; @@ -23,14 +24,13 @@ export const log = new Logger("test:store"); export const TestContentTopic = "/test/1/waku-store/utf8"; export const TestEncoder = createEncoder({ contentTopic: TestContentTopic }); export const TestDecoder = createDecoder(TestContentTopic); -export const customShardedPubsubTopic1 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 1 -}); -export const customShardedPubsubTopic2 = singleShardInfoToPubsubTopic({ - clusterId: 3, - shard: 2 -}); +export const customShardInfo1: SingleShardInfo = { clusterId: 3, shard: 1 }; +export const customShardedPubsubTopic1 = + singleShardInfoToPubsubTopic(customShardInfo1); + +export const customShardInfo2: SingleShardInfo = { clusterId: 3, shard: 2 }; +export const customShardedPubsubTopic2 = + singleShardInfoToPubsubTopic(customShardInfo2); export const shardInfo1: ShardInfo = { clusterId: 3, shards: [1] }; export const customContentTopic1 = "/test/2/waku-store/utf8"; export const customContentTopic2 = "/test/3/waku-store/utf8"; diff --git a/packages/tests/tests/wait_for_remote_peer.node.spec.ts b/packages/tests/tests/wait_for_remote_peer.node.spec.ts index 586d2574c0..2f1cd8d661 100644 --- a/packages/tests/tests/wait_for_remote_peer.node.spec.ts +++ b/packages/tests/tests/wait_for_remote_peer.node.spec.ts @@ -114,7 +114,9 @@ describe("Wait for remote peer", function () { await delay(1000); await waitForRemotePeer(waku2, [Protocols.Store]); - const peers = (await waku2.store.peers()).map((peer) => peer.id.toString()); + const peers = (await waku2.store.connectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); expect(nimPeerId).to.not.be.undefined; @@ -141,7 +143,9 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waitPromise; - const peers = (await waku2.store.peers()).map((peer) => peer.id.toString()); + const peers = (await waku2.store.connectedPeers()).map((peer) => + peer.id.toString() + ); const nimPeerId = multiAddrWithId.getPeerId(); @@ -167,7 +171,7 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waitForRemotePeer(waku2, [Protocols.LightPush]); - const peers = (await waku2.lightPush.peers()).map((peer) => + const peers = (await waku2.lightPush.connectedPeers()).map((peer) => peer.id.toString() ); @@ -195,7 +199,7 @@ describe("Wait for remote peer", function () { await waku2.dial(multiAddrWithId); await waitForRemotePeer(waku2, [Protocols.Filter]); - const peers = (await waku2.filter.peers()).map((peer) => + const peers = (await waku2.filter.connectedPeers()).map((peer) => peer.id.toString() ); @@ -227,14 +231,14 @@ describe("Wait for remote peer", function () { Protocols.LightPush ]); - const filterPeers = (await waku2.filter.peers()).map((peer) => + const filterPeers = (await waku2.filter.connectedPeers()).map((peer) => peer.id.toString() ); - const storePeers = (await waku2.store.peers()).map((peer) => + const storePeers = (await waku2.store.connectedPeers()).map((peer) => peer.id.toString() ); - const lightPushPeers = (await waku2.lightPush.peers()).map((peer) => - peer.id.toString() + const lightPushPeers = (await waku2.lightPush.connectedPeers()).map( + (peer) => peer.id.toString() ); const nimPeerId = multiAddrWithId.getPeerId(); diff --git a/packages/utils/package.json b/packages/utils/package.json index f820960ca4..b5bfdc41ac 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -71,6 +71,7 @@ "@waku/interfaces": "0.0.20", "chai": "^4.3.10", "debug": "^4.3.4", + "fast-check": "^3.14.0", "uint8arrays": "^4.0.4" }, "devDependencies": { diff --git a/packages/utils/src/common/index.ts b/packages/utils/src/common/index.ts index f834bc22c4..5fbc976815 100644 --- a/packages/utils/src/common/index.ts +++ b/packages/utils/src/common/index.ts @@ -5,6 +5,7 @@ export * from "./to_async_iterator.js"; export * from "./is_size_valid.js"; export * from "./sharding.js"; export * from "./push_or_init_map.js"; +export * from "./relay_shard_codec.js"; export function removeItemFromArray(arr: unknown[], value: unknown): unknown[] { const index = arr.indexOf(value); diff --git a/packages/enr/src/relay_shard_codec.spec.ts b/packages/utils/src/common/relay_shard_codec.spec.ts similarity index 100% rename from packages/enr/src/relay_shard_codec.spec.ts rename to packages/utils/src/common/relay_shard_codec.spec.ts diff --git a/packages/enr/src/relay_shard_codec.ts b/packages/utils/src/common/relay_shard_codec.ts similarity index 100% rename from packages/enr/src/relay_shard_codec.ts rename to packages/utils/src/common/relay_shard_codec.ts diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 883160cc89..2493fe498e 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -1,8 +1,10 @@ import type { Connection } from "@libp2p/interface/connection"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; +import type { ShardInfo } from "@waku/interfaces"; import { bytesToUtf8 } from "../bytes/index.js"; +import { decodeRelayShard } from "../common/relay_shard_codec.js"; /** * Returns a pseudo-random peer that supports the given protocol. @@ -47,6 +49,37 @@ export async function selectLowestLatencyPeer( : undefined; } +// /** +// * Returns the list of peers that supports the given protocol and shard. +// * If shard is not configured, all peers that support the protocol are returned. +// */ + +// export async function getPeersForProtocolAndShard( +// peerStore: PeerStore, +// protocols: string[], +// shardInfo?: ShardInfo +// ): Promise { +// const peers: Peer[] = []; +// await peerStore.forEach((peer) => { +// if (shardInfo) { +// const encodedPeerShardInfo = peer.metadata.get("shardInfo"); +// const peerShardInfo = +// encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); + +// if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) { +// if (protocols.some((protocol) => peer.protocols.includes(protocol))) { +// peers.push(peer); +// } +// } +// } else { +// if (protocols.some((protocol) => peer.protocols.includes(protocol))) { +// peers.push(peer); +// } +// } +// }); +// return peers; +// } + /** * Returns the list of peers that supports the given protocol. */ @@ -66,10 +99,32 @@ export async function getPeersForProtocol( return peers; } -export async function getConnectedPeersForProtocol( +// export async function getConnectedPeersForProtocol( +// connections: Connection[], +// peerStore: PeerStore, +// protocols: string[] +// ): Promise { +// const openConnections = connections.filter( +// (connection) => connection.status === "open" +// ); + +// const peerPromises = openConnections.map(async (connection) => { +// const peer = await peerStore.get(connection.remotePeer); +// const supportsProtocol = peer.protocols.some((protocol) => +// protocols.includes(protocol) +// ); +// return supportsProtocol ? peer : null; +// }); + +// const peersWithNulls = await Promise.all(peerPromises); +// return peersWithNulls.filter((peer): peer is Peer => peer !== null); +// } + +export async function getConnectedPeersForProtocolAndShard( connections: Connection[], peerStore: PeerStore, - protocols: string[] + protocols: string[], + shardInfo?: ShardInfo ): Promise { const openConnections = connections.filter( (connection) => connection.status === "open" @@ -77,10 +132,24 @@ export async function getConnectedPeersForProtocol( const peerPromises = openConnections.map(async (connection) => { const peer = await peerStore.get(connection.remotePeer); - const supportsProtocol = peer.protocols.some((protocol) => - protocols.includes(protocol) + const supportsProtocol = protocols.some((protocol) => + peer.protocols.includes(protocol) ); - return supportsProtocol ? peer : null; + + if (supportsProtocol) { + if (shardInfo) { + const encodedPeerShardInfo = peer.metadata.get("shardInfo"); + const peerShardInfo = + encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); + + if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) { + return peer; + } + } else { + return peer; + } + } + return null; }); const peersWithNulls = await Promise.all(peerPromises); From 7e34e5ba1191d976446ea78a35c7737cf8090fef Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 10 Jan 2024 13:47:32 +0530 Subject: [PATCH 02/10] fix: tests --- packages/tests/tests/getPeers.spec.ts | 16 ++++++++++------ packages/tests/tests/utils.spec.ts | 16 +--------------- 2 files changed, 11 insertions(+), 21 deletions(-) diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 723e180c19..dc13ee243e 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -6,14 +6,14 @@ import { ShardInfo } from "@waku/sdk"; import { shardInfoToPubsubTopics } from "@waku/utils"; -import { getPeersForProtocolAndShard } from "@waku/utils/libp2p"; +import { getConnectedPeersForProtocolAndShard } from "@waku/utils/libp2p"; import { expect } from "chai"; import { makeLogFileName } from "../src/log_file.js"; import { NimGoNode } from "../src/node/node.js"; import { tearDownNodes } from "../src/teardown.js"; -describe("getPeersForProtocolAndShard", function () { +describe("getConnectedPeersForProtocolAndShard", function () { let waku: LightNode; let serviceNode1: NimGoNode; let serviceNode2: NimGoNode; @@ -51,7 +51,8 @@ describe("getPeersForProtocolAndShard", function () { await waku.start(); await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getPeersForProtocolAndShard( + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), shardInfo @@ -82,7 +83,8 @@ describe("getPeersForProtocolAndShard", function () { await waku.start(); await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getPeersForProtocolAndShard( + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), shardInfo @@ -131,7 +133,8 @@ describe("getPeersForProtocolAndShard", function () { await waku.start(); await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getPeersForProtocolAndShard( + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), shardInfo2 @@ -180,7 +183,8 @@ describe("getPeersForProtocolAndShard", function () { await waku.start(); await waitForRemotePeer(waku, [Protocols.LightPush]); - const peers = await getPeersForProtocolAndShard( + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), waku.libp2p.peerStore, waku.libp2p.getProtocols(), shardInfo2 diff --git a/packages/tests/tests/utils.spec.ts b/packages/tests/tests/utils.spec.ts index 5f82879f01..3eac491d9e 100644 --- a/packages/tests/tests/utils.spec.ts +++ b/packages/tests/tests/utils.spec.ts @@ -2,16 +2,12 @@ import type { PeerStore } from "@libp2p/interface/peer-store"; import type { Peer } from "@libp2p/interface/peer-store"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { createDecoder, createEncoder, waitForRemotePeer } from "@waku/core"; -import { LightPushCodec } from "@waku/core"; import { DefaultPubsubTopic, LightNode } from "@waku/interfaces"; import { Protocols } from "@waku/interfaces"; import { createLightNode } from "@waku/sdk"; import { toAsyncIterator } from "@waku/utils"; import { bytesToUtf8, utf8ToBytes } from "@waku/utils/bytes"; -import { - getConnectedPeersForProtocol, - selectPeerForProtocol -} from "@waku/utils/libp2p"; +import { selectPeerForProtocol } from "@waku/utils/libp2p"; import chai, { expect } from "chai"; import chaiAsPromised from "chai-as-promised"; import sinon from "sinon"; @@ -288,14 +284,4 @@ describe("getConnectedPeersForProtocol", function () { this.timeout(10000); await tearDownNodes(nwaku, waku); }); - - it("returns all connected peers that support the protocol", async function () { - const peers = await getConnectedPeersForProtocol( - waku.libp2p.getConnections(), - waku.libp2p.peerStore, - [LightPushCodec] - ); - - expect(peers.length).to.eq(1); - }); }); From 89f312dc6d7f8697ffa773212ae5d7b243b91f6c Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 10 Jan 2024 14:09:12 +0530 Subject: [PATCH 03/10] update: interfafces --- packages/core/src/lib/waku.ts | 2 +- packages/interfaces/src/waku.ts | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index 9ca698b3e8..6bc20d0cc5 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -110,7 +110,7 @@ export class WakuNode implements Waku { ); } - get shardingParams(): ShardingParams | undefined { + get shardInfo(): ShardingParams | undefined { return this.pubsubShardInfo; } diff --git a/packages/interfaces/src/waku.ts b/packages/interfaces/src/waku.ts index 70e5e8c86c..1b49757e55 100644 --- a/packages/interfaces/src/waku.ts +++ b/packages/interfaces/src/waku.ts @@ -3,11 +3,10 @@ import type { PeerId } from "@libp2p/interface/peer-id"; import type { Multiaddr } from "@multiformats/multiaddr"; import { IConnectionManager } from "./connection_manager.js"; -import type { ShardInfo } from "./enr.js"; import type { IFilter } from "./filter.js"; import type { Libp2p } from "./libp2p.js"; import type { ILightPush } from "./light_push.js"; -import { Protocols } from "./protocols.js"; +import { Protocols, ShardingParams } from "./protocols.js"; import type { IRelay } from "./relay.js"; import type { IStore } from "./store.js"; @@ -18,7 +17,7 @@ export interface Waku { filter?: IFilter; lightPush?: ILightPush; - shardInfo?: ShardInfo; + shardInfo?: ShardingParams; connectionManager: IConnectionManager; From 5959fe4a2cac562f5d08b1cbf36cb090e47f4c3e Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Wed, 10 Jan 2024 17:52:37 +0530 Subject: [PATCH 04/10] rm: comments --- packages/utils/src/libp2p/index.ts | 52 ------------------------------ 1 file changed, 52 deletions(-) diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 2493fe498e..44de86d0e4 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -49,37 +49,6 @@ export async function selectLowestLatencyPeer( : undefined; } -// /** -// * Returns the list of peers that supports the given protocol and shard. -// * If shard is not configured, all peers that support the protocol are returned. -// */ - -// export async function getPeersForProtocolAndShard( -// peerStore: PeerStore, -// protocols: string[], -// shardInfo?: ShardInfo -// ): Promise { -// const peers: Peer[] = []; -// await peerStore.forEach((peer) => { -// if (shardInfo) { -// const encodedPeerShardInfo = peer.metadata.get("shardInfo"); -// const peerShardInfo = -// encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); - -// if (peerShardInfo && shardInfo.clusterId === peerShardInfo.clusterId) { -// if (protocols.some((protocol) => peer.protocols.includes(protocol))) { -// peers.push(peer); -// } -// } -// } else { -// if (protocols.some((protocol) => peer.protocols.includes(protocol))) { -// peers.push(peer); -// } -// } -// }); -// return peers; -// } - /** * Returns the list of peers that supports the given protocol. */ @@ -99,27 +68,6 @@ export async function getPeersForProtocol( return peers; } -// export async function getConnectedPeersForProtocol( -// connections: Connection[], -// peerStore: PeerStore, -// protocols: string[] -// ): Promise { -// const openConnections = connections.filter( -// (connection) => connection.status === "open" -// ); - -// const peerPromises = openConnections.map(async (connection) => { -// const peer = await peerStore.get(connection.remotePeer); -// const supportsProtocol = peer.protocols.some((protocol) => -// protocols.includes(protocol) -// ); -// return supportsProtocol ? peer : null; -// }); - -// const peersWithNulls = await Promise.all(peerPromises); -// return peersWithNulls.filter((peer): peer is Peer => peer !== null); -// } - export async function getConnectedPeersForProtocolAndShard( connections: Connection[], peerStore: PeerStore, From 7475a2cd3c60bf304ab26e33f45ee79fee57b769 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 16 Jan 2024 13:14:28 +0530 Subject: [PATCH 05/10] metadata: store peerIdStr instead of peerId --- packages/core/src/lib/metadata/index.ts | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/packages/core/src/lib/metadata/index.ts b/packages/core/src/lib/metadata/index.ts index d457699b1a..8c27abc454 100644 --- a/packages/core/src/lib/metadata/index.ts +++ b/packages/core/src/lib/metadata/index.ts @@ -3,6 +3,7 @@ import { IncomingStreamData } from "@libp2p/interface/stream-handler"; import type { IMetadata, Libp2pComponents, + PeerIdStr, ShardInfo, ShardingParams } from "@waku/interfaces"; @@ -21,12 +22,7 @@ export const MetadataCodec = "/vac/waku/metadata/1.0.0"; class Metadata extends BaseProtocol implements IMetadata { private libp2pComponents: Libp2pComponents; - handshakesConfirmed: Set = new Set(); - - checkHandshake(peerId: PeerId): boolean { - const handshakesArr = [...this.handshakesConfirmed]; - return handshakesArr.some((id) => id.equals(peerId)); - } + handshakesConfirmed: Set = new Set(); constructor( public shardInfo: ShardingParams, @@ -98,7 +94,7 @@ class Metadata extends BaseProtocol implements IMetadata { } public async confirmOrAttemptHandshake(peerId: PeerId): Promise { - if (this.checkHandshake(peerId)) return; + if (this.handshakesConfirmed.has(peerId.toString())) return; await this.query(peerId); @@ -131,7 +127,7 @@ class Metadata extends BaseProtocol implements IMetadata { } }); - this.handshakesConfirmed.add(peerId); + this.handshakesConfirmed.add(peerId.toString()); } } From 9ea0d04fc807483f3053b83dd07999a4ad334d26 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Tue, 16 Jan 2024 13:16:36 +0530 Subject: [PATCH 06/10] chore(utils): move fast-utils to dev deps --- package-lock.json | 148 ++++++++++++++++++------------------ packages/utils/package.json | 5 +- 2 files changed, 77 insertions(+), 76 deletions(-) diff --git a/package-lock.json b/package-lock.json index b653967d08..bfef193320 100644 --- a/package-lock.json +++ b/package-lock.json @@ -26273,14 +26273,14 @@ }, "packages/core": { "name": "@waku/core", - "version": "0.0.25", + "version": "0.0.26", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/enr": "^0.0.19", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/enr": "^0.0.20", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "debug": "^4.3.4", "it-all": "^3.0.4", "it-length-prefixed": "^9.0.1", @@ -26324,11 +26324,11 @@ }, "packages/dns-discovery": { "name": "@waku/dns-discovery", - "version": "0.0.19", + "version": "0.0.20", "license": "MIT OR Apache-2.0", "dependencies": { - "@waku/enr": "0.0.19", - "@waku/utils": "0.0.13", + "@waku/enr": "0.0.20", + "@waku/utils": "0.0.14", "debug": "^4.3.4", "dns-query": "^0.11.2", "hi-base32": "^0.5.1", @@ -26343,7 +26343,7 @@ "@rollup/plugin-node-resolve": "^15.2.3", "@types/chai": "^4.3.11", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", + "@waku/interfaces": "0.0.21", "chai": "^4.3.10", "cspell": "^7.3.2", "mocha": "^10.2.0", @@ -26356,7 +26356,7 @@ }, "packages/enr": { "name": "@waku/enr", - "version": "0.0.19", + "version": "0.0.20", "license": "MIT OR Apache-2.0", "dependencies": { "@ethersproject/rlp": "^5.7.0", @@ -26364,7 +26364,7 @@ "@libp2p/peer-id": "^3.0.3", "@multiformats/multiaddr": "^12.0.0", "@noble/secp256k1": "^1.7.1", - "@waku/utils": "0.0.13", + "@waku/utils": "0.0.14", "debug": "^4.3.4", "js-sha3": "^0.9.2" }, @@ -26376,7 +26376,7 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.1", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", + "@waku/interfaces": "0.0.21", "chai": "^4.3.10", "cspell": "^7.3.2", "fast-check": "^3.14.0", @@ -26392,7 +26392,7 @@ }, "packages/interfaces": { "name": "@waku/interfaces", - "version": "0.0.20", + "version": "0.0.21", "license": "MIT OR Apache-2.0", "devDependencies": { "@chainsafe/libp2p-gossipsub": "^10.1.1", @@ -26407,14 +26407,14 @@ }, "packages/message-encryption": { "name": "@waku/message-encryption", - "version": "0.0.23", + "version": "0.0.24", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/secp256k1": "^1.7.1", - "@waku/core": "0.0.25", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "debug": "^4.3.4", "js-sha3": "^0.9.2", "uint8arrays": "^5.0.0" @@ -26448,11 +26448,11 @@ }, "packages/message-hash": { "name": "@waku/message-hash", - "version": "0.1.9", + "version": "0.1.10", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", - "@waku/utils": "0.0.13" + "@waku/utils": "0.0.14" }, "devDependencies": { "@rollup/plugin-commonjs": "^25.0.7", @@ -26462,7 +26462,7 @@ "@types/debug": "^4.1.12", "@types/mocha": "^10.0.1", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", + "@waku/interfaces": "0.0.21", "chai": "^4.3.10", "cspell": "^7.3.2", "fast-check": "^3.14.0", @@ -26479,15 +26479,15 @@ }, "packages/peer-exchange": { "name": "@waku/peer-exchange", - "version": "0.0.18", + "version": "0.0.19", "license": "MIT OR Apache-2.0", "dependencies": { "@libp2p/interfaces": "^3.3.2", - "@waku/core": "0.0.25", - "@waku/enr": "0.0.19", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/enr": "0.0.20", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "debug": "^4.3.4", "it-all": "^3.0.4", "it-length-prefixed": "^9.0.1", @@ -26510,7 +26510,7 @@ }, "packages/proto": { "name": "@waku/proto", - "version": "0.0.5", + "version": "0.0.6", "license": "MIT OR Apache-2.0", "dependencies": { "protons-runtime": "^5.0.2" @@ -26532,15 +26532,15 @@ }, "packages/relay": { "name": "@waku/relay", - "version": "0.0.8", + "version": "0.0.9", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-gossipsub": "^10.1.1", "@noble/hashes": "^1.3.2", - "@waku/core": "0.0.25", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "debug": "^4.3.4", "fast-check": "^3.14.0" @@ -26558,18 +26558,18 @@ }, "packages/sdk": { "name": "@waku/sdk", - "version": "0.0.21", + "version": "0.0.22", "license": "MIT OR Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "^13.0.4", "@libp2p/mplex": "^9.0.10", "@libp2p/websockets": "^7.0.5", - "@waku/core": "0.0.25", - "@waku/dns-discovery": "0.0.19", - "@waku/interfaces": "0.0.20", - "@waku/peer-exchange": "^0.0.18", - "@waku/relay": "0.0.8", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/dns-discovery": "0.0.20", + "@waku/interfaces": "0.0.21", + "@waku/peer-exchange": "^0.0.19", + "@waku/relay": "0.0.9", + "@waku/utils": "0.0.14", "libp2p": "^0.46.14" }, "devDependencies": { @@ -26638,13 +26638,13 @@ }, "packages/utils": { "name": "@waku/utils", - "version": "0.0.13", + "version": "0.0.14", "license": "MIT OR Apache-2.0", "dependencies": { "@noble/hashes": "^1.3.2", + "@waku/interfaces": "0.0.21", "chai": "^4.3.10", "debug": "^4.3.4", - "fast-check": "^3.14.0", "uint8arrays": "^4.0.4" }, "devDependencies": { @@ -26652,8 +26652,8 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", "cspell": "^7.3.2", + "fast-check": "^3.14.0", "npm-run-all": "^4.1.5", "rollup": "^4.6.0" }, @@ -29779,10 +29779,10 @@ "@types/mocha": "^10.0.1", "@types/uuid": "^9.0.7", "@waku/build-utils": "*", - "@waku/enr": "^0.0.19", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/enr": "^0.0.20", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "cspell": "^7.3.2", "debug": "^4.3.4", @@ -29850,9 +29850,9 @@ "@rollup/plugin-node-resolve": "^15.2.3", "@types/chai": "^4.3.11", "@waku/build-utils": "*", - "@waku/enr": "0.0.19", - "@waku/interfaces": "0.0.20", - "@waku/utils": "0.0.13", + "@waku/enr": "0.0.20", + "@waku/interfaces": "0.0.21", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "cspell": "^7.3.2", "debug": "^4.3.4", @@ -29879,8 +29879,8 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.1", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", - "@waku/utils": "0.0.13", + "@waku/interfaces": "0.0.21", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "cspell": "^7.3.2", "debug": "^4.3.4", @@ -29913,10 +29913,10 @@ "@types/chai": "^4.3.11", "@types/mocha": "^10.0.1", "@waku/build-utils": "*", - "@waku/core": "0.0.25", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "cspell": "^7.3.2", "debug": "^4.3.4", @@ -29950,8 +29950,8 @@ "@types/debug": "^4.1.12", "@types/mocha": "^10.0.1", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", - "@waku/utils": "0.0.13", + "@waku/interfaces": "0.0.21", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "cspell": "^7.3.2", "fast-check": "^3.14.0", @@ -29971,11 +29971,11 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/core": "0.0.25", - "@waku/enr": "0.0.19", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/enr": "0.0.20", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "cspell": "^7.3.2", "debug": "^4.3.4", @@ -30011,10 +30011,10 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/core": "0.0.25", - "@waku/interfaces": "0.0.20", - "@waku/proto": "0.0.5", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/interfaces": "0.0.21", + "@waku/proto": "0.0.6", + "@waku/utils": "0.0.14", "chai": "^4.3.10", "debug": "^4.3.4", "fast-check": "^3.14.0", @@ -30032,12 +30032,12 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/core": "0.0.25", - "@waku/dns-discovery": "0.0.19", - "@waku/interfaces": "0.0.20", - "@waku/peer-exchange": "^0.0.18", - "@waku/relay": "0.0.8", - "@waku/utils": "0.0.13", + "@waku/core": "0.0.26", + "@waku/dns-discovery": "0.0.20", + "@waku/interfaces": "0.0.21", + "@waku/peer-exchange": "^0.0.19", + "@waku/relay": "0.0.9", + "@waku/utils": "0.0.14", "cspell": "^7.3.2", "interface-datastore": "^8.2.5", "libp2p": "^0.46.14", @@ -30094,7 +30094,7 @@ "@rollup/plugin-json": "^6.0.0", "@rollup/plugin-node-resolve": "^15.2.3", "@waku/build-utils": "*", - "@waku/interfaces": "0.0.20", + "@waku/interfaces": "0.0.21", "chai": "^4.3.10", "cspell": "^7.3.2", "debug": "^4.3.4", diff --git a/packages/utils/package.json b/packages/utils/package.json index ac8c44fb19..ac07b5de55 100644 --- a/packages/utils/package.json +++ b/packages/utils/package.json @@ -71,7 +71,6 @@ "@waku/interfaces": "0.0.21", "chai": "^4.3.10", "debug": "^4.3.4", - "fast-check": "^3.14.0", "uint8arrays": "^4.0.4" }, "devDependencies": { @@ -81,7 +80,9 @@ "@waku/build-utils": "*", "cspell": "^7.3.2", "npm-run-all": "^4.1.5", - "rollup": "^4.6.0" + "rollup": "^4.6.0", + "fast-check": "^3.14.0" + }, "files": [ "dist", From 4dd451bf15401c3561a7421d72eccf0810aab225 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Wed, 17 Jan 2024 10:55:12 -0800 Subject: [PATCH 07/10] fix: allow autosharding nodes to get peers (#1785) --- packages/tests/tests/getPeers.spec.ts | 217 +++++++++++++++++- .../light-push/multiple_pubsub.node.spec.ts | 16 +- .../tests/tests/store/multiple_pubsub.spec.ts | 6 +- packages/utils/src/libp2p/index.ts | 6 - 4 files changed, 219 insertions(+), 26 deletions(-) diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 956eb51705..35ffbdc461 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -4,6 +4,7 @@ import type { Peer } from "@libp2p/interface/peer-store"; import { createSecp256k1PeerId } from "@libp2p/peer-id-factory"; import { LightPushCodec, waitForRemotePeer } from "@waku/core"; import { + ContentTopicInfo, createLightNode, Libp2pComponents, type LightNode, @@ -26,6 +27,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { let waku: LightNode; let serviceNode1: NimGoNode; let serviceNode2: NimGoNode; + const contentTopic = "/test/2/waku-light-push/utf8"; this.beforeEach(async function () { this.timeout(15000); @@ -51,7 +53,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true + lightpush: true, + relay: true }); const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); @@ -77,12 +80,18 @@ describe("getConnectedPeersForProtocolAndShard", function () { shards: [1] }; + const shardInfoServiceNode: ShardInfo = { + clusterId: 1, + shards: [2] + }; + await serviceNode1.start({ discv5Discovery: true, peerExchange: true, - clusterId: shardInfo.clusterId, - pubsubTopic: shardInfoToPubsubTopics(shardInfo), - lightpush: true + clusterId: shardInfoServiceNode.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfoServiceNode), + lightpush: true, + relay: true }); const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); @@ -120,7 +129,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true + lightpush: true, + relay: true }); // and another node in the same cluster cluster as our node @@ -129,7 +139,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true + lightpush: true, + relay: true }); const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); @@ -170,7 +181,196 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo1.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo1), - lightpush: true + lightpush: true, + relay: true + }); + + // and another node in the same cluster cluster as our node + const serviceNode2 = new NimGoNode(makeLogFileName(this) + "2"); + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true, + relay: true + }); + + const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); + const serviceNodeMa2 = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNodeMa1, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNodeMa2, LightPushCodec); + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); + + it("same cluster, same shard: nodes connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo), + lightpush: true, + relay: true + }); + + const serviceNodeMa = await serviceNode1.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo }); + await waku.start(); + await waku.libp2p.dialProtocol(serviceNodeMa, LightPushCodec); + await waitForRemotePeer(waku, [Protocols.LightPush]); + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo + ); + expect(peers.length).to.be.greaterThan(0); + }); + + it("same cluster, different shard: nodes connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo1: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + const shardInfo2: ContentTopicInfo = { + clusterId: 1, + contentTopics: ["/test/5/waku-light-push/utf8"] + }; + + // Separate shard + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true, + relay: true + }); + + // Same shard + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true, + relay: true + }); + + const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); + const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); + + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); + + it("different cluster, same shard: nodes don't connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo1: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + const shardInfo2: ContentTopicInfo = { + clusterId: 2, + contentTopics: [contentTopic] + }; + + // we start one node in a separate cluster + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true, + relay: true + }); + + // and another node in the same cluster cluster as our node + await serviceNode2.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo2.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo2), + lightpush: true, + relay: true + }); + + const serviceNode1Ma = await serviceNode1.getMultiaddrWithId(); + const serviceNode2Ma = await serviceNode2.getMultiaddrWithId(); + + waku = await createLightNode({ shardInfo: shardInfo2 }); + await waku.libp2p.dialProtocol(serviceNode1Ma, LightPushCodec); + await waku.libp2p.dialProtocol(serviceNode2Ma, LightPushCodec); + + await waku.start(); + await waitForRemotePeer(waku, [Protocols.LightPush]); + + const peers = await getConnectedPeersForProtocolAndShard( + waku.libp2p.getConnections(), + waku.libp2p.peerStore, + waku.libp2p.getProtocols(), + shardInfo2 + ); + expect(peers.length).to.be.equal(1); + }); + + it("different cluster, different shard: nodes don't connect (autosharding)", async function () { + this.timeout(15000); + + const shardInfo1: ContentTopicInfo = { + clusterId: 1, + contentTopics: [contentTopic] + }; + + const shardInfo2: ContentTopicInfo = { + clusterId: 2, + contentTopics: ["/test/5/waku-light-push/utf8"] + }; + + // we start one node in a separate cluster + await serviceNode1.start({ + discv5Discovery: true, + peerExchange: true, + clusterId: shardInfo1.clusterId, + pubsubTopic: shardInfoToPubsubTopics(shardInfo1), + lightpush: true, + relay: true }); // and another node in the same cluster cluster as our node @@ -180,7 +380,8 @@ describe("getConnectedPeersForProtocolAndShard", function () { peerExchange: true, clusterId: shardInfo2.clusterId, pubsubTopic: shardInfoToPubsubTopics(shardInfo2), - lightpush: true + lightpush: true, + relay: true }); const serviceNodeMa1 = await serviceNode1.getMultiaddrWithId(); diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 3d4a46bc13..1f6d155dd2 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -290,7 +290,8 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { filter: true, lightpush: true, relay: true, - pubsubTopic: [autoshardingPubsubTopic2] + pubsubTopic: [autoshardingPubsubTopic2], + clusterId: shardInfo.clusterId }); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); await waku.dial(await nwaku2.getMultiaddrWithId()); @@ -353,10 +354,6 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () customContentTopic2, clusterId ); - const contentTopicInfo: ContentTopicInfo = { - clusterId, - contentTopics: [customContentTopic1, customContentTopic2] - }; const customEncoder1 = createEncoder({ contentTopic: customContentTopic1, pubsubTopicShardInfo: { @@ -372,11 +369,10 @@ describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes( - this, - [autoshardingPubsubTopic1, autoshardingPubsubTopic2], - contentTopicInfo - ); + [nwaku, waku] = await runNodes(this, [ + autoshardingPubsubTopic1, + autoshardingPubsubTopic2 + ]); messageCollector = new MessageCollector(nwaku); nimPeerId = await nwaku.getPeerId(); }); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 2d8f251a75..6f4e7a98e0 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -214,7 +214,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku.start({ store: true, pubsubTopic: [autoshardingPubsubTopic1, autoshardingPubsubTopic2], - relay: true + relay: true, + clusterId }); await nwaku.ensureSubscriptionsAutosharding([ customContentTopic1, @@ -287,7 +288,8 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { await nwaku2.start({ store: true, pubsubTopic: [autoshardingPubsubTopic2], - relay: true + relay: true, + clusterId }); await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); diff --git a/packages/utils/src/libp2p/index.ts b/packages/utils/src/libp2p/index.ts index 4bc305eca2..b393b473c9 100644 --- a/packages/utils/src/libp2p/index.ts +++ b/packages/utils/src/libp2p/index.ts @@ -89,12 +89,6 @@ export async function getConnectedPeersForProtocolAndShard( if (supportsProtocol) { if (shardInfo) { - //TODO: support auto-sharding - if (!("shards" in shardInfo)) { - throw new Error( - `Connections Manager only supports static sharding for now. Autosharding is not supported.` - ); - } const encodedPeerShardInfo = peer.metadata.get("shardInfo"); const peerShardInfo = encodedPeerShardInfo && decodeRelayShard(encodedPeerShardInfo); From 212a41d5e76d3aad5cbd7c98fe6594b82b0662c8 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 18 Jan 2024 00:44:06 +0530 Subject: [PATCH 08/10] fix: merge --- packages/tests/tests/getPeers.spec.ts | 16 +++++++--------- 1 file changed, 7 insertions(+), 9 deletions(-) diff --git a/packages/tests/tests/getPeers.spec.ts b/packages/tests/tests/getPeers.spec.ts index 35ffbdc461..7c9a9332f2 100644 --- a/packages/tests/tests/getPeers.spec.ts +++ b/packages/tests/tests/getPeers.spec.ts @@ -19,20 +19,18 @@ import { expect } from "chai"; import fc from "fast-check"; import Sinon from "sinon"; -import { makeLogFileName } from "../src/log_file.js"; -import { NimGoNode } from "../src/node/node.js"; -import { tearDownNodes } from "../src/teardown.js"; +import { makeLogFileName, ServiceNode, tearDownNodes } from "../src/index.js"; describe("getConnectedPeersForProtocolAndShard", function () { let waku: LightNode; - let serviceNode1: NimGoNode; - let serviceNode2: NimGoNode; + let serviceNode1: ServiceNode; + let serviceNode2: ServiceNode; const contentTopic = "/test/2/waku-light-push/utf8"; this.beforeEach(async function () { this.timeout(15000); - serviceNode1 = new NimGoNode(makeLogFileName(this) + "1"); - serviceNode2 = new NimGoNode(makeLogFileName(this) + "2"); + serviceNode1 = new ServiceNode(makeLogFileName(this) + "1"); + serviceNode2 = new ServiceNode(makeLogFileName(this) + "2"); }); afterEach(async function () { @@ -186,7 +184,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { }); // and another node in the same cluster cluster as our node - const serviceNode2 = new NimGoNode(makeLogFileName(this) + "2"); + const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2"); await serviceNode2.start({ discv5Discovery: true, peerExchange: true, @@ -374,7 +372,7 @@ describe("getConnectedPeersForProtocolAndShard", function () { }); // and another node in the same cluster cluster as our node - const serviceNode2 = new NimGoNode(makeLogFileName(this) + "2"); + const serviceNode2 = new ServiceNode(makeLogFileName(this) + "2"); await serviceNode2.start({ discv5Discovery: true, peerExchange: true, From 9955f8636f920c5215868125c95e29b4d33da56e Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Thu, 18 Jan 2024 02:53:50 +0530 Subject: [PATCH 09/10] fix: build --- packages/core/src/lib/connection_manager.ts | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/core/src/lib/connection_manager.ts b/packages/core/src/lib/connection_manager.ts index b1573ad52d..8a0ef17a26 100644 --- a/packages/core/src/lib/connection_manager.ts +++ b/packages/core/src/lib/connection_manager.ts @@ -165,8 +165,6 @@ export class ConnectionManager log.error(`Unexpected error while dialing peer store peers`, error) ); } - #private: any; - #private: any; private async dialPeerStorePeers(): Promise { const peerInfos = await this.libp2p.peerStore.all(); From 64942719cfd8c5fda65cb4c85ffd2a8709515491 Mon Sep 17 00:00:00 2001 From: danisharora099 Date: Fri, 19 Jan 2024 20:19:06 +0530 Subject: [PATCH 10/10] fix: failing tests from master merge --- packages/tests/tests/store/multiple_pubsub.spec.ts | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 6f4e7a98e0..fabd66d474 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -439,7 +439,8 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { await nwaku2.start({ store: true, pubsubTopic: [customShardedPubsubTopic2], - relay: true + relay: true, + clusterId: customShardInfo2.clusterId }); await nwaku2.ensureSubscriptions([customShardedPubsubTopic2]);