From f00c1ed44addb2e80b64fd00576d52470f3f63e1 Mon Sep 17 00:00:00 2001 From: Mohammad Kermani Date: Tue, 24 Sep 2024 15:23:19 +0330 Subject: [PATCH] feat(rosenet-node): limit incoming throughput overally and per peer --- .changeset/long-students-glow.md | 5 ++ packages/rosenet-node/lib/constants.ts | 8 +- .../rosenet-direct/handleIncomingMessage.ts | 88 ++++++++++++++----- .../lib/rosenet-direct/sendMessage.ts | 8 +- 4 files changed, 81 insertions(+), 28 deletions(-) create mode 100644 .changeset/long-students-glow.md diff --git a/.changeset/long-students-glow.md b/.changeset/long-students-glow.md new file mode 100644 index 0000000..ecd28fe --- /dev/null +++ b/.changeset/long-students-glow.md @@ -0,0 +1,5 @@ +--- +'@rosen-bridge/rosenet-node': minor +--- + +Limit the number of concurrent messages that can be handled concurrently diff --git a/packages/rosenet-node/lib/constants.ts b/packages/rosenet-node/lib/constants.ts index 54a8069..75b740a 100644 --- a/packages/rosenet-node/lib/constants.ts +++ b/packages/rosenet-node/lib/constants.ts @@ -7,6 +7,10 @@ export const ACK_BYTE = 1; export const MESSAGE_ROUNDTRIP_TIMEOUT = 1000; export const MESSAGE_RETRY_ATTEMPTS = 5; export const MESSAGE_RETRY_INITIAL_DELAY = 2000; -export const MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED = 1000; -export const MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE = 2000; +export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 1000; +export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000; +export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 100; +export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 200; +export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 1000; +export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000; export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 500; diff --git a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts index 7159d27..c452432 100644 --- a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts +++ b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts @@ -1,3 +1,9 @@ +import { + bulkhead, + BulkheadPolicy, + BulkheadRejectedError, + wrap, +} from 'cockatiel'; import { pipe } from 'it-pipe'; import { Libp2p } from '@libp2p/interface'; @@ -6,7 +12,32 @@ import RoseNetNodeContext from '../context/RoseNetNodeContext'; import { decode } from '../utils/codec'; -import { ACK_BYTE, ROSENET_DIRECT_PROTOCOL_V1 } from '../constants'; +import { + ACK_BYTE, + MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE, + MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER, + MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT, + MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER, + ROSENET_DIRECT_PROTOCOL_V1, +} from '../constants'; + +const messageHandlingBulkhead = bulkhead( + MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT, + MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE, +); +const peerBulkheads = new Proxy>( + {}, + { + get(bulkheads, peer: string) { + if (peer in bulkheads) return bulkheads[peer]; + bulkheads[peer] = bulkhead( + MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER, + MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER, + ); + return bulkheads[peer]; + }, + }, +); /** * protocol handler for RoseNet direct @@ -23,31 +54,44 @@ const handleIncomingMessageFactory = transient: connection.transient, }, ); + const wrappedPolicy = wrap( + messageHandlingBulkhead, + peerBulkheads[connection.remotePeer.toString()], + ); try { - await pipe( - stream, - decode, - async function* (source) { - for await (const message of source) { - RoseNetNodeContext.logger.debug( - 'message received, calling handler and sending ack', - { - message, - }, - ); - handler(connection.remotePeer.toString(), message); - yield Uint8Array.of(ACK_BYTE); - } - }, - stream, - ); + await wrappedPolicy.execute(async () => { + try { + await pipe( + stream, + decode, + async function* (source) { + for await (const message of source) { + RoseNetNodeContext.logger.debug( + 'message received, calling handler and sending ack', + { + message, + }, + ); + handler(connection.remotePeer.toString(), message); + yield Uint8Array.of(ACK_BYTE); + } + }, + stream, + ); + } catch (error) { + RoseNetNodeContext.logger.warn( + 'An error occurred while reading from stream', + { + error, + }, + ); + } + }); } catch (error) { RoseNetNodeContext.logger.warn( - 'An error occurred while reading from stream', - { - error, - }, + 'Maximum message handling threshold reached', ); + stream.abort(error as BulkheadRejectedError); } }, { runOnTransientConnection: true }, diff --git a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts index 59a3dee..7b15e37 100644 --- a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts +++ b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts @@ -26,8 +26,8 @@ import RoseNetNodeError from '../errors/RoseNetNodeError'; import { ACK_BYTE, - MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED, - MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE, + MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT, + MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE, MESSAGE_RETRY_ATTEMPTS, MESSAGE_RETRY_INITIAL_DELAY, MESSAGE_ROUNDTRIP_TIMEOUT, @@ -108,8 +108,8 @@ const sendMessageFactory = }; const bulkheadPolicy = bulkhead( - MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED, - MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE, + MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT, + MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE, ); /**