Skip to content

Commit

Permalink
Merge pull request #1823 from waku-org/feat/create-subscription-conte…
Browse files Browse the repository at this point in the history
…nt-topic

feat: subscribe to content topic via SDK
  • Loading branch information
adklempner authored Feb 28, 2024
2 parents 382af33 + ee2d417 commit 78ee39a
Show file tree
Hide file tree
Showing 11 changed files with 329 additions and 32 deletions.
4 changes: 2 additions & 2 deletions .size-limit.cjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module.exports = [
{
name: "Waku core",
path: "packages/core/bundle/index.js",
name: "Waku node",
path: "packages/sdk/bundle/index.js",
import: "{ WakuNode }",
},
{
Expand Down
4 changes: 0 additions & 4 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export { DefaultUserAgent } from "./lib/waku.js";
export { createEncoder, createDecoder } from "./lib/message/version_0.js";
export type {
Encoder,
Expand All @@ -7,9 +6,6 @@ export type {
} from "./lib/message/version_0.js";
export * as message from "./lib/message/index.js";

export * as waku from "./lib/waku.js";
export { WakuNode, WakuOptions } from "./lib/waku.js";

export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const log = new Logger("wait-for-remote-peer");
/**
* Wait for a remote peer to be ready given the passed protocols.
* Must be used after attempting to connect to nodes, using
* {@link @waku/core!WakuNode.dial} or a bootstrap method with
* {@link @waku/sdk!WakuNode.dial} or a bootstrap method with
* {@link @waku/sdk!createLightNode}.
*
* If the passed protocols is a GossipSub protocol, then it resolves only once
Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ export type ProtocolCreateOptions = {
*/
shardInfo?: Partial<ShardingParams>;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
* You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
* apart that we made the `modules` property optional and partial,
* allowing its omission and letting Waku set good defaults.
* Notes that some values are overridden by {@link @waku/core!WakuNode} to ensure it implements the Waku protocol.
* Notes that some values are overridden by {@link @waku/sdk!WakuNode} to ensure it implements the Waku protocol.
*/
libp2p?: Partial<CreateLibp2pOptions>;
/**
Expand Down
121 changes: 121 additions & 0 deletions packages/sdk/src/content_topic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import type { Multiaddr } from "@multiformats/multiaddr";
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import {
Callback,
IDecoder,
IFilterSubscription,
LightNode,
Protocols
} from "@waku/interfaces";
import {
contentTopicToPubsubTopic,
shardInfoToPubsubTopics
} from "@waku/utils";

import { createLightNode } from "./create.js";

interface CreateTopicOptions {
waku?: LightNode;
peer: Multiaddr;
}

// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and
// subscription for that content topic.
async function prepareSubscription(
waku: LightNode,
contentTopic: string,
peer: Multiaddr
): Promise<{
decoder: IDecoder<DecodedMessage>;
subscription: IFilterSubscription;
}> {
// Validate that the Waku node matches assumptions
if (!waku.filter) {
throw new Error("Filter protocol missing from Waku node");
}
const { shardInfo } = waku.libp2p.components.metadata;
if (!shardInfo) {
throw new Error("Shard info missing from Waku node.");
}

// Validate content topic and ensure node is configured for its corresponding pubsub topic
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);
if (!pubsubTopics.includes(pubsubTopic))
throw new Error(
"Content topic does not match any pubsub topic in shard info."
);

await waku.dial(peer);
await waitForRemotePeer(waku, [Protocols.Filter]);

// Create decoder and subscription
let decoder = createDecoder(contentTopic, pubsubTopic);
if (decoder) decoder = decoder ?? decoder;
const subscription = await waku.filter.createSubscription(pubsubTopic);

return { decoder, subscription };
}

/**
* Creates a subscription and streams all new messages for a content topic.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param opts
*/
export async function streamContentTopic(
contentTopic: string,
opts: CreateTopicOptions
): Promise<[ReadableStream<DecodedMessage>, LightNode]> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);

// Create a ReadableStream that receives any messages for the content topic
const messageStream = new ReadableStream<DecodedMessage>({
async start(controller) {
await subscription.subscribe(decoder, (message) => {
controller.enqueue(message);
});
},
cancel() {
return subscription.unsubscribe([contentTopic]);
}
});
return [messageStream, opts.waku];
}

/**
* Subscribes to new messages for a content topic via callback function.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param callback Called every time a new message is received on the content topic
* @param opts
*/
export async function subscribeToContentTopic(
contentTopic: string,
callback: Callback<DecodedMessage>,
opts: CreateTopicOptions
): Promise<{ subscription: IFilterSubscription; waku: LightNode }> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);
await subscription.subscribe(decoder, callback);
return { subscription, waku: opts.waku };
}
12 changes: 3 additions & 9 deletions packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,7 @@ import { mplex } from "@libp2p/mplex";
import { ping } from "@libp2p/ping";
import { webSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import {
DefaultUserAgent,
wakuFilter,
wakuLightPush,
wakuMetadata,
WakuNode,
WakuOptions,
wakuStore
} from "@waku/core";
import { wakuFilter, wakuLightPush, wakuMetadata, wakuStore } from "@waku/core";
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import {
type CreateLibp2pOptions,
Expand All @@ -34,6 +26,8 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { ensureShardingConfigured } from "@waku/utils";
import { createLibp2p } from "libp2p";

import { DefaultUserAgent, WakuNode, WakuOptions } from "./waku.js";

const DEFAULT_NODE_REQUIREMENTS = {
lightPush: 1,
filter: 1,
Expand Down
9 changes: 3 additions & 6 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
export {
waitForRemotePeer,
createEncoder,
createDecoder,
WakuNode
} from "@waku/core";
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core";
export {
DecodedMessage,
Decoder,
Expand All @@ -12,6 +7,8 @@ export {

export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes";

export * from "./content_topic.js";
export * from "./waku.js";
export * from "./create.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/relay/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { WakuNode, WakuOptions } from "@waku/core";
import {
DefaultPubsubTopic,
type ProtocolCreateOptions,
Expand All @@ -8,6 +7,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { ensureShardingConfigured } from "@waku/utils";

import { defaultLibp2p, defaultPeerDiscoveries } from "../create.js";
import { WakuNode, WakuOptions } from "../waku.js";

/**
* Create a Waku node that uses Waku Relay to send and receive messages,
Expand Down
19 changes: 18 additions & 1 deletion packages/core/src/lib/waku.ts → packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, DecodedMessage } from "@waku/core";
import type {
Callback,
IFilter,
IFilterSubscription,
ILightPush,
IRelay,
IStore,
Libp2p,
LightNode,
PubsubTopic,
Waku
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { ConnectionManager } from "./connection_manager.js";
import { subscribeToContentTopic } from "./content_topic.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -180,6 +184,19 @@ export class WakuNode implements Waku {
await this.libp2p.stop();
}

async subscribeToContentTopic(
contentTopic: string,
peer: Multiaddr,
callback: Callback<DecodedMessage>
): Promise<IFilterSubscription> {
return (
await subscribeToContentTopic(contentTopic, callback, {
waku: this as LightNode,
peer
})
).subscription;
}

isStarted(): boolean {
return this.libp2p.status == "started";
}
Expand Down
Loading

0 comments on commit 78ee39a

Please sign in to comment.