Skip to content

Commit

Permalink
Merge branch 'feat/stream-reuse' into 'dev'
Browse files Browse the repository at this point in the history
feat(rosenet-node): implement stream reuse

Closes #61

See merge request ergo/rosen-bridge/p2p!22
  • Loading branch information
vorujack committed Mar 12, 2024
2 parents 057ee4d + 22c5fdb commit cbb02e4
Show file tree
Hide file tree
Showing 12 changed files with 268 additions and 40 deletions.
5 changes: 5 additions & 0 deletions .changeset/nice-points-relax.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@rosen-bridge/rosenet-node': minor
---

implement stream reuse
32 changes: 32 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions packages/rosenet-node/lib/RoseNetNodeTools.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { AbstractLogger } from '@rosen-bridge/logger-interface';

const RoseNetNodeTools = {
logger: console as AbstractLogger,
init(logger: AbstractLogger) {
this.logger = logger;
},
};

export default RoseNetNodeTools;
4 changes: 1 addition & 3 deletions packages/rosenet-node/lib/codec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
import first from 'it-first';
import * as lp from 'it-length-prefixed';
import map from 'it-map';
import { pipe } from 'it-pipe';
Expand All @@ -16,7 +15,7 @@ const encode = (source: string) =>
pipe(source, textEncoder.encode.bind(textEncoder), lp.encode.single);

/**
* decode a byte array iterable with length prefix into a string promise
* decode a byte array iterable with length prefix into a string iterable
* @param source
*/
const decode = (source: Source<Uint8ArrayList>) =>
Expand All @@ -25,7 +24,6 @@ const decode = (source: Source<Uint8ArrayList>) =>
lp.decode,
(source) => map(source, (message) => message.subarray()),
(source) => map(source, textDecoder.decode.bind(textDecoder)),
first<string>,
);

export { decode, encode };
58 changes: 29 additions & 29 deletions packages/rosenet-node/lib/createRoseNetNode.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ import { circuitRelayTransport } from '@libp2p/circuit-relay-v2';
import { identify } from '@libp2p/identify';
import { PeerId } from '@libp2p/interface';
import { mplex } from '@libp2p/mplex';
import { peerIdFromString } from '@libp2p/peer-id';
import { pubsubPeerDiscovery } from '@libp2p/pubsub-peer-discovery';
import { tcp } from '@libp2p/tcp';
import map from 'it-map';
import { pipe } from 'it-pipe';
import { createLibp2p } from 'libp2p';

Expand All @@ -17,6 +17,8 @@ import {
} from '@rosen-bridge/rosenet-utils';

import { decode, encode } from './codec';
import getStreamAndPushable from './getStreamAndPushable';
import RoseNetNodeTools from './RoseNetNodeTools';

import RoseNetNodeError from './errors/RoseNetNodeError';

Expand All @@ -34,6 +36,8 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
throw new RoseNetNodeError('Cannot start a RoseNet node without a relay');
}

RoseNetNodeTools.init(logger);

/**
* return if a peer is unauthorized, i.e. not whitelisted
* @param peerId
Expand All @@ -43,7 +47,7 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {

const peerId = await privateKeyToPeerId(config.privateKey);

logger.debug(`PeerId ${peerId.toString()} generated`);
RoseNetNodeTools.logger.debug(`PeerId ${peerId.toString()} generated`);

const node = await createLibp2p({
peerId,
Expand Down Expand Up @@ -81,30 +85,20 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
pubsub: gossipsub({ allowPublishToZeroPeers: true }),
},
});
logger.debug('RoseNet node created');
RoseNetNodeTools.logger.debug('RoseNet node created');

addEventListeners(node, logger);
addEventListeners(node, RoseNetNodeTools.logger);

return {
start: async () => node.start(),
sendMessage: async (to: string, message: string) => {
const peerId = peerIdFromString(to);
const stream = await node.dialProtocol(
peerId,
ROSENET_DIRECT_PROTOCOL_V1,
{
runOnTransientConnection: true,
},
);
logger.debug('stream created for sending message', {
stream: {
direction: stream.direction,
protocol: stream.protocol,
remotePeer: to,
},
});
await pipe(message, encode, stream);
logger.debug('message piped through created stream', {
const { stream, pushable } = await getStreamAndPushable(to, node);

pipe(pushable, (source) => map(source, encode), stream);
pushable.push(message);
await Promise.resolve();

RoseNetNodeTools.logger.debug('message piped through created stream', {
message,
});
},
Expand All @@ -114,24 +108,30 @@ const createRoseNetNode = async ({ logger, ...config }: RoseNetNodeConfig) => {
node.handle(
ROSENET_DIRECT_PROTOCOL_V1,
async ({ connection, stream }) => {
logger.debug(
RoseNetNodeTools.logger.debug(
`incoming connection stream with protocol ${ROSENET_DIRECT_PROTOCOL_V1}`,
{
remoteAddress: connection.remoteAddr.toString(),
transient: connection.transient,
},
);
pipe(stream, decode, async (messagePromise) => {
const message = await messagePromise;
await handler(connection.remotePeer.toString(), message);
logger.debug('incoming message handled successfully', {
message,
});
pipe(stream, decode, async (source) => {
for await (const message of source) {
await handler(connection.remotePeer.toString(), message);
RoseNetNodeTools.logger.debug(
'incoming message handled successfully',
{
message,
},
);
}
});
},
{ runOnTransientConnection: true },
);
logger.debug(`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`);
RoseNetNodeTools.logger.debug(
`handler for ${ROSENET_DIRECT_PROTOCOL_V1} protocol set`,
);
},
};
};
Expand Down
101 changes: 101 additions & 0 deletions packages/rosenet-node/lib/getStreamAndPushable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import { Stream } from '@libp2p/interface';
import { peerIdFromString } from '@libp2p/peer-id';
import { shuffle } from 'fast-shuffle';
import { Libp2p } from 'libp2p';

import RoseNetNodeTools from './RoseNetNodeTools';
import createPushable, { Pushable } from './pushable';

import { ROSENET_DIRECT_PROTOCOL_V1 } from './constants';

const cache = new Map<
string,
{
stream: Stream;
pushable: Pushable<string>;
}
>();

/**
* get a stream and a pushable to the `to` remotePeer with
* ROSENET_DIRECT_PROTOCOL_V1 protocol, caching the pair for future use
*
* @param to remotePeer
* @param node
*/
async function getStreamAndPushable(to: string, node: Libp2p) {
const cacheHit = cache.get(to);
if (cacheHit?.stream.status === 'open') {
RoseNetNodeTools.logger.debug(
`Found existing stream and pushable in the cache to peer ${to}`,
{
stream: {
direction: cacheHit.stream.direction,
protocol: cacheHit.stream.protocol,
remotePeer: to,
id: cacheHit.stream.id,
},
},
);
return cacheHit;
}

const peerId = peerIdFromString(to);

const allConnectionsToPeer = shuffle(node.getConnections(peerId));
const possibleOpenConnectionToPeer = allConnectionsToPeer.find(
(connection) => connection.status === 'open',
);
const connection = possibleOpenConnectionToPeer ?? (await node.dial(peerId));

RoseNetNodeTools.logger.debug(
possibleOpenConnectionToPeer
? `Found an open connection to peer ${to}`
: `Established a new connection to peer ${to}`,
{
connection: {
id: connection.id,
transient: connection.transient,
},
},
);

const connectionStream = shuffle(connection.streams);
const possibleWritableStream = connectionStream.find(
(stream) =>
stream.writeStatus === 'ready' &&
stream.protocol === ROSENET_DIRECT_PROTOCOL_V1,
);
const stream =
possibleWritableStream ??
(await connection.newStream(ROSENET_DIRECT_PROTOCOL_V1, {
runOnTransientConnection: true,
}));

RoseNetNodeTools.logger.debug(
possibleWritableStream
? `Found an open stream to peer ${to}`
: `Created a new stream to peer ${to}`,
{
stream: {
direction: stream.direction,
protocol: stream.protocol,
remotePeer: to,
id: stream.id,
},
},
);

const pushable = createPushable<string>();

const pair = {
stream,
pushable,
};

cache.set(to, pair);

return pair;
}

export default getStreamAndPushable;
41 changes: 41 additions & 0 deletions packages/rosenet-node/lib/pushable.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
/**
* create an infinite, never ending, FIFO async iterable that can be pushed into
* after consumption
*/
const createPushable = <T>(): Pushable<T> => {
const iterable: T[] = [];

let resolveLastYield: () => void | undefined;

return {
async *[Symbol.asyncIterator](): AsyncGenerator<T, never, unknown> {
while (true) {
/**
* if there is any item in the iterable, yield it, otherwise wait for such
* an item to be pushed and yield it immediately
*/
if (iterable.length) {
yield Promise.resolve(iterable.shift()!);
} else {
yield new Promise((resolve: (value: T) => void) => {
resolveLastYield = () => {
resolve(iterable.shift()!);
};
});
}
}
},
push: (value: T) => {
iterable.push(value);
if (iterable.length === 1) {
resolveLastYield?.();
}
},
};
};

export interface Pushable<T> extends AsyncIterable<T> {
push: (value: T) => void;
}

export default createPushable;
1 change: 1 addition & 0 deletions packages/rosenet-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
"@libp2p/pubsub-peer-discovery": "^10.0.2",
"@libp2p/tcp": "^9.0.15",
"@rosen-bridge/rosenet-utils": "^0.0.0",
"fast-shuffle": "^6.1.0",
"it-first": "^3.0.4",
"it-length-prefixed": "^9.0.4",
"it-map": "^3.0.5",
Expand Down
Loading

0 comments on commit cbb02e4

Please sign in to comment.