From 455e9c5cef4e5c7593071c153761004419eafa21 Mon Sep 17 00:00:00 2001 From: Ersin Erdal Date: Thu, 21 Nov 2024 22:34:03 +0100 Subject: [PATCH] Handle cluster_block_exception during reindexing the TM index --- .../kibana_discovery_service.ts | 8 +- .../server/lib/bulk_update_error.ts | 4 + .../lib/create_managed_configuration.ts | 137 +++++++++++------- 3 files changed, 95 insertions(+), 54 deletions(-) diff --git a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts index 1c4fcb00981a0..96b7826f0bac6 100644 --- a/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts +++ b/x-pack/plugins/task_manager/server/kibana_discovery_service/kibana_discovery_service.ts @@ -59,6 +59,7 @@ export class KibanaDiscoveryService { } private async scheduleUpsertCurrentNode() { + let retryInterval = this.discoveryInterval; if (!this.stopped) { const lastSeenDate = new Date(); const lastSeen = lastSeenDate.toISOString(); @@ -71,18 +72,21 @@ export class KibanaDiscoveryService { } catch (e) { if (!this.started) { this.logger.error( - `Kibana Discovery Service couldn't be started and will be retried in ${this.discoveryInterval}ms, error:${e.message}` + `Kibana Discovery Service couldn't be started and will be retried in ${retryInterval}ms, error:${e.message}` ); } else { this.logger.error( `Kibana Discovery Service couldn't update this node's last_seen timestamp. id: ${this.currentNode}, last_seen: ${lastSeen}, error:${e.message}` ); } + if (e.message.includes('cluster_block_exception')) { + retryInterval = 60000; + } } finally { this.timer = setTimeout( async () => await this.scheduleUpsertCurrentNode(), // The timeout should not be less than the default timeout of two seconds - Math.max(this.discoveryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT) + Math.max(retryInterval - (Date.now() - lastSeenDate.getTime()), DEFAULT_TIMEOUT) ); } } diff --git a/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts b/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts index f7e0552e5a738..d27c285e11af5 100644 --- a/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts +++ b/x-pack/plugins/task_manager/server/lib/bulk_update_error.ts @@ -43,3 +43,7 @@ export function getBulkUpdateErrorType(error: Error | BulkUpdateError): string | return (error as BulkUpdateError).type; } } + +export function isClusterBlockException(error: Error | BulkUpdateError): boolean { + return getBulkUpdateErrorType(error) === 'cluster_block_exception'; +} diff --git a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts index 00736f2c36cdb..c62e18b6446bd 100644 --- a/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts +++ b/x-pack/plugins/task_manager/server/lib/create_managed_configuration.ts @@ -13,11 +13,12 @@ import { isEsCannotExecuteScriptError } from './identify_es_error'; import { CLAIM_STRATEGY_MGET, DEFAULT_CAPACITY, MAX_CAPACITY, TaskManagerConfig } from '../config'; import { TaskCost } from '../task'; import { getMsearchStatusCode } from './msearch_error'; -import { getBulkUpdateStatusCode } from './bulk_update_error'; +import { getBulkUpdateStatusCode, isClusterBlockException } from './bulk_update_error'; const FLUSH_MARKER = Symbol('flush'); export const ADJUST_THROUGHPUT_INTERVAL = 10 * 1000; export const PREFERRED_MAX_POLL_INTERVAL = 60 * 1000; +export const INTERVAL_AFTER_BLOCK_EXCEPTION = 61 * 1000; // Capacity is measured in number of normal cost tasks that can be run // At a minimum, we need to be able to run a single task with the greatest cost @@ -46,6 +47,11 @@ interface ManagedConfigurationOpts { logger: Logger; } +interface ErrorScanResult { + count: number; + isBlockException: boolean; +} + export interface ManagedConfiguration { startingCapacity: number; capacityConfiguration$: Observable; @@ -77,7 +83,7 @@ export function createManagedConfiguration({ } function createCapacityScan(config: TaskManagerConfig, logger: Logger, startingCapacity: number) { - return scan((previousCapacity: number, errorCount: number) => { + return scan((previousCapacity: number, { count: errorCount }: ErrorScanResult) => { let newCapacity: number; if (errorCount > 0) { const minCapacity = getMinCapacity(config); @@ -112,52 +118,66 @@ function createCapacityScan(config: TaskManagerConfig, logger: Logger, startingC } function createPollIntervalScan(logger: Logger, startingPollInterval: number) { - return scan((previousPollInterval: number, errorCount: number) => { - let newPollInterval: number; - if (errorCount > 0) { - // Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to - // make sure the number is different than previous while not being a decimal value. - // Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval, - // whichever is greater. - newPollInterval = Math.min( - Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE), - Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval)) - ); - if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { - logger.error( - `Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` - ); - newPollInterval = previousPollInterval; - } - } else { - // Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to - // make sure the number is different than previous while not being a decimal value. - newPollInterval = Math.max( - startingPollInterval, - Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE) - ); - if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { - logger.error( - `Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` - ); - newPollInterval = previousPollInterval; + return scan( + (previousPollInterval: number, { count: errorCount, isBlockException }: ErrorScanResult) => { + let newPollInterval: number; + if (isBlockException) { + newPollInterval = INTERVAL_AFTER_BLOCK_EXCEPTION; + } else { + if (errorCount > 0) { + // Increase poll interval by POLL_INTERVAL_INCREASE_PERCENTAGE and use Math.ceil to + // make sure the number is different than previous while not being a decimal value. + // Also ensure we don't go over PREFERRED_MAX_POLL_INTERVAL or startingPollInterval, + // whichever is greater. + newPollInterval = Math.min( + Math.ceil(previousPollInterval * POLL_INTERVAL_INCREASE_PERCENTAGE), + Math.ceil(Math.max(PREFERRED_MAX_POLL_INTERVAL, startingPollInterval)) + ); + if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { + logger.error( + `Poll interval configuration had an issue calculating the new poll interval: Math.min(Math.ceil(${previousPollInterval} * ${POLL_INTERVAL_INCREASE_PERCENTAGE}), Math.max(${PREFERRED_MAX_POLL_INTERVAL}, ${startingPollInterval})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` + ); + newPollInterval = previousPollInterval; + } + } else { + if (previousPollInterval === INTERVAL_AFTER_BLOCK_EXCEPTION) { + previousPollInterval = startingPollInterval; + } + // Decrease poll interval by POLL_INTERVAL_DECREASE_PERCENTAGE and use Math.floor to + // make sure the number is different than previous while not being a decimal value. + newPollInterval = Math.max( + startingPollInterval, + Math.floor(previousPollInterval * POLL_INTERVAL_DECREASE_PERCENTAGE) + ); + if (!Number.isSafeInteger(newPollInterval) || newPollInterval < 0) { + logger.error( + `Poll interval configuration had an issue calculating the new poll interval: Math.max(${startingPollInterval}, Math.floor(${previousPollInterval} * ${POLL_INTERVAL_DECREASE_PERCENTAGE})) = ${newPollInterval}, will keep the poll interval unchanged (${previousPollInterval})` + ); + newPollInterval = previousPollInterval; + } + } } - } - if (newPollInterval !== previousPollInterval) { - logger.debug( - `Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" error(s)` - ); - if (previousPollInterval === startingPollInterval) { - logger.warn( - `Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script" error(s).` + + if (newPollInterval !== previousPollInterval) { + logger.debug( + `Poll interval configuration changing from ${previousPollInterval} to ${newPollInterval} after seeing ${errorCount} "too many request" and/or "execute [inline] script" and/or "cluster_block_exception" error(s)` ); + if (previousPollInterval === startingPollInterval) { + logger.warn( + `Poll interval configuration is temporarily increased after Elasticsearch returned ${errorCount} "too many request" and/or "execute [inline] script and/or "cluster_block_exception"" error(s).` + ); + } } - } - return newPollInterval; - }, startingPollInterval); + return newPollInterval; + }, + startingPollInterval + ); } -function countErrors(errors$: Observable, countInterval: number): Observable { +function countErrors( + errors$: Observable, + countInterval: number +): Observable { return merge( // Flush error count at fixed interval interval(countInterval).pipe(map(() => FLUSH_MARKER)), @@ -173,36 +193,48 @@ function countErrors(errors$: Observable, countInterval: number): Observa getMsearchStatusCode(e) === 503 || getBulkUpdateStatusCode(e) === 429 || getBulkUpdateStatusCode(e) === 500 || - getBulkUpdateStatusCode(e) === 503 + getBulkUpdateStatusCode(e) === 503 || + isClusterBlockException(e) ) ) ).pipe( // When tag is "flush", reset the error counter // Otherwise increment the error counter - mergeScan(({ count }, next) => { + mergeScan(({ count, isBlockException }, next) => { return next === FLUSH_MARKER - ? of(emitErrorCount(count), resetErrorCount()) - : of(incementErrorCount(count)); - }, emitErrorCount(0)), + ? of(emitErrorCount(count, isBlockException), resetErrorCount()) + : of(incrementOrEmitErrorCount(count, isClusterBlockException(next as Error))); + }, emitErrorCount(0, false)), filter(isEmitEvent), - map(({ count }) => count) + map(({ count, isBlockException }) => { + return { count, isBlockException }; + }) ); } -function emitErrorCount(count: number) { +function emitErrorCount(count: number, isBlockException: boolean) { return { tag: 'emit', + isBlockException, count, }; } -function isEmitEvent(event: { tag: string; count: number }) { +function isEmitEvent(event: { tag: string; count: number; isBlockException: boolean }) { return event.tag === 'emit'; } -function incementErrorCount(count: number) { +function incrementOrEmitErrorCount(count: number, isBlockException: boolean) { + if (isBlockException) { + return { + tag: 'emit', + isBlockException, + count: count + 1, + }; + } return { tag: 'inc', + isBlockException, count: count + 1, }; } @@ -210,6 +242,7 @@ function incementErrorCount(count: number) { function resetErrorCount() { return { tag: 'initial', + isBlockException: false, count: 0, }; }