From b17b6a59e6108123a2dfea6facba09b129da8a8f Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Mon, 8 May 2023 19:36:02 +0200 Subject: [PATCH 01/18] stream-client: added parsing config from intial event and validating compatibility --- common/config/rush/pnpm-lock.yaml | 58 ++++----- common/config/rush/repo-state.json | 2 +- .../libs/chainweb-stream-client/README.md | 35 +++--- .../etc/chainweb-stream-client.api.md | 36 +++++- .../src/examples/index.ts | 1 + .../libs/chainweb-stream-client/src/index.ts | 110 +++++++++++++++--- .../libs/chainweb-stream-client/src/types.ts | 15 +++ 7 files changed, 194 insertions(+), 63 deletions(-) diff --git a/common/config/rush/pnpm-lock.yaml b/common/config/rush/pnpm-lock.yaml index 8e9bcd3f96..0da741f7e0 100644 --- a/common/config/rush/pnpm-lock.yaml +++ b/common/config/rush/pnpm-lock.yaml @@ -7166,12 +7166,12 @@ packages: telejson: 7.0.4 dev: true - /@storybook/channel-postmessage/7.0.8: - resolution: {integrity: sha512-op/SB2Tg66bxS4DHOhrSVja7Xdp8aiWIJ47vygSq31nqpwv5auCTptOrcdzTikOjH+4dKfTGxTx6Z5g065tuiQ==} + /@storybook/channel-postmessage/7.0.9: + resolution: {integrity: sha512-6zsUPlsD3GVhKNq4UZ5MePJPjiMcPs/K02mH5/uVTN2JSgLdWgbLhZ4VYit4HgwE+d98bd9zWbgNgSOXpTArag==} dependencies: - '@storybook/channels': 7.0.8 - '@storybook/client-logger': 7.0.8 - '@storybook/core-events': 7.0.8 + '@storybook/channels': 7.0.9 + '@storybook/client-logger': 7.0.9 + '@storybook/core-events': 7.0.9 '@storybook/global': 5.0.0 qs: 6.11.1 telejson: 7.0.4 @@ -7198,8 +7198,8 @@ packages: resolution: {integrity: sha512-2tI/ECbQcXjncYGLVdrttNT8adIp6kV/bnQGJWmF5hBXZ7Izwyq1WRPTgPT++RihmOOTHvkRx4GCKfwluOrNpA==} dev: true - /@storybook/channels/7.0.8: - resolution: {integrity: sha512-z8W4r8te/EiEDfk8qaxmjwMcKMe+x12leWEwtyz6e9XI0Q4qTk17dDtq/XZ5Ab2Ks4VSvWRu1e/QURiVpjbo2Q==} + /@storybook/channels/7.0.9: + resolution: {integrity: sha512-LF/Mkr0/+VOawEAospLGUcfZIPak3yV/ZjEAe/lubvLPJ6s2FFOjDUsyDIa2oM4ZE9TI6AGVN51kddVToelM8A==} dev: true /@storybook/cli/7.0.0-rc.8: @@ -7271,8 +7271,8 @@ packages: '@storybook/global': 5.0.0 dev: true - /@storybook/client-logger/7.0.8: - resolution: {integrity: sha512-UuyX57Jzn8L0QOhDPBA/v9UqIGCtFKqtaS23mNNNDoc1X3u+boULNgqWGD84F2U7JWg2xNopIJvjQxhH30/Jhw==} + /@storybook/client-logger/7.0.9: + resolution: {integrity: sha512-EJnXWvpTFEj462ixZbDouTN9X/FinRgaKKN6zXdhSSZUnm5PcZBtnoX5S+982z3LiAjdNIuAdZE/4vwBIAF88A==} dependencies: '@storybook/global': 5.0.0 dev: true @@ -7377,8 +7377,8 @@ packages: resolution: {integrity: sha512-KsKf+Ob6zQ8+IJ9oDD5xqASwYGzcjT08azBjSt4yocHIJ3mY741h88YDS0wcwnM+JrV6iFYlY0hiK35lnBEddA==} dev: true - /@storybook/core-events/7.0.8: - resolution: {integrity: sha512-CQJs3PKQ8HJmMe7kzYy2bWz3hw5d8myAtO5LAgvPHKsVqAZ0R+rN4lXlcPNWf/x3tb8JizDJpPgTCBdOBb+tkg==} + /@storybook/core-events/7.0.9: + resolution: {integrity: sha512-xJiyX7Gq/TgDdBv+8KbfTJ4Sc7fCMeIEUqWTtnYCHWB7Mp6Iui37+caDX3aGQRTz7FVgb7aL5QkQES9Ihc1+dg==} dev: true /@storybook/core-server/7.0.0-rc.8: @@ -7525,14 +7525,14 @@ packages: '@storybook/preview-api': 7.0.0-rc.8 dev: true - /@storybook/instrumenter/7.0.8: - resolution: {integrity: sha512-wFXSeUOOi4nyYv2+TkNic7qmSxO5VuFFt4qmTYv/MmZUAfIZKGq8S5K+McIKFUcYEnjkcvYvOWTNM6hEhlQpQw==} + /@storybook/instrumenter/7.0.9: + resolution: {integrity: sha512-ly/vLeQIGbcw3RLQv4vPugvlaMah7+rsYZtwN/ocseBbsDfFBqxI7NbJNBXSTo44eTTbzQUkeOZWqf6dZzRm0Q==} dependencies: - '@storybook/channels': 7.0.8 - '@storybook/client-logger': 7.0.8 - '@storybook/core-events': 7.0.8 + '@storybook/channels': 7.0.9 + '@storybook/client-logger': 7.0.9 + '@storybook/core-events': 7.0.9 '@storybook/global': 5.0.0 - '@storybook/preview-api': 7.0.8 + '@storybook/preview-api': 7.0.9 dev: true /@storybook/manager-api/7.0.0-rc.8_react-dom@18.2.0+react@18.2.0: @@ -7706,16 +7706,16 @@ packages: util-deprecate: 1.0.2 dev: true - /@storybook/preview-api/7.0.8: - resolution: {integrity: sha512-+/nhvNo7ML6bPnFYJRH/+mwU/sVJbIGhxFy4r+4Omxaw4aKhs8T0eVijGE2KOahRKG3qUCYV1CaTqmnlbcXgbw==} + /@storybook/preview-api/7.0.9: + resolution: {integrity: sha512-cLyhq2nk0eiMOUwIIKhgDgZoS1ecRGojl92hR0agZDzNJrb1lvXK6uIkJh/Anl2Jbir28lAjQGU54voPODwTUA==} dependencies: - '@storybook/channel-postmessage': 7.0.8 - '@storybook/channels': 7.0.8 - '@storybook/client-logger': 7.0.8 - '@storybook/core-events': 7.0.8 + '@storybook/channel-postmessage': 7.0.9 + '@storybook/channels': 7.0.9 + '@storybook/client-logger': 7.0.9 + '@storybook/core-events': 7.0.9 '@storybook/csf': 0.1.0 '@storybook/global': 5.0.0 - '@storybook/types': 7.0.8 + '@storybook/types': 7.0.9 '@types/qs': 6.9.7 dequal: 2.0.3 lodash: 4.17.21 @@ -7993,8 +7993,8 @@ packages: /@storybook/testing-library/0.0.14-next.1: resolution: {integrity: sha512-1CAl40IKIhcPaCC4pYCG0b9IiYNymktfV/jTrX7ctquRY3akaN7f4A1SippVHosksft0M+rQTFE0ccfWW581fw==} dependencies: - '@storybook/client-logger': 7.0.8 - '@storybook/instrumenter': 7.0.8 + '@storybook/client-logger': 7.0.9 + '@storybook/instrumenter': 7.0.9 '@testing-library/dom': 8.20.0 '@testing-library/user-event': 13.5.0_@testing-library+dom@8.20.0 ts-dedent: 2.2.0 @@ -8037,10 +8037,10 @@ packages: file-system-cache: 2.0.2 dev: true - /@storybook/types/7.0.8: - resolution: {integrity: sha512-x83vL/TzBlv21nHuP35c+z4AUjHSY9G7NpZLTZ/5REcuXbeIfhjGOAyeUHB4lXhPXxsOlq3wHiQippB7bSJeeQ==} + /@storybook/types/7.0.9: + resolution: {integrity: sha512-6aKrrsX3wgPMg9Nu3AK1GYmCZQiHqHv7l24ywNxZPv0T63rcpS86kWK4qVAywoaXGFc9GtRT+dz1rK8Fx50J9Q==} dependencies: - '@storybook/channels': 7.0.8 + '@storybook/channels': 7.0.9 '@types/babel__core': 7.1.20 '@types/express': 4.17.17 file-system-cache: 2.0.2 diff --git a/common/config/rush/repo-state.json b/common/config/rush/repo-state.json index 0858b2b6ab..33aa2a56c3 100644 --- a/common/config/rush/repo-state.json +++ b/common/config/rush/repo-state.json @@ -1,5 +1,5 @@ // DO NOT MODIFY THIS FILE MANUALLY BUT DO COMMIT IT. It is generated and used by Rush. { - "pnpmShrinkwrapHash": "523bc1432078993b61aef997f01fb4795bb6aff4", + "pnpmShrinkwrapHash": "ad991f815aa11e81daaf8056455be22e1af6185d", "preferredVersionsHash": "bf21a9e8fbc5a3846fb05b4fa0859e0917b2202f" } diff --git a/packages/libs/chainweb-stream-client/README.md b/packages/libs/chainweb-stream-client/README.md index 377ef4afd6..06b4bf0f4e 100644 --- a/packages/libs/chainweb-stream-client/README.md +++ b/packages/libs/chainweb-stream-client/README.md @@ -47,15 +47,31 @@ client.connect(); Find more detailed examples under `src/examples`. +## Constructor Options + +| Key | Required | Description | Example Values | +| ---------------- | :------: | ----------- | ------ | +| network | Yes | Chainweb network | `mainnet01|testnet04|...` | +| type | Yes | Transaction type to stream (event/account) | `event|account` | +| id | Yes | Account ID or module/event name | `k:abcdef01234..` | +| host | Yes | Chainweb-SSE backend URL | `http://localhost:4000` | +| limit | No | Initial data load limit | 100 | +| connectTimeout | No | Connection timeout in ms | 10_000 | +| heartbeatTimeout | No | Stale connection timeout in ms | 30_000 | +| maxReconnects | No | How many reconnections to attempt before giving up | 5 | +| confirmationDepth | No | How many confirmations for a transaction to be considered final | 6 | + ## Considerations ⚠️ ### Ensure configuration compatibility -Make sure that your client and server `confirmationDepth` and `heartbeatTimeout` values are compatible. +Make sure that your client and server `confirmationDepth` values are compatible. + +If your client `confirmationDepth` is larger than the server's, the `confirmed` event will never fire. The client will automatically detect this condition, emit an `error` event and disconnect. -If your client `heartbeatTimeout` is smaller than the server heartbeat interval, the client will keep disconnecting when the connection is silent (no data.) +If your client's configured network does not match the server's, the client will emit an `error` event and disconnect. -If your client `confirmationDepth` is larger than the server's, the `confirmed` event will never fire. +If your client `heartbeatTimeout` is smaller than the server heartbeat interval, the client will automatically adapt its heartbeat timeout to 2500ms larger than the server value. ### Handle temporary and permanent connection failures @@ -63,19 +79,6 @@ When the connection is interrupted or determined to be stale (no heartbeats rece It is recommended to handle the fired [will-reconnect](#will-reconnect) and [error](#error) events. -## Constructor Options - -| Key | Required | Description | Example Values | -| ---------------- | :------: | ----------- | ------ | -| type | Yes | Transaction type to stream (event/account) | `event` | `account` | -| id | Yes | Account ID or module/event name | `k:abcdef01234..` | -| host | Yes | Chainweb-SSE backend URL | `http://localhost:4000` | -| limit | No | Initial data load limit | 100 | -| connectTimeout | No | Connection timeout in ms | 10_000 | -| heartbeatTimeout | No | Stale connection timeout in ms | 30_000 | -| maxReconnects | No | How many reconnections to attempt before giving up | 5 | -| confirmationDepth | No | How many confirmations for a transaction to be considered final | 6 | - ## Events ChainwebStreamClient is an EventEmitter. You can subscribe to the following events using `.on('event-name', callback)`. diff --git a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md index d9a47c4681..60839d0325 100644 --- a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md +++ b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md @@ -8,7 +8,7 @@ import EventEmitter from 'eventemitter2'; // @alpha (undocumented) class ChainwebStream extends EventEmitter { - constructor({ host, type, id, limit, connectTimeout, maxReconnects, heartbeatTimeout, confirmationDepth, }: IChainwebStreamConstructorArgs); + constructor({ network, host, type, id, limit, connectTimeout, maxReconnects, heartbeatTimeout, confirmationDepth, }: IChainwebStreamConstructorArgs); // (undocumented) confirmationDepth: number; connect: () => void; @@ -25,6 +25,8 @@ class ChainwebStream extends EventEmitter { limit: number | undefined; // (undocumented) maxReconnects: number; + // (undocumented) + network: string; get state(): ConnectionState; // (undocumented) type: ChainwebStreamType; @@ -65,6 +67,22 @@ export interface IAccountTransaction extends ITransactionBase { } // @alpha (undocumented) +export interface IChainwebStreamConfig { + // (undocumented) + heartbeat: number; + // (undocumented) + id: string; + // (undocumented) + maxConf: number; + // (undocumented) + network: string; + // (undocumented) + type: ChainwebStreamType; + // (undocumented) + v: string; +} + +// @public (undocumented) export interface IChainwebStreamConstructorArgs { // (undocumented) confirmationDepth?: number; @@ -81,6 +99,10 @@ export interface IChainwebStreamConstructorArgs { // (undocumented) maxReconnects?: number; // (undocumented) + network: string; + // Warning: (ae-incompatible-release-tags) The symbol "type" is marked as @public, but its signature references "ChainwebStreamType" which is marked as @alpha + // + // (undocumented) type: ChainwebStreamType; } @@ -114,6 +136,18 @@ export interface IEventTransaction extends ITransactionBase { params: string[]; } +// @public (undocumented) +export interface IInitialEvent { + // Warning: (ae-incompatible-release-tags) The symbol "config" is marked as @public, but its signature references "IChainwebStreamConfig" which is marked as @alpha + // + // (undocumented) + config: IChainwebStreamConfig; + // Warning: (ae-incompatible-release-tags) The symbol "data" is marked as @public, but its signature references "ITransaction" which is marked as @alpha + // + // (undocumented) + data: ITransaction[]; +} + // @alpha (undocumented) export type ITransaction = IEventTransaction | IAccountTransaction; diff --git a/packages/libs/chainweb-stream-client/src/examples/index.ts b/packages/libs/chainweb-stream-client/src/examples/index.ts index 79523ef6fc..d711361c6c 100644 --- a/packages/libs/chainweb-stream-client/src/examples/index.ts +++ b/packages/libs/chainweb-stream-client/src/examples/index.ts @@ -2,6 +2,7 @@ import ChainwebStreamClient, { ITransaction } from '../'; const client: ChainwebStreamClient = new ChainwebStreamClient({ // required + network: 'mainnet01', type: 'account', id: 'k:e7f7130f359fb1f8c87873bf858a0e9cbc3c1059f62ae715ec72e760b055e9f3', host: 'http://localhost:4000/', diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index f9351d68dc..d5b9462df0 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -6,11 +6,13 @@ import { HeartbeatTimeoutError, } from './errors'; import { + ConnectionState, IChainwebStreamConstructorArgs, ChainwebStreamType, ITransaction, - ConnectionState, IDebugMsgObject, + IInitialEvent, + IChainwebStreamConfig, } from './types'; export * from './types'; @@ -18,15 +20,21 @@ export * from './types'; // TODO confirmation depth should be sent from the backend // so that it is always in sync with the client const DEFAULT_CONFIRMATION_DEPTH: number = 6; -const DEFAULT_CONNECT_TIMEOUT: number = 25_000; +const DEFAULT_CONNECT_TIMEOUT: number = 28_000; const DEFAULT_HEARTBEAT_TIMEOUT: number = 35_000; const DEFAULT_MAX_RECONNECTS: number = 6; const DEFAULT_LIMIT: number = 100; +// TODO +// const WIRE_PROTOCOL_VERSION: string = "0.0.2"; + /** * @alpha */ class ChainwebStream extends EventEmitter { + // chainweb network, e.g. mainnet01 + public network: string; + // chainweb-stream backend host, full URI - e.g. https://sse.chainweb.com public host: string; @@ -80,6 +88,7 @@ class ChainwebStream extends EventEmitter { private _reconnectTimer?: ReturnType; public constructor({ + network, host, type, id, @@ -88,8 +97,10 @@ class ChainwebStream extends EventEmitter { maxReconnects, heartbeatTimeout, confirmationDepth, - }: IChainwebStreamConstructorArgs) { + }: /* TODO quiet, */ + IChainwebStreamConstructorArgs) { super(); + this.network = network; this.type = type; this.id = id; this.host = host.endsWith('/') ? host.substr(0, host.length - 1) : host; // strip trailing slash if provided @@ -158,7 +169,7 @@ class ChainwebStream extends EventEmitter { 'ChainwebStream._handleConnect called without an _eventSource. This should never happen', ); } - _eventSource.addEventListener('initial', this._handleData); + _eventSource.addEventListener('initial', this._handleInitial); _eventSource.addEventListener('message', this._handleData); _eventSource.addEventListener('ping', this._resetHeartbeatTimeout); this._resetHeartbeatTimeout(); @@ -194,8 +205,8 @@ class ChainwebStream extends EventEmitter { this._debug('_handleError', { message, willRetry, timeout }); if (!willRetry) { - this.emit('error', message); // TODO need to wrap in Error ? - this._desiredState = ConnectionState.Closed; + this._emitError(message); + this.disconnect(); return; } @@ -213,22 +224,36 @@ class ChainwebStream extends EventEmitter { this._reconnectTimer = setTimeout(this.connect, timeout); }; + private _handleInitial = (msg: MessageEvent): void => { + this._debug('_handleData', { length: msg.data?.length }); + + const message = JSON.parse(msg.data) as IInitialEvent; + + const { config, data } = message; + + if (!this._validateServerConfig(config)) { + return; + } + + this._processData(data); + }; + private _handleData = (msg: MessageEvent): void => { this._debug('_handleData', { length: msg.data?.length }); - const message = JSON.parse(msg.data) as ITransaction | ITransaction[]; + const message = JSON.parse(msg.data) as ITransaction; + + this._processData([message]); - const data: ITransaction[] = Array.isArray(message) ? message : [message]; + this._resetHeartbeatTimeout(); + }; - const newMinHeight = data.reduce( + private _processData(data: ITransaction[]): void { + this._lastHeight = data.reduce( (highest, { height }) => (height > highest ? height : highest), - 0, + this._lastHeight ?? 0, ); - if (!this._lastHeight || newMinHeight > this._lastHeight) { - this._lastHeight = newMinHeight; - } - for (const element of data) { const { meta: { confirmations }, @@ -240,9 +265,62 @@ class ChainwebStream extends EventEmitter { this.emit('unconfirmed', element); } } + } - this._resetHeartbeatTimeout(); - }; + private _emitError(msg: string): void { + console.error(msg); + this.emit('error', msg); + } + + private _validateServerConfig(config: IChainwebStreamConfig): boolean { + const { network, type, id, maxConf, heartbeat } = config; + + const fail = (msg: string): false => { + this.disconnect(); + this._emitError(msg); + return false; + }; + + if (network !== this.network) { + return fail( + `Network mismatch: wanted ${this.network}, server is ${network}.`, + ); + } + + if (type !== this.type) { + return fail( + `Parameter mismatch: Expected transactions of type "${this.type}" but received "${type}". This should never happen.`, + ); + } + + if (id !== this.id) { + return fail( + `Parameter mismatch: Expected transactions for ${this.id} but received ${id}. This should never happen.`, + ); + } + + if (maxConf < this.confirmationDepth) { + return fail( + `Configuration mismatch: Client confirmation depth (${this.confirmationDepth}) is larger than server (${maxConf}). Events will never be considered confirmed on the client.`, + ); + } + + if (heartbeat < this.heartbeatTimeoutMs) { + const newHeartbeatTimeoutMs = heartbeat + 2_500; // give a buffer of 2.5s + + this.emit( + 'warn', + `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is larger than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, + ); + + this.heartbeatTimeoutMs = newHeartbeatTimeoutMs; + this._resetHeartbeatTimeout(); // reset to the new interval + } + + // TODO validate version + + return true; + } private _stopHeartbeatMonitor = (): void => { if (this._heartbeatTimer) { diff --git a/packages/libs/chainweb-stream-client/src/types.ts b/packages/libs/chainweb-stream-client/src/types.ts index 8e96ace470..7c7c34d388 100644 --- a/packages/libs/chainweb-stream-client/src/types.ts +++ b/packages/libs/chainweb-stream-client/src/types.ts @@ -51,7 +51,22 @@ export type ITransaction = IEventTransaction | IAccountTransaction; /** * @alpha */ +export interface IChainwebStreamConfig { + network: string; + type: ChainwebStreamType; + id: string; + maxConf: number; + heartbeat: number; + v: string; +} + +export interface IInitialEvent { + config: IChainwebStreamConfig; + data: ITransaction[]; +} + export interface IChainwebStreamConstructorArgs { + network: string; type: ChainwebStreamType; id: string; host: string; From 2b3db5b2d6c2d1cd30e0971d05dabc9fde910e54 Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:32:40 +0200 Subject: [PATCH 02/18] fixed heartbeat check bug, logic was inverted --- packages/libs/chainweb-stream-client/src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index d5b9462df0..7488d448b7 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -305,12 +305,12 @@ class ChainwebStream extends EventEmitter { ); } - if (heartbeat < this.heartbeatTimeoutMs) { + if (heartbeat > this.heartbeatTimeoutMs) { const newHeartbeatTimeoutMs = heartbeat + 2_500; // give a buffer of 2.5s this.emit( 'warn', - `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is larger than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, + `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is smaller than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, ); this.heartbeatTimeoutMs = newHeartbeatTimeoutMs; From bec1263620a6f04c456ea749ded947e8d26fbc6e Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:40:58 +0200 Subject: [PATCH 03/18] example tidy up --- .../src/examples/index.ts | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/examples/index.ts b/packages/libs/chainweb-stream-client/src/examples/index.ts index d711361c6c..a50b2bd17d 100644 --- a/packages/libs/chainweb-stream-client/src/examples/index.ts +++ b/packages/libs/chainweb-stream-client/src/examples/index.ts @@ -10,42 +10,48 @@ const client: ChainwebStreamClient = new ChainwebStreamClient({ // optional limit: 100, connectTimeout: 15_000, - heartbeatTimeout: 45_000, + // intentionally setting this to a very small value, should sync the server setting + heartbeatTimeout: 1_000, maxReconnects: 5, confirmationDepth: 6, }); // debug callback allows you to log conditionally -client.on('debug', (...args) => - console.debug(new Date().toISOString(), '[DEBUG]', ...args), -); +client.on('debug', (...args) => { + console.debug('[DEBUG]', ...args); +}); +// first connection success +// only fires once per .connect() client.on('connect', () => { - console.log('connect callback'); - // first connection success - // only fires once per .connect() + console.log('[CONNECT]'); +}); + +// connection was lost and will be retried; show warning to the user +client.on('warn', (...args) => { + console.log('[WARN]', ...args); }); // connection was lost and will be retried; show warning to the user client.on('will-reconnect', (...args) => { - console.log('will-reconnect callback', ...args); + console.log('[WILL-RECONNECT]', ...args); }); // reconnected successfully; hide user warning if still showing client.on('reconnect', () => { - console.log('reconnect callback'); + console.log('[RECONNECT]'); }); // fatal error, no reconnection attempts will be made // show error state to user client.on('error', (...args) => { - console.error('error callback', ...args); + console.error('[ERROR]', ...args); }); // received transaction with confirmation depth < client.confirmationDepth client.on('unconfirmed', (newTx: ITransaction) => console.log( - 'unconfirmed callback', + '[UNCONFIRMED]', newTx.meta.id, `conf=${newTx.meta.confirmations}`, ), @@ -53,11 +59,7 @@ client.on('unconfirmed', (newTx: ITransaction) => // received transaction with confirmation depth >= client.confirmationDepth client.on('confirmed', (newTx: ITransaction) => - console.log( - 'confirmed callback', - newTx.meta.id, - `conf=${newTx.meta.confirmations}`, - ), + console.log('[CONFIRMED]', newTx.meta.id, `conf=${newTx.meta.confirmations}`), ); client.connect(); From 1dbd15a29373003e2ab3421a5ec23124023f5f36 Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:53:40 +0200 Subject: [PATCH 04/18] protocol compatibility: validate wire protocol version --- .../libs/chainweb-stream-client/src/index.ts | 39 ++++++++++++---- .../libs/chainweb-stream-client/src/semver.ts | 45 +++++++++++++++++++ 2 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 packages/libs/chainweb-stream-client/src/semver.ts diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index 7488d448b7..759170e5e2 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -14,20 +14,18 @@ import { IInitialEvent, IChainwebStreamConfig, } from './types'; +import { isMajorCompatible, isMinorCompatible, isClientAhead } from './semver'; export * from './types'; -// TODO confirmation depth should be sent from the backend -// so that it is always in sync with the client +const CLIENT_PROTOCOL_VERSION: string = '0.0.2'; + const DEFAULT_CONFIRMATION_DEPTH: number = 6; const DEFAULT_CONNECT_TIMEOUT: number = 28_000; const DEFAULT_HEARTBEAT_TIMEOUT: number = 35_000; const DEFAULT_MAX_RECONNECTS: number = 6; const DEFAULT_LIMIT: number = 100; -// TODO -// const WIRE_PROTOCOL_VERSION: string = "0.0.2"; - /** * @alpha */ @@ -273,7 +271,14 @@ class ChainwebStream extends EventEmitter { } private _validateServerConfig(config: IChainwebStreamConfig): boolean { - const { network, type, id, maxConf, heartbeat } = config; + const { + network, + type, + id, + maxConf, + heartbeat, + v: serverProtocolVersion, + } = config; const fail = (msg: string): false => { this.disconnect(); @@ -310,14 +315,32 @@ class ChainwebStream extends EventEmitter { this.emit( 'warn', - `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is smaller than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, + `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is smaller than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops.`, ); this.heartbeatTimeoutMs = newHeartbeatTimeoutMs; this._resetHeartbeatTimeout(); // reset to the new interval } - // TODO validate version + if (!isMajorCompatible(CLIENT_PROTOCOL_VERSION, serverProtocolVersion)) { + return fail( + `Protocol mismatch: Client protocol version ${CLIENT_PROTOCOL_VERSION} is incompatible with server protocol version ${serverProtocolVersion}.`, + ); + } + + if (!isMinorCompatible(CLIENT_PROTOCOL_VERSION, serverProtocolVersion)) { + if (isClientAhead(CLIENT_PROTOCOL_VERSION, serverProtocolVersion)) { + this.emit( + 'warn', + `Client protocol version ${CLIENT_PROTOCOL_VERSION} is ahead of server protocol version ${serverProtocolVersion}. Some client features may not be supported by the server.`, + ); + } else { + this.emit( + 'warn', + `Server protocol version ${serverProtocolVersion} is ahead of client protocol version ${CLIENT_PROTOCOL_VERSION}. Some server features may not be supported by the client.`, + ); + } + } return true; } diff --git a/packages/libs/chainweb-stream-client/src/semver.ts b/packages/libs/chainweb-stream-client/src/semver.ts new file mode 100644 index 0000000000..de33ea6c68 --- /dev/null +++ b/packages/libs/chainweb-stream-client/src/semver.ts @@ -0,0 +1,45 @@ +export interface IVersion { + major: number; + minor: number; + patch: number | string; +} + +export function parse(versionStr: IVersion | string): IVersion { + if (typeof versionStr !== 'string') { + return versionStr; + } + const versionStrParts = versionStr.split('.'); + const [major, minor, patchNum] = versionStrParts.map((n) => Number(n)); + const patch = isNaN(patchNum) ? versionStrParts[2] : patchNum; + return { major, minor, patch }; +} + +export function isMajorCompatible( + clientMixed: string | IVersion, + serverMixed: string | IVersion, +): boolean { + const [client, server] = [clientMixed, serverMixed].map((version) => + parse(version), + ); + return client.major === server.major; +} + +export function isMinorCompatible( + clientMixed: string | IVersion, + serverMixed: string | IVersion, +): boolean { + const [client, server] = [clientMixed, serverMixed].map((version) => + parse(version), + ); + return client.minor === server.minor; +} + +export function isClientAhead( + clientMixed: string | IVersion, + serverMixed: string | IVersion, +): boolean { + const [client, server] = [clientMixed, serverMixed].map((version) => + parse(version), + ); + return client.minor > server.minor; +} From d2cb01b8db40c92d8df37247d084baadbe424338 Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:54:05 +0200 Subject: [PATCH 05/18] added comment about reconnection sliding window edge case --- packages/libs/chainweb-stream-client/src/index.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index 759170e5e2..7ec7d9e040 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -371,6 +371,14 @@ class ChainwebStream extends EventEmitter { // pass query string args. minHeight and limit for now. const urlParamArgs: string[][] = []; + // TODO + // This currently has a sliding window edge case when reconnecting + // chains can currently be +/- 3 height from each other + // lastHeight can be the height of the leading chain + // to never miss events from other chains we need to send lastHeight - 3 + // i.e. the lowest possible height of the trailing chain + // but in that case we would see duplicate events + // Should we track emitted events and suppress duplicates? if (this.limit !== undefined) { urlParamArgs.push(['limit', String(this.limit)]); } From b84f633245b66377173436b1cf2f7d80d1ccfb2e Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:11:26 +0200 Subject: [PATCH 06/18] added export type decorator, added rush change entry --- ...lient-initial-config-validate_2023-05-09-17-10.json | 10 ++++++++++ .../etc/chainweb-stream-client.api.md | 10 ++-------- packages/libs/chainweb-stream-client/src/types.ts | 6 ++++++ 3 files changed, 18 insertions(+), 8 deletions(-) create mode 100644 common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json diff --git a/common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json b/common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json new file mode 100644 index 0000000000..0e2e67aaa4 --- /dev/null +++ b/common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@kadena/chainweb-stream-client", + "comment": "Added wire protocol compatibility checks", + "type": "patch" + } + ], + "packageName": "@kadena/chainweb-stream-client" +} \ No newline at end of file diff --git a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md index 60839d0325..b5207d896a 100644 --- a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md +++ b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md @@ -82,7 +82,7 @@ export interface IChainwebStreamConfig { v: string; } -// @public (undocumented) +// @alpha (undocumented) export interface IChainwebStreamConstructorArgs { // (undocumented) confirmationDepth?: number; @@ -100,8 +100,6 @@ export interface IChainwebStreamConstructorArgs { maxReconnects?: number; // (undocumented) network: string; - // Warning: (ae-incompatible-release-tags) The symbol "type" is marked as @public, but its signature references "ChainwebStreamType" which is marked as @alpha - // // (undocumented) type: ChainwebStreamType; } @@ -136,14 +134,10 @@ export interface IEventTransaction extends ITransactionBase { params: string[]; } -// @public (undocumented) +// @alpha (undocumented) export interface IInitialEvent { - // Warning: (ae-incompatible-release-tags) The symbol "config" is marked as @public, but its signature references "IChainwebStreamConfig" which is marked as @alpha - // // (undocumented) config: IChainwebStreamConfig; - // Warning: (ae-incompatible-release-tags) The symbol "data" is marked as @public, but its signature references "ITransaction" which is marked as @alpha - // // (undocumented) data: ITransaction[]; } diff --git a/packages/libs/chainweb-stream-client/src/types.ts b/packages/libs/chainweb-stream-client/src/types.ts index 7c7c34d388..f3585ff07e 100644 --- a/packages/libs/chainweb-stream-client/src/types.ts +++ b/packages/libs/chainweb-stream-client/src/types.ts @@ -60,11 +60,17 @@ export interface IChainwebStreamConfig { v: string; } +/** + * @alpha + */ export interface IInitialEvent { config: IChainwebStreamConfig; data: ITransaction[]; } +/** + * @alpha + */ export interface IChainwebStreamConstructorArgs { network: string; type: ChainwebStreamType; From 4fc0599adb4ac278de56fe3638e82847d20b241f Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:17:50 +0200 Subject: [PATCH 07/18] removed stale TODO comments --- packages/libs/chainweb-stream-client/src/index.ts | 8 +++----- packages/libs/chainweb-stream-client/src/types.ts | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index 7ec7d9e040..d133fc2430 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -55,8 +55,7 @@ class ChainwebStream extends EventEmitter { public maxReconnects: number; // depth at which a block/transaction is considered confirmed - // TODO client <-> server should agree about this somehow, because of edge case: - // if backend CONFIRMATION_DEPTH < client CONFIRMATION_DEPTH, then the client will not emit and confirmed events + // must be less than or equal to the corresponding server configuration public confirmationDepth: number; // desired eventsource state @@ -95,8 +94,7 @@ class ChainwebStream extends EventEmitter { maxReconnects, heartbeatTimeout, confirmationDepth, - }: /* TODO quiet, */ - IChainwebStreamConstructorArgs) { + }: IChainwebStreamConstructorArgs) { super(); this.network = network; this.type = type; @@ -134,7 +132,7 @@ class ChainwebStream extends EventEmitter { public disconnect = (): void => { this._desiredState = ConnectionState.Closed; this._eventSource?.close(); - // TODO Should we null out this._eventSource? + this._eventSource = undefined; this._stopHeartbeatMonitor(); this._debug('disconnect'); }; diff --git a/packages/libs/chainweb-stream-client/src/types.ts b/packages/libs/chainweb-stream-client/src/types.ts index f3585ff07e..40a081522f 100644 --- a/packages/libs/chainweb-stream-client/src/types.ts +++ b/packages/libs/chainweb-stream-client/src/types.ts @@ -76,7 +76,6 @@ export interface IChainwebStreamConstructorArgs { type: ChainwebStreamType; id: string; host: string; - // TODO network ? for sanity check/safety limit?: number; connectTimeout?: number; heartbeatTimeout?: number; From f4a1525f1ed38a5fb8ee6c99155ce99e399951f9 Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:22:27 +0200 Subject: [PATCH 08/18] moved edge case comment to correct place --- packages/libs/chainweb-stream-client/src/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index d133fc2430..ba0ae58ea0 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -369,6 +369,9 @@ class ChainwebStream extends EventEmitter { // pass query string args. minHeight and limit for now. const urlParamArgs: string[][] = []; + if (this.limit !== undefined) { + urlParamArgs.push(['limit', String(this.limit)]); + } // TODO // This currently has a sliding window edge case when reconnecting // chains can currently be +/- 3 height from each other @@ -377,9 +380,6 @@ class ChainwebStream extends EventEmitter { // i.e. the lowest possible height of the trailing chain // but in that case we would see duplicate events // Should we track emitted events and suppress duplicates? - if (this.limit !== undefined) { - urlParamArgs.push(['limit', String(this.limit)]); - } if (this._lastHeight !== undefined) { urlParamArgs.push(['minHeight', String(this._lastHeight)]); } From ee6ffaa94f02c3788ebeab4fd009b660d75bb56c Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:44:19 +0200 Subject: [PATCH 09/18] resume from last max height - 3 to guarantee we will not miss transactions (but this introduces duplicates) --- packages/libs/chainweb-stream-client/src/index.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index ba0ae58ea0..0ceb691092 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -372,16 +372,12 @@ class ChainwebStream extends EventEmitter { if (this.limit !== undefined) { urlParamArgs.push(['limit', String(this.limit)]); } - // TODO - // This currently has a sliding window edge case when reconnecting - // chains can currently be +/- 3 height from each other - // lastHeight can be the height of the leading chain - // to never miss events from other chains we need to send lastHeight - 3 - // i.e. the lowest possible height of the trailing chain - // but in that case we would see duplicate events - // Should we track emitted events and suppress duplicates? + // TODO This reconnection strategy of -3 from last max height + // guarrantees that we will not miss events, but it also means + // that confirmed transactions will be emitted more than once + // Discussion here: https://github.com/kadena-community/kadena.js/issues/275 if (this._lastHeight !== undefined) { - urlParamArgs.push(['minHeight', String(this._lastHeight)]); + urlParamArgs.push(['minHeight', String(this._lastHeight - 3)]); } if (urlParamArgs.length) { path += '?' + new URLSearchParams(urlParamArgs).toString(); From 14fc5d15fabf9506b2eafc3d400e22f08fdfd156 Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Mon, 8 May 2023 19:36:02 +0200 Subject: [PATCH 10/18] stream-client: added parsing config from intial event and validating compatibility --- .../libs/chainweb-stream-client/README.md | 35 +++--- .../etc/chainweb-stream-client.api.md | 36 +++++- .../src/examples/index.ts | 1 + .../libs/chainweb-stream-client/src/index.ts | 110 +++++++++++++++--- .../libs/chainweb-stream-client/src/types.ts | 15 +++ 5 files changed, 164 insertions(+), 33 deletions(-) diff --git a/packages/libs/chainweb-stream-client/README.md b/packages/libs/chainweb-stream-client/README.md index 377ef4afd6..06b4bf0f4e 100644 --- a/packages/libs/chainweb-stream-client/README.md +++ b/packages/libs/chainweb-stream-client/README.md @@ -47,15 +47,31 @@ client.connect(); Find more detailed examples under `src/examples`. +## Constructor Options + +| Key | Required | Description | Example Values | +| ---------------- | :------: | ----------- | ------ | +| network | Yes | Chainweb network | `mainnet01|testnet04|...` | +| type | Yes | Transaction type to stream (event/account) | `event|account` | +| id | Yes | Account ID or module/event name | `k:abcdef01234..` | +| host | Yes | Chainweb-SSE backend URL | `http://localhost:4000` | +| limit | No | Initial data load limit | 100 | +| connectTimeout | No | Connection timeout in ms | 10_000 | +| heartbeatTimeout | No | Stale connection timeout in ms | 30_000 | +| maxReconnects | No | How many reconnections to attempt before giving up | 5 | +| confirmationDepth | No | How many confirmations for a transaction to be considered final | 6 | + ## Considerations ⚠️ ### Ensure configuration compatibility -Make sure that your client and server `confirmationDepth` and `heartbeatTimeout` values are compatible. +Make sure that your client and server `confirmationDepth` values are compatible. + +If your client `confirmationDepth` is larger than the server's, the `confirmed` event will never fire. The client will automatically detect this condition, emit an `error` event and disconnect. -If your client `heartbeatTimeout` is smaller than the server heartbeat interval, the client will keep disconnecting when the connection is silent (no data.) +If your client's configured network does not match the server's, the client will emit an `error` event and disconnect. -If your client `confirmationDepth` is larger than the server's, the `confirmed` event will never fire. +If your client `heartbeatTimeout` is smaller than the server heartbeat interval, the client will automatically adapt its heartbeat timeout to 2500ms larger than the server value. ### Handle temporary and permanent connection failures @@ -63,19 +79,6 @@ When the connection is interrupted or determined to be stale (no heartbeats rece It is recommended to handle the fired [will-reconnect](#will-reconnect) and [error](#error) events. -## Constructor Options - -| Key | Required | Description | Example Values | -| ---------------- | :------: | ----------- | ------ | -| type | Yes | Transaction type to stream (event/account) | `event` | `account` | -| id | Yes | Account ID or module/event name | `k:abcdef01234..` | -| host | Yes | Chainweb-SSE backend URL | `http://localhost:4000` | -| limit | No | Initial data load limit | 100 | -| connectTimeout | No | Connection timeout in ms | 10_000 | -| heartbeatTimeout | No | Stale connection timeout in ms | 30_000 | -| maxReconnects | No | How many reconnections to attempt before giving up | 5 | -| confirmationDepth | No | How many confirmations for a transaction to be considered final | 6 | - ## Events ChainwebStreamClient is an EventEmitter. You can subscribe to the following events using `.on('event-name', callback)`. diff --git a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md index d9a47c4681..60839d0325 100644 --- a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md +++ b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md @@ -8,7 +8,7 @@ import EventEmitter from 'eventemitter2'; // @alpha (undocumented) class ChainwebStream extends EventEmitter { - constructor({ host, type, id, limit, connectTimeout, maxReconnects, heartbeatTimeout, confirmationDepth, }: IChainwebStreamConstructorArgs); + constructor({ network, host, type, id, limit, connectTimeout, maxReconnects, heartbeatTimeout, confirmationDepth, }: IChainwebStreamConstructorArgs); // (undocumented) confirmationDepth: number; connect: () => void; @@ -25,6 +25,8 @@ class ChainwebStream extends EventEmitter { limit: number | undefined; // (undocumented) maxReconnects: number; + // (undocumented) + network: string; get state(): ConnectionState; // (undocumented) type: ChainwebStreamType; @@ -65,6 +67,22 @@ export interface IAccountTransaction extends ITransactionBase { } // @alpha (undocumented) +export interface IChainwebStreamConfig { + // (undocumented) + heartbeat: number; + // (undocumented) + id: string; + // (undocumented) + maxConf: number; + // (undocumented) + network: string; + // (undocumented) + type: ChainwebStreamType; + // (undocumented) + v: string; +} + +// @public (undocumented) export interface IChainwebStreamConstructorArgs { // (undocumented) confirmationDepth?: number; @@ -81,6 +99,10 @@ export interface IChainwebStreamConstructorArgs { // (undocumented) maxReconnects?: number; // (undocumented) + network: string; + // Warning: (ae-incompatible-release-tags) The symbol "type" is marked as @public, but its signature references "ChainwebStreamType" which is marked as @alpha + // + // (undocumented) type: ChainwebStreamType; } @@ -114,6 +136,18 @@ export interface IEventTransaction extends ITransactionBase { params: string[]; } +// @public (undocumented) +export interface IInitialEvent { + // Warning: (ae-incompatible-release-tags) The symbol "config" is marked as @public, but its signature references "IChainwebStreamConfig" which is marked as @alpha + // + // (undocumented) + config: IChainwebStreamConfig; + // Warning: (ae-incompatible-release-tags) The symbol "data" is marked as @public, but its signature references "ITransaction" which is marked as @alpha + // + // (undocumented) + data: ITransaction[]; +} + // @alpha (undocumented) export type ITransaction = IEventTransaction | IAccountTransaction; diff --git a/packages/libs/chainweb-stream-client/src/examples/index.ts b/packages/libs/chainweb-stream-client/src/examples/index.ts index 79523ef6fc..d711361c6c 100644 --- a/packages/libs/chainweb-stream-client/src/examples/index.ts +++ b/packages/libs/chainweb-stream-client/src/examples/index.ts @@ -2,6 +2,7 @@ import ChainwebStreamClient, { ITransaction } from '../'; const client: ChainwebStreamClient = new ChainwebStreamClient({ // required + network: 'mainnet01', type: 'account', id: 'k:e7f7130f359fb1f8c87873bf858a0e9cbc3c1059f62ae715ec72e760b055e9f3', host: 'http://localhost:4000/', diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index f9351d68dc..d5b9462df0 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -6,11 +6,13 @@ import { HeartbeatTimeoutError, } from './errors'; import { + ConnectionState, IChainwebStreamConstructorArgs, ChainwebStreamType, ITransaction, - ConnectionState, IDebugMsgObject, + IInitialEvent, + IChainwebStreamConfig, } from './types'; export * from './types'; @@ -18,15 +20,21 @@ export * from './types'; // TODO confirmation depth should be sent from the backend // so that it is always in sync with the client const DEFAULT_CONFIRMATION_DEPTH: number = 6; -const DEFAULT_CONNECT_TIMEOUT: number = 25_000; +const DEFAULT_CONNECT_TIMEOUT: number = 28_000; const DEFAULT_HEARTBEAT_TIMEOUT: number = 35_000; const DEFAULT_MAX_RECONNECTS: number = 6; const DEFAULT_LIMIT: number = 100; +// TODO +// const WIRE_PROTOCOL_VERSION: string = "0.0.2"; + /** * @alpha */ class ChainwebStream extends EventEmitter { + // chainweb network, e.g. mainnet01 + public network: string; + // chainweb-stream backend host, full URI - e.g. https://sse.chainweb.com public host: string; @@ -80,6 +88,7 @@ class ChainwebStream extends EventEmitter { private _reconnectTimer?: ReturnType; public constructor({ + network, host, type, id, @@ -88,8 +97,10 @@ class ChainwebStream extends EventEmitter { maxReconnects, heartbeatTimeout, confirmationDepth, - }: IChainwebStreamConstructorArgs) { + }: /* TODO quiet, */ + IChainwebStreamConstructorArgs) { super(); + this.network = network; this.type = type; this.id = id; this.host = host.endsWith('/') ? host.substr(0, host.length - 1) : host; // strip trailing slash if provided @@ -158,7 +169,7 @@ class ChainwebStream extends EventEmitter { 'ChainwebStream._handleConnect called without an _eventSource. This should never happen', ); } - _eventSource.addEventListener('initial', this._handleData); + _eventSource.addEventListener('initial', this._handleInitial); _eventSource.addEventListener('message', this._handleData); _eventSource.addEventListener('ping', this._resetHeartbeatTimeout); this._resetHeartbeatTimeout(); @@ -194,8 +205,8 @@ class ChainwebStream extends EventEmitter { this._debug('_handleError', { message, willRetry, timeout }); if (!willRetry) { - this.emit('error', message); // TODO need to wrap in Error ? - this._desiredState = ConnectionState.Closed; + this._emitError(message); + this.disconnect(); return; } @@ -213,22 +224,36 @@ class ChainwebStream extends EventEmitter { this._reconnectTimer = setTimeout(this.connect, timeout); }; + private _handleInitial = (msg: MessageEvent): void => { + this._debug('_handleData', { length: msg.data?.length }); + + const message = JSON.parse(msg.data) as IInitialEvent; + + const { config, data } = message; + + if (!this._validateServerConfig(config)) { + return; + } + + this._processData(data); + }; + private _handleData = (msg: MessageEvent): void => { this._debug('_handleData', { length: msg.data?.length }); - const message = JSON.parse(msg.data) as ITransaction | ITransaction[]; + const message = JSON.parse(msg.data) as ITransaction; + + this._processData([message]); - const data: ITransaction[] = Array.isArray(message) ? message : [message]; + this._resetHeartbeatTimeout(); + }; - const newMinHeight = data.reduce( + private _processData(data: ITransaction[]): void { + this._lastHeight = data.reduce( (highest, { height }) => (height > highest ? height : highest), - 0, + this._lastHeight ?? 0, ); - if (!this._lastHeight || newMinHeight > this._lastHeight) { - this._lastHeight = newMinHeight; - } - for (const element of data) { const { meta: { confirmations }, @@ -240,9 +265,62 @@ class ChainwebStream extends EventEmitter { this.emit('unconfirmed', element); } } + } - this._resetHeartbeatTimeout(); - }; + private _emitError(msg: string): void { + console.error(msg); + this.emit('error', msg); + } + + private _validateServerConfig(config: IChainwebStreamConfig): boolean { + const { network, type, id, maxConf, heartbeat } = config; + + const fail = (msg: string): false => { + this.disconnect(); + this._emitError(msg); + return false; + }; + + if (network !== this.network) { + return fail( + `Network mismatch: wanted ${this.network}, server is ${network}.`, + ); + } + + if (type !== this.type) { + return fail( + `Parameter mismatch: Expected transactions of type "${this.type}" but received "${type}". This should never happen.`, + ); + } + + if (id !== this.id) { + return fail( + `Parameter mismatch: Expected transactions for ${this.id} but received ${id}. This should never happen.`, + ); + } + + if (maxConf < this.confirmationDepth) { + return fail( + `Configuration mismatch: Client confirmation depth (${this.confirmationDepth}) is larger than server (${maxConf}). Events will never be considered confirmed on the client.`, + ); + } + + if (heartbeat < this.heartbeatTimeoutMs) { + const newHeartbeatTimeoutMs = heartbeat + 2_500; // give a buffer of 2.5s + + this.emit( + 'warn', + `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is larger than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, + ); + + this.heartbeatTimeoutMs = newHeartbeatTimeoutMs; + this._resetHeartbeatTimeout(); // reset to the new interval + } + + // TODO validate version + + return true; + } private _stopHeartbeatMonitor = (): void => { if (this._heartbeatTimer) { diff --git a/packages/libs/chainweb-stream-client/src/types.ts b/packages/libs/chainweb-stream-client/src/types.ts index 8e96ace470..7c7c34d388 100644 --- a/packages/libs/chainweb-stream-client/src/types.ts +++ b/packages/libs/chainweb-stream-client/src/types.ts @@ -51,7 +51,22 @@ export type ITransaction = IEventTransaction | IAccountTransaction; /** * @alpha */ +export interface IChainwebStreamConfig { + network: string; + type: ChainwebStreamType; + id: string; + maxConf: number; + heartbeat: number; + v: string; +} + +export interface IInitialEvent { + config: IChainwebStreamConfig; + data: ITransaction[]; +} + export interface IChainwebStreamConstructorArgs { + network: string; type: ChainwebStreamType; id: string; host: string; From b32b5ba36a1ceedcd1a2b2d991769013a517e3ec Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:32:40 +0200 Subject: [PATCH 11/18] fixed heartbeat check bug, logic was inverted --- packages/libs/chainweb-stream-client/src/index.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index d5b9462df0..7488d448b7 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -305,12 +305,12 @@ class ChainwebStream extends EventEmitter { ); } - if (heartbeat < this.heartbeatTimeoutMs) { + if (heartbeat > this.heartbeatTimeoutMs) { const newHeartbeatTimeoutMs = heartbeat + 2_500; // give a buffer of 2.5s this.emit( 'warn', - `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is larger than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, + `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is smaller than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, ); this.heartbeatTimeoutMs = newHeartbeatTimeoutMs; From 5c0dcf2f1b12906d777bfd2e031dba8106218bd9 Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:40:58 +0200 Subject: [PATCH 12/18] example tidy up --- .../src/examples/index.ts | 34 ++++++++++--------- 1 file changed, 18 insertions(+), 16 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/examples/index.ts b/packages/libs/chainweb-stream-client/src/examples/index.ts index d711361c6c..a50b2bd17d 100644 --- a/packages/libs/chainweb-stream-client/src/examples/index.ts +++ b/packages/libs/chainweb-stream-client/src/examples/index.ts @@ -10,42 +10,48 @@ const client: ChainwebStreamClient = new ChainwebStreamClient({ // optional limit: 100, connectTimeout: 15_000, - heartbeatTimeout: 45_000, + // intentionally setting this to a very small value, should sync the server setting + heartbeatTimeout: 1_000, maxReconnects: 5, confirmationDepth: 6, }); // debug callback allows you to log conditionally -client.on('debug', (...args) => - console.debug(new Date().toISOString(), '[DEBUG]', ...args), -); +client.on('debug', (...args) => { + console.debug('[DEBUG]', ...args); +}); +// first connection success +// only fires once per .connect() client.on('connect', () => { - console.log('connect callback'); - // first connection success - // only fires once per .connect() + console.log('[CONNECT]'); +}); + +// connection was lost and will be retried; show warning to the user +client.on('warn', (...args) => { + console.log('[WARN]', ...args); }); // connection was lost and will be retried; show warning to the user client.on('will-reconnect', (...args) => { - console.log('will-reconnect callback', ...args); + console.log('[WILL-RECONNECT]', ...args); }); // reconnected successfully; hide user warning if still showing client.on('reconnect', () => { - console.log('reconnect callback'); + console.log('[RECONNECT]'); }); // fatal error, no reconnection attempts will be made // show error state to user client.on('error', (...args) => { - console.error('error callback', ...args); + console.error('[ERROR]', ...args); }); // received transaction with confirmation depth < client.confirmationDepth client.on('unconfirmed', (newTx: ITransaction) => console.log( - 'unconfirmed callback', + '[UNCONFIRMED]', newTx.meta.id, `conf=${newTx.meta.confirmations}`, ), @@ -53,11 +59,7 @@ client.on('unconfirmed', (newTx: ITransaction) => // received transaction with confirmation depth >= client.confirmationDepth client.on('confirmed', (newTx: ITransaction) => - console.log( - 'confirmed callback', - newTx.meta.id, - `conf=${newTx.meta.confirmations}`, - ), + console.log('[CONFIRMED]', newTx.meta.id, `conf=${newTx.meta.confirmations}`), ); client.connect(); From 2959e261ab96a192413c97b5579a98a8901e212b Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:53:40 +0200 Subject: [PATCH 13/18] protocol compatibility: validate wire protocol version --- .../libs/chainweb-stream-client/src/index.ts | 39 ++++++++++++---- .../libs/chainweb-stream-client/src/semver.ts | 45 +++++++++++++++++++ 2 files changed, 76 insertions(+), 8 deletions(-) create mode 100644 packages/libs/chainweb-stream-client/src/semver.ts diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index 7488d448b7..759170e5e2 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -14,20 +14,18 @@ import { IInitialEvent, IChainwebStreamConfig, } from './types'; +import { isMajorCompatible, isMinorCompatible, isClientAhead } from './semver'; export * from './types'; -// TODO confirmation depth should be sent from the backend -// so that it is always in sync with the client +const CLIENT_PROTOCOL_VERSION: string = '0.0.2'; + const DEFAULT_CONFIRMATION_DEPTH: number = 6; const DEFAULT_CONNECT_TIMEOUT: number = 28_000; const DEFAULT_HEARTBEAT_TIMEOUT: number = 35_000; const DEFAULT_MAX_RECONNECTS: number = 6; const DEFAULT_LIMIT: number = 100; -// TODO -// const WIRE_PROTOCOL_VERSION: string = "0.0.2"; - /** * @alpha */ @@ -273,7 +271,14 @@ class ChainwebStream extends EventEmitter { } private _validateServerConfig(config: IChainwebStreamConfig): boolean { - const { network, type, id, maxConf, heartbeat } = config; + const { + network, + type, + id, + maxConf, + heartbeat, + v: serverProtocolVersion, + } = config; const fail = (msg: string): false => { this.disconnect(); @@ -310,14 +315,32 @@ class ChainwebStream extends EventEmitter { this.emit( 'warn', - `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is smaller than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops`, + `Configuration mismatch: Client heartbeat interval (${this.heartbeatTimeoutMs}ms) is smaller than server heartbeat interval (${heartbeat}ms). Adapting to ${newHeartbeatTimeoutMs}ms to avoid reconnection loops.`, ); this.heartbeatTimeoutMs = newHeartbeatTimeoutMs; this._resetHeartbeatTimeout(); // reset to the new interval } - // TODO validate version + if (!isMajorCompatible(CLIENT_PROTOCOL_VERSION, serverProtocolVersion)) { + return fail( + `Protocol mismatch: Client protocol version ${CLIENT_PROTOCOL_VERSION} is incompatible with server protocol version ${serverProtocolVersion}.`, + ); + } + + if (!isMinorCompatible(CLIENT_PROTOCOL_VERSION, serverProtocolVersion)) { + if (isClientAhead(CLIENT_PROTOCOL_VERSION, serverProtocolVersion)) { + this.emit( + 'warn', + `Client protocol version ${CLIENT_PROTOCOL_VERSION} is ahead of server protocol version ${serverProtocolVersion}. Some client features may not be supported by the server.`, + ); + } else { + this.emit( + 'warn', + `Server protocol version ${serverProtocolVersion} is ahead of client protocol version ${CLIENT_PROTOCOL_VERSION}. Some server features may not be supported by the client.`, + ); + } + } return true; } diff --git a/packages/libs/chainweb-stream-client/src/semver.ts b/packages/libs/chainweb-stream-client/src/semver.ts new file mode 100644 index 0000000000..de33ea6c68 --- /dev/null +++ b/packages/libs/chainweb-stream-client/src/semver.ts @@ -0,0 +1,45 @@ +export interface IVersion { + major: number; + minor: number; + patch: number | string; +} + +export function parse(versionStr: IVersion | string): IVersion { + if (typeof versionStr !== 'string') { + return versionStr; + } + const versionStrParts = versionStr.split('.'); + const [major, minor, patchNum] = versionStrParts.map((n) => Number(n)); + const patch = isNaN(patchNum) ? versionStrParts[2] : patchNum; + return { major, minor, patch }; +} + +export function isMajorCompatible( + clientMixed: string | IVersion, + serverMixed: string | IVersion, +): boolean { + const [client, server] = [clientMixed, serverMixed].map((version) => + parse(version), + ); + return client.major === server.major; +} + +export function isMinorCompatible( + clientMixed: string | IVersion, + serverMixed: string | IVersion, +): boolean { + const [client, server] = [clientMixed, serverMixed].map((version) => + parse(version), + ); + return client.minor === server.minor; +} + +export function isClientAhead( + clientMixed: string | IVersion, + serverMixed: string | IVersion, +): boolean { + const [client, server] = [clientMixed, serverMixed].map((version) => + parse(version), + ); + return client.minor > server.minor; +} From 6c7401602d7086356a74ef8f05287f2fc1fc00ce Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 18:54:05 +0200 Subject: [PATCH 14/18] added comment about reconnection sliding window edge case --- packages/libs/chainweb-stream-client/src/index.ts | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index 759170e5e2..7ec7d9e040 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -371,6 +371,14 @@ class ChainwebStream extends EventEmitter { // pass query string args. minHeight and limit for now. const urlParamArgs: string[][] = []; + // TODO + // This currently has a sliding window edge case when reconnecting + // chains can currently be +/- 3 height from each other + // lastHeight can be the height of the leading chain + // to never miss events from other chains we need to send lastHeight - 3 + // i.e. the lowest possible height of the trailing chain + // but in that case we would see duplicate events + // Should we track emitted events and suppress duplicates? if (this.limit !== undefined) { urlParamArgs.push(['limit', String(this.limit)]); } From 19d4647e0da87b73fcb92db8e495265069bc4ee4 Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:11:26 +0200 Subject: [PATCH 15/18] added export type decorator, added rush change entry --- ...lient-initial-config-validate_2023-05-09-17-10.json | 10 ++++++++++ .../etc/chainweb-stream-client.api.md | 10 ++-------- packages/libs/chainweb-stream-client/src/types.ts | 6 ++++++ 3 files changed, 18 insertions(+), 8 deletions(-) create mode 100644 common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json diff --git a/common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json b/common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json new file mode 100644 index 0000000000..0e2e67aaa4 --- /dev/null +++ b/common/changes/@kadena/chainweb-stream-client/chainweb-stream-client-initial-config-validate_2023-05-09-17-10.json @@ -0,0 +1,10 @@ +{ + "changes": [ + { + "packageName": "@kadena/chainweb-stream-client", + "comment": "Added wire protocol compatibility checks", + "type": "patch" + } + ], + "packageName": "@kadena/chainweb-stream-client" +} \ No newline at end of file diff --git a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md index 60839d0325..b5207d896a 100644 --- a/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md +++ b/packages/libs/chainweb-stream-client/etc/chainweb-stream-client.api.md @@ -82,7 +82,7 @@ export interface IChainwebStreamConfig { v: string; } -// @public (undocumented) +// @alpha (undocumented) export interface IChainwebStreamConstructorArgs { // (undocumented) confirmationDepth?: number; @@ -100,8 +100,6 @@ export interface IChainwebStreamConstructorArgs { maxReconnects?: number; // (undocumented) network: string; - // Warning: (ae-incompatible-release-tags) The symbol "type" is marked as @public, but its signature references "ChainwebStreamType" which is marked as @alpha - // // (undocumented) type: ChainwebStreamType; } @@ -136,14 +134,10 @@ export interface IEventTransaction extends ITransactionBase { params: string[]; } -// @public (undocumented) +// @alpha (undocumented) export interface IInitialEvent { - // Warning: (ae-incompatible-release-tags) The symbol "config" is marked as @public, but its signature references "IChainwebStreamConfig" which is marked as @alpha - // // (undocumented) config: IChainwebStreamConfig; - // Warning: (ae-incompatible-release-tags) The symbol "data" is marked as @public, but its signature references "ITransaction" which is marked as @alpha - // // (undocumented) data: ITransaction[]; } diff --git a/packages/libs/chainweb-stream-client/src/types.ts b/packages/libs/chainweb-stream-client/src/types.ts index 7c7c34d388..f3585ff07e 100644 --- a/packages/libs/chainweb-stream-client/src/types.ts +++ b/packages/libs/chainweb-stream-client/src/types.ts @@ -60,11 +60,17 @@ export interface IChainwebStreamConfig { v: string; } +/** + * @alpha + */ export interface IInitialEvent { config: IChainwebStreamConfig; data: ITransaction[]; } +/** + * @alpha + */ export interface IChainwebStreamConstructorArgs { network: string; type: ChainwebStreamType; From f4d829bf87fecaec5292f7a81bbe846a2d463b4e Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:17:50 +0200 Subject: [PATCH 16/18] removed stale TODO comments --- packages/libs/chainweb-stream-client/src/index.ts | 8 +++----- packages/libs/chainweb-stream-client/src/types.ts | 1 - 2 files changed, 3 insertions(+), 6 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index 7ec7d9e040..d133fc2430 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -55,8 +55,7 @@ class ChainwebStream extends EventEmitter { public maxReconnects: number; // depth at which a block/transaction is considered confirmed - // TODO client <-> server should agree about this somehow, because of edge case: - // if backend CONFIRMATION_DEPTH < client CONFIRMATION_DEPTH, then the client will not emit and confirmed events + // must be less than or equal to the corresponding server configuration public confirmationDepth: number; // desired eventsource state @@ -95,8 +94,7 @@ class ChainwebStream extends EventEmitter { maxReconnects, heartbeatTimeout, confirmationDepth, - }: /* TODO quiet, */ - IChainwebStreamConstructorArgs) { + }: IChainwebStreamConstructorArgs) { super(); this.network = network; this.type = type; @@ -134,7 +132,7 @@ class ChainwebStream extends EventEmitter { public disconnect = (): void => { this._desiredState = ConnectionState.Closed; this._eventSource?.close(); - // TODO Should we null out this._eventSource? + this._eventSource = undefined; this._stopHeartbeatMonitor(); this._debug('disconnect'); }; diff --git a/packages/libs/chainweb-stream-client/src/types.ts b/packages/libs/chainweb-stream-client/src/types.ts index f3585ff07e..40a081522f 100644 --- a/packages/libs/chainweb-stream-client/src/types.ts +++ b/packages/libs/chainweb-stream-client/src/types.ts @@ -76,7 +76,6 @@ export interface IChainwebStreamConstructorArgs { type: ChainwebStreamType; id: string; host: string; - // TODO network ? for sanity check/safety limit?: number; connectTimeout?: number; heartbeatTimeout?: number; From 6e8b8c671cc99b3b751337ac641e1bf5fb56bbcd Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:22:27 +0200 Subject: [PATCH 17/18] moved edge case comment to correct place --- packages/libs/chainweb-stream-client/src/index.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index d133fc2430..ba0ae58ea0 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -369,6 +369,9 @@ class ChainwebStream extends EventEmitter { // pass query string args. minHeight and limit for now. const urlParamArgs: string[][] = []; + if (this.limit !== undefined) { + urlParamArgs.push(['limit', String(this.limit)]); + } // TODO // This currently has a sliding window edge case when reconnecting // chains can currently be +/- 3 height from each other @@ -377,9 +380,6 @@ class ChainwebStream extends EventEmitter { // i.e. the lowest possible height of the trailing chain // but in that case we would see duplicate events // Should we track emitted events and suppress duplicates? - if (this.limit !== undefined) { - urlParamArgs.push(['limit', String(this.limit)]); - } if (this._lastHeight !== undefined) { urlParamArgs.push(['minHeight', String(this._lastHeight)]); } From 54ce94ec9c3ab26d5fca54d270b2a30dd3c4c8bc Mon Sep 17 00:00:00 2001 From: Takadenoshi Date: Tue, 9 May 2023 19:44:19 +0200 Subject: [PATCH 18/18] resume from last max height - 3 to guarantee we will not miss transactions (but this introduces duplicates) --- packages/libs/chainweb-stream-client/src/index.ts | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/packages/libs/chainweb-stream-client/src/index.ts b/packages/libs/chainweb-stream-client/src/index.ts index ba0ae58ea0..0ceb691092 100644 --- a/packages/libs/chainweb-stream-client/src/index.ts +++ b/packages/libs/chainweb-stream-client/src/index.ts @@ -372,16 +372,12 @@ class ChainwebStream extends EventEmitter { if (this.limit !== undefined) { urlParamArgs.push(['limit', String(this.limit)]); } - // TODO - // This currently has a sliding window edge case when reconnecting - // chains can currently be +/- 3 height from each other - // lastHeight can be the height of the leading chain - // to never miss events from other chains we need to send lastHeight - 3 - // i.e. the lowest possible height of the trailing chain - // but in that case we would see duplicate events - // Should we track emitted events and suppress duplicates? + // TODO This reconnection strategy of -3 from last max height + // guarrantees that we will not miss events, but it also means + // that confirmed transactions will be emitted more than once + // Discussion here: https://github.com/kadena-community/kadena.js/issues/275 if (this._lastHeight !== undefined) { - urlParamArgs.push(['minHeight', String(this._lastHeight)]); + urlParamArgs.push(['minHeight', String(this._lastHeight - 3)]); } if (urlParamArgs.length) { path += '?' + new URLSearchParams(urlParamArgs).toString();