diff --git a/packages/core/src/lib/filter/index.ts b/packages/core/src/lib/filter/index.ts index 5428d84bdf..d571131956 100644 --- a/packages/core/src/lib/filter/index.ts +++ b/packages/core/src/lib/filter/index.ts @@ -309,6 +309,10 @@ class Filter extends BaseProtocol implements IReceiver { }) )[0]; + if (!peer) { + throw new Error("No peer found to initiate subscription."); + } + const subscription = this.getActiveSubscription(pubsubTopic, peer.id.toString()) ?? this.setActiveSubscription( diff --git a/packages/tests/src/run-tests.js b/packages/tests/src/run-tests.js index 25d28181ef..1439312010 100644 --- a/packages/tests/src/run-tests.js +++ b/packages/tests/src/run-tests.js @@ -3,7 +3,7 @@ import { promisify } from "util"; const execAsync = promisify(exec); -const WAKUNODE_IMAGE = process.env.WAKUNODE_IMAGE || "wakuorg/nwaku:v0.22.0"; +const WAKUNODE_IMAGE = "wakuorg/go-waku:v0.9.0"; async function main() { try { diff --git a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts index ba43471fbe..f56fbe63ed 100644 --- a/packages/tests/tests/filter/multiple_pubsub.node.spec.ts +++ b/packages/tests/tests/filter/multiple_pubsub.node.spec.ts @@ -387,10 +387,14 @@ describe("Waku Filter V2 (Named sharding): Multiple PubsubTopics", function () { this.beforeEach(async function () { this.timeout(15000); - [nwaku, waku] = await runNodes(this, [ - customPubsubTopic1, - customPubsubTopic2 - ]); + [nwaku, waku] = await runNodes( + this, + [customPubsubTopic1, customPubsubTopic2], + { + clusterId: 3, + shards: [1, 2] + } + ); subscription = await waku.filter.createSubscription(customPubsubTopic1); messageCollector = new MessageCollector(); }); diff --git a/packages/tests/tests/filter/utils.ts b/packages/tests/tests/filter/utils.ts index c7a929cfa2..3d201fae67 100644 --- a/packages/tests/tests/filter/utils.ts +++ b/packages/tests/tests/filter/utils.ts @@ -79,12 +79,22 @@ export async function runNodes( log.error("jswaku node failed to start:", error); } - if (waku) { - await waku.dial(await nwaku.getMultiaddrWithId()); - await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); - await nwaku.ensureSubscriptions(pubsubTopics); - return [nwaku, waku]; - } else { + if (!waku) { throw new Error("Failed to initialize waku"); } + + await waku.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku, [Protocols.Filter, Protocols.LightPush]); + await nwaku.ensureSubscriptions(pubsubTopics); + + const wakuPeers = await waku.libp2p.peerStore.all(); + const nwakuPeers = await nwaku.peers(); + + if (wakuPeers.length < 1 || nwakuPeers.length < 1) { + throw new Error( + `Expected at least 1 peer in each node. Got waku: ${wakuPeers.length} and nwaku: ${nwakuPeers.length}` + ); + } + + return [nwaku, waku]; } diff --git a/packages/tests/tests/peer_exchange.node.spec.ts b/packages/tests/tests/peer_exchange.node.spec.ts index 996e1e1adc..fd9dc777c6 100644 --- a/packages/tests/tests/peer_exchange.node.spec.ts +++ b/packages/tests/tests/peer_exchange.node.spec.ts @@ -16,6 +16,7 @@ import { makeLogFileName } from "../src/log_file.js"; import { NimGoNode } from "../src/node/node.js"; describe("Peer Exchange", () => { + // TODO: fails for a long time describe("Locally Run Nodes", () => { let waku: LightNode; let nwaku1: NimGoNode; diff --git a/packages/tests/tests/store/multiple_pubsub.spec.ts b/packages/tests/tests/store/multiple_pubsub.spec.ts index 74ad7219f1..62d1d98143 100644 --- a/packages/tests/tests/store/multiple_pubsub.spec.ts +++ b/packages/tests/tests/store/multiple_pubsub.spec.ts @@ -342,13 +342,22 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { nwaku = new NimGoNode(makeLogFileName(this)); await nwaku.start({ store: true, - pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2], - relay: true + relay: true, + pubsubTopic: [customShardedPubsubTopic1, customShardedPubsubTopic2] }); await nwaku.ensureSubscriptions([ customShardedPubsubTopic1, customShardedPubsubTopic2 ]); + + waku = await startAndConnectLightNode( + nwaku, + [customShardedPubsubTopic1, customShardedPubsubTopic2], + { + clusterId: 3, + shards: [1, 2] + } + ); }); afterEach(async function () { @@ -363,10 +372,7 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { customContentTopic1, customShardedPubsubTopic1 ); - waku = await startAndConnectLightNode(nwaku, [ - customShardedPubsubTopic1, - customShardedPubsubTopic2 - ]); + const messages = await processQueriedMessages( waku, [customDecoder1], @@ -397,11 +403,6 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { customShardedPubsubTopic2 ); - waku = await startAndConnectLightNode(nwaku, [ - customShardedPubsubTopic1, - customShardedPubsubTopic2 - ]); - const customMessages = await processQueriedMessages( waku, [customDecoder1], @@ -451,13 +452,6 @@ describe("Waku Store (named sharding), custom pubsub topic", function () { customShardedPubsubTopic2 ); - waku = await createLightNode({ - staticNoiseKey: NOISE_KEY_1, - pubsubTopics: [customShardedPubsubTopic1, customShardedPubsubTopic2] - }); - await waku.start(); - - await waku.dial(await nwaku.getMultiaddrWithId()); await waku.dial(await nwaku2.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); diff --git a/packages/tests/tests/store/utils.ts b/packages/tests/tests/store/utils.ts index 252df84cb0..d6a1360de0 100644 --- a/packages/tests/tests/store/utils.ts +++ b/packages/tests/tests/store/utils.ts @@ -107,16 +107,27 @@ export async function startAndConnectLightNode( shardInfo?: ShardingParams ): Promise { const waku = await createLightNode({ + pubsubTopics: shardInfo ? undefined : pubsubTopics, + staticNoiseKey: NOISE_KEY_1, + libp2p: { addresses: { listen: ["/ip4/0.0.0.0/tcp/0/ws"] } }, ...((pubsubTopics.length !== 1 || pubsubTopics[0] !== DefaultPubsubTopic) && { shardInfo: shardInfo - }), - pubsubTopics: shardInfo ? undefined : pubsubTopics, - staticNoiseKey: NOISE_KEY_1 + }) }); await waku.start(); await waku.dial(await instance.getMultiaddrWithId()); await waitForRemotePeer(waku, [Protocols.Store]); + + const wakuPeers = await waku.libp2p.peerStore.all(); + const nwakuPeers = await instance.peers(); + + if (wakuPeers.length < 1 || nwakuPeers.length < 1) { + throw new Error( + `Expected at least 1 peer in each node. Got waku: ${wakuPeers.length} and nwaku: ${nwakuPeers.length}` + ); + } + log.info("Waku node created"); return waku; }