diff --git a/src/Stream.js b/src/Stream.js index b4c1af4..75eed23 100644 --- a/src/Stream.js +++ b/src/Stream.js @@ -1,6 +1,7 @@ const messages = require('./messages'); const { appendUrlPath, base64URLEncode, objectHasOwnProperty } = require('./utils'); const { getLDHeaders, transformHeaders } = require('./headers'); +const { isHttpErrorRecoverable } = require('./errors'); // The underlying event source implementation is abstracted via the platform object, which should // have these three properties: @@ -16,6 +17,8 @@ const { getLDHeaders, transformHeaders } = require('./headers'); // interval between heartbeats from the LaunchDarkly streaming server. If this amount of time elapses // with no new data, the connection will be cycled. const streamReadTimeoutMillis = 5 * 60 * 1000; // 5 minutes +const maxRetryDelay = 30 * 1000; // Maximum retry delay 30 seconds. +const jitterRatio = 0.5; // Delay should be 50%-100% of calculated time. function Stream(platform, config, environment, diagnosticsAccumulator) { const baseUrl = config.streamUrl; @@ -24,7 +27,7 @@ function Stream(platform, config, environment, diagnosticsAccumulator) { const evalUrlPrefix = appendUrlPath(baseUrl, '/eval/' + environment); const useReport = config.useReport; const withReasons = config.evaluationReasons; - const streamReconnectDelay = config.streamReconnectDelay; + const baseReconnectDelay = config.streamReconnectDelay; const headers = getLDHeaders(platform, config); let firstConnectionErrorLogged = false; let es = null; @@ -33,6 +36,22 @@ function Stream(platform, config, environment, diagnosticsAccumulator) { let user = null; let hash = null; let handlers = null; + let retryCount = 0; + + function backoff() { + const delay = baseReconnectDelay * Math.pow(2, retryCount); + return delay > maxRetryDelay ? maxRetryDelay : delay; + } + + function jitter(computedDelayMillis) { + return computedDelayMillis - Math.trunc(Math.random() * jitterRatio * computedDelayMillis); + } + + function getNextRetryDelay() { + const delay = jitter(backoff()); + retryCount += 1; + return delay; + } stream.connect = function(newUser, newHash, newHandlers) { user = newUser; @@ -63,13 +82,31 @@ function Stream(platform, config, environment, diagnosticsAccumulator) { }; function handleError(err) { + // The event source may not produce a status. But the LaunchDarkly + // polyfill can. If we can get the status, then we should stop retrying + // on certain error codes. + if (err.status && typeof err.status === 'number' && !isHttpErrorRecoverable(err.status)) { + // If we encounter an unrecoverable condition, then we do not want to + // retry anymore. + closeConnection(); + logger.error(messages.unrecoverableStreamError(err)); + // Ensure any pending retry attempts are not done. + if (reconnectTimeoutReference) { + clearTimeout(reconnectTimeoutReference); + reconnectTimeoutReference = null; + } + return; + } + + const delay = getNextRetryDelay(); + if (!firstConnectionErrorLogged) { - logger.warn(messages.streamError(err, streamReconnectDelay)); + logger.warn(messages.streamError(err, delay)); firstConnectionErrorLogged = true; } logConnectionResult(false); closeConnection(); - tryConnect(streamReconnectDelay); + tryConnect(delay); } function tryConnect(delay) { @@ -123,6 +160,11 @@ function Stream(platform, config, environment, diagnosticsAccumulator) { } es.onerror = handleError; + + es.onopen = () => { + // If the connection is a success, then reset the retryCount. + retryCount = 0; + }; } } diff --git a/src/__tests__/Stream-test.js b/src/__tests__/Stream-test.js index 45e3f7a..733a287 100644 --- a/src/__tests__/Stream-test.js +++ b/src/__tests__/Stream-test.js @@ -170,6 +170,7 @@ describe('Stream', () => { const nAttempts = 5; for (let i = 0; i < nAttempts; i++) { es.mockError('test error'); + await sleepAsync(10); const created1 = await platform.testing.expectStream(); const es1 = created1.eventSource; @@ -185,6 +186,40 @@ describe('Stream', () => { } }); + it.each([401, 403])('does not reconnect after an unrecoverable error', async status => { + const config = { ...defaultConfig, streamReconnectDelay: 1, useReport: false }; + const stream = new Stream(platform, config, envName); + stream.connect(user); + + const created = await platform.testing.expectStream(); + const es = created.eventSource; + + expect(es.readyState).toBe(EventSource.CONNECTING); + es.mockOpen(); + expect(es.readyState).toBe(EventSource.OPEN); + + es.mockError({ status }); + await sleepAsync(10); + expect(platform.testing.eventSourcesCreated.length()).toEqual(0); + }); + + it.each([400, 408, 429])('does reconnect after a recoverable error', async status => { + const config = { ...defaultConfig, streamReconnectDelay: 1, useReport: false }; + const stream = new Stream(platform, config, envName); + stream.connect(user); + + const created = await platform.testing.expectStream(); + const es = created.eventSource; + + expect(es.readyState).toBe(EventSource.CONNECTING); + es.mockOpen(); + expect(es.readyState).toBe(EventSource.OPEN); + + es.mockError({ status }); + await sleepAsync(10); + expect(platform.testing.eventSourcesCreated.length()).toEqual(1); + }); + it('logs a warning for only the first failed connection attempt', async () => { const config = { ...defaultConfig, streamReconnectDelay: 1 }; const stream = new Stream(platform, config, envName); @@ -197,6 +232,7 @@ describe('Stream', () => { const nAttempts = 5; for (let i = 0; i < nAttempts; i++) { es.mockError('test error'); + await sleepAsync(10); const created1 = await platform.testing.expectStream(); es = created1.eventSource; es.mockOpen(); @@ -221,6 +257,7 @@ describe('Stream', () => { const nAttempts = 5; for (let i = 0; i < nAttempts; i++) { es.mockError('test error #1'); + await sleepAsync(10); const created1 = await platform.testing.expectStream(); es = created1.eventSource; es.mockOpen(); @@ -232,6 +269,7 @@ describe('Stream', () => { for (let i = 0; i < nAttempts; i++) { es.mockError('test error #2'); + await sleepAsync(10); const created1 = await platform.testing.expectStream(); es = created1.eventSource; es.mockOpen(); @@ -239,8 +277,8 @@ describe('Stream', () => { // make sure there is just a single logged message rather than five (one per attempt) expect(logger.output.warn).toEqual([ - messages.streamError('test error #1', 1), - messages.streamError('test error #2', 1), + expect.stringContaining('test error #1'), + expect.stringContaining('test error #2'), ]); }); diff --git a/src/messages.js b/src/messages.js index f2b4b42..0727e87 100644 --- a/src/messages.js +++ b/src/messages.js @@ -123,7 +123,7 @@ const streamError = function(err, streamReconnectDelay) { return ( 'Error on stream connection: ' + errorString(err) + - ', will continue retrying every ' + + ', will continue retrying after ' + streamReconnectDelay + ' milliseconds.' ); @@ -131,6 +131,8 @@ const streamError = function(err, streamReconnectDelay) { const unknownOption = name => 'Ignoring unknown config option "' + name + '"'; +const unrecoverableStreamError = err => `Error on stream connection ${errorString(err)}, giving up permanently`; + const wrongOptionType = (name, expectedType, actualType) => 'Config option "' + name + '" should be of type ' + expectedType + ', got ' + actualType + ', using default value'; @@ -228,6 +230,7 @@ module.exports = { tagValueTooLong, unknownCustomEventKey, unknownOption, + unrecoverableStreamError, userNotSpecified, wrongOptionType, wrongOptionTypeBoolean,