Skip to content

Commit

Permalink
[Response Ops][Task Manager] Setting background task utilization wind…
Browse files Browse the repository at this point in the history
…ow based on poll interval (#203481)

## Summary

Setting a desired window length (15 seconds) for the background task
utilization window running average and calcuating the number of entries
to average based on the poll interval. Updating config to be optional so
this value can still be configured but if not configured will be
calculated.

## To Verify

- Add a console log to log the window size

```
--- a/x-pack/platform/plugins/shared/task_manager/server/monitoring/background_task_utilization_statistics.ts
+++ b/x-pack/platform/plugins/shared/task_manager/server/monitoring/background_task_utilization_statistics.ts
@@ -59,6 +59,7 @@ export function createBackgroundTaskUtilizationAggregator(
     workerUtilizationRunningAverageWindowSize ??
     WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW_SIZE_MS / pollInterval;

+  console.log(`workerUtilizationWindowSize ${workerUtilizationWindowSize}`);
   const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat();
   const taskRunAdhocEvents$: Observable<Pick<BackgroundTaskUtilizationStat, 'adhoc'>> =
```

- Start Kibana with `mget` claim strategy. The window size should be 30
- Start Kibana with `update_by_query` claim strategy. The window size
should be 5
- Set `xpack.task_manager.worker_utilization_running_average_window: 22`
in your Kibana config and start Kibana. The window size should be 22
(takes the configured window size if it's passed in).

Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
ymao1 and elasticmachine authored Jan 7, 2025
1 parent 98ce312 commit a82a02e
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,6 @@ describe('config validation', () => {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
"worker_utilization_running_average_window": 5,
}
`);
});
Expand Down Expand Up @@ -110,7 +109,6 @@ describe('config validation', () => {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
"worker_utilization_running_average_window": 5,
}
`);
});
Expand Down Expand Up @@ -172,7 +170,6 @@ describe('config validation', () => {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
"worker_utilization_running_average_window": 5,
}
`);
});
Expand Down
13 changes: 8 additions & 5 deletions x-pack/platform/plugins/shared/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export const DEFAULT_METRICS_RESET_INTERVAL = 30 * 1000; // 30 seconds
// At the default poll interval of 3sec, this averages over the last 15sec.
export const DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW = 5;

export const WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW_SIZE_MS = 15 * 1000; // 15 seconds

export const CLAIM_STRATEGY_UPDATE_BY_QUERY = 'update_by_query';
export const CLAIM_STRATEGY_MGET = 'mget';

Expand Down Expand Up @@ -189,11 +191,12 @@ export const configSchema = schema.object(
min: 50,
max: 100,
}),
worker_utilization_running_average_window: schema.number({
defaultValue: DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW,
max: 100,
min: 1,
}),
worker_utilization_running_average_window: schema.maybe(
schema.number({
max: 100,
min: 1,
})
),
claim_strategy: schema.string({ defaultValue: CLAIM_STRATEGY_MGET }),
request_timeouts: requestTimeoutsConfig,
auto_calculate_default_ech_capacity: schema.boolean({ defaultValue: false }),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -425,7 +425,10 @@ describe('Task Run Statistics', () => {
});

test('returns a running count of load', async () => {
const loads = [40, 80, 100, 100, 10, 10, 60, 40];
const loads = [
40, 80, 100, 100, 10, 10, 60, 40, 40, 80, 100, 100, 10, 10, 60, 40, 40, 80, 100, 100, 10,
10, 60, 40, 40, 80, 100, 100, 10, 10, 60, 40, 30,
];
const events$ = new Subject<TaskLifecycleEvent>();
const taskPollingLifecycle = taskPollingLifecycleMock.create({
events$: events$ as Observable<TaskLifecycleEvent>,
Expand All @@ -434,7 +437,7 @@ describe('Task Run Statistics', () => {
const BackgroundTaskUtilizationAggregator = createBackgroundTaskUtilizationAggregator(
taskPollingLifecycle,
new AdHocTaskCounter(),
pollInterval
500
);

function expectWindowEqualsUpdate(
Expand All @@ -461,10 +464,36 @@ describe('Task Run Statistics', () => {
expectWindowEqualsUpdate(taskStats[2], loads.slice(0, 3));
expectWindowEqualsUpdate(taskStats[3], loads.slice(0, 4));
expectWindowEqualsUpdate(taskStats[4], loads.slice(0, 5));
// from the 6th value, begin to drop old values as our window is 5
expectWindowEqualsUpdate(taskStats[5], loads.slice(1, 6));
expectWindowEqualsUpdate(taskStats[6], loads.slice(2, 7));
expectWindowEqualsUpdate(taskStats[7], loads.slice(3, 8));
expectWindowEqualsUpdate(taskStats[5], loads.slice(0, 6));
expectWindowEqualsUpdate(taskStats[6], loads.slice(0, 7));
expectWindowEqualsUpdate(taskStats[7], loads.slice(0, 8));
expectWindowEqualsUpdate(taskStats[8], loads.slice(0, 9));
expectWindowEqualsUpdate(taskStats[9], loads.slice(0, 10));
expectWindowEqualsUpdate(taskStats[10], loads.slice(0, 11));
expectWindowEqualsUpdate(taskStats[11], loads.slice(0, 12));
expectWindowEqualsUpdate(taskStats[12], loads.slice(0, 13));
expectWindowEqualsUpdate(taskStats[13], loads.slice(0, 14));
expectWindowEqualsUpdate(taskStats[14], loads.slice(0, 15));
expectWindowEqualsUpdate(taskStats[15], loads.slice(0, 16));
expectWindowEqualsUpdate(taskStats[16], loads.slice(0, 17));
expectWindowEqualsUpdate(taskStats[17], loads.slice(0, 18));
expectWindowEqualsUpdate(taskStats[18], loads.slice(0, 19));
expectWindowEqualsUpdate(taskStats[19], loads.slice(0, 20));
expectWindowEqualsUpdate(taskStats[20], loads.slice(0, 21));
expectWindowEqualsUpdate(taskStats[21], loads.slice(0, 22));
expectWindowEqualsUpdate(taskStats[22], loads.slice(0, 23));
expectWindowEqualsUpdate(taskStats[23], loads.slice(0, 24));
expectWindowEqualsUpdate(taskStats[24], loads.slice(0, 25));
expectWindowEqualsUpdate(taskStats[25], loads.slice(0, 26));
expectWindowEqualsUpdate(taskStats[26], loads.slice(0, 27));
expectWindowEqualsUpdate(taskStats[27], loads.slice(0, 28));
expectWindowEqualsUpdate(taskStats[28], loads.slice(0, 29));
expectWindowEqualsUpdate(taskStats[29], loads.slice(0, 30));
// from the 31st value, begin to drop old values as our window is 30
expectWindowEqualsUpdate(taskStats[30], loads.slice(1, 31));
expectWindowEqualsUpdate(taskStats[31], loads.slice(2, 32));
expectWindowEqualsUpdate(taskStats[32], loads.slice(3, 33));

resolve();
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import {
import { MonitoredStat } from './monitoring_stats_stream';
import { AggregatedStat, AggregatedStatProvider } from '../lib/runtime_statistics_aggregator';
import { createRunningAveragedStat } from './task_run_calculators';
import { DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW } from '../config';
import { WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW_SIZE_MS } from '../config';

export interface PublicBackgroundTaskUtilizationStat extends JsonObject {
load: number;
Expand Down Expand Up @@ -53,8 +53,12 @@ export function createBackgroundTaskUtilizationAggregator(
taskPollingLifecycle: TaskPollingLifecycle,
adHocTaskCounter: AdHocTaskCounter,
pollInterval: number,
workerUtilizationRunningAverageWindowSize: number = DEFAULT_WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW
workerUtilizationRunningAverageWindowSize?: number
): AggregatedStatProvider<BackgroundTaskUtilizationStat> {
const workerUtilizationWindowSize =
workerUtilizationRunningAverageWindowSize ??
WORKER_UTILIZATION_RUNNING_AVERAGE_WINDOW_SIZE_MS / pollInterval;

const taskRunEventToAdhocStat = createTaskRunEventToAdhocStat();
const taskRunAdhocEvents$: Observable<Pick<BackgroundTaskUtilizationStat, 'adhoc'>> =
taskPollingLifecycle.events.pipe(
Expand Down Expand Up @@ -84,7 +88,7 @@ export function createBackgroundTaskUtilizationAggregator(
);

const taskManagerUtilizationEventToLoadStat = createTaskRunEventToLoadStat(
workerUtilizationRunningAverageWindowSize
workerUtilizationWindowSize
);

const taskManagerWorkerUtilizationEvent$: Observable<
Expand Down

0 comments on commit a82a02e

Please sign in to comment.