From 88830268cd8dcc88123aeb6664c1e49cc4e56309 Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Wed, 11 Sep 2024 18:26:44 +0330 Subject: [PATCH 1/3] feat(rosenet-node): implement direct message retry --- .changeset/small-grapes-pump.md | 5 + packages/rosenet-node/lib/constants.ts | 3 + .../rosenet-node/lib/createRoseNetNode.ts | 63 +------- .../lib/rosenet-direct/sendMessage.ts | 135 ++++++++++++++++++ 4 files changed, 146 insertions(+), 60 deletions(-) create mode 100644 .changeset/small-grapes-pump.md create mode 100644 packages/rosenet-node/lib/rosenet-direct/sendMessage.ts diff --git a/.changeset/small-grapes-pump.md b/.changeset/small-grapes-pump.md new file mode 100644 index 0000000..ed118b2 --- /dev/null +++ b/.changeset/small-grapes-pump.md @@ -0,0 +1,5 @@ +--- +"@rosen-bridge/rosenet-node": major +--- + +Retry failed direct messages diff --git a/packages/rosenet-node/lib/constants.ts b/packages/rosenet-node/lib/constants.ts index 9a5ff91..abca208 100644 --- a/packages/rosenet-node/lib/constants.ts +++ b/packages/rosenet-node/lib/constants.ts @@ -9,3 +9,6 @@ export const ACK_BYTE = 1; * and an ack requires 1 RTT. We use 2 RTT as the timeout for an ack. */ export const ACK_TIMEOUT = 500; +export const MESSAGE_RETRY_ATTEMPTS = 3; +export const MESSAGE_RETRY_EXPONENT = 5; +export const MESSAGE_RETRY_INITIAL_DELAY = 5000; diff --git a/packages/rosenet-node/lib/createRoseNetNode.ts b/packages/rosenet-node/lib/createRoseNetNode.ts index dce9d38..54db759 100644 --- a/packages/rosenet-node/lib/createRoseNetNode.ts +++ b/packages/rosenet-node/lib/createRoseNetNode.ts @@ -7,10 +7,7 @@ import { identify } from '@libp2p/identify'; import { PeerId } from '@libp2p/interface'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; import { tcp } from '@libp2p/tcp'; -import { AbortError, raceSignal } from 'race-signal'; -import first from 'it-first'; -import map from 'it-map'; import { pipe } from 'it-pipe'; import { createLibp2p } from 'libp2p'; @@ -25,19 +22,14 @@ import RoseNetNodeContext from './context/RoseNetNodeContext'; import restartRelayDiscovery from './libp2p/restart-relay-discovery'; import addressService from './address/address-service'; -import streamService from './stream/stream-service'; -import { decode, encode } from './utils/codec'; +import { decode } from './utils/codec'; import sample from './utils/sample'; import RoseNetNodeError from './errors/RoseNetNodeError'; -import RoseNetDirectAckError, { - AckError, -} from './errors/RoseNetDirectAckError'; import { ACK_BYTE, - ACK_TIMEOUT, DEFAULT_NODE_PORT, RELAYS_COUNT_TO_CONNECT, ROSENET_DIRECT_PROTOCOL_V1, @@ -46,6 +38,7 @@ import { import packageJson from '../package.json' with { type: 'json' }; import { RoseNetNodeConfig } from './types'; +import sendMessageFactory from './rosenet-direct/sendMessage'; const textEncoder = new TextEncoder(); const textDecoder = new TextDecoder(); @@ -151,57 +144,7 @@ const createRoseNetNode = async ({ return { start: async () => node.start(), - sendMessage: async (to: string, message: string) => { - let stream; - - try { - stream = await streamService.getRoseNetDirectStreamTo(to, node); - - const result = await pipe( - [message], - (source) => map(source, encode), - stream, - async (source) => - await raceSignal(first(source), AbortSignal.timeout(ACK_TIMEOUT)), - ); - - if (result?.length !== 1) { - throw new RoseNetDirectAckError( - `There are more than one chunk in the ack message`, - AckError.InvalidChunks, - ); - } - const ack = result?.subarray(); - if (ack.length !== 1 || ack[0] !== ACK_BYTE) { - throw new RoseNetDirectAckError( - `Ack byte is invalid`, - AckError.InvalidByte, - ); - } - - RoseNetNodeContext.logger.debug('message sent successfully', { - message, - }); - } catch (error) { - if (error instanceof AbortError) { - const errorToThrow = new RoseNetDirectAckError( - `Ack was not received`, - AckError.Timeout, - ); - stream?.abort(errorToThrow); - throw errorToThrow; - } - if (error instanceof RoseNetDirectAckError) { - stream?.abort(error); - throw error; - } - const errorToThrow = new RoseNetNodeError( - `An unknown error occured: ${error instanceof Error ? error.message : error}`, - ); - stream?.abort(errorToThrow); - throw error; - } - }, + sendMessage: sendMessageFactory(node), handleIncomingMessage: ( handler: (from: string, message?: string) => void, ) => { diff --git a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts new file mode 100644 index 0000000..8dfeceb --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts @@ -0,0 +1,135 @@ +import { Libp2p } from '@libp2p/interface'; + +import { ExponentialBackoff, handleAll, retry } from 'cockatiel'; +import first from 'it-first'; +import map from 'it-map'; +import { pipe } from 'it-pipe'; +import { AbortError, raceSignal } from 'race-signal'; + +import RoseNetNodeContext from '../context/RoseNetNodeContext'; +import streamService from '../stream/stream-service'; +import { encode } from '../utils/codec'; + +import RoseNetDirectAckError, { + AckError, +} from '../errors/RoseNetDirectAckError'; +import RoseNetNodeError from '../errors/RoseNetNodeError'; + +import { + ACK_BYTE, + ACK_TIMEOUT, + MESSAGE_RETRY_ATTEMPTS, + MESSAGE_RETRY_EXPONENT, + MESSAGE_RETRY_INITIAL_DELAY, +} from '../constants'; + +/** + * A factory returning a function to send a message to a specific peer via + * RoseNet direct protocol + */ +const sendMessageFactory = + (node: Libp2p) => async (to: string, message: string) => { + let stream; + + try { + stream = await streamService.getRoseNetDirectStreamTo(to, node); + + const result = await pipe( + [message], + (source) => map(source, encode), + stream, + async (source) => + await raceSignal(first(source), AbortSignal.timeout(ACK_TIMEOUT)), + ); + + if (result?.length !== 1) { + throw new RoseNetDirectAckError( + `There are more than one chunk in the ack message`, + AckError.InvalidChunks, + ); + } + const ack = result?.subarray(); + if (ack.length !== 1 || ack[0] !== ACK_BYTE) { + throw new RoseNetDirectAckError( + `Ack byte is invalid`, + AckError.InvalidByte, + ); + } + + RoseNetNodeContext.logger.debug('message sent successfully', { + message, + }); + } catch (error) { + if (error instanceof AbortError) { + const errorToThrow = new RoseNetDirectAckError( + `Ack was not received`, + AckError.Timeout, + ); + stream?.abort(errorToThrow); + throw errorToThrow; + } + if (error instanceof RoseNetDirectAckError) { + stream?.abort(error); + throw error; + } + const errorToThrow = new RoseNetNodeError( + `An unknown error occured: ${error instanceof Error ? error.message : error}`, + ); + stream?.abort(errorToThrow); + throw error; + } + }; + +/** + * A wrapper around `sendMessageFactory` for retrying failed messages + */ +const sendMessageWithRetryFactory = + (node: Libp2p) => + async ( + to: string, + message: string, + /** + * an optional callback that is called with an error if the message sending + * fails after enough retrials, and with no arguments otherwise + */ + onSettled?: (error?: Error) => Promise, + ) => { + const sendMessageInner = sendMessageFactory(node); + try { + const retryPolicy = retry(handleAll, { + maxAttempts: MESSAGE_RETRY_ATTEMPTS, + backoff: new ExponentialBackoff({ + exponent: MESSAGE_RETRY_EXPONENT, + initialDelay: MESSAGE_RETRY_INITIAL_DELAY, + maxDelay: 300_000, + }), + }); + retryPolicy.onFailure((data) => { + RoseNetNodeContext.logger.debug('message sending failed', { + message, + reason: data.reason, + }); + }); + retryPolicy.onRetry((data) => { + RoseNetNodeContext.logger.debug( + `retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`, + { + message, + }, + ); + }); + + await retryPolicy.execute(() => sendMessageInner(to, message)); + onSettled?.(); + } catch (error) { + RoseNetNodeContext.logger.error( + 'message sending failed regardless of 3 retries, dropping message', + ); + RoseNetNodeContext.logger.debug('message was: ', { + message, + }); + onSettled?.(new RoseNetNodeError('Message sending failed')); + } + }; + +export default sendMessageWithRetryFactory; From 7addf22318811e7d912485c8c122c5c8b9be8dd8 Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Wed, 11 Sep 2024 18:31:48 +0330 Subject: [PATCH 2/3] refactor(rosenet-node): extract RoseNet direct protocol handler --- .../rosenet-node/lib/createRoseNetNode.ts | 53 +++---------------- .../rosenet-direct/handleIncomingMessage.ts | 51 ++++++++++++++++++ .../rosenet-node/lib/rosenet-direct/index.ts | 2 + 3 files changed, 60 insertions(+), 46 deletions(-) create mode 100644 packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts create mode 100644 packages/rosenet-node/lib/rosenet-direct/index.ts diff --git a/packages/rosenet-node/lib/createRoseNetNode.ts b/packages/rosenet-node/lib/createRoseNetNode.ts index 54db759..ba832ca 100644 --- a/packages/rosenet-node/lib/createRoseNetNode.ts +++ b/packages/rosenet-node/lib/createRoseNetNode.ts @@ -8,7 +8,6 @@ import { PeerId } from '@libp2p/interface'; import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery'; import { tcp } from '@libp2p/tcp'; -import { pipe } from 'it-pipe'; import { createLibp2p } from 'libp2p'; import { @@ -17,28 +16,26 @@ import { privateKeyToPeerId, } from '@rosen-bridge/rosenet-utils'; +import { + handleIncomingMessageFactory, + sendMessageFactory, +} from './rosenet-direct'; + import RoseNetNodeContext from './context/RoseNetNodeContext'; import restartRelayDiscovery from './libp2p/restart-relay-discovery'; import addressService from './address/address-service'; -import { decode } from './utils/codec'; import sample from './utils/sample'; import RoseNetNodeError from './errors/RoseNetNodeError'; -import { - ACK_BYTE, - DEFAULT_NODE_PORT, - RELAYS_COUNT_TO_CONNECT, - ROSENET_DIRECT_PROTOCOL_V1, -} from './constants'; +import { DEFAULT_NODE_PORT, RELAYS_COUNT_TO_CONNECT } from './constants'; import packageJson from '../package.json' with { type: 'json' }; import { RoseNetNodeConfig } from './types'; -import sendMessageFactory from './rosenet-direct/sendMessage'; const textEncoder = new TextEncoder(); const textDecoder = new TextDecoder(); @@ -145,43 +142,7 @@ const createRoseNetNode = async ({ return { start: async () => node.start(), sendMessage: sendMessageFactory(node), - handleIncomingMessage: ( - handler: (from: string, message?: string) => void, - ) => { - node.handle( - ROSENET_DIRECT_PROTOCOL_V1, - async ({ connection, stream }) => { - RoseNetNodeContext.logger.debug( - `incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`, - { - remoteAddress: connection.remoteAddr.toString(), - transient: connection.transient, - }, - ); - pipe( - stream, - decode, - async function* (source) { - for await (const message of source) { - RoseNetNodeContext.logger.debug( - 'message received, calling handler and sending ack', - { - message, - }, - ); - handler(connection.remotePeer.toString(), message); - yield Uint8Array.of(ACK_BYTE); - } - }, - stream, - ); - }, - { runOnTransientConnection: true }, - ); - RoseNetNodeContext.logger.debug( - `handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`, - ); - }, + handleIncomingMessage: handleIncomingMessageFactory(node), publish: async (topic: string, message: string) => { node.services.pubsub.publish(topic, textEncoder.encode(message)); }, diff --git a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts new file mode 100644 index 0000000..e756e7b --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts @@ -0,0 +1,51 @@ +import { pipe } from 'it-pipe'; + +import { Libp2p } from '@libp2p/interface'; + +import RoseNetNodeContext from '../context/RoseNetNodeContext'; + +import { decode } from '../utils/codec'; + +import { ACK_BYTE, ROSENET_DIRECT_PROTOCOL_V1 } from '../constants'; + +/** + * protocol handler for RoseNet direct + */ +const handleIncomingMessageFactory = + (node: Libp2p) => (handler: (from: string, message?: string) => void) => { + node.handle( + ROSENET_DIRECT_PROTOCOL_V1, + async ({ connection, stream }) => { + RoseNetNodeContext.logger.debug( + `incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`, + { + remoteAddress: connection.remoteAddr.toString(), + transient: connection.transient, + }, + ); + pipe( + stream, + decode, + async function* (source) { + for await (const message of source) { + RoseNetNodeContext.logger.debug( + 'message received, calling handler and sending ack', + { + message, + }, + ); + handler(connection.remotePeer.toString(), message); + yield Uint8Array.of(ACK_BYTE); + } + }, + stream, + ); + }, + { runOnTransientConnection: true }, + ); + RoseNetNodeContext.logger.debug( + `handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`, + ); + }; + +export default handleIncomingMessageFactory; diff --git a/packages/rosenet-node/lib/rosenet-direct/index.ts b/packages/rosenet-node/lib/rosenet-direct/index.ts new file mode 100644 index 0000000..2880318 --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-direct/index.ts @@ -0,0 +1,2 @@ +export { default as handleIncomingMessageFactory } from './handleIncomingMessage'; +export { default as sendMessageFactory } from './sendMessage'; From 72af9869aeea18bcce6eec140a8c9597baa48fc1 Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Wed, 11 Sep 2024 18:38:22 +0330 Subject: [PATCH 3/3] refactor(rosenet-node): extract pubsub functions --- .../rosenet-node/lib/createRoseNetNode.ts | 17 +++-------------- .../rosenet-node/lib/rosenet-pubsub/index.ts | 2 ++ .../lib/rosenet-pubsub/publish.ts | 14 ++++++++++++++ .../lib/rosenet-pubsub/subscribe.ts | 19 +++++++++++++++++++ 4 files changed, 38 insertions(+), 14 deletions(-) create mode 100644 packages/rosenet-node/lib/rosenet-pubsub/index.ts create mode 100644 packages/rosenet-node/lib/rosenet-pubsub/publish.ts create mode 100644 packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts diff --git a/packages/rosenet-node/lib/createRoseNetNode.ts b/packages/rosenet-node/lib/createRoseNetNode.ts index ba832ca..25dd763 100644 --- a/packages/rosenet-node/lib/createRoseNetNode.ts +++ b/packages/rosenet-node/lib/createRoseNetNode.ts @@ -20,6 +20,7 @@ import { handleIncomingMessageFactory, sendMessageFactory, } from './rosenet-direct'; +import { publishFactory, subscribeFactory } from './rosenet-pubsub'; import RoseNetNodeContext from './context/RoseNetNodeContext'; @@ -37,9 +38,6 @@ import packageJson from '../package.json' with { type: 'json' }; import { RoseNetNodeConfig } from './types'; -const textEncoder = new TextEncoder(); -const textDecoder = new TextDecoder(); - const createRoseNetNode = async ({ logger, port = DEFAULT_NODE_PORT, @@ -143,17 +141,8 @@ const createRoseNetNode = async ({ start: async () => node.start(), sendMessage: sendMessageFactory(node), handleIncomingMessage: handleIncomingMessageFactory(node), - publish: async (topic: string, message: string) => { - node.services.pubsub.publish(topic, textEncoder.encode(message)); - }, - subscribe: async (topic: string, handler: (message: string) => void) => { - node.services.pubsub.subscribe(topic); - node.services.pubsub.addEventListener('message', (event) => { - if (event.detail.topic === topic) { - handler(textDecoder.decode(event.detail.data)); - } - }); - }, + publish: publishFactory(node), + subscribe: subscribeFactory(node), }; }; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/index.ts b/packages/rosenet-node/lib/rosenet-pubsub/index.ts new file mode 100644 index 0000000..8f4d625 --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-pubsub/index.ts @@ -0,0 +1,2 @@ +export { default as publishFactory } from './publish'; +export { default as subscribeFactory } from './subscribe'; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts new file mode 100644 index 0000000..0c9af89 --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts @@ -0,0 +1,14 @@ +import { Libp2p, PubSub } from '@libp2p/interface'; + +const textEncoder = new TextEncoder(); + +/** + * factory for libp2p publish + */ +const publishFactory = + (node: Libp2p<{ pubsub: PubSub }>) => + async (topic: string, message: string) => { + node.services.pubsub.publish(topic, textEncoder.encode(message)); + }; + +export default publishFactory; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts new file mode 100644 index 0000000..05829fa --- /dev/null +++ b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts @@ -0,0 +1,19 @@ +import { Libp2p, PubSub } from '@libp2p/interface'; + +const textDecoder = new TextDecoder(); + +/** + * factory for libp2p subscribe + */ +const subscribeFactory = + (node: Libp2p<{ pubsub: PubSub }>) => + async (topic: string, handler: (message: string) => void) => { + node.services.pubsub.subscribe(topic); + node.services.pubsub.addEventListener('message', (event) => { + if (event.detail.topic === topic) { + handler(textDecoder.decode(event.detail.data)); + } + }); + }; + +export default subscribeFactory;