From 656de736baca76db08ddfd2ce1ab7ce5dec6d260 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Tue, 5 Dec 2023 23:34:45 -0800 Subject: [PATCH] reintroduce and deprecate named sharding --- packages/core/src/index.ts | 1 - packages/core/src/lib/filter/index.ts | 14 +- packages/core/src/lib/light_push/index.ts | 3 +- packages/core/src/lib/message/version_0.ts | 6 +- packages/core/src/lib/store/index.ts | 3 +- packages/core/src/lib/waku.ts | 4 +- packages/interfaces/src/enr.ts | 1 - packages/interfaces/src/filter.ts | 2 +- packages/interfaces/src/message.ts | 4 + packages/interfaces/src/protocols.ts | 14 +- packages/message-encryption/src/ecies.ts | 34 +-- packages/message-encryption/src/symmetric.ts | 24 +- packages/relay/src/index.ts | 9 +- packages/sdk/src/create.ts | 71 ++++- .../tests/filter/multiple_pubsub.node.spec.ts | 163 +++++++++- packages/tests/tests/filter/utils.ts | 1 + .../light-push/multiple_pubsub.node.spec.ts | 71 ++--- packages/tests/tests/light-push/utils.ts | 1 + .../tests/relay/multiple_pubsub.node.spec.ts | 285 +++++++++++++++++- .../tests/tests/store/multiple_pubsub.spec.ts | 162 +++++++++- packages/tests/tests/store/utils.ts | 5 +- packages/utils/src/common/sharding.ts | 46 ++- 22 files changed, 825 insertions(+), 99 deletions(-) diff --git a/packages/core/src/index.ts b/packages/core/src/index.ts index b5b0eeac7d..f51901a109 100644 --- a/packages/core/src/index.ts +++ b/packages/core/src/index.ts @@ -28,4 +28,3 @@ export { KeepAliveManager } from "./lib/keep_alive_manager.js"; export { StreamManager } from "./lib/stream_manager.js"; export { MetadataCodec, wakuMetadata } from "./lib/metadata/index.js"; -export { determinePubsubTopics } from "./lib/determine_pubsub_topics.js"; diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index c54d1df39a..3c7907d393 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -281,7 +281,8 @@ class Filter extends BaseProtocol implements IReceiver { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(FilterCodecs.SUBSCRIBE, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); + this.pubsubTopics = + options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo); libp2p.handle(FilterCodecs.PUSH, this.onRequest.bind(this)).catch((e) => { log.error("Failed to register ", FilterCodecs.PUSH, e); @@ -291,11 +292,14 @@ class Filter extends BaseProtocol implements IReceiver { } async createSubscription( - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo?: SingleShardInfo | string ): Promise { - const pubsubTopic = pubsubTopicShardInfo - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : DefaultPubsubTopic; + const pubsubTopic = + typeof pubsubTopicShardInfo == "string" + ? pubsubTopicShardInfo + : pubsubTopicShardInfo + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : DefaultPubsubTopic; ensurePubsubTopicIsConfigured(pubsubTopic, this.pubsubTopics); diff --git a/packages/core/src/lib/light_push/index.ts b/packages/core/src/lib/light_push/index.ts index 9d052b5b02..55cbb95e5e 100644 --- a/packages/core/src/lib/light_push/index.ts +++ b/packages/core/src/lib/light_push/index.ts @@ -49,7 +49,8 @@ class LightPush extends BaseProtocol implements ILightPush { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(LightPushCodec, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); + this.pubsubTopics = + options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo); } private async preparePushMessage( diff --git a/packages/core/src/lib/message/version_0.ts b/packages/core/src/lib/message/version_0.ts index a55b82059f..c29c1fbff2 100644 --- a/packages/core/src/lib/message/version_0.ts +++ b/packages/core/src/lib/message/version_0.ts @@ -118,6 +118,7 @@ export class Encoder implements IEncoder { * messages. */ export function createEncoder({ + pubsubTopic, pubsubTopicShardInfo, contentTopic, ephemeral, @@ -126,7 +127,7 @@ export function createEncoder({ return new Encoder( contentTopic, ephemeral, - determinePubsubTopic(contentTopic, pubsubTopicShardInfo), + determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo), metaSetter ); } @@ -186,10 +187,11 @@ export class Decoder implements IDecoder { */ export function createDecoder( contentTopic: string, + pubsubTopic?: string, pubsubTopicShardInfo?: SingleShardInfo ): Decoder { return new Decoder( - determinePubsubTopic(contentTopic, pubsubTopicShardInfo), + determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo), contentTopic ); } diff --git a/packages/core/src/lib/store/index.ts b/packages/core/src/lib/store/index.ts index 53fb0b0d19..c7683bdc10 100644 --- a/packages/core/src/lib/store/index.ts +++ b/packages/core/src/lib/store/index.ts @@ -79,7 +79,8 @@ class Store extends BaseProtocol implements IStore { constructor(libp2p: Libp2p, options?: ProtocolCreateOptions) { super(StoreCodec, libp2p.components); - this.pubsubTopics = this.initializePubsubTopic(options?.shardInfo); + this.pubsubTopics = + options?.pubsubTopics ?? this.initializePubsubTopic(options?.shardInfo); } /** diff --git a/packages/core/src/lib/waku.ts b/packages/core/src/lib/waku.ts index c452cf6db6..1fc3f35643 100644 --- a/packages/core/src/lib/waku.ts +++ b/packages/core/src/lib/waku.ts @@ -55,6 +55,7 @@ export class WakuNode implements Waku { constructor( options: WakuOptions, + pubsubTopics: PubsubTopic[] = [], libp2p: Libp2p, pubsubShardInfo?: ShardingParams, store?: (libp2p: Libp2p) => IStore, @@ -63,7 +64,8 @@ export class WakuNode implements Waku { relay?: (libp2p: Libp2p) => IRelay ) { if (!pubsubShardInfo) { - this.pubsubTopics = [DefaultPubsubTopic]; + this.pubsubTopics = + pubsubTopics.length > 0 ? pubsubTopics : [DefaultPubsubTopic]; } else { this.pubsubTopics = shardInfoToPubsubTopics(pubsubShardInfo); } diff --git a/packages/interfaces/src/enr.ts b/packages/interfaces/src/enr.ts index f930842ae9..2aa5e44487 100644 --- a/packages/interfaces/src/enr.ts +++ b/packages/interfaces/src/enr.ts @@ -21,7 +21,6 @@ export interface Waku2 { export interface ShardInfo { clusterId: number; shards: number[]; - contentTopics?: string[]; } export interface IEnr extends Map { diff --git a/packages/interfaces/src/filter.ts b/packages/interfaces/src/filter.ts index a5d6798858..b4838a695b 100644 --- a/packages/interfaces/src/filter.ts +++ b/packages/interfaces/src/filter.ts @@ -25,7 +25,7 @@ export interface IFilterSubscription { export type IFilter = IReceiver & IBaseProtocol & { createSubscription( - pubsubTopicShardInfo?: SingleShardInfo, + pubsubTopicShardInfo?: SingleShardInfo | string, peerId?: PeerId ): Promise; }; diff --git a/packages/interfaces/src/message.ts b/packages/interfaces/src/message.ts index 1a4dedeac9..1c8348239e 100644 --- a/packages/interfaces/src/message.ts +++ b/packages/interfaces/src/message.ts @@ -46,6 +46,10 @@ export interface IMetaSetter { } export interface EncoderOptions { + /** + * @deprecated + */ + pubsubTopic?: PubsubTopic; pubsubTopicShardInfo?: SingleShardInfo; /** The content topic to set on outgoing messages. */ contentTopic: string; diff --git a/packages/interfaces/src/protocols.ts b/packages/interfaces/src/protocols.ts index 33650580da..c7c223c6b4 100644 --- a/packages/interfaces/src/protocols.ts +++ b/packages/interfaces/src/protocols.ts @@ -5,6 +5,7 @@ import type { Peer, PeerStore } from "@libp2p/interface/peer-store"; import type { ShardInfo } from "./enr.js"; import type { CreateLibp2pOptions } from "./libp2p.js"; import type { IDecodedMessage } from "./message.js"; +import { PubsubTopic } from "./misc.js"; export enum Protocols { Relay = "relay", @@ -26,9 +27,20 @@ export type ContentTopicInfo = { contentTopics: string[]; }; -export type ShardingParams = ShardInfo | ContentTopicInfo; +export type ApplicationInfo = { + clusterId: number; + application: string; + version: string; +}; + +export type ShardingParams = ShardInfo | ContentTopicInfo | ApplicationInfo; export type ProtocolCreateOptions = { + /** + * @deprecated + * Waku will stop supporting named sharding. Only static sharding and autosharding will be supported moving forward. + */ + pubsubTopics?: PubsubTopic[]; /** * Waku supports usage of multiple pubsub topics. This is achieved through static sharding for now, and auto-sharding in the future. * The format to specify a shard is: diff --git a/packages/message-encryption/src/ecies.ts b/packages/message-encryption/src/ecies.ts index 0c7b27f753..9fa494dfbe 100644 --- a/packages/message-encryption/src/ecies.ts +++ b/packages/message-encryption/src/ecies.ts @@ -1,13 +1,14 @@ import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; -import type { - EncoderOptions as BaseEncoderOptions, - IDecoder, - IEncoder, - IMessage, - IMetaSetter, - IProtoMessage, - PubsubTopic, - SingleShardInfo +import { + type EncoderOptions as BaseEncoderOptions, + DefaultPubsubTopic, + type IDecoder, + type IEncoder, + type IMessage, + type IMetaSetter, + type IProtoMessage, + type PubsubTopic, + type SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { determinePubsubTopic, Logger } from "@waku/utils"; @@ -79,6 +80,10 @@ class Encoder implements IEncoder { } export interface EncoderOptions extends BaseEncoderOptions { + /** + * @deprecated + */ + pubsubTopic?: PubsubTopic; /** Indicates if autosharding is enabled */ autosharding?: boolean; /** The public key to encrypt the payload for. */ @@ -100,6 +105,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubsubTopic, pubsubTopicShardInfo, contentTopic, publicKey, @@ -107,14 +113,8 @@ export function createEncoder({ ephemeral = false, metaSetter }: EncoderOptions): Encoder { - if (typeof pubsubTopic === "string" && pubsubTopic !== DefaultPubsubTopic) { - throw new Error( - `Error: cannot use custom named pubsub topic: ${pubsubTopic}, must be ${DefaultPubsubTopic}` - ); - } - return new Encoder( - determinePubsubTopic(contentTopic, pubsubTopicShardInfo), + determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo), contentTopic, publicKey, sigPrivKey, @@ -202,7 +202,7 @@ class Decoder extends DecoderV0 implements IDecoder { export function createDecoder( contentTopic: string, privateKey: Uint8Array, - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): Decoder { return new Decoder( determinePubsubTopic(contentTopic, pubsubTopicShardInfo), diff --git a/packages/message-encryption/src/symmetric.ts b/packages/message-encryption/src/symmetric.ts index 8040a96800..581d838da2 100644 --- a/packages/message-encryption/src/symmetric.ts +++ b/packages/message-encryption/src/symmetric.ts @@ -1,13 +1,14 @@ import { Decoder as DecoderV0 } from "@waku/core/lib/message/version_0"; -import type { - EncoderOptions as BaseEncoderOptions, - IDecoder, - IEncoder, - IMessage, - IMetaSetter, - IProtoMessage, - PubsubTopic, - SingleShardInfo +import { + type EncoderOptions as BaseEncoderOptions, + DefaultPubsubTopic, + type IDecoder, + type IEncoder, + type IMessage, + type IMetaSetter, + type IProtoMessage, + type PubsubTopic, + type SingleShardInfo } from "@waku/interfaces"; import { WakuMessage } from "@waku/proto"; import { determinePubsubTopic, Logger } from "@waku/utils"; @@ -100,6 +101,7 @@ export interface EncoderOptions extends BaseEncoderOptions { * in [26/WAKU2-PAYLOAD](https://rfc.vac.dev/spec/26/). */ export function createEncoder({ + pubsubTopic = DefaultPubsubTopic, pubsubTopicShardInfo, contentTopic, symKey, @@ -108,7 +110,7 @@ export function createEncoder({ metaSetter }: EncoderOptions): Encoder { return new Encoder( - determinePubsubTopic(contentTopic, pubsubTopicShardInfo), + determinePubsubTopic(contentTopic, pubsubTopic ?? pubsubTopicShardInfo), contentTopic, symKey, sigPrivKey, @@ -196,7 +198,7 @@ class Decoder extends DecoderV0 implements IDecoder { export function createDecoder( contentTopic: string, symKey: Uint8Array, - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): Decoder { return new Decoder( determinePubsubTopic(contentTopic, pubsubTopicShardInfo), diff --git a/packages/relay/src/index.ts b/packages/relay/src/index.ts index a920ebcb72..682f028e61 100644 --- a/packages/relay/src/index.ts +++ b/packages/relay/src/index.ts @@ -9,7 +9,6 @@ import { SignaturePolicy } from "@chainsafe/libp2p-gossipsub/types"; import type { PeerId } from "@libp2p/interface/peer-id"; import type { PubSub as Libp2pPubsub } from "@libp2p/interface/pubsub"; import { sha256 } from "@noble/hashes/sha256"; -import { determinePubsubTopics } from "@waku/core"; import { ActiveSubscriptions, Callback, @@ -73,9 +72,11 @@ class Relay implements IRelay { } this.gossipSub = libp2p.services.pubsub as GossipSub; - this.pubsubTopics = options?.shardInfo - ? new Set(shardInfoToPubsubTopics(options.shardInfo)) - : new Set([DefaultPubsubTopic]); + this.pubsubTopics = new Set( + options?.shardInfo + ? shardInfoToPubsubTopics(options.shardInfo) + : options?.pubsubTopics ?? [DefaultPubsubTopic] + ); if (this.gossipSub.isStarted()) { this.subscribeToAllTopics(); diff --git a/packages/sdk/src/create.ts b/packages/sdk/src/create.ts index cb96f5947c..30f5558e51 100644 --- a/packages/sdk/src/create.ts +++ b/packages/sdk/src/create.ts @@ -39,6 +39,60 @@ const DEFAULT_NODE_REQUIREMENTS = { export { Libp2pComponents }; +const ensureShardingConfigured = (shardInfo: ShardingParams): void => { + if ( + ("shards" in shardInfo && shardInfo.shards.length < 1) || + ("contentTopics" in shardInfo && shardInfo.contentTopics.length < 1) + ) { + throw new Error( + "Missing required configuration options for static sharding or autosharding." + ); + } +}; + +/** + * Create a Waku node configured to use autosharding or static sharding. + */ +export async function createNode( + options?: ProtocolCreateOptions & WakuOptions & Partial +): Promise { + options = options ?? {}; + + if (!options.shardInfo) { + throw new Error("Shard info must be set"); + } + + ensureShardingConfigured(options.shardInfo); + + const libp2pOptions = options?.libp2p ?? {}; + const peerDiscovery = libp2pOptions.peerDiscovery ?? []; + if (options?.defaultBootstrap) { + peerDiscovery.push(...defaultPeerDiscoveries()); + Object.assign(libp2pOptions, { peerDiscovery }); + } + + const libp2p = await defaultLibp2p( + undefined, + wakuGossipSub(options), + libp2pOptions, + options?.userAgent + ); + + const store = wakuStore(options); + const lightPush = wakuLightPush(options); + const filter = wakuFilter(options); + + return new WakuNode( + options ?? {}, + [], + libp2p, + options.shardInfo, + store, + lightPush, + filter + ) as LightNode; +} + /** * Create a Waku node that uses Waku Light Push, Filter and Store to send and * receive messages, enabling low resource consumption. @@ -49,6 +103,10 @@ export async function createLightNode( ): Promise { options = options ?? {}; + if (options.shardInfo) { + ensureShardingConfigured(options.shardInfo); + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -58,7 +116,7 @@ export async function createLightNode( const libp2p = await defaultLibp2p( options.shardInfo, - undefined, + wakuGossipSub(options), libp2pOptions, options?.userAgent ); @@ -69,6 +127,7 @@ export async function createLightNode( return new WakuNode( options ?? {}, + options.pubsubTopics, libp2p, options.shardInfo, store, @@ -86,6 +145,10 @@ export async function createRelayNode( ): Promise { options = options ?? {}; + if (options.shardInfo) { + ensureShardingConfigured(options.shardInfo); + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -104,6 +167,7 @@ export async function createRelayNode( return new WakuNode( options, + options.pubsubTopics, libp2p, options.shardInfo, undefined, @@ -131,6 +195,10 @@ export async function createFullNode( ): Promise { options = options ?? {}; + if (options.shardInfo) { + ensureShardingConfigured(options.shardInfo); + } + const libp2pOptions = options?.libp2p ?? {}; const peerDiscovery = libp2pOptions.peerDiscovery ?? []; if (options?.defaultBootstrap) { @@ -152,6 +220,7 @@ export async function createFullNode( return new WakuNode( options ?? {}, + options.pubsubTopics, libp2p, options.shardInfo, store, diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index 9875e12d49..c0178d7a0e 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -50,12 +50,20 @@ describe("Waku Filter V2: Multiple PubsubTopics", function () { pubsubTopicShardInfo: singleShardInfo1, contentTopic: customContentTopic1 }); - const customDecoder1 = createDecoder(customContentTopic1, singleShardInfo1); + const customDecoder1 = createDecoder( + customContentTopic1, + undefined, + singleShardInfo1 + ); const customEncoder2 = createEncoder({ pubsubTopicShardInfo: singleShardInfo2, contentTopic: customContentTopic2 }); - const customDecoder2 = createDecoder(customContentTopic2, singleShardInfo2); + const customDecoder2 = createDecoder( + customContentTopic2, + undefined, + singleShardInfo2 + ); this.beforeEach(async function () { this.timeout(15000); @@ -354,3 +362,154 @@ describe("Waku Filter V2 (Autosharding): Multiple PubsubTopics", function () { } }); }); + +describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { + // Set the timeout for all tests in this suite. Can be overwritten at test level + this.timeout(30000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + let subscription: IFilterSubscription; + let messageCollector: MessageCollector; + + const customPubsubTopic1 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 1 + }); + const customPubsubTopic2 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 2 + }); + const customContentTopic1 = "/test/2/waku-filter"; + const customContentTopic2 = "/test/3/waku-filter"; + const customEncoder1 = createEncoder({ + pubsubTopic: customPubsubTopic1, + contentTopic: customContentTopic1 + }); + const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1); + const customEncoder2 = createEncoder({ + pubsubTopic: customPubsubTopic2, + contentTopic: customContentTopic2 + }); + const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2); + + this.beforeEach(async function () { + this.timeout(15000); + [nwaku, waku] = await runNodes(this, [ + customPubsubTopic1, + customPubsubTopic2 + ]); + subscription = await waku.filter.createSubscription(customPubsubTopic1); + messageCollector = new MessageCollector(); + }); + + this.afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Subscribe and receive messages on custom pubsubtopic", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + expect(await messageCollector.waitForMessages(1)).to.eq(true); + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, + expectedMessageText: "M1" + }); + }); + + it("Subscribe and receive messages on 2 different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Subscribe from the same lightnode to the 2nd pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(customPubsubTopic2) + ); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + + expect(await messageCollector.waitForMessages(1)).to.eq(true); + expect(await messageCollector2.waitForMessages(1)).to.eq(true); + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Subscribe and receive messages from 2 nwaku nodes each with different pubsubtopics", async function () { + await subscription.subscribe([customDecoder1], messageCollector.callback); + + // Set up and start a new nwaku node with customPubsubTopic1 + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + filter: true, + lightpush: true, + relay: true, + pubsubTopic: [customPubsubTopic2] + }); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + + // Subscribe from the same lightnode to the new nwaku on the new pubsubtopic + const subscription2 = await waku.filter.createSubscription( + pubsubTopicToSingleShardInfo(customPubsubTopic2), + await nwaku2.getPeerId() + ); + await nwaku2.ensureSubscriptions([customPubsubTopic2]); + + const messageCollector2 = new MessageCollector(); + + await subscription2.subscribe([customDecoder2], messageCollector2.callback); + + // Making sure that messages are send and reveiced for both subscriptions + // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 + while ( + !(await messageCollector.waitForMessages(1, { + pubsubTopic: customPubsubTopic1 + })) || + !(await messageCollector2.waitForMessages(1, { + pubsubTopic: customPubsubTopic2 + })) + ) { + await waku.lightPush.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku.lightPush.send(customEncoder2, { payload: utf8ToBytes("M2") }); + } + + messageCollector.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic1, + expectedPubsubTopic: customPubsubTopic1, + expectedMessageText: "M1" + }); + + messageCollector2.verifyReceivedMessage(0, { + expectedContentTopic: customContentTopic2, + expectedPubsubTopic: customPubsubTopic2, + expectedMessageText: "M2" + }); + }); + + it("Should fail to subscribe with decoder with wrong pubsubTopic", async function () { + // this subscription object is set up with the `customPubsubTopic` but we're passing it a Decoder with the `DefaultPubsubTopic` + try { + await subscription.subscribe([customDecoder2], messageCollector.callback); + } catch (error) { + expect((error as Error).message).to.include( + "Pubsub topic not configured" + ); + } + }); +}); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index 2541184101..c7a929cfa2 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -63,6 +63,7 @@ export async function runNodes( const waku_options = { staticNoiseKey: NOISE_KEY_1, libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, + pubsubTopics: shardInfo ? undefined : pubsubTopics, ...((pubsubTopics.length !== 1 || pubsubTopics[0] !== DefaultPubsubTopic) && { shardInfo: shardInfo diff --git a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts index 840da8e5a6..6916f7e197 100644 --- a/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/light-push/multiple_pubsub.node.spec.ts @@ -181,18 +181,21 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { let nwaku2: NimGoNode; let messageCollector: MessageCollector; + // When using lightpush, we have to use a cluster id of 1 because that is the default cluster id for autosharding + // With a different cluster id, we never find a viable peer + const clusterId = 1; const customContentTopic1 = "/waku/2/content/test.js"; const customContentTopic2 = "/myapp/1/latest/proto"; const autoshardingPubsubTopic1 = contentTopicToPubsubTopic( customContentTopic1, - 3 + clusterId ); const autoshardingPubsubTopic2 = contentTopicToPubsubTopic( customContentTopic2, - 3 + clusterId ); const shardInfo: ShardInfo = { - clusterId: 3, + clusterId, shards: [0, 1, 2], contentTopics: [customContentTopic1, customContentTopic2] }; @@ -201,14 +204,14 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { contentTopic: customContentTopic1 }, true, - 3 + clusterId ); const customEncoder2 = createEncoder( { contentTopic: customContentTopic2 }, true, - 3 + clusterId ); let nimPeerId: PeerId; @@ -234,11 +237,12 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); + expect(pushResponse.errors).to.be.empty; expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 }) ).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -260,14 +264,14 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { const messageCollector2 = new MessageCollector(nwaku); expect( - await messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 + await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic2 + await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 }) ).to.eq(true); @@ -292,7 +296,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { relay: true, pubsubTopic: [autoshardingPubsubTopic2] }); - await nwaku2.ensureSubscriptions([autoshardingPubsubTopic2]); + await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -303,11 +307,11 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { // Making sure that we send messages to both nwaku nodes // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 while ( - !(await messageCollector.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic1 + !(await messageCollector.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic1 })) || - !(await messageCollector2.waitForMessages(1, { - pubsubTopic: autoshardingPubsubTopic2 + !(await messageCollector2.waitForMessagesAutosharding(1, { + contentTopic: customContentTopic2 })) || pushResponse1!.recipients[0].toString() === pushResponse2!.recipients[0].toString() @@ -333,7 +337,7 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { }); }); -describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { +describe("Waku Light Push (named sharding): Multiple PubsubTopics", function () { this.timeout(30000); let waku: LightNode; let nwaku: NimGoNode; @@ -391,12 +395,11 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { payload: utf8ToBytes(messageText) }); - expect(pushResponse.errors).to.be.empty; expect(pushResponse.recipients[0].toString()).to.eq(nimPeerId.toString()); expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 + await messageCollector.waitForMessages(1, { + pubsubTopic: customPubsubTopic1 }) ).to.eq(true); messageCollector.verifyReceivedMessage(0, { @@ -418,26 +421,26 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { const messageCollector2 = new MessageCollector(nwaku); expect( - await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 + await messageCollector.waitForMessages(1, { + pubsubTopic: customPubsubTopic1 }) ).to.eq(true); expect( - await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 + await messageCollector2.waitForMessages(1, { + pubsubTopic: customPubsubTopic2 }) ).to.eq(true); messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1 + expectedPubsubTopic: customPubsubTopic1 }); messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2 + expectedPubsubTopic: customPubsubTopic1 }); }); @@ -448,9 +451,9 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { filter: true, lightpush: true, relay: true, - pubsubTopic: [autoshardingPubsubTopic2] + pubsubTopic: [customPubsubTopic2] }); - await nwaku2.ensureSubscriptionsAutosharding([customContentTopic2]); + await nwaku2.ensureSubscriptions([customPubsubTopic2]); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.LightPush]); @@ -461,11 +464,11 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { // Making sure that we send messages to both nwaku nodes // While loop is done because of https://github.com/waku-org/js-waku/issues/1606 while ( - !(await messageCollector.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic1 + !(await messageCollector.waitForMessages(1, { + pubsubTopic: customPubsubTopic1 })) || - !(await messageCollector2.waitForMessagesAutosharding(1, { - contentTopic: customContentTopic2 + !(await messageCollector2.waitForMessages(1, { + pubsubTopic: customPubsubTopic2 })) || pushResponse1!.recipients[0].toString() === pushResponse2!.recipients[0].toString() @@ -481,12 +484,12 @@ describe("Waku Light Push (Autosharding): Multiple PubsubTopics", function () { messageCollector.verifyReceivedMessage(0, { expectedMessageText: "M1", expectedContentTopic: customContentTopic1, - expectedPubsubTopic: autoshardingPubsubTopic1 + expectedPubsubTopic: customPubsubTopic1 }); messageCollector2.verifyReceivedMessage(0, { expectedMessageText: "M2", expectedContentTopic: customContentTopic2, - expectedPubsubTopic: autoshardingPubsubTopic2 + expectedPubsubTopic: customPubsubTopic2 }); }); }); diff --git a/packages/tests/tests/light-push/utils.ts b/packages/tests/tests/light-push/utils.ts index 635d8e3989..435b7b50d2 100644 --- a/packages/tests/tests/light-push/utils.ts +++ b/packages/tests/tests/light-push/utils.ts @@ -35,6 +35,7 @@ export async function runNodes( pubsubTopics[0] !== DefaultPubsubTopic) && { shardInfo: shardInfo }), + pubsubTopics: shardInfo ? undefined : pubsubTopics, staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts index f1cfaf886f..04f3fb0136 100644 --- a/packages/tests/tests/relay/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/relay/multiple_pubsub.node.spec.ts @@ -58,12 +58,20 @@ describe("Waku Relay, multiple pubsub topics", function () { pubsubTopicShardInfo: singleShardInfo1, contentTopic: customContentTopic1 }); - const customDecoder1 = createDecoder(customContentTopic1, singleShardInfo1); + const customDecoder1 = createDecoder( + customContentTopic1, + undefined, + singleShardInfo1 + ); const customEncoder2 = createEncoder({ pubsubTopicShardInfo: singleShardInfo2, contentTopic: customContentTopic2 }); - const customDecoder2 = createDecoder(customContentTopic2, singleShardInfo2); + const customDecoder2 = createDecoder( + customContentTopic2, + undefined, + singleShardInfo2 + ); const shardInfoBothShards: ShardInfo = { clusterId: 3, shards: [1, 2] }; afterEach(async function () { @@ -638,3 +646,276 @@ describe("Waku Relay (Autosharding), multiple pubsub topics", function () { expect(waku2ReceivedMsg.pubsubTopic).to.eq(autoshardingPubsubTopic1); }); }); + +describe("Waku Relay (named sharding), multiple pubsub topics", function () { + this.timeout(15000); + let waku1: RelayNode; + let waku2: RelayNode; + let waku3: RelayNode; + + const customPubsubTopic1 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 1 + }); + const customPubsubTopic2 = singleShardInfoToPubsubTopic({ + clusterId: 3, + shard: 2 + }); + const customContentTopic1 = "/test/2/waku-relay/utf8"; + const customContentTopic2 = "/test/3/waku-relay/utf8"; + const customEncoder1 = createEncoder({ + pubsubTopic: customPubsubTopic1, + contentTopic: customContentTopic1 + }); + const customDecoder1 = createDecoder(customContentTopic1, customPubsubTopic1); + const customEncoder2 = createEncoder({ + pubsubTopic: customPubsubTopic2, + contentTopic: customContentTopic2 + }); + const customDecoder2 = createDecoder(customContentTopic2, customPubsubTopic2); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([], [waku1, waku2, waku3]); + }); + + [ + { + pubsub: customPubsubTopic1, + encoder: customEncoder1, + decoder: customDecoder1 + }, + { + pubsub: customPubsubTopic2, + encoder: customEncoder2, + decoder: customDecoder2 + } + ].forEach((testItem) => { + it(`3 nodes on ${testItem.pubsub} topic`, async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [testItem.pubsub], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe([testItem.decoder], msgCollector1.callback); + await waku2.relay.subscribe([testItem.decoder], msgCollector2.callback); + await waku3.relay.subscribe([testItem.decoder], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + const relayResponse1 = await waku1.relay.send(testItem.encoder, { + payload: utf8ToBytes("M1") + }); + const relayResponse2 = await waku2.relay.send(testItem.encoder, { + payload: utf8ToBytes("M2") + }); + const relayResponse3 = await waku3.relay.send(testItem.encoder, { + payload: utf8ToBytes("M3") + }); + + expect(relayResponse1.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse3.recipients[0].toString()).to.eq( + waku2.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku1.libp2p.peerId.toString() + ); + expect(relayResponse2.recipients.map((r) => r.toString())).to.include( + waku3.libp2p.peerId.toString() + ); + + expect(await msgCollector1.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector2.waitForMessages(2, { exact: true })).to.eq( + true + ); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq( + true + ); + + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + expect( + msgCollector1.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector2.hasMessage(testItem.encoder.contentTopic, "M3") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M1") + ).to.eq(true); + expect( + msgCollector3.hasMessage(testItem.encoder.contentTopic, "M2") + ).to.eq(true); + }); + }); + + it("Nodes with multiple pubsub topic", async function () { + const [msgCollector1, msgCollector2, msgCollector3] = Array(3) + .fill(null) + .map(() => new MessageCollector()); + + // Waku1 and waku2 are using multiple pubsub topis + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [customPubsubTopic1, customPubsubTopic2], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [customPubsubTopic1, customPubsubTopic2], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [customPubsubTopic1], + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]), + waitForRemotePeer(waku3, [Protocols.Relay]) + ]); + + await waku1.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector1.callback + ); + await waku2.relay.subscribe( + [customDecoder1, customDecoder2], + msgCollector2.callback + ); + await waku3.relay.subscribe([customDecoder1], msgCollector3.callback); + + // The nodes are setup in such a way that all messages send should be relayed to the other nodes in the network + // However onlt waku1 and waku2 are receiving messages on the CustomPubSubTopic + await waku1.relay.send(customEncoder1, { payload: utf8ToBytes("M1") }); + await waku1.relay.send(customEncoder2, { payload: utf8ToBytes("M2") }); + await waku2.relay.send(customEncoder1, { payload: utf8ToBytes("M3") }); + await waku2.relay.send(customEncoder2, { payload: utf8ToBytes("M4") }); + await waku3.relay.send(customEncoder1, { payload: utf8ToBytes("M5") }); + await waku3.relay.send(customEncoder2, { payload: utf8ToBytes("M6") }); + + expect(await msgCollector1.waitForMessages(3, { exact: true })).to.eq(true); + expect(await msgCollector2.waitForMessages(3, { exact: true })).to.eq(true); + expect(await msgCollector3.waitForMessages(2, { exact: true })).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M3")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic2, "M4")).to.eq(true); + expect(msgCollector1.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic2, "M2")).to.eq(true); + expect(msgCollector2.hasMessage(customContentTopic1, "M5")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M1")).to.eq(true); + expect(msgCollector3.hasMessage(customContentTopic1, "M3")).to.eq(true); + }); + + it("n1 and n2 uses a custom pubsub, n3 uses the default pubsub", async function () { + [waku1, waku2, waku3] = await Promise.all([ + createRelayNode({ + pubsubTopics: [customPubsubTopic1], + staticNoiseKey: NOISE_KEY_1 + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + pubsubTopics: [customPubsubTopic1], + staticNoiseKey: NOISE_KEY_2, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } } + }).then((waku) => waku.start().then(() => waku)), + createRelayNode({ + staticNoiseKey: NOISE_KEY_3 + }).then((waku) => waku.start().then(() => waku)) + ]); + + await waku1.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await waku3.libp2p.peerStore.merge(waku2.libp2p.peerId, { + multiaddrs: waku2.libp2p.getMultiaddrs() + }); + await Promise.all([ + waku1.dial(waku2.libp2p.peerId), + waku3.dial(waku2.libp2p.peerId) + ]); + + await Promise.all([ + waitForRemotePeer(waku1, [Protocols.Relay]), + waitForRemotePeer(waku2, [Protocols.Relay]) + ]); + + const messageText = "Communicating using a custom pubsub topic"; + + const waku2ReceivedMsgPromise: Promise = new Promise( + (resolve) => { + void waku2.relay.subscribe([customDecoder1], resolve); + } + ); + + // The promise **fails** if we receive a message on the default + // pubsub topic. + const waku3NoMsgPromise: Promise = new Promise( + (resolve, reject) => { + void waku3.relay.subscribe([TestDecoder], reject); + setTimeout(resolve, 1000); + } + ); + + await waku1.relay.send(customEncoder1, { + payload: utf8ToBytes(messageText) + }); + + const waku2ReceivedMsg = await waku2ReceivedMsgPromise; + await waku3NoMsgPromise; + + expect(bytesToUtf8(waku2ReceivedMsg.payload!)).to.eq(messageText); + expect(waku2ReceivedMsg.pubsubTopic).to.eq(customPubsubTopic1); + }); +}); diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index d3ce09f278..74ad7219f1 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -27,7 +27,7 @@ import { totalMsgs } from "./utils.js"; -describe.skip("Waku Store, custom pubsub topic", function () { +describe("Waku Store, custom pubsub topic", function () { this.timeout(15000); let waku: LightNode; let nwaku: NimGoNode; @@ -321,3 +321,163 @@ describe("Waku Store (Autosharding), custom pubsub topic", function () { } }); }); + +describe("Waku Store (named sharding), custom pubsub topic", function () { + this.timeout(15000); + let waku: LightNode; + let nwaku: NimGoNode; + let nwaku2: NimGoNode; + + const customDecoder1 = createDecoder( + customContentTopic1, + customShardedPubsubTopic1 + ); + const customDecoder2 = createDecoder( + customContentTopic2, + customShardedPubsubTopic2 + ); + + beforeEach(async function () { + this.timeout(15000); + nwaku = new NimGoNode(makeLogFileName(this)); + await nwaku.start({ + store: true, + pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2], + relay: true + }); + await nwaku.ensureSubscriptions([ + customShardedPubsubTopic1, + customShardedPubsubTopic2 + ]); + }); + + afterEach(async function () { + this.timeout(15000); + await tearDownNodes([nwaku, nwaku2], waku); + }); + + it("Generator, custom pubsub topic", async function () { + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + waku = await startAndConnectLightNode(nwaku, [ + customShardedPubsubTopic1, + customShardedPubsubTopic2 + ]); + const messages = await processQueriedMessages( + waku, + [customDecoder1], + customShardedPubsubTopic1 + ); + + expect(messages?.length).eq(totalMsgs); + const result = messages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result).to.not.eq(-1); + }); + + it("Generator, 2 different pubsubtopics", async function () { + this.timeout(10000); + + const totalMsgs = 10; + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + await sendMessages( + nwaku, + totalMsgs, + customContentTopic2, + customShardedPubsubTopic2 + ); + + waku = await startAndConnectLightNode(nwaku, [ + customShardedPubsubTopic1, + customShardedPubsubTopic2 + ]); + + const customMessages = await processQueriedMessages( + waku, + [customDecoder1], + customShardedPubsubTopic1 + ); + expect(customMessages?.length).eq(totalMsgs); + const result1 = customMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result1).to.not.eq(-1); + + const testMessages = await processQueriedMessages( + waku, + [customDecoder2], + customShardedPubsubTopic2 + ); + expect(testMessages?.length).eq(totalMsgs); + const result2 = testMessages?.findIndex((msg) => { + return msg.payload![0]! === 0; + }); + expect(result2).to.not.eq(-1); + }); + + it("Generator, 2 nwaku nodes each with different pubsubtopics", async function () { + this.timeout(10000); + + // Set up and start a new nwaku node with Default Pubsubtopic + nwaku2 = new NimGoNode(makeLogFileName(this) + "2"); + await nwaku2.start({ + store: true, + pubsubTopic: [customShardedPubsubTopic2], + relay: true + }); + await nwaku2.ensureSubscriptions([customShardedPubsubTopic2]); + + const totalMsgs = 10; + await sendMessages( + nwaku, + totalMsgs, + customContentTopic1, + customShardedPubsubTopic1 + ); + await sendMessages( + nwaku2, + totalMsgs, + customContentTopic2, + customShardedPubsubTopic2 + ); + + waku = await createLightNode({ + staticNoiseKey: NOISE_KEY_1, + pubsubTopics: [customShardedPubsubTopic1, customShardedPubsubTopic2] + }); + await waku.start(); + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waku.dial(await nwaku2.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Store]); + + let customMessages: IMessage[] = []; + let testMessages: IMessage[] = []; + + while ( + customMessages.length != totalMsgs || + testMessages.length != totalMsgs + ) { + customMessages = await processQueriedMessages( + waku, + [customDecoder1], + customShardedPubsubTopic1 + ); + testMessages = await processQueriedMessages( + waku, + [customDecoder2], + customShardedPubsubTopic2 + ); + } + }); +}); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 76ce57f09f..5e70c115dc 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -34,11 +34,11 @@ export const customShardedPubsubTopic2 = singleShardInfoToPubsubTopic({ export const shardInfo1: ShardInfo = { clusterId: 3, shards: [1] }; export const customContentTopic1 = "/test/2/waku-store/utf8"; export const customContentTopic2 = "/test/3/waku-store/utf8"; -export const customDecoder1 = createDecoder(customContentTopic1, { +export const customDecoder1 = createDecoder(customContentTopic1, undefined, { clusterId: 3, shard: 1 }); -export const customDecoder2 = createDecoder(customContentTopic2, { +export const customDecoder2 = createDecoder(customContentTopic2, undefined, { clusterId: 3, shard: 2 }); @@ -111,6 +111,7 @@ export async function startAndConnectLightNode( pubsubTopics[0] !== DefaultPubsubTopic) && { shardInfo: shardInfo }), + pubsubTopics: shardInfo ? undefined : pubsubTopics, staticNoiseKey: NOISE_KEY_1 }); await waku.start(); diff --git a/packages/utils/src/common/sharding.ts b/packages/utils/src/common/sharding.ts index f4a8afb71b..f6f3cd7b34 100644 --- a/packages/utils/src/common/sharding.ts +++ b/packages/utils/src/common/sharding.ts @@ -23,14 +23,31 @@ export const shardInfoToPubsubTopics = ( if (shardInfo.clusterId === undefined) throw new Error("Cluster ID must be specified"); if ("contentTopics" in shardInfo) { - return shardInfo.contentTopics.map((contentTopic) => - contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId) + // Autosharding: explicitly defined content topics + return Array.from( + new Set( + shardInfo.contentTopics.map((contentTopic) => + contentTopicToPubsubTopic(contentTopic, shardInfo.clusterId) + ) + ) ); - } else { + } else if ("shards" in shardInfo) { + // Static sharding if (shardInfo.shards === undefined) throw new Error("Invalid shard"); - return shardInfo.shards.map( - (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` + return Array.from( + new Set( + shardInfo.shards.map( + (index) => `/waku/2/rs/${shardInfo.clusterId}/${index}` + ) + ) ); + } else { + // Autosharding: single shard from application and version + return [ + contentTopicToPubsubTopic( + `/${shardInfo.application}/${shardInfo.version}/default/default` + ) + ]; } }; @@ -183,11 +200,18 @@ export function contentTopicsByPubsubTopic( */ export function determinePubsubTopic( contentTopic: string, - pubsubTopicShardInfo?: SingleShardInfo + pubsubTopicShardInfo: SingleShardInfo | PubsubTopic = DefaultPubsubTopic ): string { - return pubsubTopicShardInfo - ? pubsubTopicShardInfo.shard - ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) - : contentTopicToPubsubTopic(contentTopic, pubsubTopicShardInfo.clusterId) - : DefaultPubsubTopic; + if (typeof pubsubTopicShardInfo == "string") { + return pubsubTopicShardInfo; + } else { + return pubsubTopicShardInfo + ? pubsubTopicShardInfo.shard + ? singleShardInfoToPubsubTopic(pubsubTopicShardInfo) + : contentTopicToPubsubTopic( + contentTopic, + pubsubTopicShardInfo.clusterId + ) + : DefaultPubsubTopic; + } }