Skip to content

Commit

Permalink
Emit msearch error with status code for managed configuration observable
Browse files Browse the repository at this point in the history
  • Loading branch information
ymao1 committed Oct 23, 2024
1 parent 629edc0 commit 3b0d716
Show file tree
Hide file tree
Showing 5 changed files with 73 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {
} from './create_managed_configuration';
import { mockLogger } from '../test_utils';
import { CLAIM_STRATEGY_UPDATE_BY_QUERY, CLAIM_STRATEGY_MGET, TaskManagerConfig } from '../config';
import { MsearchError } from './msearch_error';

describe('createManagedConfiguration()', () => {
let clock: sinon.SinonFakeTimers;
Expand Down Expand Up @@ -224,9 +225,20 @@ describe('createManagedConfiguration()', () => {
});

describe('mget claim strategy', () => {
test('should decrease configuration at the next interval when an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
test('should decrease configuration at the next interval when an msearch 429 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError('a', 429));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(2);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should decrease configuration at the next interval when an msearch 503 error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError('a', 503));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
Expand All @@ -235,9 +247,19 @@ describe('createManagedConfiguration()', () => {
expect(subscription).toHaveBeenNthCalledWith(2, 8);
});

test('should not change configuration at the next interval when other msearch error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10);
errors$.next(new MsearchError('a', 404));
clock.tick(ADJUST_THROUGHPUT_INTERVAL - 1);
expect(subscription).toHaveBeenCalledTimes(1);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
clock.tick(1);
expect(subscription).toHaveBeenCalledTimes(1);
});

test('should log a warning when the configuration changes from the starting value', async () => {
const { errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
errors$.next(new MsearchError('a', 429));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
expect(logger.warn).toHaveBeenCalledWith(
'Capacity configuration is temporarily reduced after Elasticsearch returned 1 "too many request" and/or "execute [inline] script" error(s).'
Expand All @@ -246,7 +268,7 @@ describe('createManagedConfiguration()', () => {

test('should increase configuration back to normal incrementally after an error is emitted', async () => {
const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
errors$.next(new MsearchError('a', 429));
clock.tick(ADJUST_THROUGHPUT_INTERVAL * 10);
expect(subscription).toHaveBeenNthCalledWith(1, 10);
expect(subscription).toHaveBeenNthCalledWith(2, 8);
Expand All @@ -259,7 +281,7 @@ describe('createManagedConfiguration()', () => {
test('should keep reducing configuration when errors keep emitting until it reaches minimum', async () => {
const { subscription, errors$ } = setupScenario(10, CLAIM_STRATEGY_MGET);
for (let i = 0; i < 20; i++) {
errors$.next(SavedObjectsErrorHelpers.createTooManyRequestsError('a', 'b'));
errors$.next(new MsearchError('a', 429));
clock.tick(ADJUST_THROUGHPUT_INTERVAL);
}
expect(subscription).toHaveBeenNthCalledWith(1, 10);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import { Logger } from '@kbn/core/server';
import { isEsCannotExecuteScriptError } from './identify_es_error';
import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config';
import { TaskCost } from '../task';
import { getMsearchStatusCode } from './msearch_error';

const FLUSH_MARKER = Symbol('flush');
export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000;
Expand Down Expand Up @@ -161,7 +162,11 @@ function countErrors(errors$: Observable<Error>, countInterval: number): Observa
interval(countInterval).pipe(map(() => FLUSH_MARKER)),
errors$.pipe(
filter(
(e) => SavedObjectsErrorHelpers.isTooManyRequestsError(e) || isEsCannotExecuteScriptError(e)
(e) =>
SavedObjectsErrorHelpers.isTooManyRequestsError(e) ||
isEsCannotExecuteScriptError(e) ||
getMsearchStatusCode(e) === 429 ||
getMsearchStatusCode(e) === 503
)
)
).pipe(
Expand Down
25 changes: 25 additions & 0 deletions x-pack/plugins/task_manager/server/lib/msearch_error.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class MsearchError extends Error {
private _statusCode: number;

constructor(message: string, statusCode: number) {
super(message);
this._statusCode = statusCode;
}

public get statusCode() {
return this._statusCode;
}
}

export function getMsearchStatusCode(error: Error | MsearchError): number | undefined {
if (Boolean(error && error instanceof MsearchError)) {
return (error as MsearchError).statusCode;
}
}
11 changes: 8 additions & 3 deletions x-pack/plugins/task_manager/server/task_store.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import { mockLogger } from './test_utils';
import { AdHocTaskCounter } from './lib/adhoc_task_counter';
import { asErr, asOk } from './lib/result_type';
import { UpdateByQueryResponse } from '@elastic/elasticsearch/lib/api/types';
import { MsearchError } from './lib/msearch_error';

const mockGetValidatedTaskInstanceFromReading = jest.fn();
const mockGetValidatedTaskInstanceForUpdating = jest.fn();
Expand Down Expand Up @@ -490,9 +491,13 @@ describe('TaskStore', () => {
},
],
} as estypes.MsearchResponse);
await expect(store.msearch([{}])).rejects.toThrowErrorMatchingInlineSnapshot(
`"Unexpected status code from taskStore::msearch: 429"`
);

try {
await store.msearch([{}]);
} catch (err) {
expect(err instanceof MsearchError).toBe(true);
expect(err.statusCode).toEqual(429);
}
expect(await firstErrorPromise).toMatchInlineSnapshot(
`[Error: Unexpected status code from taskStore::msearch: 429]`
);
Expand Down
8 changes: 6 additions & 2 deletions x-pack/plugins/task_manager/server/task_store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ import { TaskValidator } from './task_validator';
import { claimSort } from './queries/mark_available_tasks_as_claimed';
import { MAX_PARTITIONS } from './lib/task_partitioner';
import { ErrorOutput } from './lib/bulk_operation_buffer';
import { MsearchError } from './lib/msearch_error';

export interface StoreOpts {
esClient: ElasticsearchClient;
Expand Down Expand Up @@ -574,8 +575,11 @@ export class TaskStore {
let allTasks = new Array<ConcreteTaskInstance>();

for (const response of responses) {
if (response.status !== 200) {
const err = new Error(`Unexpected status code from taskStore::msearch: ${response.status}`);
if (response.status && response.status !== 200) {
const err = new MsearchError(
`Unexpected status code from taskStore::msearch: ${response.status}`,
response.status
);
this.errors$.next(err);
throw err;
}
Expand Down

0 comments on commit 3b0d716

Please sign in to comment.