Skip to content

Commit

Permalink
Merge branch 'feat/message-retry' into 'dev'
Browse files Browse the repository at this point in the history
feat(rosenet-node): implement message retry

Closes #67

See merge request ergo/rosen-bridge/rosenet!34
  • Loading branch information
vorujack committed Sep 12, 2024
2 parents f82b84b + 72af986 commit 9ba8cd6
Show file tree
Hide file tree
Showing 9 changed files with 242 additions and 118 deletions.
5 changes: 5 additions & 0 deletions .changeset/small-grapes-pump.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@rosen-bridge/rosenet-node": major
---

Retry failed direct messages
3 changes: 3 additions & 0 deletions packages/rosenet-node/lib/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,3 +9,6 @@ export const ACK_BYTE = 1;
* and an ack requires 1 RTT. We use 2 RTT as the timeout for an ack.
*/
export const ACK_TIMEOUT = 500;
export const MESSAGE_RETRY_ATTEMPTS = 3;
export const MESSAGE_RETRY_EXPONENT = 5;
export const MESSAGE_RETRY_INITIAL_DELAY = 5000;
129 changes: 11 additions & 118 deletions packages/rosenet-node/lib/createRoseNetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,7 @@ import { identify } from '@libp2p/identify';
import { PeerId } from '@libp2p/interface';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { tcp } from '@libp2p/tcp';
import { AbortError, raceSignal } from 'race-signal';

import first from 'it-first';
import map from 'it-map';
import { pipe } from 'it-pipe';
import { createLibp2p } from 'libp2p';

import {
Expand All @@ -20,36 +16,28 @@ import {
privateKeyToPeerId,
} from '@rosen-bridge/rosenet-utils';

import {
handleIncomingMessageFactory,
sendMessageFactory,
} from './rosenet-direct';
import { publishFactory, subscribeFactory } from './rosenet-pubsub';

import RoseNetNodeContext from './context/RoseNetNodeContext';

import restartRelayDiscovery from './libp2p/restart-relay-discovery';

import addressService from './address/address-service';
import streamService from './stream/stream-service';

import { decode, encode } from './utils/codec';
import sample from './utils/sample';

import RoseNetNodeError from './errors/RoseNetNodeError';
import RoseNetDirectAckError, {
AckError,
} from './errors/RoseNetDirectAckError';

import {
ACK_BYTE,
ACK_TIMEOUT,
DEFAULT_NODE_PORT,
RELAYS_COUNT_TO_CONNECT,
ROSENET_DIRECT_PROTOCOL_V1,
} from './constants';
import { DEFAULT_NODE_PORT, RELAYS_COUNT_TO_CONNECT } from './constants';

import packageJson from '../package.json' with { type: 'json' };

import { RoseNetNodeConfig } from './types';

const textEncoder = new TextEncoder();
const textDecoder = new TextDecoder();

const createRoseNetNode = async ({
logger,
port = DEFAULT_NODE_PORT,
Expand Down Expand Up @@ -151,105 +139,10 @@ const createRoseNetNode = async ({

return {
start: async () => node.start(),
sendMessage: async (to: string, message: string) => {
let stream;

try {
stream = await streamService.getRoseNetDirectStreamTo(to, node);

const result = await pipe(
[message],
(source) => map(source, encode),
stream,
async (source) =>
await raceSignal(first(source), AbortSignal.timeout(ACK_TIMEOUT)),
);

if (result?.length !== 1) {
throw new RoseNetDirectAckError(
`There are more than one chunk in the ack message`,
AckError.InvalidChunks,
);
}
const ack = result?.subarray();
if (ack.length !== 1 || ack[0] !== ACK_BYTE) {
throw new RoseNetDirectAckError(
`Ack byte is invalid`,
AckError.InvalidByte,
);
}

RoseNetNodeContext.logger.debug('message sent successfully', {
message,
});
} catch (error) {
if (error instanceof AbortError) {
const errorToThrow = new RoseNetDirectAckError(
`Ack was not received`,
AckError.Timeout,
);
stream?.abort(errorToThrow);
throw errorToThrow;
}
if (error instanceof RoseNetDirectAckError) {
stream?.abort(error);
throw error;
}
const errorToThrow = new RoseNetNodeError(
`An unknown error occured: ${error instanceof Error ? error.message : error}`,
);
stream?.abort(errorToThrow);
throw error;
}
},
handleIncomingMessage: (
handler: (from: string, message?: string) => void,
) => {
node.handle(
ROSENET_DIRECT_PROTOCOL_V1,
async ({ connection, stream }) => {
RoseNetNodeContext.logger.debug(
`incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
{
remoteAddress: connection.remoteAddr.toString(),
transient: connection.transient,
},
);
pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
);
},
{ runOnTransientConnection: true },
);
RoseNetNodeContext.logger.debug(
`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
);
},
publish: async (topic: string, message: string) => {
node.services.pubsub.publish(topic, textEncoder.encode(message));
},
subscribe: async (topic: string, handler: (message: string) => void) => {
node.services.pubsub.subscribe(topic);
node.services.pubsub.addEventListener('message', (event) => {
if (event.detail.topic === topic) {
handler(textDecoder.decode(event.detail.data));
}
});
},
sendMessage: sendMessageFactory(node),
handleIncomingMessage: handleIncomingMessageFactory(node),
publish: publishFactory(node),
subscribe: subscribeFactory(node),
};
};

Expand Down
51 changes: 51 additions & 0 deletions packages/rosenet-node/lib/rosenet-direct/handleIncomingMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import { pipe } from 'it-pipe';

import { Libp2p } from '@libp2p/interface';

import RoseNetNodeContext from '../context/RoseNetNodeContext';

import { decode } from '../utils/codec';

import { ACK_BYTE, ROSENET_DIRECT_PROTOCOL_V1 } from '../constants';

/**
* protocol handler for RoseNet direct
*/
const handleIncomingMessageFactory =
(node: Libp2p) => (handler: (from: string, message?: string) => void) => {
node.handle(
ROSENET_DIRECT_PROTOCOL_V1,
async ({ connection, stream }) => {
RoseNetNodeContext.logger.debug(
`incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
{
remoteAddress: connection.remoteAddr.toString(),
transient: connection.transient,
},
);
pipe(
stream,
decode,
async function* (source) {
for await (const message of source) {
RoseNetNodeContext.logger.debug(
'message received, calling handler and sending ack',
{
message,
},
);
handler(connection.remotePeer.toString(), message);
yield Uint8Array.of(ACK_BYTE);
}
},
stream,
);
},
{ runOnTransientConnection: true },
);
RoseNetNodeContext.logger.debug(
`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
);
};

export default handleIncomingMessageFactory;
2 changes: 2 additions & 0 deletions packages/rosenet-node/lib/rosenet-direct/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { default as handleIncomingMessageFactory } from './handleIncomingMessage';
export { default as sendMessageFactory } from './sendMessage';
135 changes: 135 additions & 0 deletions packages/rosenet-node/lib/rosenet-direct/sendMessage.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
import { Libp2p } from '@libp2p/interface';

import { ExponentialBackoff, handleAll, retry } from 'cockatiel';
import first from 'it-first';
import map from 'it-map';
import { pipe } from 'it-pipe';
import { AbortError, raceSignal } from 'race-signal';

import RoseNetNodeContext from '../context/RoseNetNodeContext';
import streamService from '../stream/stream-service';
import { encode } from '../utils/codec';

import RoseNetDirectAckError, {
AckError,
} from '../errors/RoseNetDirectAckError';
import RoseNetNodeError from '../errors/RoseNetNodeError';

import {
ACK_BYTE,
ACK_TIMEOUT,
MESSAGE_RETRY_ATTEMPTS,
MESSAGE_RETRY_EXPONENT,
MESSAGE_RETRY_INITIAL_DELAY,
} from '../constants';

/**
* A factory returning a function to send a message to a specific peer via
* RoseNet direct protocol
*/
const sendMessageFactory =
(node: Libp2p) => async (to: string, message: string) => {
let stream;

try {
stream = await streamService.getRoseNetDirectStreamTo(to, node);

const result = await pipe(
[message],
(source) => map(source, encode),
stream,
async (source) =>
await raceSignal(first(source), AbortSignal.timeout(ACK_TIMEOUT)),
);

if (result?.length !== 1) {
throw new RoseNetDirectAckError(
`There are more than one chunk in the ack message`,
AckError.InvalidChunks,
);
}
const ack = result?.subarray();
if (ack.length !== 1 || ack[0] !== ACK_BYTE) {
throw new RoseNetDirectAckError(
`Ack byte is invalid`,
AckError.InvalidByte,
);
}

RoseNetNodeContext.logger.debug('message sent successfully', {
message,
});
} catch (error) {
if (error instanceof AbortError) {
const errorToThrow = new RoseNetDirectAckError(
`Ack was not received`,
AckError.Timeout,
);
stream?.abort(errorToThrow);
throw errorToThrow;
}
if (error instanceof RoseNetDirectAckError) {
stream?.abort(error);
throw error;
}
const errorToThrow = new RoseNetNodeError(
`An unknown error occured: ${error instanceof Error ? error.message : error}`,
);
stream?.abort(errorToThrow);
throw error;
}
};

/**
* A wrapper around `sendMessageFactory` for retrying failed messages
*/
const sendMessageWithRetryFactory =
(node: Libp2p) =>
async (
to: string,
message: string,
/**
* an optional callback that is called with an error if the message sending
* fails after enough retrials, and with no arguments otherwise
*/
onSettled?: (error?: Error) => Promise<void>,
) => {
const sendMessageInner = sendMessageFactory(node);
try {
const retryPolicy = retry(handleAll, {
maxAttempts: MESSAGE_RETRY_ATTEMPTS,
backoff: new ExponentialBackoff({
exponent: MESSAGE_RETRY_EXPONENT,
initialDelay: MESSAGE_RETRY_INITIAL_DELAY,
maxDelay: 300_000,
}),
});
retryPolicy.onFailure((data) => {
RoseNetNodeContext.logger.debug('message sending failed', {
message,
reason: data.reason,
});
});
retryPolicy.onRetry((data) => {
RoseNetNodeContext.logger.debug(
`retry sending message (attempt #${data.attempt}/${MESSAGE_RETRY_ATTEMPTS})`,
{
message,
},
);
});

await retryPolicy.execute(() => sendMessageInner(to, message));
onSettled?.();
} catch (error) {
RoseNetNodeContext.logger.error(
'message sending failed regardless of 3 retries, dropping message',
);
RoseNetNodeContext.logger.debug('message was: ', {
message,
});
onSettled?.(new RoseNetNodeError('Message sending failed'));
}
};

export default sendMessageWithRetryFactory;
2 changes: 2 additions & 0 deletions packages/rosenet-node/lib/rosenet-pubsub/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
export { default as publishFactory } from './publish';
export { default as subscribeFactory } from './subscribe';
14 changes: 14 additions & 0 deletions packages/rosenet-node/lib/rosenet-pubsub/publish.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
import { Libp2p, PubSub } from '@libp2p/interface';

const textEncoder = new TextEncoder();

/**
* factory for libp2p publish
*/
const publishFactory =
(node: Libp2p<{ pubsub: PubSub }>) =>
async (topic: string, message: string) => {
node.services.pubsub.publish(topic, textEncoder.encode(message));
};

export default publishFactory;
Loading

0 comments on commit 9ba8cd6

Please sign in to comment.