From 3b0d7169a94f152e2dd0f3f9b85c8ee5b984136a Mon Sep 17 00:00:00 2001 From: Ying Date: Wed, 23 Oct 2024 13:06:00 -0400 Subject: [PATCH 1/2] Emit msearch error with status code for managed configuration observable --- .../lib/create_managed_configuration.test.ts | 34 +++++++++++++++---- .../lib/create_managed_configuration.ts | 7 +++- .../task_manager/server/lib/msearch_error.ts | 25 ++++++++++++++ .../task_manager/server/task_store.test.ts | 11 ++++-- .../plugins/task_manager/server/task_store.ts | 8 +++-- 5 files changed, 73 insertions(+), 12 deletions(-) create mode 100644 x-pack/plugins/task_manager/server/lib/msearch_error.ts diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts index cfbe43a8ecf21..84e2dd4f6367c 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts @@ -14,6 +14,7 @@ import { } from './create_managed_configuration'; import { mockLogger } from '../test_utils'; import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config'; +import { MsearchError } from './msearch_error'; describe('createManagedConfiguration()', () => { let clock: sinon.SinonFakeTimers; @@ -224,9 +225,20 @@ describe('createManagedConfiguration()', () => { }); describe('mget claim strategy', () => { - test('should decrease configuration at the next interval when an error is emitted', async () => { - const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET); - errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + test('should decrease configuration at the next interval when an msearch 429 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next(new MsearchError('a', 429)); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + expect(subscription).toHaveBeenNthCalledWith(1, 10); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(2); + expect(subscription).toHaveBeenNthCalledWith(2, 8); + }); + + test('should decrease configuration at the next interval when an msearch 503 error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next(new MsearchError('a', 503)); clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); expect(subscription).toHaveBeenCalledTimes(1); expect(subscription).toHaveBeenNthCalledWith(1, 10); @@ -235,9 +247,19 @@ describe('createManagedConfiguration()', () => { expect(subscription).toHaveBeenNthCalledWith(2, 8); }); + test('should not change configuration at the next interval when other msearch error is emitted', async () => { + const { subscription, errors$ } = setupScenario(10); + errors$.next(new MsearchError('a', 404)); + clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); + expect(subscription).toHaveBeenCalledTimes(1); + expect(subscription).toHaveBeenNthCalledWith(1, 10); + clock.tick(1); + expect(subscription).toHaveBeenCalledTimes(1); + }); + test('should log a warning when the configuration changes from the starting value', async () => { const { errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET); - errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + errors$.next(new MsearchError('a', 429)); clock.tick(ADJUST_THROUGHPUT_INTERVAL); expect(logger.warn).toHaveBeenCalledWith( 'Capacity configuration is temporarily reduced after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).' @@ -246,7 +268,7 @@ describe('createManagedConfiguration()', () => { test('should increase configuration back to normal incrementally after an error is emitted', async () => { const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET); - errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + errors$.next(new MsearchError('a', 429)); clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); expect(subscription).toHaveBeenNthCalledWith(1, 10); expect(subscription).toHaveBeenNthCalledWith(2, 8); @@ -259,7 +281,7 @@ describe('createManagedConfiguration()', () => { test('should keep reducing configuration when errors keep emitting until it reaches minimum', async () => { const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET); for (let i = 0; i < 20; i++) { - errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b')); + errors$.next(new MsearchError('a', 429)); clock.tick(ADJUST_THROUGHPUT_INTERVAL); } expect(subscription).toHaveBeenNthCalledWith(1, 10); diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts index 3036eb2008de6..16f95807c44cb 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts @@ -12,6 +12,7 @@ import { Logger } from '@kbn/core/server'; import { isEsCannotExecuteScriptError } from './identify_es_error'; import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config'; import { TaskCost } from '../task'; +import { getMsearchStatusCode } from './msearch_error'; const FLUSH_MARKER = Symbol('flush'); export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000; @@ -161,7 +162,11 @@ function countErrors(errors$: Observable, countInterval: number): Observa interval(countInterval).pipe(map(() => FLUSH_MARKER)), errors$.pipe( filter( - (e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e) || isEsCannotExecuteScriptError(e) + (e) => + SavedObjectsErrorHelpers.isTooManyRequestsError(e) || + isEsCannotExecuteScriptError(e) || + getMsearchStatusCode(e) === 429 || + getMsearchStatusCode(e) === 503 ) ) ).pipe( diff --git a/x-pack/plugins/task_manager/server/lib/msearch_error.ts b/x-pack/plugins/task_manager/server/lib/msearch_error.ts new file mode 100644 index 0000000000000..171fe9b2c7b57 --- /dev/null +++ b/x-pack/plugins/task_manager/server/lib/msearch_error.ts @@ -0,0 +1,25 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class MsearchError extends Error { + private _statusCode: number; + + constructor(message: string, statusCode: number) { + super(message); + this._statusCode = statusCode; + } + + public get statusCode() { + return this._statusCode; + } +} + +export function getMsearchStatusCode(error: Error | MsearchError): number | undefined { + if (Boolean(error && error instanceof MsearchError)) { + return (error as MsearchError).statusCode; + } +} diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index 18dc3fa3c44ce..a773aaa995b41 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -30,6 +30,7 @@ import { mockLogger } from './test_utils'; import { AdHocTaskCounter } from './lib/adhoc_task_counter'; import { asErr, asOk } from './lib/result_type'; import { UpdateByQueryResponse } from '@elastic/elasticsearch/lib/api/types'; +import { MsearchError } from './lib/msearch_error'; const mockGetValidatedTaskInstanceFromReading = jest.fn(); const mockGetValidatedTaskInstanceForUpdating = jest.fn(); @@ -490,9 +491,13 @@ describe('TaskStore', () => { }, ], } as estypes.MsearchResponse); - await expect(store.msearch([{}])).rejects.toThrowErrorMatchingInlineSnapshot( - `"Unexpected status code from taskStore::msearch: 429"` - ); + + try { + await store.msearch([{}]); + } catch (err) { + expect(err instanceof MsearchError).toBe(true); + expect(err.statusCode).toEqual(429); + } expect(await firstErrorPromise).toMatchInlineSnapshot( `[Error: Unexpected status code from taskStore::msearch: 429]` ); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 83c69ebdb2d88..19bbe61542e85 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -47,6 +47,7 @@ import { TaskValidator } from './task_validator'; import { claimSort } from './queries/mark_available_tasks_as_claimed'; import { MAX_PARTITIONS } from './lib/task_partitioner'; import { ErrorOutput } from './lib/bulk_operation_buffer'; +import { MsearchError } from './lib/msearch_error'; export interface StoreOpts { esClient: ElasticsearchClient; @@ -574,8 +575,11 @@ export class TaskStore { let allTasks = new Array(); for (const response of responses) { - if (response.status !== 200) { - const err = new Error(`Unexpected status code from taskStore::msearch: ${response.status}`); + if (response.status && response.status !== 200) { + const err = new MsearchError( + `Unexpected status code from taskStore::msearch: ${response.status}`, + response.status + ); this.errors$.next(err); throw err; } From 241935d1b9c0e7dd17422d5eeafafd97b3cfb73e Mon Sep 17 00:00:00 2001 From: Ying Date: Mon, 28 Oct 2024 09:58:59 -0400 Subject: [PATCH 2/2] PR feedback --- .../server/lib/create_managed_configuration.test.ts | 12 ++++++------ .../plugins/task_manager/server/lib/msearch_error.ts | 6 +++--- .../plugins/task_manager/server/task_store.test.ts | 1 + x-pack/plugins/task_manager/server/task_store.ts | 7 ++----- 4 files changed, 12 insertions(+), 14 deletions(-) diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts index ae97312be1564..5e0a5ed4f2e67 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.test.ts @@ -238,7 +238,7 @@ describe('createManagedConfiguration()', () => { describe('mget claim strategy', () => { test('should decrease configuration at the next interval when an msearch 429 error is emitted', async () => { const { subscription, errors$ } = setupScenario(10); - errors$.next(new MsearchError('a', 429)); + errors$.next(new MsearchError(429)); clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); expect(subscription).toHaveBeenCalledTimes(1); expect(subscription).toHaveBeenNthCalledWith(1, 10); @@ -249,7 +249,7 @@ describe('createManagedConfiguration()', () => { test('should decrease configuration at the next interval when an msearch 503 error is emitted', async () => { const { subscription, errors$ } = setupScenario(10); - errors$.next(new MsearchError('a', 503)); + errors$.next(new MsearchError(503)); clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); expect(subscription).toHaveBeenCalledTimes(1); expect(subscription).toHaveBeenNthCalledWith(1, 10); @@ -260,7 +260,7 @@ describe('createManagedConfiguration()', () => { test('should not change configuration at the next interval when other msearch error is emitted', async () => { const { subscription, errors$ } = setupScenario(10); - errors$.next(new MsearchError('a', 404)); + errors$.next(new MsearchError(404)); clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1); expect(subscription).toHaveBeenCalledTimes(1); expect(subscription).toHaveBeenNthCalledWith(1, 10); @@ -270,7 +270,7 @@ describe('createManagedConfiguration()', () => { test('should log a warning when the configuration changes from the starting value', async () => { const { errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET); - errors$.next(new MsearchError('a', 429)); + errors$.next(new MsearchError(429)); clock.tick(ADJUST_THROUGHPUT_INTERVAL); expect(logger.warn).toHaveBeenCalledWith( 'Capacity configuration is temporarily reduced after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).' @@ -279,7 +279,7 @@ describe('createManagedConfiguration()', () => { test('should increase configuration back to normal incrementally after an error is emitted', async () => { const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET); - errors$.next(new MsearchError('a', 429)); + errors$.next(new MsearchError(429)); clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10); expect(subscription).toHaveBeenNthCalledWith(1, 10); expect(subscription).toHaveBeenNthCalledWith(2, 8); @@ -292,7 +292,7 @@ describe('createManagedConfiguration()', () => { test('should keep reducing configuration when errors keep emitting until it reaches minimum', async () => { const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET); for (let i = 0; i < 20; i++) { - errors$.next(new MsearchError('a', 429)); + errors$.next(new MsearchError(429)); clock.tick(ADJUST_THROUGHPUT_INTERVAL); } expect(subscription).toHaveBeenNthCalledWith(1, 10); diff --git a/x-pack/plugins/task_manager/server/lib/msearch_error.ts b/x-pack/plugins/task_manager/server/lib/msearch_error.ts index 171fe9b2c7b57..8cd61a53dd2a7 100644 --- a/x-pack/plugins/task_manager/server/lib/msearch_error.ts +++ b/x-pack/plugins/task_manager/server/lib/msearch_error.ts @@ -6,10 +6,10 @@ */ export class MsearchError extends Error { - private _statusCode: number; + private _statusCode?: number; - constructor(message: string, statusCode: number) { - super(message); + constructor(statusCode?: number) { + super(`Unexpected status code from taskStore::msearch: ${statusCode ?? 'unknown'}`); this._statusCode = statusCode; } diff --git a/x-pack/plugins/task_manager/server/task_store.test.ts b/x-pack/plugins/task_manager/server/task_store.test.ts index a773aaa995b41..f1374f6d27b76 100644 --- a/x-pack/plugins/task_manager/server/task_store.test.ts +++ b/x-pack/plugins/task_manager/server/task_store.test.ts @@ -494,6 +494,7 @@ describe('TaskStore', () => { try { await store.msearch([{}]); + throw new Error('should have thrown'); } catch (err) { expect(err instanceof MsearchError).toBe(true); expect(err.statusCode).toEqual(429); diff --git a/x-pack/plugins/task_manager/server/task_store.ts b/x-pack/plugins/task_manager/server/task_store.ts index 19bbe61542e85..2b3440e87c0f8 100644 --- a/x-pack/plugins/task_manager/server/task_store.ts +++ b/x-pack/plugins/task_manager/server/task_store.ts @@ -575,11 +575,8 @@ export class TaskStore { let allTasks = new Array(); for (const response of responses) { - if (response.status && response.status !== 200) { - const err = new MsearchError( - `Unexpected status code from taskStore::msearch: ${response.status}`, - response.status - ); + if (response.status !== 200) { + const err = new MsearchError(response.status); this.errors$.next(err); throw err; }