From 72af9869aeea18bcce6eec140a8c9597baa48fc1 Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Wed, 11 Sep 2024 18:38:22 +0330 Subject: [PATCH] refactor(rosenet-node): extract pubsub functions --- .../rosenet-node/lib/createRoseNetNode.ts | 17 +++-------------- .../rosenet-node/lib/rosenet-pubsub/index.ts | 2 ++ .../lib/rosenet-pubsub/publish.ts | 14 ++++++++++++++ .../lib/rosenet-pubsub/subscribe.ts | 19 +++++++++++++++++++ 4 files changed, 38 insertions(+), 14 deletions(-) create mode 100644 packages/rosenet-node/lib/rosenet-pubsub/index.ts create mode 100644 packages/rosenet-node/lib/rosenet-pubsub/publish.ts create mode 100644 packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts diff --git a/packages/rosenet-node/lib/createRoseNetNode.ts b/packages/rosenet-node/lib/createRoseNetNode.ts index ba832ca..25dd763 100644 --- a/packages/rosenet-node/lib/createRoseNetNode.ts +++ b/packages/rosenet-node/lib/createRoseNetNode.ts @@ -20,6 +20,7 @@ import { handleIncomingMessageFactory, sendMessageFactory, } from './rosenet-direct'; +import { publishFactory, subscribeFactory } from './rosenet-pubsub'; import RoseNetNodeContext from './context/RoseNetNodeContext'; @@ -37,9 +38,6 @@ import packageJson from '../package.json' with { type: 'json' }; import { RoseNetNodeConfig } from './types'; -const textEncoder = new TextEncoder(); -const textDecoder = new TextDecoder(); - const createRoseNetNode = async ({ logger, port = DEFAULT_NODE_PORT, @@ -143,17 +141,8 @@ const createRoseNetNode = async ({ start: async () => node.start(), sendMessage: sendMessageFactory(node), handleIncomingMessage: handleIncomingMessageFactory(node), - publish: async (topic: string, message: string) => { - node.services.pubsub.publish(topic, textEncoder.encode(message)); - }, - subscribe: async (topic: string, handler: (message: string) => void) => { - node.services.pubsub.subscribe(topic); - node.services.pubsub.addEventListener('message', (event) => { - if (event.detail.topic === topic) { - handler(textDecoder.decode(event.detail.data)); - } - }); - }, + publish: publishFactory(node), + subscribe: subscribeFactory(node), }; }; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/index.ts b/packages/rosenet-node/lib/rosenet-pubsub/index.ts new file mode 100644 index 0000000..8f4d625 --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-pubsub/index.ts @@ -0,0 +1,2 @@ +export { default as publishFactory } from './publish'; +export { default as subscribeFactory } from './subscribe'; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts new file mode 100644 index 0000000..0c9af89 --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts @@ -0,0 +1,14 @@ +import { Libp2p, PubSub } from '@libp2p/interface'; + +const textEncoder = new TextEncoder(); + +/** + * factory for libp2p publish + */ +const publishFactory = + (node: Libp2p<{ pubsub: PubSub }>) => + async (topic: string, message: string) => { + node.services.pubsub.publish(topic, textEncoder.encode(message)); + }; + +export default publishFactory; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts new file mode 100644 index 0000000..05829fa --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts @@ -0,0 +1,19 @@ +import { Libp2p, PubSub } from '@libp2p/interface'; + +const textDecoder = new TextDecoder(); + +/** + * factory for libp2p subscribe + */ +const subscribeFactory = + (node: Libp2p<{ pubsub: PubSub }>) => + async (topic: string, handler: (message: string) => void) => { + node.services.pubsub.subscribe(topic); + node.services.pubsub.addEventListener('message', (event) => { + if (event.detail.topic === topic) { + handler(textDecoder.decode(event.detail.data)); + } + }); + }; + +export default subscribeFactory;