Skip to content

Commit

Permalink
[Task Manager] Support excluding certain task types from executing (#…
Browse files Browse the repository at this point in the history
…111036) (#112022)

* Support excluding certain action types

* Fix types

* Fix jest tests

* Flip this

* Add functional test

* Add to README

* Updated README

* Add startup log

* Update x-pack/plugins/task_manager/README.md

Co-authored-by: Mike Côté <[email protected]>

* Add telemetry

* Add test

* Rename internal to unsafe

* Update test config

* Fix tests

* Update x-pack/plugins/task_manager/README.md

Co-authored-by: gchaps <[email protected]>

* PR feedback

Co-authored-by: Mike Côté <[email protected]>
Co-authored-by: gchaps <[email protected]>

Co-authored-by: Mike Côté <[email protected]>
Co-authored-by: gchaps <[email protected]>
  • Loading branch information
3 people authored Sep 13, 2021
1 parent d0d9481 commit d3f231c
Show file tree
Hide file tree
Showing 21 changed files with 309 additions and 33 deletions.
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ The task_manager can be configured via `taskManager` config options (e.g. `taskM
- `monitored_stats_running_average_window`- Dictates the size of the window used to calculate the running average of various "Hot" stats. Learn More: [./MONITORING](./MONITORING.MD)
- `monitored_stats_required_freshness` - Dictates the _required freshness_ of critical "Hot" stats. Learn More: [./MONITORING](./MONITORING.MD)
- `monitored_task_execution_thresholds`- Dictates the threshold of failed task executions. Learn More: [./MONITORING](./MONITORING.MD)
- `unsafe.exclude_task_types` - A list of task types to exclude from running. Supports wildcard usage, such as `namespace:*`. This configuration is experimental, unsupported, and can only be used for temporary debugging purposes because it causes Kibana to behave in unexpected ways.

## Task definitions

Expand Down
9 changes: 9 additions & 0 deletions x-pack/plugins/task_manager/server/config.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
Expand Down Expand Up @@ -93,6 +96,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
Expand Down Expand Up @@ -141,6 +147,9 @@ describe('config validation', () => {
},
"poll_interval": 3000,
"request_capacity": 1000,
"unsafe": Object {
"exclude_task_types": Array [],
},
"version_conflict_threshold": 80,
}
`);
Expand Down
4 changes: 4 additions & 0 deletions x-pack/plugins/task_manager/server/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,10 @@ export const configSchema = schema.object(
max: DEFAULT_MAX_EPHEMERAL_REQUEST_CAPACITY,
}),
}),
/* These are not designed to be used by most users. Please use caution when changing these */
unsafe: schema.object({
exclude_task_types: schema.arrayOf(schema.string(), { defaultValue: [] }),
}),
},
{
validate: (config) => {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,9 @@ describe('EphemeralTaskLifecycle', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
...config,
},
elasticsearchAndSOAvailability$,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,9 @@ describe('managed configuration', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});
logger = context.logger.get('taskManager');

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ describe('Configuration Statistics Aggregator', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
};

const managedConfig = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,9 @@ describe('createMonitoringStatsStream', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
};

it('returns the initial config used to configure Task Manager', async () => {
Expand Down
48 changes: 48 additions & 0 deletions x-pack/plugins/task_manager/server/plugin.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,9 @@ describe('TaskManagerPlugin', () => {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});

pluginInitializerContext.env.instanceUuid = '';
Expand Down Expand Up @@ -82,6 +85,9 @@ describe('TaskManagerPlugin', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
});

const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
Expand Down Expand Up @@ -122,6 +128,48 @@ describe('TaskManagerPlugin', () => {
`"Cannot register task definitions after the task manager has started"`
);
});

test('it logs a warning when the unsafe `exclude_task_types` config is used', async () => {
const pluginInitializerContext = coreMock.createPluginInitializerContext<TaskManagerConfig>({
enabled: true,
max_workers: 10,
index: 'foo',
max_attempts: 9,
poll_interval: 3000,
version_conflict_threshold: 80,
max_poll_inactivity_cycles: 10,
request_capacity: 1000,
monitored_aggregated_stats_refresh_rate: 5000,
monitored_stats_health_verbose_log: {
enabled: false,
warn_delayed_task_start_in_seconds: 60,
},
monitored_stats_required_freshness: 5000,
monitored_stats_running_average_window: 50,
monitored_task_execution_thresholds: {
default: {
error_threshold: 90,
warn_threshold: 80,
},
custom: {},
},
ephemeral_tasks: {
enabled: false,
request_capacity: 10,
},
unsafe: {
exclude_task_types: ['*'],
},
});

const logger = pluginInitializerContext.logger.get();
const taskManagerPlugin = new TaskManagerPlugin(pluginInitializerContext);
taskManagerPlugin.setup(coreMock.createSetup(), { usageCollection: undefined });
expect((logger.warn as jest.Mock).mock.calls.length).toBe(1);
expect((logger.warn as jest.Mock).mock.calls[0][0]).toBe(
'Excluding task types from execution: *'
);
});
});

describe('getElasticsearchAndSOAvailability', () => {
Expand Down
9 changes: 8 additions & 1 deletion x-pack/plugins/task_manager/server/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,14 @@ export class TaskManagerPlugin
usageCollection,
monitoredHealth$,
this.config.ephemeral_tasks.enabled,
this.config.ephemeral_tasks.request_capacity
this.config.ephemeral_tasks.request_capacity,
this.config.unsafe.exclude_task_types
);
}

if (this.config.unsafe.exclude_task_types.length) {
this.logger.warn(
`Excluding task types from execution: ${this.config.unsafe.exclude_task_types.join(', ')}`
);
}

Expand Down
3 changes: 3 additions & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ describe('TaskPollingLifecycle', () => {
enabled: true,
request_capacity: 10,
},
unsafe: {
exclude_task_types: [],
},
},
taskStore: mockTaskStore,
logger: taskManagerLogger,
Expand Down
1 change: 1 addition & 0 deletions x-pack/plugins/task_manager/server/polling_lifecycle.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export class TaskPollingLifecycle {
this.taskClaiming = new TaskClaiming({
taskStore,
maxAttempts: config.max_attempts,
excludedTaskTypes: config.unsafe.exclude_task_types,
definitions,
logger: this.logger,
getCapacity: (taskType?: string) =>
Expand Down
14 changes: 14 additions & 0 deletions x-pack/plugins/task_manager/server/queries/task_claiming.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ describe('TaskClaiming', () => {
new TaskClaiming({
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
taskStore: taskStoreMock.create({ taskManagerId: '' }),
maxAttempts: 2,
getCapacity: () => 10,
Expand All @@ -119,11 +120,13 @@ describe('TaskClaiming', () => {
taskClaimingOpts = {},
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
}) {
const definitions = storeOpts.definitions ?? taskDefinitions;
const store = taskStoreMock.create({ taskManagerId: storeOpts.taskManagerId });
Expand Down Expand Up @@ -151,6 +154,7 @@ describe('TaskClaiming', () => {
logger: taskManagerLogger,
definitions,
taskStore: store,
excludedTaskTypes,
maxAttempts: taskClaimingOpts.maxAttempts ?? 2,
getCapacity: taskClaimingOpts.getCapacity ?? (() => 10),
...taskClaimingOpts,
Expand All @@ -165,17 +169,20 @@ describe('TaskClaiming', () => {
claimingOpts,
hits = [generateFakeTasks(1)],
versionConflicts = 2,
excludedTaskTypes = [],
}: {
storeOpts: Partial<StoreOpts>;
taskClaimingOpts: Partial<TaskClaimingOpts>;
claimingOpts: Omit<OwnershipClaimingOpts, 'size' | 'taskTypes'>;
hits?: ConcreteTaskInstance[][];
versionConflicts?: number;
excludedTaskTypes?: string[];
}) {
const getCapacity = taskClaimingOpts.getCapacity ?? (() => 10);
const { taskClaiming, store } = initialiseTestClaiming({
storeOpts,
taskClaimingOpts,
excludedTaskTypes,
hits,
versionConflicts,
});
Expand Down Expand Up @@ -264,6 +271,11 @@ describe('TaskClaiming', () => {
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
foobar: {
title: 'foobar',
maxAttempts: customMaxAttempts,
createTaskRunner: jest.fn(),
},
});

const [
Expand All @@ -282,6 +294,7 @@ describe('TaskClaiming', () => {
claimingOpts: {
claimOwnershipUntil: new Date(),
},
excludedTaskTypes: ['foobar'],
});
expect(query).toMatchObject({
bool: {
Expand Down Expand Up @@ -1241,6 +1254,7 @@ if (doc['task.runAt'].size()!=0) {
const taskClaiming = new TaskClaiming({
logger: taskManagerLogger,
definitions,
excludedTaskTypes: [],
taskStore,
maxAttempts: 2,
getCapacity,
Expand Down
55 changes: 37 additions & 18 deletions x-pack/plugins/task_manager/server/queries/task_claiming.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* This module contains helpers for managing the task manager storage layer.
*/
import apm from 'elastic-apm-node';
import minimatch from 'minimatch';
import { Subject, Observable, from, of } from 'rxjs';
import { map, mergeScan } from 'rxjs/operators';
import { difference, partition, groupBy, mapValues, countBy, pick, isPlainObject } from 'lodash';
Expand Down Expand Up @@ -57,6 +58,7 @@ export interface TaskClaimingOpts {
definitions: TaskTypeDictionary;
taskStore: TaskStore;
maxAttempts: number;
excludedTaskTypes: string[];
getCapacity: (taskType?: string) => number;
}

Expand Down Expand Up @@ -115,6 +117,7 @@ export class TaskClaiming {
private logger: Logger;
private readonly taskClaimingBatchesByType: TaskClaimingBatches;
private readonly taskMaxAttempts: Record<string, number>;
private readonly excludedTaskTypes: string[];

/**
* Constructs a new TaskStore.
Expand All @@ -130,6 +133,7 @@ export class TaskClaiming {
this.logger = opts.logger;
this.taskClaimingBatchesByType = this.partitionIntoClaimingBatches(this.definitions);
this.taskMaxAttempts = Object.fromEntries(this.normalizeMaxAttempts(this.definitions));
this.excludedTaskTypes = opts.excludedTaskTypes;

this.events$ = new Subject<TaskClaim>();
}
Expand Down Expand Up @@ -354,6 +358,16 @@ export class TaskClaiming {
};
};

private isTaskTypeExcluded(taskType: string) {
for (const excludedType of this.excludedTaskTypes) {
if (minimatch(taskType, excludedType)) {
return true;
}
}

return false;
}

private async markAvailableTasksAsClaimed({
claimOwnershipUntil,
claimTasksById,
Expand All @@ -362,9 +376,11 @@ export class TaskClaiming {
}: OwnershipClaimingOpts): Promise<UpdateByQueryResult> {
const { taskTypesToSkip = [], taskTypesToClaim = [] } = groupBy(
this.definitions.getAllTypes(),
(type) => (taskTypes.has(type) ? 'taskTypesToClaim' : 'taskTypesToSkip')
(type) =>
taskTypes.has(type) && !this.isTaskTypeExcluded(type)
? 'taskTypesToClaim'
: 'taskTypesToSkip'
);

const queryForScheduledTasks = mustBeAllOf(
// Either a task with idle status and runAt <= now or
// status running or claiming with a retryAt <= now.
Expand All @@ -382,29 +398,32 @@ export class TaskClaiming {
sort.unshift('_score');
}

const query = matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
);
const script = updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
);

const apmTrans = apm.startTransaction(
'markAvailableTasksAsClaimed',
`taskManager markAvailableTasksAsClaimed`
);
try {
const result = await this.taskStore.updateByQuery(
{
query: matchesClauses(
claimTasksById && claimTasksById.length
? mustBeAllOf(asPinnedQuery(claimTasksById, queryForScheduledTasks))
: queryForScheduledTasks,
filterDownBy(InactiveTasks)
),
script: updateFieldsAndMarkAsFailed(
{
ownerId: this.taskStore.taskManagerId,
retryAt: claimOwnershipUntil,
},
claimTasksById || [],
taskTypesToClaim,
taskTypesToSkip,
pick(this.taskMaxAttempts, taskTypesToClaim)
),
query,
script,
sort,
},
{
Expand Down
Loading

0 comments on commit d3f231c

Please sign in to comment.