diff --git a/common/changes/@kadena/chainweb-stream-client/feat-chainweb-stream-receive-heights-event_2023-06-14-15-56.json b/common/changes/@kadena/chainweb-stream-client/feat-chainweb-stream-receive-heights-event_2023-06-14-15-56.json new file mode 100644 index 0000000000..6dc2481a49 --- /dev/null +++ b/common/changes/@kadena/chainweb-stream-client/feat-chainweb-stream-receive-heights-event_2023-06-14-15-56.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@kadena/chainweb-stream-client", + "comment": "Changed reconnection strategy, added handling of `heights` event", + "type": "minor" + } + ], + "packageName": "@kadena/chainweb-stream-client" +} \ No newline at end of file diff --git a/packages/libs/chainweb-stream-client/README.md b/packages/libs/chainweb-stream-client/README.md index 06b4bf0f4e..a36e0001ae 100644 --- a/packages/libs/chainweb-stream-client/README.md +++ b/packages/libs/chainweb-stream-client/README.md @@ -111,6 +111,14 @@ Hint: You can rely on the `.meta.id` value to de-duplicate the underlying unconf Callback type: `(txn: ITransaction) => void` +### heights + +Emitted when a `heights` event is received by the server. This carried the current cut as seen from stream-server's corresponding chainweb-node. This event is mostly intended for calculating minheight when reconnecting, but the event itself is expored to users in case they have use cases beyond that. + +[Payload](https://github.com/kadena-io/chainweb-stream-server#events) is an array of the height of each chain, with implicit indexes (i.e. chain 0 = index 0, etc). + +Callback type: `(chainHeights: number[]) => void` + ### data Emitted when any transaction payload is received (unconfirmd and confirmed.) diff --git a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md index b5207d896a..ae1e4a21a9 100644 --- a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md +++ b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md @@ -109,11 +109,13 @@ export interface IDebugMsgObject { // (undocumented) consecutiveFailedAttempts?: number; // (undocumented) + lastHeight?: number; + // (undocumented) length?: number; // (undocumented) message?: string; // (undocumented) - method: 'connect' | 'disconnect' | '_handleConnect' | '_handleError' | '_handleData' | '_handleHeartbeatTimeout' | string; + method: 'connect' | 'disconnect' | '_handleConnect' | '_handleError' | '_handleData' | '_handleHeights' | '_handleHeartbeatTimeout' | '_updateLastHeight' | string; // (undocumented) timeout?: number; // (undocumented) @@ -121,6 +123,8 @@ export interface IDebugMsgObject { // (undocumented) ts: number; // (undocumented) + url?: string; + // (undocumented) willRetry?: boolean; } diff --git a/packages/libs/chainweb-stream-client/src/examples/index.ts b/packages/libs/chainweb-stream-client/src/examples/index.ts index a50b2bd17d..d663975912 100644 --- a/packages/libs/chainweb-stream-client/src/examples/index.ts +++ b/packages/libs/chainweb-stream-client/src/examples/index.ts @@ -8,7 +8,7 @@ const client: ChainwebStreamClient = new ChainwebStreamClient({ host: 'http://localhost:4000/', // optional - limit: 100, + limit: 0, connectTimeout: 15_000, // intentionally setting this to a very small value, should sync the server setting heartbeatTimeout: 1_000, diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index 0ceb691092..976e5cd05f 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -18,7 +18,7 @@ import { isMajorCompatible, isMinorCompatible, isClientAhead } from './semver'; export * from './types'; -const CLIENT_PROTOCOL_VERSION: string = '0.0.2'; +const CLIENT_PROTOCOL_VERSION: string = '0.0.3'; const DEFAULT_CONFIRMATION_DEPTH: number = 6; const DEFAULT_CONNECT_TIMEOUT: number = 28_000; @@ -111,15 +111,14 @@ class ChainwebStream extends EventEmitter { * Call to initialize connection */ public connect = (): void => { - this._debug('connect'); this._desiredState = ConnectionState.Connected; - this._eventSource = new EventSource(this._makeConnectionURL()); + const connectionURL = this._makeConnectionURL(); + this._debug('connect', { url: connectionURL }); + this._eventSource = new EventSource(connectionURL); this._eventSource.onopen = this._handleConnect; this._eventSource.onerror = this._handleError; // reset & set custom connect timeout handler - if (this._connectTimer) { - clearTimeout(this._connectTimer); - } + this._clearConnectTimeout(); this._connectTimer = setTimeout( () => this._handleError(new ConnectTimeoutError(this.connectTimeoutMs)), this.connectTimeoutMs, @@ -166,12 +165,12 @@ class ChainwebStream extends EventEmitter { ); } _eventSource.addEventListener('initial', this._handleInitial); - _eventSource.addEventListener('message', this._handleData); + _eventSource.addEventListener('heights', this._handleHeights); _eventSource.addEventListener('ping', this._resetHeartbeatTimeout); + // note: generic data handler for unlabelled event (not `event: message` unlike above) + _eventSource.addEventListener('message', this._handleData); this._resetHeartbeatTimeout(); - if (this._connectTimer) { - clearTimeout(this._connectTimer); - } + this._clearConnectTimeout(); this.emit(this._totalConnectionAttempts ? 'reconnect' : 'connect'); this._totalConnectionAttempts += 1; }; @@ -189,10 +188,10 @@ class ChainwebStream extends EventEmitter { // close event source; crucial - chrome reconnects, firefox does not this._eventSource?.close(); - // cancel connection timeout timer, if exists - if (this._connectTimer) { - clearTimeout(this._connectTimer); - } + // cancel connection timeout timer & heartbeat timeout timers, if they exists + this._clearConnectTimeout(); + + this._stopHeartbeatMonitor(); const message = parseError(err); @@ -234,6 +233,18 @@ class ChainwebStream extends EventEmitter { this._processData(data); }; + private _handleHeights = (msg: MessageEvent): void => { + const heights = JSON.parse(msg.data) as number[]; + + this._debug('_handleHeights'); + + this.emit('heights', heights); + + this._updateLastHeight(heights); + + this._resetHeartbeatTimeout(); + }; + private _handleData = (msg: MessageEvent): void => { this._debug('_handleData', { length: msg.data?.length }); @@ -245,11 +256,7 @@ class ChainwebStream extends EventEmitter { }; private _processData(data: ITransaction[]): void { - this._lastHeight = data.reduce( - (highest, { height }) => (height > highest ? height : highest), - this._lastHeight ?? 0, - ); - + this._updateLastHeight(data); for (const element of data) { const { meta: { confirmations }, @@ -263,6 +270,23 @@ class ChainwebStream extends EventEmitter { } } + private _updateLastHeight(data: ITransaction[] | number[]): void { + if (!data.length) { + return; + } + + const heights = data.map((element: ITransaction | number) => + typeof element === 'number' ? element : element.height, + ); + + const newLastHeight = Math.max(this._lastHeight ?? 0, ...heights); + + if (newLastHeight !== this._lastHeight) { + this._lastHeight = newLastHeight; + this._debug('_updateLastHeight', { lastHeight: this._lastHeight }); + } + } + private _emitError(msg: string): void { console.error(msg); this.emit('error', msg); @@ -343,6 +367,12 @@ class ChainwebStream extends EventEmitter { return true; } + private _clearConnectTimeout = (): void => { + if (this._connectTimer) { + clearTimeout(this._connectTimer); + } + }; + private _stopHeartbeatMonitor = (): void => { if (this._heartbeatTimer) { clearTimeout(this._heartbeatTimer); @@ -372,12 +402,17 @@ class ChainwebStream extends EventEmitter { if (this.limit !== undefined) { urlParamArgs.push(['limit', String(this.limit)]); } - // TODO This reconnection strategy of -3 from last max height + // This reconnection strategy of last_height - conf_depth - 3 // guarrantees that we will not miss events, but it also means // that confirmed transactions will be emitted more than once + // until we implement de duplication. 3 is the max chain height span + // for current chainweb chain count (20) // Discussion here: https://github.com/kadena-community/kadena.js/issues/275 if (this._lastHeight !== undefined) { - urlParamArgs.push(['minHeight', String(this._lastHeight - 3)]); + urlParamArgs.push([ + 'minHeight', + String(this._lastHeight - this.confirmationDepth - 3), + ]); } if (urlParamArgs.length) { path += '?' + new URLSearchParams(urlParamArgs).toString(); diff --git a/packages/libs/chainweb-stream-client/src/types.ts b/packages/libs/chainweb-stream-client/src/types.ts index 40a081522f..6435f83708 100644 --- a/packages/libs/chainweb-stream-client/src/types.ts +++ b/packages/libs/chainweb-stream-client/src/types.ts @@ -109,7 +109,9 @@ export interface IDebugMsgObject { | '_handleConnect' | '_handleError' | '_handleData' + | '_handleHeights' | '_handleHeartbeatTimeout' + | '_updateLastHeight' | string; /* @@ -121,4 +123,6 @@ export interface IDebugMsgObject { willRetry?: boolean; timeout?: number; length?: number; + lastHeight?: number; + url?: string; }