Skip to content

Commit

Permalink
Merge branch 'feat/receive-message-limits' into 'dev'
Browse files Browse the repository at this point in the history
feat(rosenet-node): limit incoming throughput overally and per peer

Closes #79

See merge request ergo/rosen-bridge/rosenet!39
  • Loading branch information
vorujack committed Sep 25, 2024
2 parents de119a1 + f00c1ed commit 61043f7
Show file tree
Hide file tree
Showing 4 changed files with 81 additions and 28 deletions.
5 changes: 5 additions & 0 deletions .changeset/long-students-glow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': minor
---

Limit the number of concurrent messages that can be handled concurrently
8 changes: 6 additions & 2 deletions packages/rosenet-node/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,10 @@ export const ACK_BYTE = 1;
export const MESSAGE_ROUNDTRIP_TIMEOUT = 1000;
export const MESSAGE_RETRY_ATTEMPTS = 5;
export const MESSAGE_RETRY_INITIAL_DELAY = 2000;
export const MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED = 1000;
export const MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE = 2000;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT = 1000;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000;
export const MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER = 100;
export const MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER = 200;
export const MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT = 1000;
export const MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE = 2000;
export const ROSENET_DIRECT_STREAM_CREATION_TIMEOUT = 500;
88 changes: 66 additions & 22 deletions packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
import {
bulkhead,
BulkheadPolicy,
BulkheadRejectedError,
wrap,
} from 'cockatiel';
import { pipe } from 'it-pipe';

import { Libp2p } from '@libp2p/interface';
Expand All @@ -6,7 +12,32 @@ import RoseNetNodeContext from '../context/RoseNetNodeContext';

import { decode } from '../utils/codec';

import { ACK_BYTE, ROSENET_DIRECT_PROTOCOL_V1 } from '../constants';
import {
ACK_BYTE,
MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE,
MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER,
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT,
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER,
ROSENET_DIRECT_PROTOCOL_V1,
} from '../constants';

const messageHandlingBulkhead = bulkhead(
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT,
MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE,
);
const peerBulkheads = new Proxy<Record<string, BulkheadPolicy>>(
{},
{
get(bulkheads, peer: string) {
if (peer in bulkheads) return bulkheads[peer];
bulkheads[peer] = bulkhead(
MAX_INBOUND_ROSENET_DIRECT_THROUGHPUT_PER_PEER,
MAX_INBOUND_ROSENET_DIRECT_QUEUE_SIZE_PER_PEER,
);
return bulkheads[peer];
},
},
);

/**
* protocol handler for RoseNet direct
Expand All @@ -23,31 +54,44 @@ const handleIncomingMessageFactory =
transient: connection.transient,
},
);
const wrappedPolicy = wrap(
messageHandlingBulkhead,
peerBulkheads[connection.remotePeer.toString()],
);
try {
await pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
);
await wrappedPolicy.execute(async () => {
try {
await pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
);
} catch (error) {
RoseNetNodeContext.logger.warn(
'An error occurred while reading from stream',
{
error,
},
);
}
});
} catch (error) {
RoseNetNodeContext.logger.warn(
'An error occurred while reading from stream',
{
error,
},
'Maximum message handling threshold reached',
);
stream.abort(error as BulkheadRejectedError);
}
},
{ runOnTransientConnection: true },
Expand Down
8 changes: 4 additions & 4 deletions packages/rosenet-node/lib/rosenet-direct/sendMessage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ import RoseNetNodeError from '../errors/RoseNetNodeError';

import {
ACK_BYTE,
MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED,
MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE,
MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT,
MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE,
MESSAGE_RETRY_ATTEMPTS,
MESSAGE_RETRY_INITIAL_DELAY,
MESSAGE_ROUNDTRIP_TIMEOUT,
Expand Down Expand Up @@ -108,8 +108,8 @@ const sendMessageFactory =
};

const bulkheadPolicy = bulkhead(
MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_ALLOWED,
MAX_CONCURRENT_ROSENET_DIRECT_MESSAGES_QUEUE_SIZE,
MAX_OUTBOUND_ROSENET_DIRECT_THROUGHPUT,
MAX_OUTBOUND_ROSENET_DIRECT_QUEUE_SIZE,
);

/**
Expand Down

0 comments on commit 61043f7

Please sign in to comment.