From 34f5a83cae8cf7210597321030c51c0825b1069c Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Mon, 23 Sep 2024 22:32:43 +0330 Subject: [PATCH] feat(rosenet-node): make `sendMessage` code more defensive - Apply timeout to stream creation and message round trip pipe - Add a bulkhead to limit the number of concurrent pending messages - Add a circuit breaker for each of peer dials --- .changeset/cold-shirts-accept.md | 5 + package-lock.json | 117 +-------------- packages/rosenet-node/lib/constants.ts | 14 +- .../lib/errors/RoseNetDirectAckError.ts | 18 --- .../lib/errors/RoseNetDirectError.ts | 18 +++ .../lib/errors/RoseNetNodeError.ts | 3 + .../lib/rosenet-direct/sendMessage.ts | 142 +++++++++++------- .../rosenet-node/lib/stream/stream-service.ts | 75 ++++++++- packages/rosenet-node/package.json | 3 +- 9 files changed, 197 insertions(+), 198 deletions(-) create mode 100644 .changeset/cold-shirts-accept.md delete mode 100644 packages/rosenet-node/lib/errors/RoseNetDirectAckError.ts create mode 100644 packages/rosenet-node/lib/errors/RoseNetDirectError.ts diff --git a/.changeset/cold-shirts-accept.md b/.changeset/cold-shirts-accept.md new file mode 100644 index 0000000..3176208 --- /dev/null +++ b/.changeset/cold-shirts-accept.md @@ -0,0 +1,5 @@ +--- +'@rosen-bridge/rosenet-node': minor +--- + +Limit the number of concurrent pending messages, while also applying different timeouts during message sending diff --git a/package-lock.json b/package-lock.json index c1d0809..a7345fc 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1761,14 +1761,6 @@ "uint8arrays": "^5.1.0" } }, - "node_modules/@libp2p/circuit-relay-v2/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/crypto": { "version": "4.1.9", "resolved": "https://registry.npmjs.org/@libp2p/crypto/-/crypto-4.1.9.tgz", @@ -1784,14 +1776,6 @@ "uint8arrays": "^5.1.0" } }, - "node_modules/@libp2p/crypto/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/identify": { "version": "2.1.5", "resolved": "https://registry.npmjs.org/@libp2p/identify/-/identify-2.1.5.tgz", @@ -1812,14 +1796,6 @@ "wherearewe": "^2.0.1" } }, - "node_modules/@libp2p/identify/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/interface": { "version": "1.7.0", "resolved": "https://registry.npmjs.org/@libp2p/interface/-/interface-1.7.0.tgz", @@ -1873,14 +1849,6 @@ "uint8arrays": "^5.1.0" } }, - "node_modules/@libp2p/multistream-select/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/peer-collections": { "version": "5.2.9", "resolved": "https://registry.npmjs.org/@libp2p/peer-collections/-/peer-collections-5.2.9.tgz", @@ -1914,14 +1882,6 @@ "uint8arrays": "^5.0.1" } }, - "node_modules/@libp2p/peer-id/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/peer-record": { "version": "7.0.25", "resolved": "https://registry.npmjs.org/@libp2p/peer-record/-/peer-record-7.0.25.tgz", @@ -1938,14 +1898,6 @@ "uint8arrays": "^5.1.0" } }, - "node_modules/@libp2p/peer-record/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/peer-store": { "version": "10.1.5", "resolved": "https://registry.npmjs.org/@libp2p/peer-store/-/peer-store-10.1.5.tgz", @@ -1965,18 +1917,10 @@ "uint8arrays": "^5.1.0" } }, - "node_modules/@libp2p/peer-store/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/ping": { - "version": "1.1.5", - "resolved": "https://registry.npmjs.org/@libp2p/ping/-/ping-1.1.5.tgz", - "integrity": "sha512-CeRpXdtljyWr/UNmrojnZbyI/oDkdu6duCGtWnnDFmPS2tR4Rxr2C8sKA1iAvhgvRFhh5vrTmlB1QUbUWHHRCg==", + "version": "1.1.6", + "resolved": "https://registry.npmjs.org/@libp2p/ping/-/ping-1.1.6.tgz", + "integrity": "sha512-tzTL0BzS1JaHE8v4PhRZ5K8wQQQcTMXM/0baCkLTLIaSMe1fzhj+KHbFNoUrY3yni4yfsVY1uR0qchhc1/J9qg==", "dependencies": { "@libp2p/crypto": "^4.1.9", "@libp2p/interface": "^1.7.0", @@ -1987,14 +1931,6 @@ "uint8arrays": "^5.1.0" } }, - "node_modules/@libp2p/ping/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@libp2p/pubsub": { "version": "9.0.10", "resolved": "https://registry.npmjs.org/@libp2p/pubsub/-/pubsub-9.0.10.tgz", @@ -2046,20 +1982,6 @@ "multiformats": "^12.0.1" } }, - "node_modules/@libp2p/pubsub/node_modules/it-pipe": { - "version": "3.0.1", - "resolved": "https://registry.npmjs.org/it-pipe/-/it-pipe-3.0.1.tgz", - "integrity": "sha512-sIoNrQl1qSRg2seYSBH/3QxWhJFn9PKYvOf/bHdtCBF0bnghey44VyASsWzn5dAx0DCDDABq1hZIuzKmtBZmKA==", - "dependencies": { - "it-merge": "^3.0.0", - "it-pushable": "^3.1.2", - "it-stream-types": "^2.0.1" - }, - "engines": { - "node": ">=16.0.0", - "npm": ">=7.0.0" - } - }, "node_modules/@libp2p/tcp": { "version": "9.1.5", "resolved": "https://registry.npmjs.org/@libp2p/tcp/-/tcp-9.1.5.tgz", @@ -2102,14 +2024,6 @@ "uint8arrays": "^5.1.0" } }, - "node_modules/@libp2p/utils/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "node_modules/@manypkg/find-root": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/@manypkg/find-root/-/find-root-1.1.0.tgz", @@ -8702,9 +8616,9 @@ } }, "node_modules/uint8arrays": { - "version": "5.0.2", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.0.2.tgz", - "integrity": "sha512-S0GaeR+orZt7LaqzTRs4ZP8QqzAauJ+0d4xvP2lJTA99jIkKsE2FgDs4tGF/K/z5O9I/2W5Yvrh7IuqNeYH+0Q==", + "version": "5.1.0", + "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", + "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", "dependencies": { "multiformats": "^13.0.0" } @@ -9273,8 +9187,7 @@ "it-map": "^3.0.5", "it-pipe": "^3.0.1", "libp2p": "^1.9.2", - "public-ip": "^6.0.2", - "race-signal": "^1.1.0" + "public-ip": "^6.0.2" }, "devDependencies": { "@types/node": "^20.11.9", @@ -9545,14 +9458,6 @@ "url": "https://github.com/sponsors/antfu" } }, - "packages/rosenet-node/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "packages/rosenet-node/node_modules/vite-node": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/vite-node/-/vite-node-1.3.0.tgz", @@ -9922,14 +9827,6 @@ "url": "https://github.com/sponsors/antfu" } }, - "packages/rosenet-relay/node_modules/uint8arrays": { - "version": "5.1.0", - "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-5.1.0.tgz", - "integrity": "sha512-vA6nFepEmlSKkMBnLBaUMVvAC4G3CTmO58C12y4sq6WPDOR7mOFYOi7GlrQ4djeSbP6JG9Pv9tJDM97PedRSww==", - "dependencies": { - "multiformats": "^13.0.0" - } - }, "packages/rosenet-relay/node_modules/vite-node": { "version": "1.3.0", "resolved": "https://registry.npmjs.org/vite-node/-/vite-node-1.3.0.tgz", diff --git a/packages/rosenet-node/lib/constants.ts b/packages/rosenet-node/lib/constants.ts index abca208..54a8069 100644 --- a/packages/rosenet-node/lib/constants.ts +++ b/packages/rosenet-node/lib/constants.ts @@ -4,11 +4,9 @@ export const RELAY_DISCOVERY_RESTART_INTERVAL = 10_000; export const ROSENET_DIRECT_PROTOCOL_V1 = '/rosenet/direct/1'; export const DEFAULT_NODE_PORT = 55123; export const ACK_BYTE = 1; -/** - * Worst inter-continent ping RTT is around 250ms at the time of this commit, - * and an ack requires 1 RTT. We use 2 RTT as the timeout for an ack. - */ -export const ACK_TIMEOUT = 500; -export const MESSAGE_RETRY_ATTEMPTS = 3; -export const MESSAGE_RETRY_EXPONENT = 5; -export const MESSAGE_RETRY_INITIAL_DELAY = 5000; +export const MESSAGE_ROUNDTRIP_TIMEOUT = 1000; +export const MESSAGE_RETRY_ATTEMPTS = 5; +export const MESSAGE_RETRY_INITIAL_DELAY = 2000; +export const MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED = 1000; +export const MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE = 2000; +export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 500; diff --git a/packages/rosenet-node/lib/errors/RoseNetDirectAckError.ts b/packages/rosenet-node/lib/errors/RoseNetDirectAckError.ts deleted file mode 100644 index 2c779b3..0000000 --- a/packages/rosenet-node/lib/errors/RoseNetDirectAckError.ts +++ /dev/null @@ -1,18 +0,0 @@ -import RoseNetNodeError from './RoseNetNodeError'; - -export enum AckError { - Timeout = 'Timeout', - InvalidChunks = 'InvalidChunks', - InvalidByte = 'InvalidByte', -} - -class RoseNetDirectAckError extends RoseNetNodeError { - constructor( - message: string, - public type: AckError, - ) { - super(message); - } -} - -export default RoseNetDirectAckError; diff --git a/packages/rosenet-node/lib/errors/RoseNetDirectError.ts b/packages/rosenet-node/lib/errors/RoseNetDirectError.ts new file mode 100644 index 0000000..15ac4be --- /dev/null +++ b/packages/rosenet-node/lib/errors/RoseNetDirectError.ts @@ -0,0 +1,18 @@ +import RoseNetNodeError from './RoseNetNodeError'; + +export enum RoseNetDirectErrorType { + Timeout = 'Timeout', + InvalidAckChunks = 'InvalidAckChunks', + InvalidAckByte = 'InvalidAckByte', +} + +class RoseNetDirectError extends RoseNetNodeError { + constructor( + message: string, + public type: RoseNetDirectErrorType, + ) { + super(message); + } +} + +export default RoseNetDirectError; diff --git a/packages/rosenet-node/lib/errors/RoseNetNodeError.ts b/packages/rosenet-node/lib/errors/RoseNetNodeError.ts index b7e3e00..c73713e 100644 --- a/packages/rosenet-node/lib/errors/RoseNetNodeError.ts +++ b/packages/rosenet-node/lib/errors/RoseNetNodeError.ts @@ -2,9 +2,12 @@ class RoseNetNodeError extends Error { constructor( public message: string, public stack = '', + cause?: unknown, ) { super(message); + this.cause = cause; + if (stack) { this.stack = stack; } else { diff --git a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts index 8dfeceb..59a3dee 100644 --- a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts +++ b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts @@ -1,26 +1,36 @@ -import { Libp2p } from '@libp2p/interface'; +import { Libp2p, Stream } from '@libp2p/interface'; -import { ExponentialBackoff, handleAll, retry } from 'cockatiel'; +import { + bulkhead, + ExponentialBackoff, + handleAll, + isBrokenCircuitError, + isTaskCancelledError, + retry, + timeout, + TimeoutStrategy, + wrap, +} from 'cockatiel'; import first from 'it-first'; import map from 'it-map'; import { pipe } from 'it-pipe'; -import { AbortError, raceSignal } from 'race-signal'; import RoseNetNodeContext from '../context/RoseNetNodeContext'; import streamService from '../stream/stream-service'; import { encode } from '../utils/codec'; -import RoseNetDirectAckError, { - AckError, -} from '../errors/RoseNetDirectAckError'; +import RoseNetDirectError, { + RoseNetDirectErrorType, +} from '../errors/RoseNetDirectError'; import RoseNetNodeError from '../errors/RoseNetNodeError'; import { ACK_BYTE, - ACK_TIMEOUT, + MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED, + MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE, MESSAGE_RETRY_ATTEMPTS, - MESSAGE_RETRY_EXPONENT, MESSAGE_RETRY_INITIAL_DELAY, + MESSAGE_ROUNDTRIP_TIMEOUT, } from '../constants'; /** @@ -29,30 +39,36 @@ import { */ const sendMessageFactory = (node: Libp2p) => async (to: string, message: string) => { - let stream; + let stream: Stream | undefined; try { stream = await streamService.getRoseNetDirectStreamTo(to, node); - const result = await pipe( + const messageRoundTripTimeout = timeout( + MESSAGE_ROUNDTRIP_TIMEOUT, + TimeoutStrategy.Aggressive, + ); + + const messagePipe = pipe( [message], (source) => map(source, encode), stream, - async (source) => - await raceSignal(first(source), AbortSignal.timeout(ACK_TIMEOUT)), + async (source) => first(source), ); + const result = await messageRoundTripTimeout.execute(() => messagePipe); + if (result?.length !== 1) { - throw new RoseNetDirectAckError( + throw new RoseNetDirectError( `There are more than one chunk in the ack message`, - AckError.InvalidChunks, + RoseNetDirectErrorType.InvalidAckChunks, ); } const ack = result?.subarray(); if (ack.length !== 1 || ack[0] !== ACK_BYTE) { - throw new RoseNetDirectAckError( + throw new RoseNetDirectError( `Ack byte is invalid`, - AckError.InvalidByte, + RoseNetDirectErrorType.InvalidAckByte, ); } @@ -60,15 +76,26 @@ const sendMessageFactory = message, }); } catch (error) { - if (error instanceof AbortError) { - const errorToThrow = new RoseNetDirectAckError( - `Ack was not received`, - AckError.Timeout, + if (isBrokenCircuitError(error)) { + /** + * We were unable to dial, so `stream` is undefined and we don't need to + * abort it + */ + throw new RoseNetNodeError( + `Cannot dial peer ${to} to send message`, + undefined, + error, + ); + } + if (isTaskCancelledError(error)) { + const errorToThrow = new RoseNetDirectError( + 'Message sending timed out', + RoseNetDirectErrorType.Timeout, ); stream?.abort(errorToThrow); throw errorToThrow; } - if (error instanceof RoseNetDirectAckError) { + if (error instanceof RoseNetNodeError) { stream?.abort(error); throw error; } @@ -80,12 +107,17 @@ const sendMessageFactory = } }; +const bulkheadPolicy = bulkhead( + MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED, + MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE, +); + /** * A wrapper around `sendMessageFactory` for retrying failed messages */ -const sendMessageWithRetryFactory = +const sendMessageWithRetryAndBulkheadFactory = (node: Libp2p) => - async ( + ( to: string, message: string, /** @@ -95,41 +127,41 @@ const sendMessageWithRetryFactory = onSettled?: (error?: Error) => Promise, ) => { const sendMessageInner = sendMessageFactory(node); - try { - const retryPolicy = retry(handleAll, { - maxAttempts: MESSAGE_RETRY_ATTEMPTS, - backoff: new ExponentialBackoff({ - exponent: MESSAGE_RETRY_EXPONENT, - initialDelay: MESSAGE_RETRY_INITIAL_DELAY, - maxDelay: 300_000, - }), + const retryPolicy = retry(handleAll, { + maxAttempts: MESSAGE_RETRY_ATTEMPTS, + backoff: new ExponentialBackoff({ + initialDelay: MESSAGE_RETRY_INITIAL_DELAY, + }), + }); + retryPolicy.onFailure((data) => { + RoseNetNodeContext.logger.debug('message sending failed', { + message, + reason: data.reason, }); - retryPolicy.onFailure((data) => { - RoseNetNodeContext.logger.debug('message sending failed', { + }); + retryPolicy.onRetry((data) => { + RoseNetNodeContext.logger.debug( + `retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`, + { message, - reason: data.reason, - }); - }); - retryPolicy.onRetry((data) => { - RoseNetNodeContext.logger.debug( - `retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`, - { - message, - }, - ); - }); - - await retryPolicy.execute(() => sendMessageInner(to, message)); - onSettled?.(); - } catch (error) { - RoseNetNodeContext.logger.error( - 'message sending failed regardless of 3 retries, dropping message', + }, ); - RoseNetNodeContext.logger.debug('message was: ', { - message, + }); + + const wrappedPolicy = wrap(bulkheadPolicy, retryPolicy); + + wrappedPolicy + .execute(() => sendMessageInner(to, message)) + .then(() => onSettled?.()) + .catch(() => { + RoseNetNodeContext.logger.error( + 'message sending failed regardless of 3 retries, dropping message', + ); + RoseNetNodeContext.logger.debug('message was: ', { + message, + }); + onSettled?.(new RoseNetNodeError('Message sending failed')); }); - onSettled?.(new RoseNetNodeError('Message sending failed')); - } }; -export default sendMessageWithRetryFactory; +export default sendMessageWithRetryAndBulkheadFactory; diff --git a/packages/rosenet-node/lib/stream/stream-service.ts b/packages/rosenet-node/lib/stream/stream-service.ts index 9632b44..181c6e8 100644 --- a/packages/rosenet-node/lib/stream/stream-service.ts +++ b/packages/rosenet-node/lib/stream/stream-service.ts @@ -1,10 +1,56 @@ import { peerIdFromString } from '@libp2p/peer-id'; +import { + circuitBreaker, + CircuitBreakerPolicy, + ExponentialBackoff, + handleAll, + isTaskCancelledError, + SamplingBreaker, + timeout, + TimeoutStrategy, +} from 'cockatiel'; import { shuffle } from 'fast-shuffle'; import { Libp2p } from 'libp2p'; import RoseNetNodeContext from '../context/RoseNetNodeContext'; -import { ROSENET_DIRECT_PROTOCOL_V1 } from '../constants'; +import { + ROSENET_DIRECT_STREAM_CREATION_TIMEOUT, + ROSENET_DIRECT_PROTOCOL_V1, +} from '../constants'; + +import { RoseNetNodeError } from '../errors'; + +const peerBreakers = new Proxy>( + {}, + { + get(breakers, peer: string) { + if (peer in breakers) return breakers[peer]; + breakers[peer] = circuitBreaker(handleAll, { + breaker: new SamplingBreaker({ duration: 5000, threshold: 0.7 }), + halfOpenAfter: new ExponentialBackoff({ + initialDelay: 2000, + }), + }); + breakers[peer].onBreak(() => { + RoseNetNodeContext.logger.debug( + `Too many dials to ${peer} are failing, halting upcoming dials temporarily`, + ); + }); + breakers[peer].onHalfOpen(() => { + RoseNetNodeContext.logger.debug( + `Retrying dial to ${peer} for resuming upcoming dials`, + ); + }); + breakers[peer].onReset(() => { + RoseNetNodeContext.logger.debug( + `Retry dial to ${peer} succeeded, resuming upcoming dials`, + ); + }); + return breakers[peer]; + }, + }, +); /** * get a new stream to the `to` remotePeer with ROSENET_DIRECT_PROTOCOL_V1 @@ -20,7 +66,14 @@ async function getRoseNetDirectStreamTo(to: string, node: Libp2p) { const possibleOpenConnectionToPeer = allConnectionsToPeer.find( (connection) => connection.status === 'open', ); - const connection = possibleOpenConnectionToPeer ?? (await node.dial(peerId)); + + /** + * There is a default 5 seconds dial timeout in libp2p, so there is no need to + * add another one here + */ + const connection = + possibleOpenConnectionToPeer ?? + (await peerBreakers[to].execute(() => node.dial(peerId))); RoseNetNodeContext.logger.debug( possibleOpenConnectionToPeer @@ -34,9 +87,21 @@ async function getRoseNetDirectStreamTo(to: string, node: Libp2p) { }, ); - const stream = await connection.newStream(ROSENET_DIRECT_PROTOCOL_V1, { - runOnTransientConnection: true, - }); + const streamCreationTimeout = timeout( + ROSENET_DIRECT_STREAM_CREATION_TIMEOUT, + TimeoutStrategy.Aggressive, + ); + const stream = await streamCreationTimeout + .execute(() => + connection.newStream(ROSENET_DIRECT_PROTOCOL_V1, { + runOnTransientConnection: true, + }), + ) + .catch((error) => { + throw isTaskCancelledError(error) + ? new RoseNetNodeError('Stream creation timed out', undefined, error) + : error; + }); RoseNetNodeContext.logger.debug(`Created a new stream to peer ${to}`, { stream: { diff --git a/packages/rosenet-node/package.json b/packages/rosenet-node/package.json index 673b585..7030a91 100644 --- a/packages/rosenet-node/package.json +++ b/packages/rosenet-node/package.json @@ -57,7 +57,6 @@ "it-map": "^3.0.5", "it-pipe": "^3.0.1", "libp2p": "^1.9.2", - "public-ip": "^6.0.2", - "race-signal": "^1.1.0" + "public-ip": "^6.0.2" } }