diff --git a/.changeset/thin-donkeys-sip.md b/.changeset/thin-donkeys-sip.md new file mode 100644 index 0000000..27cdb6a --- /dev/null +++ b/.changeset/thin-donkeys-sip.md @@ -0,0 +1,5 @@ +--- +'@rosen-bridge/rosenet-node': major +--- + +Add required configurations to node factory diff --git a/package-lock.json b/package-lock.json index e4503e1..3d935c2 100644 --- a/package-lock.json +++ b/package-lock.json @@ -2542,6 +2542,21 @@ "integrity": "sha512-5+fP8P8MFNC+AyZCDxrB2pkZFPGzqQWUzpSeuuVLvm8VMcorNYavBqoFcxK8bQz4Qsbn4oUEEem4wDLfcysGHA==", "dev": true }, + "node_modules/@types/lodash": { + "version": "4.17.10", + "resolved": "https://registry.npmjs.org/@types/lodash/-/lodash-4.17.10.tgz", + "integrity": "sha512-YpS0zzoduEhuOWjAotS6A5AVCva7X4lVlYLF0FYHAY9sdraBfnatttHItlWeZdGhuEkf+OzMNg2ZYAx8t+52uQ==", + "dev": true + }, + "node_modules/@types/lodash.merge": { + "version": "4.6.9", + "resolved": "https://registry.npmjs.org/@types/lodash.merge/-/lodash.merge-4.6.9.tgz", + "integrity": "sha512-23sHDPmzd59kUgWyKGiOMO2Qb9YtqRO/x4IhkgNUiPQ1+5MUVqi6bCZeq9nBJ17msjIMbEIO5u+XW4Kz6aGUhQ==", + "dev": true, + "dependencies": { + "@types/lodash": "*" + } + }, "node_modules/@types/minimist": { "version": "1.2.5", "resolved": "https://registry.npmjs.org/@types/minimist/-/minimist-1.2.5.tgz", @@ -6105,7 +6120,6 @@ }, "node_modules/lodash.merge": { "version": "4.6.2", - "dev": true, "license": "MIT" }, "node_modules/lodash.startcase": { @@ -9159,9 +9173,11 @@ "it-map": "^3.0.5", "it-pipe": "^3.0.1", "libp2p": "^1.9.2", + "lodash.merge": "^4.6.2", "public-ip": "^6.0.2" }, "devDependencies": { + "@types/lodash.merge": "^4.6.9", "@types/node": "^20.11.9", "@typescript-eslint/eslint-plugin": "^6.19.1", "@typescript-eslint/parser": "^6.19.1", @@ -9952,6 +9968,7 @@ } }, "tests/scale": { + "name": "@rosenet-tests/scale", "version": "0.0.0", "license": "GPL-3.0", "dependencies": { diff --git a/packages/rosenet-node/lib/constants.ts b/packages/rosenet-node/lib/constants.ts index 2c75300..6f874ae 100644 --- a/packages/rosenet-node/lib/constants.ts +++ b/packages/rosenet-node/lib/constants.ts @@ -1,7 +1,7 @@ /** * How many relays to search for and connect to */ -export const RELAYS_COUNT_TO_CONNECT = 3; +export const DEFAULT_RELAYS_COUNT_TO_CONNECT = 3; /** * Minimum relays count below which we try to connect new relays */ @@ -15,6 +15,10 @@ export const RELAY_DISCOVERY_RESTART_INTERVAL = 10_000; * RoseNet protocol id */ export const ROSENET_DIRECT_PROTOCOL_V1 = '/rosenet/direct/1'; +/** + * Default host on which libp2p node will listen to + */ +export const DEFAULT_NODE_HOST = '0.0.0.0'; /** * Default port in which libp2p node will listen to */ @@ -28,85 +32,90 @@ export const ACK_BYTE = 1; * Maximum time that sending a message (writing bytes and receiving ack) can * take */ -export const MESSAGE_ROUNDTRIP_TIMEOUT = 5000; +export const DEFAULT_MESSAGE_ROUNDTRIP_TIMEOUT = 5000; /** * Maximum time that handling an incoming RoseNet Direct can take (calling * handler and writing ack byte) */ -export const MESSAGE_HANDLING_TIMEOUT = 2000; +export const DEFAULT_MESSAGE_HANDLING_TIMEOUT = 2000; /** * The number of times we attempt to re-send failed messages */ -export const MESSAGE_RETRY_ATTEMPTS = 5; +export const DEFAULT_MESSAGE_RETRY_ATTEMPTS = 5; /** * The number of times we attempt to re-send failed messages when in fail fast * mode */ -export const FAIL_FAST_MESSAGE_RETRY_ATTEMPTS = 1; +export const DEFAULT_FAIL_FAST_MESSAGE_RETRY_ATTEMPTS = 1; /** * Initial delay after which we retry sending a failed message */ -export const MESSAGE_RETRY_INITIAL_DELAY = 2000; +export const DEFAULT_MESSAGE_RETRY_INITIAL_DELAY = 2000; /** * Initial delay after which we retry sending a failed message when in fail fast * mode */ -export const FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY = 5000; +export const DEFAULT_FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY = 5000; /** * Maximum number of incoming RoseNet Direct messages that can be handled * concurrently */ -export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 100; +export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 100; /** * Maximum number of incoming RoseNet Direct messages that can be queued if * throughput threshold is reached */ -export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 200; +export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 200; /** * Maximum number of incoming RoseNet Direct messages from a single peer that * can be handled concurrently */ -export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 10; +export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 10; /** * Maximum number of incoming RoseNet Direct messages from a single peer that * can be queued if throughput threshold is reached */ -export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 20; +export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 20; /** * Maximum number of outgoing RoseNet Direct messages that can be sent * concurrently */ -export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 200; +export const DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 200; /** * Maximum number of outgoing RoseNet Direct messages that can queued if * throughput threshold is reached */ -export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 400; +export const DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 400; /** * Maximum number of incoming pubsub messages that can be handled concurrently */ -export const MAX_INBOUND_PUBSUB_THROUGHPUT = 100; +export const DEFAULT_MAX_INBOUND_PUBSUB_THROUGHPUT = 100; /** * Maximum number of incoming pubsub messages that can be queued if throughput * threshold is reached */ -export const MAX_INBOUND_PUBSUB_QUEUE_SIZE = 200; +export const DEFAULT_MAX_INBOUND_PUBSUB_QUEUE_SIZE = 200; /** * Maximum number of outgoing pubsub messages that can be sent concurrently */ -export const MAX_OUTBOUND_PUBSUB_THROUGHPUT = 200; +export const DEFAULT_MAX_OUTBOUND_PUBSUB_THROUGHPUT = 200; /** * Maximum number of outgoing pubsub messages that can queued if throughput * threshold is reached */ -export const MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400; +export const DEFAULT_MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400; /** * Maximum time that creation of a stream (including protocol negotiations and * upgrade) can take */ -export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000; +export const DEFAULT_ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000; /** * Threshold for enabling fail fast when sending messages, resulting in fewer * retry attempts to prevent the queues from becoming full */ -export const FAIL_FAST_THRESHOLD = MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT / 4; +export const DEFAULT_FAIL_FAST_THRESHOLD = + DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT / 4; +/** + * Default gossipsubMaxInboundDataLength config of Gossipsub + */ +export const DEFAULT_GOSSIPSUB_MAX_INBOUND_DATA_LENGTH = 170_000_000; diff --git a/packages/rosenet-node/lib/context/RoseNetNodeContext.ts b/packages/rosenet-node/lib/context/RoseNetNodeContext.ts index 01eb7a5..e56e546 100644 --- a/packages/rosenet-node/lib/context/RoseNetNodeContext.ts +++ b/packages/rosenet-node/lib/context/RoseNetNodeContext.ts @@ -1,9 +1,80 @@ import { AbstractLogger } from '@rosen-bridge/logger-interface'; +import merge from 'lodash.merge'; + +import { PartialRoseNetNodeConfig, RoseNetNodeConfig } from '../types'; + +import { + DEFAULT_FAIL_FAST_MESSAGE_RETRY_ATTEMPTS, + DEFAULT_FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY, + DEFAULT_FAIL_FAST_THRESHOLD, + DEFAULT_GOSSIPSUB_MAX_INBOUND_DATA_LENGTH, + DEFAULT_MAX_INBOUND_PUBSUB_QUEUE_SIZE, + DEFAULT_MAX_INBOUND_PUBSUB_THROUGHPUT, + DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE, + DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER, + DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT, + DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER, + DEFAULT_MAX_OUTBOUND_PUBSUB_QUEUE_SIZE, + DEFAULT_MAX_OUTBOUND_PUBSUB_THROUGHPUT, + DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE, + DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT, + DEFAULT_MESSAGE_HANDLING_TIMEOUT, + DEFAULT_MESSAGE_RETRY_ATTEMPTS, + DEFAULT_MESSAGE_RETRY_INITIAL_DELAY, + DEFAULT_MESSAGE_ROUNDTRIP_TIMEOUT, + DEFAULT_NODE_HOST, + DEFAULT_NODE_PORT, + DEFAULT_RELAYS_COUNT_TO_CONNECT, + DEFAULT_ROSENET_DIRECT_STREAM_CREATION_TIMEOUT, +} from '../constants'; const RoseNetNodeContext = { logger: console as AbstractLogger, - init(logger: AbstractLogger) { - this.logger = logger; + config: {} as RoseNetNodeConfig, + init(config: PartialRoseNetNodeConfig) { + const defaultConfig = { + direct: { + roundTripTimeout: DEFAULT_MESSAGE_ROUNDTRIP_TIMEOUT, + handlingTimeout: DEFAULT_MESSAGE_HANDLING_TIMEOUT, + maxRetryAttempts: DEFAULT_MESSAGE_RETRY_ATTEMPTS, + failFastMaxRetryAttempts: DEFAULT_FAIL_FAST_MESSAGE_RETRY_ATTEMPTS, + retryInitialDelay: DEFAULT_MESSAGE_RETRY_INITIAL_DELAY, + failFastRetryInitialDelay: + DEFAULT_FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY, + maxInboundThroughput: DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT, + maxInboundThroughputPerPeer: + DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER, + maxInboundQueueSize: DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE, + maxInboundQueueSizePerPeer: + DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER, + maxOutboundThroughput: DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT, + maxOutboundQueueSize: DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE, + streamCreationTimeout: DEFAULT_ROSENET_DIRECT_STREAM_CREATION_TIMEOUT, + failFastThreshold: DEFAULT_FAIL_FAST_THRESHOLD, + }, + host: DEFAULT_NODE_HOST, + port: DEFAULT_NODE_PORT, + pubsub: { + maxInboundThroughput: DEFAULT_MAX_INBOUND_PUBSUB_THROUGHPUT, + maxInboundQueueSize: DEFAULT_MAX_INBOUND_PUBSUB_QUEUE_SIZE, + maxOutboundThroughput: DEFAULT_MAX_OUTBOUND_PUBSUB_THROUGHPUT, + maxOutboundQueueSize: DEFAULT_MAX_OUTBOUND_PUBSUB_QUEUE_SIZE, + gossipsubMaxInboundDataLength: + DEFAULT_GOSSIPSUB_MAX_INBOUND_DATA_LENGTH, + gossipsubSignaturePolicy: 'StrictNoSign', + }, + relay: { + sampleSize: DEFAULT_RELAYS_COUNT_TO_CONNECT, + multiaddrs: [], + }, + logger: console, + whitelist: [], + debug: { + libp2pComponents: [], + }, + }; + this.config = merge(defaultConfig, config); + this.logger = this.config.logger; }, }; diff --git a/packages/rosenet-node/lib/createRoseNetNode.ts b/packages/rosenet-node/lib/createRoseNetNode.ts index 65310a1..e364544 100644 --- a/packages/rosenet-node/lib/createRoseNetNode.ts +++ b/packages/rosenet-node/lib/createRoseNetNode.ts @@ -33,53 +33,62 @@ import sample from './utils/sample'; import RoseNetNodeError from './errors/RoseNetNodeError'; -import { DEFAULT_NODE_PORT, RELAYS_COUNT_TO_CONNECT } from './constants'; - import packageJson from '../package.json' with { type: 'json' }; -import { RoseNetNodeConfig } from './types'; +import { PartialRoseNetNodeConfig } from './types'; + +const createRoseNetNode = async (config: PartialRoseNetNodeConfig) => { + RoseNetNodeContext.init(config); + RoseNetNodeContext.logger.debug('RoseNet node config got prepared', { + config: { + direct: RoseNetNodeContext.config.direct, + pubsub: RoseNetNodeContext.config.pubsub, + host: RoseNetNodeContext.config.host, + port: RoseNetNodeContext.config.port, + relay: RoseNetNodeContext.config.relay, + whitelist: RoseNetNodeContext.config.whitelist, + debug: RoseNetNodeContext.config.debug, + }, + }); -const createRoseNetNode = async ({ - logger, - port = DEFAULT_NODE_PORT, - ...config -}: RoseNetNodeConfig) => { - if (!config.relayMultiaddrs.length) { + if (!RoseNetNodeContext.config.relay.multiaddrs.length) { throw new RoseNetNodeError('Cannot start a RoseNet node without a relay'); } - RoseNetNodeContext.init(logger); - /** * return if a peer is unauthorized, i.e. not whitelisted * @param peerId */ const isPeerUnauthorized = (peerId: PeerId) => - !peerId || !config.whitelist!.includes(peerId.toString()); + !peerId || !RoseNetNodeContext.config.whitelist.includes(peerId.toString()); - const peerId = await privateKeyToPeerId(config.privateKey); + const peerId = await privateKeyToPeerId(RoseNetNodeContext.config.privateKey); RoseNetNodeContext.logger.debug(`PeerId ${peerId.toString()} generated`); - const announceMultiaddr = await addressService.getAnnounceMultiaddr(port); - logger.info(`${announceMultiaddr} set as announce multiaddr`); + const announceMultiaddr = await addressService.getAnnounceMultiaddr( + RoseNetNodeContext.config.port, + ); + RoseNetNodeContext.logger.info( + `${announceMultiaddr} set as announce multiaddr`, + ); const sampledRelayMultiaddrs = sample( - config.relayMultiaddrs, - RELAYS_COUNT_TO_CONNECT, + RoseNetNodeContext.config.relay.multiaddrs, + RoseNetNodeContext.config.relay.sampleSize, ); const node = await createLibp2p({ peerId, transports: [ circuitRelayTransport({ - discoverRelays: RELAYS_COUNT_TO_CONNECT, + discoverRelays: RoseNetNodeContext.config.relay.sampleSize, }), tcp(), ], addresses: { listen: [ - `/ip4/0.0.0.0/tcp/${port}`, + `/ip4/${RoseNetNodeContext.config.host}/tcp/${RoseNetNodeContext.config.port}`, ...sampledRelayMultiaddrs.map( (multiaddr) => `${multiaddr}/p2p-circuit`, ), @@ -93,7 +102,7 @@ const createRoseNetNode = async ({ }, connectionEncryption: [noise()], connectionGater: { - ...(config.whitelist && { + ...(RoseNetNodeContext.config.whitelist.length && { denyInboundEncryptedConnection: isPeerUnauthorized, denyInboundRelayedConnection: ( relayPeerId: PeerId, @@ -128,14 +137,19 @@ const createRoseNetNode = async ({ * maximum of around 5000*100KB=500MB is received in 3 heartbeats from * a single stream, which is 500MB/3≃170MB. */ - maxInboundDataLength: 170_000_000, // 170MB - globalSignaturePolicy: 'StrictNoSign', + maxInboundDataLength: + RoseNetNodeContext.config.pubsub.gossipsubMaxInboundDataLength, + globalSignaturePolicy: + RoseNetNodeContext.config.pubsub.gossipsubSignaturePolicy, ignoreDuplicatePublishError: true, }), dcutr: dcutr(), restartRelayDiscovery, }, - logger: libp2pLoggerFactory(logger, config.debug?.libp2pComponents ?? []), + logger: libp2pLoggerFactory( + RoseNetNodeContext.logger, + RoseNetNodeContext.config.debug.libp2pComponents, + ), }); RoseNetNodeContext.logger.info('RoseNet node created'); diff --git a/packages/rosenet-node/lib/libp2p/restart-relay-discovery.ts b/packages/rosenet-node/lib/libp2p/restart-relay-discovery.ts index 6718c4d..3c3348e 100644 --- a/packages/rosenet-node/lib/libp2p/restart-relay-discovery.ts +++ b/packages/rosenet-node/lib/libp2p/restart-relay-discovery.ts @@ -107,7 +107,6 @@ class RestartRelayDiscovery implements Startable { await this.breaker.execute(this.restartRelayDiscoveryIfNeeded); } catch (error) { if (error instanceof BrokenCircuitError) { - // log error this.logger('libp2p:restart-relay-discovery circuit is open'); } else { throw error; diff --git a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts index d48ef62..7f5a84f 100644 --- a/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts +++ b/packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts @@ -14,39 +14,31 @@ import RoseNetNodeContext from '../context/RoseNetNodeContext'; import { decode } from '../utils/codec'; -import { - ACK_BYTE, - MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE, - MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER, - MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT, - MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER, - ROSENET_DIRECT_PROTOCOL_V1, - MESSAGE_HANDLING_TIMEOUT, -} from '../constants'; - -const messageHandlingBulkhead = bulkhead( - MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT, - MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE, -); -const peerBulkheads = new Proxy>( - {}, - { - get(bulkheads, peer: string) { - if (peer in bulkheads) return bulkheads[peer]; - bulkheads[peer] = bulkhead( - MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER, - MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER, - ); - return bulkheads[peer]; - }, - }, -); +import { ACK_BYTE, ROSENET_DIRECT_PROTOCOL_V1 } from '../constants'; /** * protocol handler for RoseNet direct */ -const handleIncomingMessageFactory = - (node: Libp2p) => (handler: (from: string, message?: string) => void) => { +const handleIncomingMessageFactory = (node: Libp2p) => { + const messageHandlingBulkhead = bulkhead( + RoseNetNodeContext.config.direct.maxInboundThroughput, + RoseNetNodeContext.config.direct.maxInboundQueueSize, + ); + const peerBulkheads = new Proxy>( + {}, + { + get(bulkheads, peer: string) { + if (peer in bulkheads) return bulkheads[peer]; + bulkheads[peer] = bulkhead( + RoseNetNodeContext.config.direct.maxInboundThroughputPerPeer, + RoseNetNodeContext.config.direct.maxInboundQueueSizePerPeer, + ); + return bulkheads[peer]; + }, + }, + ); + + return (handler: (from: string, message?: string) => void) => { node.handle( ROSENET_DIRECT_PROTOCOL_V1, async ({ connection, stream }) => { @@ -65,7 +57,7 @@ const handleIncomingMessageFactory = await wrappedPolicy.execute(async () => { try { const messageHandlingTimeout = timeout( - MESSAGE_HANDLING_TIMEOUT, + RoseNetNodeContext.config.direct.handlingTimeout, TimeoutStrategy.Aggressive, ); await messageHandlingTimeout.execute(() => @@ -113,5 +105,6 @@ const handleIncomingMessageFactory = `Handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`, ); }; +}; export default handleIncomingMessageFactory; diff --git a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts index e923efb..f597faf 100644 --- a/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts +++ b/packages/rosenet-node/lib/rosenet-direct/sendMessage.ts @@ -24,17 +24,7 @@ import RoseNetDirectError, { } from '../errors/RoseNetDirectError'; import RoseNetNodeError from '../errors/RoseNetNodeError'; -import { - ACK_BYTE, - FAIL_FAST_MESSAGE_RETRY_ATTEMPTS, - FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY, - FAIL_FAST_THRESHOLD, - MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE, - MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT, - MESSAGE_RETRY_ATTEMPTS, - MESSAGE_RETRY_INITIAL_DELAY, - MESSAGE_ROUNDTRIP_TIMEOUT, -} from '../constants'; +import { ACK_BYTE } from '../constants'; /** * A factory returning a function to send a message to a specific peer via @@ -48,7 +38,7 @@ const sendMessageFactory = stream = await streamService.getRoseNetDirectStreamTo(to, node); const messageRoundTripTimeout = timeout( - MESSAGE_ROUNDTRIP_TIMEOUT, + RoseNetNodeContext.config.direct.roundTripTimeout, TimeoutStrategy.Aggressive, ); @@ -124,17 +114,16 @@ const sendMessageFactory = } }; -const bulkheadPolicy = bulkhead( - MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT, - MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE, -); - /** * A wrapper around `sendMessageFactory` for retrying failed messages */ -const sendMessageWithRetryAndBulkheadFactory = - (node: Libp2p) => - ( +const sendMessageWithRetryAndBulkheadFactory = (node: Libp2p) => { + const bulkheadPolicy = bulkhead( + RoseNetNodeContext.config.direct.maxOutboundThroughput, + RoseNetNodeContext.config.direct.maxOutboundQueueSize, + ); + + return ( to: string, message: string, /** @@ -145,14 +134,17 @@ const sendMessageWithRetryAndBulkheadFactory = ) => { const sendMessageInner = sendMessageFactory(node); - const shouldFailFast = bulkheadPolicy.executionSlots > FAIL_FAST_THRESHOLD; + const shouldFailFast = + bulkheadPolicy.executionSlots > + RoseNetNodeContext.config.direct.failFastThreshold; const maxAttempts = shouldFailFast - ? MESSAGE_RETRY_ATTEMPTS - : FAIL_FAST_MESSAGE_RETRY_ATTEMPTS; + ? RoseNetNodeContext.config.direct.maxRetryAttempts + : RoseNetNodeContext.config.direct.failFastMaxRetryAttempts; + const initialDelay = shouldFailFast - ? MESSAGE_RETRY_INITIAL_DELAY - : FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY; + ? RoseNetNodeContext.config.direct.retryInitialDelay + : RoseNetNodeContext.config.direct.failFastRetryInitialDelay; const retryPolicy = retry(handleAll, { maxAttempts, @@ -168,7 +160,7 @@ const sendMessageWithRetryAndBulkheadFactory = }); retryPolicy.onRetry((data) => { RoseNetNodeContext.logger.debug( - `Retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`, + `Retry sending message (attempt #${data.attempt}/${RoseNetNodeContext.config.direct.maxRetryAttempts})`, { message, }, @@ -194,5 +186,6 @@ const sendMessageWithRetryAndBulkheadFactory = onSettled?.(new RoseNetNodeError('Message sending failed')); }); }; +}; export default sendMessageWithRetryAndBulkheadFactory; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts index 9defb32..bfc872e 100644 --- a/packages/rosenet-node/lib/rosenet-pubsub/publish.ts +++ b/packages/rosenet-node/lib/rosenet-pubsub/publish.ts @@ -3,24 +3,18 @@ import { bulkhead, isBulkheadRejectedError } from 'cockatiel'; import RoseNetNodeContext from '../context/RoseNetNodeContext'; -import { - MAX_OUTBOUND_PUBSUB_QUEUE_SIZE, - MAX_OUTBOUND_PUBSUB_THROUGHPUT, -} from '../constants'; - const textEncoder = new TextEncoder(); -const bulkheadPolicy = bulkhead( - MAX_OUTBOUND_PUBSUB_THROUGHPUT, - MAX_OUTBOUND_PUBSUB_QUEUE_SIZE, -); - /** * factory for libp2p publish */ -const publishFactory = - (node: Libp2p<{ pubsub: PubSub }>) => - async (topic: string, message: string) => { +const publishFactory = (node: Libp2p<{ pubsub: PubSub }>) => { + const bulkheadPolicy = bulkhead( + RoseNetNodeContext.config.pubsub.maxOutboundThroughput, + RoseNetNodeContext.config.pubsub.maxOutboundQueueSize, + ); + + return async (topic: string, message: string) => { try { await bulkheadPolicy.execute(() => node.services.pubsub.publish(topic, textEncoder.encode(message)), @@ -36,5 +30,6 @@ const publishFactory = } } }; +}; export default publishFactory; diff --git a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts index 0674599..0264925 100644 --- a/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts +++ b/packages/rosenet-node/lib/rosenet-pubsub/subscribe.ts @@ -3,24 +3,18 @@ import { bulkhead } from 'cockatiel'; import RoseNetNodeContext from '../context/RoseNetNodeContext'; -import { - MAX_INBOUND_PUBSUB_QUEUE_SIZE, - MAX_INBOUND_PUBSUB_THROUGHPUT, -} from '../constants'; - const textDecoder = new TextDecoder(); -const bulkheadPolicy = bulkhead( - MAX_INBOUND_PUBSUB_THROUGHPUT, - MAX_INBOUND_PUBSUB_QUEUE_SIZE, -); - /** * factory for libp2p subscribe */ -const subscribeFactory = - (node: Libp2p<{ pubsub: PubSub }>) => - async (topic: string, handler: (message: string) => void) => { +const subscribeFactory = (node: Libp2p<{ pubsub: PubSub }>) => { + const bulkheadPolicy = bulkhead( + RoseNetNodeContext.config.pubsub.maxInboundThroughput, + RoseNetNodeContext.config.pubsub.maxInboundQueueSize, + ); + + return async (topic: string, handler: (message: string) => void) => { node.services.pubsub.subscribe(topic); node.services.pubsub.addEventListener('message', async (event) => { try { @@ -41,5 +35,6 @@ const subscribeFactory = }); RoseNetNodeContext.logger.info(`Topic ${topic} subscribed`); }; +}; export default subscribeFactory; diff --git a/packages/rosenet-node/lib/stream/stream-service.ts b/packages/rosenet-node/lib/stream/stream-service.ts index 181c6e8..6c836e8 100644 --- a/packages/rosenet-node/lib/stream/stream-service.ts +++ b/packages/rosenet-node/lib/stream/stream-service.ts @@ -14,10 +14,7 @@ import { Libp2p } from 'libp2p'; import RoseNetNodeContext from '../context/RoseNetNodeContext'; -import { - ROSENET_DIRECT_STREAM_CREATION_TIMEOUT, - ROSENET_DIRECT_PROTOCOL_V1, -} from '../constants'; +import { ROSENET_DIRECT_PROTOCOL_V1 } from '../constants'; import { RoseNetNodeError } from '../errors'; @@ -88,7 +85,7 @@ async function getRoseNetDirectStreamTo(to: string, node: Libp2p) { ); const streamCreationTimeout = timeout( - ROSENET_DIRECT_STREAM_CREATION_TIMEOUT, + RoseNetNodeContext.config.direct.streamCreationTimeout, TimeoutStrategy.Aggressive, ); const stream = await streamCreationTimeout diff --git a/packages/rosenet-node/lib/types.ts b/packages/rosenet-node/lib/types.ts index 51e5488..ed7ca88 100644 --- a/packages/rosenet-node/lib/types.ts +++ b/packages/rosenet-node/lib/types.ts @@ -1,12 +1,52 @@ +import { SignaturePolicy } from '@libp2p/interface'; import { AbstractLogger } from '@rosen-bridge/logger-interface'; export interface RoseNetNodeConfig { - relayMultiaddrs: string[]; - logger: AbstractLogger; privateKey: string; - port?: number; - whitelist?: string[]; - debug?: { - libp2pComponents?: string[]; + logger: AbstractLogger; + port: number; + host: string; + whitelist: string[]; + debug: { + libp2pComponents: string[]; + }; + relay: { + multiaddrs: string[]; + sampleSize: number; + }; + direct: { + roundTripTimeout: number; + handlingTimeout: number; + maxRetryAttempts: number; + failFastMaxRetryAttempts: number; + retryInitialDelay: number; + failFastRetryInitialDelay: number; + maxInboundThroughput: number; + maxInboundThroughputPerPeer: number; + maxInboundQueueSize: number; + maxInboundQueueSizePerPeer: number; + maxOutboundThroughput: number; + maxOutboundQueueSize: number; + streamCreationTimeout: number; + failFastThreshold: number; + }; + pubsub: { + maxInboundThroughput: number; + maxInboundQueueSize: number; + maxOutboundThroughput: number; + maxOutboundQueueSize: number; + gossipsubMaxInboundDataLength: number; + gossipsubSignaturePolicy: SignaturePolicy; }; } + +type RecursivePartial = { + [P in keyof T]?: RecursivePartial; +}; + +type RoseNetNodeConfigMandatoryKeys = 'privateKey'; +export type PartialRoseNetNodeConfig = Pick< + RoseNetNodeConfig, + RoseNetNodeConfigMandatoryKeys +> & + RecursivePartial>; diff --git a/packages/rosenet-node/package.json b/packages/rosenet-node/package.json index 7030a91..4f4986a 100644 --- a/packages/rosenet-node/package.json +++ b/packages/rosenet-node/package.json @@ -18,6 +18,7 @@ "type-check": "tsc --noEmit" }, "devDependencies": { + "@types/lodash.merge": "^4.6.9", "@types/node": "^20.11.9", "@typescript-eslint/eslint-plugin": "^6.19.1", "@typescript-eslint/parser": "^6.19.1", @@ -57,6 +58,7 @@ "it-map": "^3.0.5", "it-pipe": "^3.0.1", "libp2p": "^1.9.2", + "lodash.merge": "^4.6.2", "public-ip": "^6.0.2" } } diff --git a/tests/scale/src/node/node.ts b/tests/scale/src/node/node.ts index 31e7a1b..c3c88ed 100644 --- a/tests/scale/src/node/node.ts +++ b/tests/scale/src/node/node.ts @@ -1,8 +1,9 @@ import { createRoseNetNode } from '@rosen-bridge/rosenet-node'; const node = await createRoseNetNode({ - logger: console, - relayMultiaddrs: process.env.RELAY_MULTIADDRS!.split(','), + relay: { + multiaddrs: process.env.RELAY_MULTIADDRS!.split(','), + }, privateKey: process.env.PRIVATE_KEY!, port: +process.env.PORT!, });