From acb43ca33fc71f15f3e21384321e0d13964f8539 Mon Sep 17 00:00:00 2001 From: Arseniy Klempner Date: Thu, 8 Feb 2024 12:09:10 -0800 Subject: [PATCH] feat: create node and subscription by content topic --- packages/sdk/src/content_topic.ts | 109 +++++++++++++++ packages/sdk/src/index.ts | 1 + .../tests/tests/sdk/content_topic.spec.ts | 126 ++++++++++++++++++ 3 files changed, 236 insertions(+) create mode 100644 packages/sdk/src/content_topic.ts create mode 100644 packages/tests/tests/sdk/content_topic.spec.ts diff --git a/packages/sdk/src/content_topic.ts b/packages/sdk/src/content_topic.ts new file mode 100644 index 0000000000..42f195e705 --- /dev/null +++ b/packages/sdk/src/content_topic.ts @@ -0,0 +1,109 @@ +import type { Multiaddr } from "@multiformats/multiaddr"; +import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; +import { + Callback, + IDecoder, + IFilterSubscription, + IMetadata, + LightNode, + Protocols +} from "@waku/interfaces"; +import { + contentTopicToPubsubTopic, + shardInfoToPubsubTopics +} from "@waku/utils"; + +import { createLightNode } from "."; + +// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and +// subscription for that content topic. +async function prepareSubscription( + waku: LightNode, + contentTopic: string, + peer?: Multiaddr +): Promise<{ + decoder: IDecoder; + subscription: IFilterSubscription; +}> { + // Validate that the Waku node matches assumptions + if (!waku.filter) { + throw new Error("Filter protocol missing from Waku node"); + } + const shardInfo = (waku.libp2p.components.metadata as IMetadata).shardInfo; + if (!shardInfo) { + throw new Error("Shard info missing from Waku node."); + } + + // Validate content topic and ensure node is configured for its corresponding pubsub topic + const pubsubTopics = shardInfoToPubsubTopics(shardInfo); + const pubsubTopic = contentTopicToPubsubTopic(contentTopic); + if (!pubsubTopics.includes(pubsubTopic)) + throw new Error( + "Content topic does not match any pubsub topic in shard info." + ); + + // Ensures there is a peer. Without this condition, the subscription will fail to create. + if (peer) { + await waku.dial(peer); + await waitForRemotePeer(waku, [Protocols.Filter]); + } + + // Create decoder and subscription + const decoder = createDecoder(contentTopic, pubsubTopic); + const subscription = await waku.filter.createSubscription(pubsubTopic); + + return { decoder, subscription }; +} + +interface CreateTopicOptions { + waku?: LightNode; + peer?: Multiaddr; +} + +export async function streamContentTopic( + contentTopic: string, + opts: CreateTopicOptions = {} +): Promise<[ReadableStream, LightNode]> { + opts.waku = + opts.waku ?? + (await createLightNode({ + shardInfo: { contentTopics: [contentTopic] } + })); + const { decoder, subscription } = await prepareSubscription( + opts.waku, + contentTopic, + opts.peer + ); + + // Create a ReadableStream that receives any messages for the content topic + const messageStream = new ReadableStream({ + async start(controller) { + await subscription.subscribe(decoder, (message) => { + controller.enqueue(message); + }); + }, + cancel() { + return subscription.unsubscribe([contentTopic]); + } + }); + return [messageStream, opts.waku]; +} + +export async function subscribeToContentTopic( + contentTopic: string, + callback: Callback, + opts: CreateTopicOptions = {} +): Promise<[IFilterSubscription, LightNode]> { + opts.waku = + opts.waku ?? + (await createLightNode({ + shardInfo: { contentTopics: [contentTopic] } + })); + const { decoder, subscription } = await prepareSubscription( + opts.waku, + contentTopic, + opts.peer + ); + await subscription.subscribe(decoder, callback); + return [subscription, opts.waku]; +} diff --git a/packages/sdk/src/index.ts b/packages/sdk/src/index.ts index fbe6f66ccf..1ae2ca552a 100644 --- a/packages/sdk/src/index.ts +++ b/packages/sdk/src/index.ts @@ -12,6 +12,7 @@ export { export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes"; +export * from "./content_topic.js"; export * from "./create.js"; export * as waku from "@waku/core"; export * as utils from "@waku/utils"; diff --git a/packages/tests/tests/sdk/content_topic.spec.ts b/packages/tests/tests/sdk/content_topic.spec.ts new file mode 100644 index 0000000000..712ac012d3 --- /dev/null +++ b/packages/tests/tests/sdk/content_topic.spec.ts @@ -0,0 +1,126 @@ +import { + bytesToUtf8, + createEncoder, + createLightNode, + DEFAULT_CLUSTER_ID, + LightNode, + Protocols, + streamContentTopic, + subscribeToContentTopic, + utf8ToBytes, + waitForRemotePeer, + WakuNode +} from "@waku/sdk"; +import { + contentTopicToPubsubTopic, + pubsubTopicToSingleShardInfo +} from "@waku/utils"; +import { expect } from "chai"; + +import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src"; + +describe("SDK: Creating by Content Topic", function () { + const ContentTopic = "/myapp/1/latest/proto"; + const testMessage = "Test123"; + let nwaku: ServiceNode; + let waku: LightNode; + let waku2: LightNode; + + beforeEach(async function () { + nwaku = new ServiceNode(makeLogFileName(this) + "1"); + await nwaku.start({ + pubsubTopic: [contentTopicToPubsubTopic(ContentTopic)], + lightpush: true, + relay: true, + filter: true, + discv5Discovery: true, + peerExchange: true, + clusterId: DEFAULT_CLUSTER_ID + }); + }); + + afterEach(async function () { + await tearDownNodes(nwaku, [waku, waku2]); + }); + + it("given a content topic, creates a waku node and filter subscription", async function () { + const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + + [, waku] = await subscribeToContentTopic(ContentTopic, () => {}, { + peer: await nwaku.getMultiaddrWithId() + }); + + expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic); + }); + + it("given a waku node and content topic, creates a filter subscription", async function () { + const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic); + + waku = await createLightNode({ + shardInfo: { contentTopics: [ContentTopic] } + }); + await subscribeToContentTopic(ContentTopic, () => {}, { + waku, + peer: await nwaku.getMultiaddrWithId() + }); + + expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic); + }); + + it("receives messages sent to provided content topic through callback", async function () { + const messages: string[] = []; + [, waku] = await subscribeToContentTopic( + ContentTopic, + (msg) => { + messages.push(bytesToUtf8(msg.payload)); + }, + { + peer: await nwaku.getMultiaddrWithId() + } + ); + + waku2 = await createLightNode({ + shardInfo: { contentTopics: [ContentTopic] } + }); + await waku2.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku2, [Protocols.LightPush]); + const encoder = createEncoder({ + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( + contentTopicToPubsubTopic(ContentTopic) + ), + contentTopic: ContentTopic + }); + await waku2.lightPush?.send(encoder, { + payload: utf8ToBytes(testMessage) + }); + + expect(messages[0]).to.be.eq(testMessage); + }); + + it("receives messages sent to provided content topic through stream", async function () { + let stream; + [stream, waku] = await streamContentTopic(ContentTopic, { + peer: await nwaku.getMultiaddrWithId() + }); + + waku2 = await createLightNode({ + shardInfo: { contentTopics: [ContentTopic] } + }); + await waku2.dial(await nwaku.getMultiaddrWithId()); + await waitForRemotePeer(waku2, [Protocols.LightPush]); + + const encoder = createEncoder({ + pubsubTopicShardInfo: pubsubTopicToSingleShardInfo( + contentTopicToPubsubTopic(ContentTopic) + ), + contentTopic: ContentTopic + }); + await waku2.lightPush?.send(encoder, { + payload: utf8ToBytes(testMessage) + }); + + const reader = stream.getReader(); + const { value: message } = await reader.read(); + expect(bytesToUtf8(message!.payload)).to.be.eq(testMessage); + }); +});