Skip to content

Commit

Permalink
feat: create node and subscription by content topic
Browse files Browse the repository at this point in the history
  • Loading branch information
adklempner committed Feb 12, 2024
1 parent 51ec7d9 commit a5c278c
Show file tree
Hide file tree
Showing 3 changed files with 236 additions and 0 deletions.
109 changes: 109 additions & 0 deletions packages/sdk/src/content_topic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,109 @@
import type { Multiaddr } from "@multiformats/multiaddr";
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import {
Callback,
IDecoder,
IFilterSubscription,
IMetadata,
LightNode,
Protocols
} from "@waku/interfaces";
import {
contentTopicToPubsubTopic,
shardInfoToPubsubTopics
} from "@waku/utils";

import { createLightNode } from ".";

// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and
// subscription for that content topic.
async function prepareSubscription(
waku: LightNode,
contentTopic: string,
peer?: Multiaddr
): Promise<{
decoder: IDecoder<DecodedMessage>;
subscription: IFilterSubscription;
}> {
// Validate that the Waku node matches assumptions
if (!waku.filter) {
throw new Error("Filter protocol missing from Waku node");
}
const shardInfo = (waku.libp2p.components.metadata as IMetadata).shardInfo;
if (!shardInfo) {
throw new Error("Shard info missing from Waku node.");
}

// Validate content topic and ensure node is configured for its corresponding pubsub topic
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);
if (!pubsubTopics.includes(pubsubTopic))
throw new Error(
"Content topic does not match any pubsub topic in shard info."
);

// Ensures there is a peer. Without this condition, the subscription will fail to create.
if (peer) {
await waku.dial(peer);
await waitForRemotePeer(waku, [Protocols.Filter]);
}

// Create decoder and subscription
const decoder = createDecoder(contentTopic, pubsubTopic);
const subscription = await waku.filter.createSubscription(pubsubTopic);

return { decoder, subscription };
}

interface CreateTopicOptions {
waku?: LightNode;
peer?: Multiaddr;
}

export async function streamContentTopic(
contentTopic: string,
opts: CreateTopicOptions = {}
): Promise<[ReadableStream<DecodedMessage>, LightNode]> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);

// Create a ReadableStream that receives any messages for the content topic
const messageStream = new ReadableStream<DecodedMessage>({
async start(controller) {
await subscription.subscribe(decoder, (message) => {
controller.enqueue(message);
});
},
cancel() {
return subscription.unsubscribe([contentTopic]);
}
});
return [messageStream, opts.waku];
}

export async function subscribeToContentTopic(
contentTopic: string,
callback: Callback<DecodedMessage>,
opts: CreateTopicOptions = {}
): Promise<[IFilterSubscription, LightNode]> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);
await subscription.subscribe(decoder, callback);
return [subscription, opts.waku];
}
1 change: 1 addition & 0 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ export {

export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes";

export * from "./content_topic.js";
export * from "./create.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";
Expand Down
126 changes: 126 additions & 0 deletions packages/tests/tests/sdk/content_topic.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
import {
bytesToUtf8,
createEncoder,
createLightNode,
DEFAULT_CLUSTER_ID,
LightNode,
Protocols,
streamContentTopic,
subscribeToContentTopic,
utf8ToBytes,
waitForRemotePeer,
WakuNode
} from "@waku/sdk";
import {
contentTopicToPubsubTopic,
pubsubTopicToSingleShardInfo
} from "@waku/utils";
import { expect } from "chai";

import { makeLogFileName, ServiceNode, tearDownNodes } from "../../src";

describe("SDK: Creating by Content Topic", function () {
const ContentTopic = "/myapp/1/latest/proto";
const testMessage = "Test123";
let nwaku: ServiceNode;
let waku: LightNode;
let waku2: LightNode;

beforeEach(async function () {
nwaku = new ServiceNode(makeLogFileName(this) + "1");
await nwaku.start({
pubsubTopic: [contentTopicToPubsubTopic(ContentTopic)],
lightpush: true,
relay: true,
filter: true,
discv5Discovery: true,
peerExchange: true,
clusterId: DEFAULT_CLUSTER_ID
});
});

afterEach(async function () {
await tearDownNodes(nwaku, [waku, waku2]);
});

it("given a content topic, creates a waku node and filter subscription", async function () {
const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic);

[, waku] = await subscribeToContentTopic(ContentTopic, () => {}, {
peer: await nwaku.getMultiaddrWithId()
});

expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic);
});

it("given a waku node and content topic, creates a filter subscription", async function () {
const expectedPubsubTopic = contentTopicToPubsubTopic(ContentTopic);

waku = await createLightNode({
shardInfo: { contentTopics: [ContentTopic] }
});
await subscribeToContentTopic(ContentTopic, () => {}, {
waku,
peer: await nwaku.getMultiaddrWithId()
});

expect((waku as WakuNode).pubsubTopics).to.include(expectedPubsubTopic);
});

it("receives messages sent to provided content topic through callback", async function () {
const messages: string[] = [];
[, waku] = await subscribeToContentTopic(
ContentTopic,
(msg) => {
messages.push(bytesToUtf8(msg.payload));
},
{
peer: await nwaku.getMultiaddrWithId()
}
);

waku2 = await createLightNode({
shardInfo: { contentTopics: [ContentTopic] }
});
await waku2.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku2, [Protocols.LightPush]);
const encoder = createEncoder({
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(
contentTopicToPubsubTopic(ContentTopic)
),
contentTopic: ContentTopic
});
await waku2.lightPush?.send(encoder, {
payload: utf8ToBytes(testMessage)
});

expect(messages[0]).to.be.eq(testMessage);
});

it("receives messages sent to provided content topic through stream", async function () {
let stream;
[stream, waku] = await streamContentTopic(ContentTopic, {
peer: await nwaku.getMultiaddrWithId()
});

waku2 = await createLightNode({
shardInfo: { contentTopics: [ContentTopic] }
});
await waku2.dial(await nwaku.getMultiaddrWithId());
await waitForRemotePeer(waku2, [Protocols.LightPush]);

const encoder = createEncoder({
pubsubTopicShardInfo: pubsubTopicToSingleShardInfo(
contentTopicToPubsubTopic(ContentTopic)
),
contentTopic: ContentTopic
});
await waku2.lightPush?.send(encoder, {
payload: utf8ToBytes(testMessage)
});

const reader = stream.getReader();
const { value: message } = await reader.read();
expect(bytesToUtf8(message!.payload)).to.be.eq(testMessage);
});
});

0 comments on commit a5c278c

Please sign in to comment.