-
Notifications
You must be signed in to change notification settings - Fork 42
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: subscribe to content topic via SDK #1823
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,121 @@ | ||
import type { Multiaddr } from "@multiformats/multiaddr"; | ||
import { createDecoder, DecodedMessage, waitForRemotePeer } from "@waku/core"; | ||
import { | ||
Callback, | ||
IDecoder, | ||
IFilterSubscription, | ||
LightNode, | ||
Protocols | ||
} from "@waku/interfaces"; | ||
import { | ||
contentTopicToPubsubTopic, | ||
shardInfoToPubsubTopics | ||
} from "@waku/utils"; | ||
|
||
import { createLightNode } from "./create.js"; | ||
|
||
interface CreateTopicOptions { | ||
waku?: LightNode; | ||
peer: Multiaddr; | ||
} | ||
|
||
// Given a Waku node, peer Multiaddr, and content topic, creates a decoder and | ||
// subscription for that content topic. | ||
async function prepareSubscription( | ||
waku: LightNode, | ||
contentTopic: string, | ||
peer: Multiaddr | ||
): Promise<{ | ||
decoder: IDecoder<DecodedMessage>; | ||
subscription: IFilterSubscription; | ||
}> { | ||
// Validate that the Waku node matches assumptions | ||
if (!waku.filter) { | ||
throw new Error("Filter protocol missing from Waku node"); | ||
} | ||
const { shardInfo } = waku.libp2p.components.metadata; | ||
if (!shardInfo) { | ||
throw new Error("Shard info missing from Waku node."); | ||
} | ||
|
||
// Validate content topic and ensure node is configured for its corresponding pubsub topic | ||
const pubsubTopics = shardInfoToPubsubTopics(shardInfo); | ||
const pubsubTopic = contentTopicToPubsubTopic(contentTopic); | ||
if (!pubsubTopics.includes(pubsubTopic)) | ||
throw new Error( | ||
"Content topic does not match any pubsub topic in shard info." | ||
); | ||
|
||
await waku.dial(peer); | ||
await waitForRemotePeer(waku, [Protocols.Filter]); | ||
|
||
// Create decoder and subscription | ||
let decoder = createDecoder(contentTopic, pubsubTopic); | ||
if (decoder) decoder = decoder ?? decoder; | ||
const subscription = await waku.filter.createSubscription(pubsubTopic); | ||
|
||
return { decoder, subscription }; | ||
} | ||
|
||
/** | ||
* Creates a subscription and streams all new messages for a content topic. | ||
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. | ||
* Assumes node is using autosharding. | ||
* @param contentTopic | ||
* @param opts | ||
*/ | ||
export async function streamContentTopic( | ||
danisharora099 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
contentTopic: string, | ||
opts: CreateTopicOptions | ||
): Promise<[ReadableStream<DecodedMessage>, LightNode]> { | ||
opts.waku = | ||
opts.waku ?? | ||
(await createLightNode({ | ||
shardInfo: { contentTopics: [contentTopic] } | ||
})); | ||
const { decoder, subscription } = await prepareSubscription( | ||
opts.waku, | ||
contentTopic, | ||
opts.peer | ||
); | ||
|
||
// Create a ReadableStream that receives any messages for the content topic | ||
const messageStream = new ReadableStream<DecodedMessage>({ | ||
async start(controller) { | ||
await subscription.subscribe(decoder, (message) => { | ||
controller.enqueue(message); | ||
}); | ||
}, | ||
cancel() { | ||
return subscription.unsubscribe([contentTopic]); | ||
} | ||
}); | ||
return [messageStream, opts.waku]; | ||
} | ||
|
||
/** | ||
* Subscribes to new messages for a content topic via callback function. | ||
* Will create a light node configured for the content topic with default settings if a node is not provided in `opts`. | ||
* Assumes node is using autosharding. | ||
* @param contentTopic | ||
* @param callback Called every time a new message is received on the content topic | ||
* @param opts | ||
*/ | ||
export async function subscribeToContentTopic( | ||
danisharora099 marked this conversation as resolved.
Show resolved
Hide resolved
|
||
contentTopic: string, | ||
callback: Callback<DecodedMessage>, | ||
opts: CreateTopicOptions | ||
): Promise<{ subscription: IFilterSubscription; waku: LightNode }> { | ||
opts.waku = | ||
opts.waku ?? | ||
(await createLightNode({ | ||
shardInfo: { contentTopics: [contentTopic] } | ||
})); | ||
const { decoder, subscription } = await prepareSubscription( | ||
opts.waku, | ||
contentTopic, | ||
opts.peer | ||
); | ||
await subscription.subscribe(decoder, callback); | ||
return { subscription, waku: opts.waku }; | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,9 +1,4 @@ | ||
export { | ||
waitForRemotePeer, | ||
createEncoder, | ||
createDecoder, | ||
WakuNode | ||
} from "@waku/core"; | ||
export { waitForRemotePeer, createEncoder, createDecoder } from "@waku/core"; | ||
export { | ||
DecodedMessage, | ||
Decoder, | ||
|
@@ -12,6 +7,8 @@ export { | |
|
||
export { utf8ToBytes, bytesToUtf8 } from "@waku/utils/bytes"; | ||
|
||
export * from "./content_topic.js"; | ||
export * from "./waku.js"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: I don't really think we need to export it There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'd rather leave it in for now just because it breaks unit tests if these functions can't be accessed from tests package There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we should test the entry point API - that means |
||
export * from "./create.js"; | ||
export * as waku from "@waku/core"; | ||
export * as utils from "@waku/utils"; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall I think it is the way to go - especially considering alignment on waku-org/pm#114 (comment)
With current implementation I think the main problem is ambiguity between existing
filter.subscribe
and these SDK functions.To resolve that I think we should make these utils as close as possible to the
node
object so that they are essentially properties - this way we make sure:The most straightforward way I see is to move it to
core
and expose from here with additionally making a tech debt task.Another way might be to move
Waku
implementation to SDK as this is utility and does not follow any RFC thus does not belong tocore
nb: created a
debt
label just for thatThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@weboko I added a new class to SDK that extends
WakuNode
and gives it a function to create a subscription from a content topic. I think it makes sense to keep new, higher-level functions like this separate for now, and then decide on which can be moved inside theWakuNode
class itself.I'm not sure if the entire
WakuNode
class should be moved to sdk package just yet.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need to introduce another abstraction as in most cases it will be needed to subscribe by
contentTopic
Considering this I would move
WakuNode
all together tosdk
and use util functions incontent_topic.ts
as private toWakuNode
that way we keep only one facade and have additional method on theWakuNode
to facilitate subscription by topic.There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@weboko Moved
WakuNode
fromcore
tosdk
and added a function to create a subscription in latest commit