Skip to content

Commit

Permalink
implements backoff delay in usage counters service (elastic#206363)
Browse files Browse the repository at this point in the history
fix elastic#192829

## Summary

Usage counters lose counts when there's a version conflict exception to
increment counters.
ATM, the service retries `incrementCounter` immediately on error, which
could also fail if another Kibana tries at the same time.
Using a backoff retry reduces the likelihood of a version conflict
exception.  

This PR replaces the retry with an exponential `backoffDelay` that will
wait longer between retry attempts.


- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
- [x] [Flaky Test
Runner](https://ci-stats.kibana.dev/trigger_flaky_test_runner/1) was
used on any tests changed
(https://buildkite.com/elastic/kibana-flaky-test-suite-runner/builds/7701)

---------

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
2 people authored and viduni94 committed Jan 23, 2025
1 parent 1f7ce35 commit e0b2689
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 50 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,4 @@
export const KIBANA_STATS_TYPE = 'kibana_stats';
export const DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S = 1;
export const MAIN_APP_DEFAULT_VIEW_ID = 'main';
export const USAGE_COUNTERS_BACKOFF_DELAY_IN_MS = 500;
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import { DEFAULT_MAXIMUM_WAIT_TIME_FOR_ALL_COLLECTORS_IN_S } from '../common/con
export const configSchema = schema.object({
usageCounters: schema.object({
enabled: schema.boolean({ defaultValue: true }),
retryCount: schema.number({ defaultValue: 1 }),
retryCount: schema.number({ defaultValue: 3 }),
bufferDuration: schema.duration({ defaultValue: '5s' }),
}),
uiCounters: schema.object({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,6 @@ export class UsageCounter implements IUsageCounter {
incrementBy = 1,
namespace,
} = params;

this.counter$.next({
domainId: this.domainId,
counterName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,10 @@ const registerUsageCountersRollupsMock = registerUsageCountersRollups as jest.Mo
typeof registerUsageCountersRollups
>;

const tick = () => {
// optionally advance test timers after a delay
const tickWithDelay = (delay = 1) => {
jest.useRealTimers();
return new Promise((resolve) => setTimeout(resolve, 1));
return new Promise((resolve) => setTimeout(resolve, delay));
};

describe('UsageCountersService', () => {
Expand Down Expand Up @@ -188,51 +189,62 @@ describe('UsageCountersService', () => {
]
`);
});

it('retries errors by `retryCount` times before failing to store', async () => {
const usageCountersService = new UsageCountersService({
logger,
retryCount: 1,
bufferDurationMs,
});

const mockRepository = coreStart.savedObjects.createInternalRepository();
const mockError = new Error('failed.');
const mockIncrementCounter = jest.fn().mockImplementation((_, key) => {
switch (key) {
case 'test-counter:counterA:count:server:20210409':
throw mockError;
case 'test-counter:counterB:count:server:20210409':
return 'pass';
default:
throw new Error(`unknown key ${key}`);
}
});

mockRepository.incrementCounter = mockIncrementCounter;

coreStart.savedObjects.createInternalRepository.mockReturnValue(mockRepository);
const { createUsageCounter } = usageCountersService.setup(coreSetup);
jest.useFakeTimers();
const usageCounter = createUsageCounter('test-counter');

usageCountersService.start(coreStart);
usageCounter.incrementCounter({ counterName: 'counterA' });
usageCounter.incrementCounter({ counterName: 'counterB' });
jest.runOnlyPendingTimers();

// wait for retries to kick in on next scheduler call
await tick();
// number of incrementCounter calls + number of retries
expect(mockIncrementCounter).toBeCalledTimes(2 + 1);
expect(logger.debug).toHaveBeenNthCalledWith(1, 'Store counters into savedObjects', {
kibana: {
usageCounters: {
results: [mockError, 'pass'],
// requires extended test runtime because of exponential backoff delay for retries
it(
'retries errors by `retryCount` times before failing to store',
async () => {
const retryConst = 2;
const usageCountersService = new UsageCountersService({
logger,
retryCount: retryConst,
bufferDurationMs: 50000,
});

const mockRepository = coreStart.savedObjects.createInternalRepository();
const mockError = new Error('failed');
const mockIncrementCounter = jest.fn().mockImplementation((_, key) => {
switch (key) {
case 'test-counter:counterA:count:server:20210409':
throw mockError;
case 'test-counter:counterB:count:server:20210409':
return 'pass';
default:
throw new Error(`unknown key ${key}`);
}
});

mockRepository.incrementCounter = mockIncrementCounter;

coreStart.savedObjects.createInternalRepository.mockReturnValue(mockRepository);
const { createUsageCounter } = usageCountersService.setup(coreSetup);
jest.useFakeTimers();
const usageCounter = createUsageCounter('test-counter');

usageCountersService.start(coreStart);
usageCounter.incrementCounter({ counterName: 'counterA' });
usageCounter.incrementCounter({ counterName: 'counterB' });
jest.runOnlyPendingTimers();

// wait for retries to kick in on next scheduler call
await tickWithDelay(5000);
// number of incrementCounter calls + number of retries
expect(mockIncrementCounter).toBeCalledTimes(2 + retryConst);
// assert counterA increment error warning logs
expect(logger.warn).toHaveBeenNthCalledWith(
2,
`${mockError}, retrying attempt ${retryConst}`
);
expect(logger.warn).toHaveBeenNthCalledWith(3, mockError);
expect(logger.debug).toHaveBeenNthCalledWith(1, 'Store counters into savedObjects', {
kibana: {
usageCounters: {
results: [mockError, 'pass'],
},
},
},
});
});
});
},
10 * 1000
);

it('buffers counters within `bufferDurationMs` time', async () => {
const usageCountersService = new UsageCountersService({
Expand Down Expand Up @@ -264,7 +276,7 @@ describe('UsageCountersService', () => {
jest.runOnlyPendingTimers();

// wait for debounce to kick in on next scheduler call
await tick();
await tickWithDelay();
expect(mockIncrementCounter).toBeCalledTimes(2);
expect(mockIncrementCounter.mock.results.map(({ value }) => value)).toMatchInlineSnapshot(`
Array [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import {
} from './saved_objects';
import { registerUsageCountersRollups } from './rollups';
import { searchUsageCounters } from './search';
import { USAGE_COUNTERS_BACKOFF_DELAY_IN_MS } from '../../common/constants';

interface UsageCountersLogMeta extends LogMeta {
kibana: { usageCounters: { results: unknown[] } };
Expand Down Expand Up @@ -159,14 +160,23 @@ export class UsageCountersService {
};
};

private backoffDelay = (attempt: number) =>
Math.pow(2, attempt) * USAGE_COUNTERS_BACKOFF_DELAY_IN_MS; // exponential backoff: 500ms, 1000ms, 2000ms

private storeDate$(
counters: UsageCounters.v1.CounterMetric[],
soRepository: Pick<SavedObjectsRepository, 'incrementCounter'>
) {
return Rx.forkJoin(
counters.map((metric) =>
Rx.defer(() => storeCounter({ metric, soRepository })).pipe(
Rx.retry(this.retryCount),
Rx.retry({
count: this.retryCount,
delay: (error, retryIndex) => {
this.logger.warn(`Error: ${error.message}, retrying attempt ${retryIndex}`); // extra warning logger
return Rx.timer(this.backoffDelay(retryIndex));
},
}),
Rx.catchError((error) => {
this.logger.warn(error);
return Rx.of(error);
Expand Down

0 comments on commit e0b2689

Please sign in to comment.