Skip to content

Commit

Permalink
Merge branch 'feat/configs' into 'dev'
Browse files Browse the repository at this point in the history
feat(rosenet-node): add required configurations to factory function

Closes #84

See merge request ergo/rosen-bridge/rosenet!48
  • Loading branch information
vorujack committed Oct 19, 2024
2 parents c7e109f + b5ff692 commit a05d95c
Show file tree
Hide file tree
Showing 14 changed files with 272 additions and 141 deletions.
5 changes: 5 additions & 0 deletions .changeset/thin-donkeys-sip.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': major
---

Add required configurations to node factory
19 changes: 18 additions & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

47 changes: 28 additions & 19 deletions packages/rosenet-node/lib/constants.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/**
* How many relays to search for and connect to
*/
export const RELAYS_COUNT_TO_CONNECT = 3;
export const DEFAULT_RELAYS_COUNT_TO_CONNECT = 3;
/**
* Minimum relays count below which we try to connect new relays
*/
Expand All @@ -15,6 +15,10 @@ export const RELAY_DISCOVERY_RESTART_INTERVAL = 10_000;
* RoseNet protocol id
*/
export const ROSENET_DIRECT_PROTOCOL_V1 = '/rosenet/direct/1';
/**
* Default host on which libp2p node will listen to
*/
export const DEFAULT_NODE_HOST = '0.0.0.0';
/**
* Default port in which libp2p node will listen to
*/
Expand All @@ -28,85 +32,90 @@ export const ACK_BYTE = 1;
* Maximum time that sending a message (writing bytes and receiving ack) can
* take
*/
export const MESSAGE_ROUNDTRIP_TIMEOUT = 5000;
export const DEFAULT_MESSAGE_ROUNDTRIP_TIMEOUT = 5000;
/**
* Maximum time that handling an incoming RoseNet Direct can take (calling
* handler and writing ack byte)
*/
export const MESSAGE_HANDLING_TIMEOUT = 2000;
export const DEFAULT_MESSAGE_HANDLING_TIMEOUT = 2000;
/**
* The number of times we attempt to re-send failed messages
*/
export const MESSAGE_RETRY_ATTEMPTS = 5;
export const DEFAULT_MESSAGE_RETRY_ATTEMPTS = 5;
/**
* The number of times we attempt to re-send failed messages when in fail fast
* mode
*/
export const FAIL_FAST_MESSAGE_RETRY_ATTEMPTS = 1;
export const DEFAULT_FAIL_FAST_MESSAGE_RETRY_ATTEMPTS = 1;
/**
* Initial delay after which we retry sending a failed message
*/
export const MESSAGE_RETRY_INITIAL_DELAY = 2000;
export const DEFAULT_MESSAGE_RETRY_INITIAL_DELAY = 2000;
/**
* Initial delay after which we retry sending a failed message when in fail fast
* mode
*/
export const FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY = 5000;
export const DEFAULT_FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY = 5000;
/**
* Maximum number of incoming RoseNet Direct messages that can be handled
* concurrently
*/
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 100;
export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 100;
/**
* Maximum number of incoming RoseNet Direct messages that can be queued if
* throughput threshold is reached
*/
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 200;
export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 200;
/**
* Maximum number of incoming RoseNet Direct messages from a single peer that
* can be handled concurrently
*/
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 10;
export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 10;
/**
* Maximum number of incoming RoseNet Direct messages from a single peer that
* can be queued if throughput threshold is reached
*/
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 20;
export const DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 20;
/**
* Maximum number of outgoing RoseNet Direct messages that can be sent
* concurrently
*/
export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 200;
export const DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 200;
/**
* Maximum number of outgoing RoseNet Direct messages that can queued if
* throughput threshold is reached
*/
export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 400;
export const DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 400;
/**
* Maximum number of incoming pubsub messages that can be handled concurrently
*/
export const MAX_INBOUND_PUBSUB_THROUGHPUT = 100;
export const DEFAULT_MAX_INBOUND_PUBSUB_THROUGHPUT = 100;
/**
* Maximum number of incoming pubsub messages that can be queued if throughput
* threshold is reached
*/
export const MAX_INBOUND_PUBSUB_QUEUE_SIZE = 200;
export const DEFAULT_MAX_INBOUND_PUBSUB_QUEUE_SIZE = 200;
/**
* Maximum number of outgoing pubsub messages that can be sent concurrently
*/
export const MAX_OUTBOUND_PUBSUB_THROUGHPUT = 200;
export const DEFAULT_MAX_OUTBOUND_PUBSUB_THROUGHPUT = 200;
/**
* Maximum number of outgoing pubsub messages that can queued if throughput
* threshold is reached
*/
export const MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400;
export const DEFAULT_MAX_OUTBOUND_PUBSUB_QUEUE_SIZE = 400;
/**
* Maximum time that creation of a stream (including protocol negotiations and
* upgrade) can take
*/
export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000;
export const DEFAULT_ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 2000;
/**
* Threshold for enabling fail fast when sending messages, resulting in fewer
* retry attempts to prevent the queues from becoming full
*/
export const FAIL_FAST_THRESHOLD = MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT / 4;
export const DEFAULT_FAIL_FAST_THRESHOLD =
DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT / 4;
/**
* Default gossipsubMaxInboundDataLength config of Gossipsub
*/
export const DEFAULT_GOSSIPSUB_MAX_INBOUND_DATA_LENGTH = 170_000_000;
75 changes: 73 additions & 2 deletions packages/rosenet-node/lib/context/RoseNetNodeContext.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,80 @@
import { AbstractLogger } from '@rosen-bridge/logger-interface';
import merge from 'lodash.merge';

import { PartialRoseNetNodeConfig, RoseNetNodeConfig } from '../types';

import {
DEFAULT_FAIL_FAST_MESSAGE_RETRY_ATTEMPTS,
DEFAULT_FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY,
DEFAULT_FAIL_FAST_THRESHOLD,
DEFAULT_GOSSIPSUB_MAX_INBOUND_DATA_LENGTH,
DEFAULT_MAX_INBOUND_PUBSUB_QUEUE_SIZE,
DEFAULT_MAX_INBOUND_PUBSUB_THROUGHPUT,
DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE,
DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER,
DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT,
DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER,
DEFAULT_MAX_OUTBOUND_PUBSUB_QUEUE_SIZE,
DEFAULT_MAX_OUTBOUND_PUBSUB_THROUGHPUT,
DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE,
DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT,
DEFAULT_MESSAGE_HANDLING_TIMEOUT,
DEFAULT_MESSAGE_RETRY_ATTEMPTS,
DEFAULT_MESSAGE_RETRY_INITIAL_DELAY,
DEFAULT_MESSAGE_ROUNDTRIP_TIMEOUT,
DEFAULT_NODE_HOST,
DEFAULT_NODE_PORT,
DEFAULT_RELAYS_COUNT_TO_CONNECT,
DEFAULT_ROSENET_DIRECT_STREAM_CREATION_TIMEOUT,
} from '../constants';

const RoseNetNodeContext = {
logger: console as AbstractLogger,
init(logger: AbstractLogger) {
this.logger = logger;
config: {} as RoseNetNodeConfig,
init(config: PartialRoseNetNodeConfig) {
const defaultConfig = {
direct: {
roundTripTimeout: DEFAULT_MESSAGE_ROUNDTRIP_TIMEOUT,
handlingTimeout: DEFAULT_MESSAGE_HANDLING_TIMEOUT,
maxRetryAttempts: DEFAULT_MESSAGE_RETRY_ATTEMPTS,
failFastMaxRetryAttempts: DEFAULT_FAIL_FAST_MESSAGE_RETRY_ATTEMPTS,
retryInitialDelay: DEFAULT_MESSAGE_RETRY_INITIAL_DELAY,
failFastRetryInitialDelay:
DEFAULT_FAIL_FAST_MESSAGE_RETRY_INITIAL_DELAY,
maxInboundThroughput: DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT,
maxInboundThroughputPerPeer:
DEFAULT_MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER,
maxInboundQueueSize: DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE,
maxInboundQueueSizePerPeer:
DEFAULT_MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER,
maxOutboundThroughput: DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT,
maxOutboundQueueSize: DEFAULT_MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE,
streamCreationTimeout: DEFAULT_ROSENET_DIRECT_STREAM_CREATION_TIMEOUT,
failFastThreshold: DEFAULT_FAIL_FAST_THRESHOLD,
},
host: DEFAULT_NODE_HOST,
port: DEFAULT_NODE_PORT,
pubsub: {
maxInboundThroughput: DEFAULT_MAX_INBOUND_PUBSUB_THROUGHPUT,
maxInboundQueueSize: DEFAULT_MAX_INBOUND_PUBSUB_QUEUE_SIZE,
maxOutboundThroughput: DEFAULT_MAX_OUTBOUND_PUBSUB_THROUGHPUT,
maxOutboundQueueSize: DEFAULT_MAX_OUTBOUND_PUBSUB_QUEUE_SIZE,
gossipsubMaxInboundDataLength:
DEFAULT_GOSSIPSUB_MAX_INBOUND_DATA_LENGTH,
gossipsubSignaturePolicy: 'StrictNoSign',
},
relay: {
sampleSize: DEFAULT_RELAYS_COUNT_TO_CONNECT,
multiaddrs: [],
},
logger: console,
whitelist: [],
debug: {
libp2pComponents: [],
},
};
this.config = merge(defaultConfig, config);
this.logger = this.config.logger;
},
};

Expand Down
60 changes: 37 additions & 23 deletions packages/rosenet-node/lib/createRoseNetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,53 +33,62 @@ import sample from './utils/sample';

import RoseNetNodeError from './errors/RoseNetNodeError';

import { DEFAULT_NODE_PORT, RELAYS_COUNT_TO_CONNECT } from './constants';

import packageJson from '../package.json' with { type: 'json' };

import { RoseNetNodeConfig } from './types';
import { PartialRoseNetNodeConfig } from './types';

const createRoseNetNode = async (config: PartialRoseNetNodeConfig) => {
RoseNetNodeContext.init(config);
RoseNetNodeContext.logger.debug('RoseNet node config got prepared', {
config: {
direct: RoseNetNodeContext.config.direct,
pubsub: RoseNetNodeContext.config.pubsub,
host: RoseNetNodeContext.config.host,
port: RoseNetNodeContext.config.port,
relay: RoseNetNodeContext.config.relay,
whitelist: RoseNetNodeContext.config.whitelist,
debug: RoseNetNodeContext.config.debug,
},
});

const createRoseNetNode = async ({
logger,
port = DEFAULT_NODE_PORT,
...config
}: RoseNetNodeConfig) => {
if (!config.relayMultiaddrs.length) {
if (!RoseNetNodeContext.config.relay.multiaddrs.length) {
throw new RoseNetNodeError('Cannot start a RoseNet node without a relay');
}

RoseNetNodeContext.init(logger);

/**
* return if a peer is unauthorized, i.e. not whitelisted
* @param peerId
*/
const isPeerUnauthorized = (peerId: PeerId) =>
!peerId || !config.whitelist!.includes(peerId.toString());
!peerId || !RoseNetNodeContext.config.whitelist.includes(peerId.toString());

const peerId = await privateKeyToPeerId(config.privateKey);
const peerId = await privateKeyToPeerId(RoseNetNodeContext.config.privateKey);

RoseNetNodeContext.logger.debug(`PeerId ${peerId.toString()} generated`);

const announceMultiaddr = await addressService.getAnnounceMultiaddr(port);
logger.info(`${announceMultiaddr} set as announce multiaddr`);
const announceMultiaddr = await addressService.getAnnounceMultiaddr(
RoseNetNodeContext.config.port,
);
RoseNetNodeContext.logger.info(
`${announceMultiaddr} set as announce multiaddr`,
);

const sampledRelayMultiaddrs = sample(
config.relayMultiaddrs,
RELAYS_COUNT_TO_CONNECT,
RoseNetNodeContext.config.relay.multiaddrs,
RoseNetNodeContext.config.relay.sampleSize,
);

const node = await createLibp2p({
peerId,
transports: [
circuitRelayTransport({
discoverRelays: RELAYS_COUNT_TO_CONNECT,
discoverRelays: RoseNetNodeContext.config.relay.sampleSize,
}),
tcp(),
],
addresses: {
listen: [
`/ip4/0.0.0.0/tcp/${port}`,
`/ip4/${RoseNetNodeContext.config.host}/tcp/${RoseNetNodeContext.config.port}`,
...sampledRelayMultiaddrs.map(
(multiaddr) => `${multiaddr}/p2p-circuit`,
),
Expand All @@ -93,7 +102,7 @@ const createRoseNetNode = async ({
},
connectionEncryption: [noise()],
connectionGater: {
...(config.whitelist && {
...(RoseNetNodeContext.config.whitelist.length && {
denyInboundEncryptedConnection: isPeerUnauthorized,
denyInboundRelayedConnection: (
relayPeerId: PeerId,
Expand Down Expand Up @@ -128,14 +137,19 @@ const createRoseNetNode = async ({
* maximum of around 5000*100KB=500MB is received in 3 heartbeats from
* a single stream, which is 500MB/3≃170MB.
*/
maxInboundDataLength: 170_000_000, // 170MB
globalSignaturePolicy: 'StrictNoSign',
maxInboundDataLength:
RoseNetNodeContext.config.pubsub.gossipsubMaxInboundDataLength,
globalSignaturePolicy:
RoseNetNodeContext.config.pubsub.gossipsubSignaturePolicy,
ignoreDuplicatePublishError: true,
}),
dcutr: dcutr(),
restartRelayDiscovery,
},
logger: libp2pLoggerFactory(logger, config.debug?.libp2pComponents ?? []),
logger: libp2pLoggerFactory(
RoseNetNodeContext.logger,
RoseNetNodeContext.config.debug.libp2pComponents,
),
});
RoseNetNodeContext.logger.info('RoseNet node created');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ class RestartRelayDiscovery implements Startable {
await this.breaker.execute(this.restartRelayDiscoveryIfNeeded);
} catch (error) {
if (error instanceof BrokenCircuitError) {
// log error
this.logger('libp2p:restart-relay-discovery circuit is open');
} else {
throw error;
Expand Down
Loading

0 comments on commit a05d95c

Please sign in to comment.