Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[@kadena/Chainweb-stream-client] Receive heights event, improve reconnection strategy #416

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"changes": [
{
"packageName": "@kadena/chainweb-stream-client",
"comment": "Changed reconnection strategy, added handling of `heights` event",
"type": "minor"
}
],
"packageName": "@kadena/chainweb-stream-client"
}
8 changes: 8 additions & 0 deletions packages/libs/chainweb-stream-client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ Hint: You can rely on the `.meta.id` value to de-duplicate the underlying unconf

Callback type: `(txn: ITransaction) => void`

### heights

Emitted when a `heights` event is received by the server. This carried the current cut as seen from stream-server's corresponding chainweb-node. This event is mostly intended for calculating minheight when reconnecting, but the event itself is expored to users in case they have use cases beyond that.

[Payload](https://github.com/kadena-io/chainweb-stream-server#events) is an array of the height of each chain, with implicit indexes (i.e. chain 0 = index 0, etc).

Callback type: `(chainHeights: number[]) => void`

### data

Emitted when any transaction payload is received (unconfirmd and confirmed.)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,18 +109,22 @@ export interface IDebugMsgObject {
// (undocumented)
consecutiveFailedAttempts?: number;
// (undocumented)
lastHeight?: number;
// (undocumented)
length?: number;
// (undocumented)
message?: string;
// (undocumented)
method: 'connect' | 'disconnect' | '_handleConnect' | '_handleError' | '_handleData' | '_handleHeartbeatTimeout' | string;
method: 'connect' | 'disconnect' | '_handleConnect' | '_handleError' | '_handleData' | '_handleHeights' | '_handleHeartbeatTimeout' | '_updateLastHeight' | string;
// (undocumented)
timeout?: number;
// (undocumented)
totalAttempts?: number;
// (undocumented)
ts: number;
// (undocumented)
url?: string;
// (undocumented)
willRetry?: boolean;
}

Expand Down
2 changes: 1 addition & 1 deletion packages/libs/chainweb-stream-client/src/examples/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const client: ChainwebStreamClient = new ChainwebStreamClient({
host: 'http://localhost:4000/',

// optional
limit: 100,
limit: 0,
connectTimeout: 15_000,
// intentionally setting this to a very small value, should sync the server setting
heartbeatTimeout: 1_000,
Expand Down
77 changes: 56 additions & 21 deletions packages/libs/chainweb-stream-client/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import { isMajorCompatible, isMinorCompatible, isClientAhead } from './semver';

export * from './types';

const CLIENT_PROTOCOL_VERSION: string = '0.0.2';
const CLIENT_PROTOCOL_VERSION: string = '0.0.3';

const DEFAULT_CONFIRMATION_DEPTH: number = 6;
const DEFAULT_CONNECT_TIMEOUT: number = 28_000;
Expand Down Expand Up @@ -111,15 +111,14 @@ class ChainwebStream extends EventEmitter {
* Call to initialize connection
*/
public connect = (): void => {
this._debug('connect');
this._desiredState = ConnectionState.Connected;
this._eventSource = new EventSource(this._makeConnectionURL());
const connectionURL = this._makeConnectionURL();
this._debug('connect', { url: connectionURL });
this._eventSource = new EventSource(connectionURL);
this._eventSource.onopen = this._handleConnect;
this._eventSource.onerror = this._handleError;
// reset & set custom connect timeout handler
if (this._connectTimer) {
clearTimeout(this._connectTimer);
}
this._clearConnectTimeout();
this._connectTimer = setTimeout(
() => this._handleError(new ConnectTimeoutError(this.connectTimeoutMs)),
this.connectTimeoutMs,
Expand Down Expand Up @@ -166,12 +165,12 @@ class ChainwebStream extends EventEmitter {
);
}
_eventSource.addEventListener('initial', this._handleInitial);
_eventSource.addEventListener('message', this._handleData);
_eventSource.addEventListener('heights', this._handleHeights);
_eventSource.addEventListener('ping', this._resetHeartbeatTimeout);
// note: generic data handler for unlabelled event (not `event: message` unlike above)
_eventSource.addEventListener('message', this._handleData);
this._resetHeartbeatTimeout();
if (this._connectTimer) {
clearTimeout(this._connectTimer);
}
this._clearConnectTimeout();
this.emit(this._totalConnectionAttempts ? 'reconnect' : 'connect');
this._totalConnectionAttempts += 1;
};
Expand All @@ -189,10 +188,10 @@ class ChainwebStream extends EventEmitter {
// close event source; crucial - chrome reconnects, firefox does not
this._eventSource?.close();

// cancel connection timeout timer, if exists
if (this._connectTimer) {
clearTimeout(this._connectTimer);
}
// cancel connection timeout timer & heartbeat timeout timers, if they exists
this._clearConnectTimeout();

this._stopHeartbeatMonitor();

const message = parseError(err);

Expand Down Expand Up @@ -234,6 +233,18 @@ class ChainwebStream extends EventEmitter {
this._processData(data);
};

private _handleHeights = (msg: MessageEvent<string>): void => {
const heights = JSON.parse(msg.data) as number[];

this._debug('_handleHeights');

this.emit('heights', heights);

this._updateLastHeight(heights);

this._resetHeartbeatTimeout();
};

private _handleData = (msg: MessageEvent<string>): void => {
this._debug('_handleData', { length: msg.data?.length });

Expand All @@ -245,11 +256,7 @@ class ChainwebStream extends EventEmitter {
};

private _processData(data: ITransaction[]): void {
this._lastHeight = data.reduce(
(highest, { height }) => (height > highest ? height : highest),
this._lastHeight ?? 0,
);

this._updateLastHeight(data);
for (const element of data) {
const {
meta: { confirmations },
Expand All @@ -263,6 +270,23 @@ class ChainwebStream extends EventEmitter {
}
}

private _updateLastHeight(data: ITransaction[] | number[]): void {
if (!data.length) {
return;
}

const heights = data.map((element: ITransaction | number) =>
typeof element === 'number' ? element : element.height,
);

const newLastHeight = Math.max(this._lastHeight ?? 0, ...heights);

if (newLastHeight !== this._lastHeight) {
this._lastHeight = newLastHeight;
this._debug('_updateLastHeight', { lastHeight: this._lastHeight });
}
}

private _emitError(msg: string): void {
console.error(msg);
this.emit('error', msg);
Expand Down Expand Up @@ -343,6 +367,12 @@ class ChainwebStream extends EventEmitter {
return true;
}

private _clearConnectTimeout = (): void => {
if (this._connectTimer) {
clearTimeout(this._connectTimer);
}
};

private _stopHeartbeatMonitor = (): void => {
if (this._heartbeatTimer) {
clearTimeout(this._heartbeatTimer);
Expand Down Expand Up @@ -372,12 +402,17 @@ class ChainwebStream extends EventEmitter {
if (this.limit !== undefined) {
urlParamArgs.push(['limit', String(this.limit)]);
}
// TODO This reconnection strategy of -3 from last max height
// This reconnection strategy of last_height - conf_depth - 3
// guarrantees that we will not miss events, but it also means
// that confirmed transactions will be emitted more than once
// until we implement de duplication. 3 is the max chain height span
// for current chainweb chain count (20)
// Discussion here: https://github.com/kadena-community/kadena.js/issues/275
if (this._lastHeight !== undefined) {
urlParamArgs.push(['minHeight', String(this._lastHeight - 3)]);
urlParamArgs.push([
'minHeight',
String(this._lastHeight - this.confirmationDepth - 3),
]);
}
if (urlParamArgs.length) {
path += '?' + new URLSearchParams(urlParamArgs).toString();
Expand Down
4 changes: 4 additions & 0 deletions packages/libs/chainweb-stream-client/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,9 @@ export interface IDebugMsgObject {
| '_handleConnect'
| '_handleError'
| '_handleData'
| '_handleHeights'
| '_handleHeartbeatTimeout'
| '_updateLastHeight'
| string;

/*
Expand All @@ -121,4 +123,6 @@ export interface IDebugMsgObject {
willRetry?: boolean;
timeout?: number;
length?: number;
lastHeight?: number;
url?: string;
}