Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: subscribe to content topic via SDK #1823

Merged
merged 1 commit into from
Feb 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .size-limit.cjs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
module.exports = [
{
name: "Waku core",
path: "packages/core/bundle/index.js",
name: "Waku node",
path: "packages/sdk/bundle/index.js",
import: "{ WakuNode }",
},
{
Expand Down
4 changes: 0 additions & 4 deletions packages/core/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
export { DefaultUserAgent } from "./lib/waku.js";
export { createEncoder, createDecoder } from "./lib/message/version_0.js";
export type {
Encoder,
Expand All @@ -7,9 +6,6 @@ export type {
} from "./lib/message/version_0.js";
export * as message from "./lib/message/index.js";

export * as waku from "./lib/waku.js";
export { WakuNode, WakuOptions } from "./lib/waku.js";

export * as waku_filter from "./lib/filter/index.js";
export { wakuFilter, FilterCodecs } from "./lib/filter/index.js";

Expand Down
2 changes: 1 addition & 1 deletion packages/core/src/lib/wait_for_remote_peer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* Wait for a remote peer to be ready given the passed protocols.
* Must be used after attempting to connect to nodes, using
* {@link @waku/core!WakuNode.dial} or a bootstrap method with
* {@link @waku/sdk!WakuNode.dial} or a bootstrap method with
* {@link @waku/sdk!createLightNode}.
*
* If the passed protocols is a GossipSub protocol, then it resolves only once
Expand Down Expand Up @@ -100,7 +100,7 @@
);
return;
} catch (e) {
if ((e as any).code === "ERR_CONNECTION_BEING_CLOSED")

Check warning on line 103 in packages/core/src/lib/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 103 in packages/core/src/lib/wait_for_remote_peer.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
log.error(
`Connection with the peer was closed and possibly because it's on a different shard. Error: ${e}`
);
Expand Down
4 changes: 2 additions & 2 deletions packages/interfaces/src/protocols.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,11 @@ export type ProtocolCreateOptions = {
*/
shardInfo?: Partial<ShardingParams>;
/**
* You can pass options to the `Libp2p` instance used by {@link @waku/core!WakuNode} using the `libp2p` property.
* You can pass options to the `Libp2p` instance used by {@link @waku/sdk!WakuNode} using the `libp2p` property.
* This property is the same type as the one passed to [`Libp2p.create`](https://github.com/libp2p/js-libp2p/blob/master/doc/API.md#create)
* apart that we made the `modules` property optional and partial,
* allowing its omission and letting Waku set good defaults.
* Notes that some values are overridden by {@link @waku/core!WakuNode} to ensure it implements the Waku protocol.
* Notes that some values are overridden by {@link @waku/sdk!WakuNode} to ensure it implements the Waku protocol.
*/
libp2p?: Partial<CreateLibp2pOptions>;
/**
Expand Down
121 changes: 121 additions & 0 deletions packages/sdk/src/content_topic.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import type { Multiaddr } from "@multiformats/multiaddr";
Copy link
Collaborator

@weboko weboko Feb 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall I think it is the way to go - especially considering alignment on waku-org/pm#114 (comment)

With current implementation I think the main problem is ambiguity between existing filter.subscribe and these SDK functions.

To resolve that I think we should make these utils as close as possible to the node object so that they are essentially properties - this way we make sure:

  • they are accessible without any additional context such as import of it or knowledge of their existence;
  • consumer can use filter directly as well (which can be the case if they decide to build on top of raw RFC standard);

The most straightforward way I see is to move it to core and expose from here with additionally making a tech debt task.

Another way might be to move Waku implementation to SDK as this is utility and does not follow any RFC thus does not belong to core

nb: created a debt label just for that

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weboko I added a new class to SDK that extends WakuNode and gives it a function to create a subscription from a content topic. I think it makes sense to keep new, higher-level functions like this separate for now, and then decide on which can be moved inside the WakuNode class itself.

I'm not sure if the entire WakuNode class should be moved to sdk package just yet.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need to introduce another abstraction as in most cases it will be needed to subscribe by contentTopic

Considering this I would move WakuNode all together to sdk and use util functions in content_topic.ts as private to WakuNode that way we keep only one facade and have additional method on the WakuNode to facilitate subscription by topic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@weboko Moved WakuNode from core to sdk and added a function to create a subscription in latest commit

import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core";
import {
Callback,
IDecoder,
IFilterSubscription,
LightNode,
Protocols
} from "@waku/interfaces";
import {
contentTopicToPubsubTopic,
shardInfoToPubsubTopics
} from "@waku/utils";

import { createLightNode } from "./create.js";

interface CreateTopicOptions {
waku?: LightNode;
peer: Multiaddr;
}

// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and
// subscription for that content topic.
async function prepareSubscription(
waku: LightNode,
contentTopic: string,
peer: Multiaddr
): Promise<{
decoder: IDecoder<DecodedMessage>;
subscription: IFilterSubscription;
}> {
// Validate that the Waku node matches assumptions
if (!waku.filter) {
throw new Error("Filter protocol missing from Waku node");
}
const { shardInfo } = waku.libp2p.components.metadata;
if (!shardInfo) {
throw new Error("Shard info missing from Waku node.");
}

// Validate content topic and ensure node is configured for its corresponding pubsub topic
const pubsubTopics = shardInfoToPubsubTopics(shardInfo);
const pubsubTopic = contentTopicToPubsubTopic(contentTopic);
if (!pubsubTopics.includes(pubsubTopic))
throw new Error(
"Content topic does not match any pubsub topic in shard info."
);

await waku.dial(peer);
await waitForRemotePeer(waku, [Protocols.Filter]);

// Create decoder and subscription
let decoder = createDecoder(contentTopic, pubsubTopic);
if (decoder) decoder = decoder ?? decoder;
const subscription = await waku.filter.createSubscription(pubsubTopic);

return { decoder, subscription };
}

/**
* Creates a subscription and streams all new messages for a content topic.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param opts
*/
export async function streamContentTopic(
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
contentTopic: string,
opts: CreateTopicOptions
): Promise<[ReadableStream<DecodedMessage>, LightNode]> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);

// Create a ReadableStream that receives any messages for the content topic
const messageStream = new ReadableStream<DecodedMessage>({
async start(controller) {
await subscription.subscribe(decoder, (message) => {
controller.enqueue(message);
});
},
cancel() {
return subscription.unsubscribe([contentTopic]);
}
});
return [messageStream, opts.waku];
}

/**
* Subscribes to new messages for a content topic via callback function.
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`.
* Assumes node is using autosharding.
* @param contentTopic
* @param callback Called every time a new message is received on the content topic
* @param opts
*/
export async function subscribeToContentTopic(
danisharora099 marked this conversation as resolved.
Show resolved Hide resolved
contentTopic: string,
callback: Callback<DecodedMessage>,
opts: CreateTopicOptions
): Promise<{ subscription: IFilterSubscription; waku: LightNode }> {
opts.waku =
opts.waku ??
(await createLightNode({
shardInfo: { contentTopics: [contentTopic] }
}));
const { decoder, subscription } = await prepareSubscription(
opts.waku,
contentTopic,
opts.peer
);
await subscription.subscribe(decoder, callback);
return { subscription, waku: opts.waku };
}
12 changes: 3 additions & 9 deletions packages/sdk/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,7 @@
import { ping } from "@libp2p/ping";
import { webSockets } from "@libp2p/websockets";
import { all as filterAll } from "@libp2p/websockets/filters";
import {
DefaultUserAgent,
wakuFilter,
wakuLightPush,
wakuMetadata,
WakuNode,
WakuOptions,
wakuStore
} from "@waku/core";
import { wakuFilter, wakuLightPush, wakuMetadata, wakuStore } from "@waku/core";
import { enrTree, wakuDnsDiscovery } from "@waku/dns-discovery";
import {
type CreateLibp2pOptions,
Expand All @@ -34,6 +26,8 @@
import { ensureShardingConfigured } from "@waku/utils";
import { createLibp2p } from "libp2p";

import { DefaultUserAgent, WakuNode, WakuOptions } from "./waku.js";

const DEFAULT_NODE_REQUIREMENTS = {
lightPush: 1,
filter: 1,
Expand Down Expand Up @@ -252,5 +246,5 @@
...pubsubService,
...options?.services
}
}) as any as Libp2p; // TODO: make libp2p include it;

Check warning on line 249 in packages/sdk/src/create.ts

View workflow job for this annotation

GitHub Actions / proto

Unexpected any. Specify a different type

Check warning on line 249 in packages/sdk/src/create.ts

View workflow job for this annotation

GitHub Actions / check

Unexpected any. Specify a different type
}
9 changes: 3 additions & 6 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,4 @@
export {
waitForRemotePeer,
createEncoder,
createDecoder,
WakuNode
} from "@waku/core";
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core";
export {
DecodedMessage,
Decoder,
Expand All @@ -12,6 +7,8 @@ export {

export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes";

export * from "./content_topic.js";
export * from "./waku.js";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I don't really think we need to export it

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd rather leave it in for now just because it breaks unit tests if these functions can't be accessed from tests package

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should test the entry point API - that means node.subscribeToContentTopic is the only thing that should be tested externally

export * from "./create.js";
export * as waku from "@waku/core";
export * as utils from "@waku/utils";
Expand Down
2 changes: 1 addition & 1 deletion packages/sdk/src/relay/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import { WakuNode, WakuOptions } from "@waku/core";
import {
DefaultPubsubTopic,
type ProtocolCreateOptions,
Expand All @@ -8,6 +7,7 @@ import { RelayCreateOptions, wakuGossipSub, wakuRelay } from "@waku/relay";
import { ensureShardingConfigured } from "@waku/utils";

import { defaultLibp2p, defaultPeerDiscoveries } from "../create.js";
import { WakuNode, WakuOptions } from "../waku.js";

/**
* Create a Waku node that uses Waku Relay to send and receive messages,
Expand Down
19 changes: 18 additions & 1 deletion packages/core/src/lib/waku.ts → packages/sdk/src/waku.ts
Original file line number Diff line number Diff line change
@@ -1,19 +1,23 @@
import type { Stream } from "@libp2p/interface";
import { isPeerId, PeerId } from "@libp2p/interface";
import { multiaddr, Multiaddr, MultiaddrInput } from "@multiformats/multiaddr";
import { ConnectionManager, DecodedMessage } from "@waku/core";
import type {
Callback,
IFilter,
IFilterSubscription,
ILightPush,
IRelay,
IStore,
Libp2p,
LightNode,
PubsubTopic,
Waku
} from "@waku/interfaces";
import { Protocols } from "@waku/interfaces";
import { Logger } from "@waku/utils";

import { ConnectionManager } from "./connection_manager.js";
import { subscribeToContentTopic } from "./content_topic.js";

export const DefaultPingKeepAliveValueSecs = 5 * 60;
export const DefaultRelayKeepAliveValueSecs = 5 * 60;
Expand Down Expand Up @@ -180,6 +184,19 @@ export class WakuNode implements Waku {
await this.libp2p.stop();
}

async subscribeToContentTopic(
contentTopic: string,
peer: Multiaddr,
callback: Callback<DecodedMessage>
): Promise<IFilterSubscription> {
return (
await subscribeToContentTopic(contentTopic, callback, {
waku: this as LightNode,
peer
})
).subscription;
}

isStarted(): boolean {
return this.libp2p.status == "started";
}
Expand Down
Loading
Loading