Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[eem] unique esql query per type #201499

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 28 additions & 43 deletions x-pack/plugins/entity_manager/server/lib/entity_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { EntityV2, EntityDefinition, EntityDefinitionUpdate } from '@kbn/entitie
import { SavedObjectsClientContract } from '@kbn/core-saved-objects-api-server';
import { ElasticsearchClient } from '@kbn/core-elasticsearch-server';
import { Logger } from '@kbn/logging';
import { omit, uniq } from 'lodash';
import {
installEntityDefinition,
installationInProgress,
Expand All @@ -24,7 +25,7 @@ import { deleteIndices } from './entities/delete_index';
import { EntityDefinitionWithState } from './entities/types';
import { EntityDefinitionUpdateConflict } from './entities/errors/entity_definition_update_conflict';
import { EntitySource, getEntityInstancesQuery } from './queries';
import { mergeEntitiesList, runESQLQuery } from './queries/utils';
import { runESQLQuery } from './queries/utils';
import { UnknownEntityType } from './entities/errors/unknown_entity_type';

export class EntityClient {
Expand Down Expand Up @@ -232,53 +233,37 @@ export class EntityClient {
filters?: string[];
limit?: number;
}) {
const entities = await Promise.all(
sources.map(async (source) => {
const mandatoryFields = [source.timestamp_field, ...source.identity_fields];
const metaFields = [...metadataFields, ...source.metadata_fields];
const { fields } = await this.options.esClient.fieldCaps({
index: source.index_patterns,
fields: [...mandatoryFields, ...metaFields],
});
const [mandatory, metadata] = [
uniq(sources.flatMap((source) => [source.timestamp_field, ...source.identity_fields])),
uniq([...metadataFields, ...sources.flatMap((source) => source.metadata_fields)]),
];

const sourceHasMandatoryFields = mandatoryFields.every((field) => !!fields[field]);
if (!sourceHasMandatoryFields) {
// we can't build entities without id fields so we ignore the source.
// filters should likely behave similarly.
this.options.logger.info(
`Ignoring source for type [${source.type}] with index_patterns [${source.index_patterns}] because some mandatory fields [${mandatoryFields}] are not mapped`
);
return [];
}
const { fields } = await this.options.esClient.fieldCaps({
index: sources.flatMap((source) => source.index_patterns),
fields: [...mandatory, ...metadata],
});

// but metadata field not being available is fine
const availableMetadataFields = metaFields.filter((field) => fields[field]);
const sourceHasMandatoryFields = mandatory.every((field) => !!fields[field]);
if (!sourceHasMandatoryFields) {
throw new Error(`Invalid identity fields configuration`);
}

const query = getEntityInstancesQuery({
source: {
...source,
metadata_fields: availableMetadataFields,
filters: [...source.filters, ...filters],
},
start,
end,
limit,
});
this.options.logger.debug(`Entity query: ${query}`);
const availableMetadataFields = metadata.filter((field) => fields[field]);

const rawEntities = await runESQLQuery<EntityV2>({
query,
esClient: this.options.esClient,
});
const query = getEntityInstancesQuery({
sources,
start,
end,
limit,
metadataFields: availableMetadataFields,
});
this.options.logger.info(`Entity query: ${query}`);

return rawEntities.map((entity) => {
entity['entity.id'] = source.identity_fields.map((field) => entity[field]).join(':');
entity['entity.type'] = source.type;
return entity;
});
})
).then((results) => results.flat());
const rawEntities = await runESQLQuery<EntityV2>({
query,
esClient: this.options.esClient,
});

return mergeEntitiesList(entities).slice(0, limit);
return rawEntities;
}
}
87 changes: 51 additions & 36 deletions x-pack/plugins/entity_manager/server/lib/queries/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
*/

import { z } from '@kbn/zod';
import { uniq } from 'lodash';

export const entitySourceSchema = z.object({
type: z.string(),
Expand All @@ -18,74 +19,88 @@ export const entitySourceSchema = z.object({

export type EntitySource = z.infer<typeof entitySourceSchema>;

const sourceCommand = ({ source }: { source: EntitySource }) => {
let query = `FROM ${source.index_patterns}`;
const sourceCommand = ({ sources }: { sources: EntitySource[] }) => {
return `FROM ${sources.flatMap((source) => source.index_patterns).join(', ')}`;
};

const esMetadataFields = source.metadata_fields.filter((field) =>
['_index', '_id'].includes(field)
);
if (esMetadataFields.length) {
query += ` METADATA ${esMetadataFields.join(',')}`;
}
const idEvalCommand = ({ sources }: { sources: EntitySource[] }) => {
const conditions = sources.flatMap((source) => {
return [
source.identity_fields.map((field) => `${field} IS NOT NULL`).join(' AND '),
source.identity_fields.length === 1
? source.identity_fields[0]
: `CONCAT(${source.identity_fields.join(', ":", ')})`,
];
});
return `EVAL entity.id = CASE(${conditions.join(', ')})`;
};

return query;
const timestampEvalCommand = ({ sources }: { sources: EntitySource[] }) => {
const conditions = uniq(sources.map((source) => source.timestamp_field)).flatMap((field) => {
return [`${field} IS NOT NULL`, field];
});
return `EVAL entity.timestamp = CASE(${conditions.join(', ')})`;
};

const filterCommands = ({
source,
sources,
start,
end,
}: {
source: EntitySource;
sources: EntitySource[];
start: string;
end: string;
}) => {
const commands = [
`WHERE ${source.timestamp_field} >= "${start}"`,
`WHERE ${source.timestamp_field} <= "${end}"`,
const conditions = [
'entity.id IS NOT NULL',
'entity.timestamp IS NOT NULL',
`(entity.timestamp >= "${start}" AND entity.timestamp <= "${end}")`,
];

source.identity_fields.forEach((field) => {
commands.push(`WHERE ${field} IS NOT NULL`);
});

source.filters.forEach((filter) => {
commands.push(`WHERE ${filter}`);
});

return commands;
return conditions.map((condition) => `WHERE ${condition}`).join(' | ');
};

const statsCommand = ({ source }: { source: EntitySource }) => {
const statsCommand = ({
sources,
metadataFields,
}: {
sources: EntitySource[];
metadataFields: string[];
}) => {
const aggs = [
// default 'last_seen' attribute
`entity.last_seen_timestamp=MAX(${source.timestamp_field})`,
...source.metadata_fields
.filter((field) => !source.identity_fields.some((idField) => idField === field))
.map((field) => `metadata.${field}=VALUES(${field})`),
'entity.last_seen_timestamp=MAX(entity.timestamp)',
...uniq(sources.flatMap((source) => source.identity_fields)).map(
(field) => `${field}=TOP(${field}, 1, "desc")`
),
...metadataFields.map((field) => `metadata.${field}=VALUES(${field})`),
];

return `STATS ${aggs.join(',')} BY ${source.identity_fields.join(',')}`;
return `STATS ${aggs.join(',')} BY entity.id`;
};

export function getEntityInstancesQuery({
source,
sources,
limit,
start,
end,
metadataFields = [],
}: {
source: EntitySource;
sources: EntitySource[];
limit: number;
start: string;
end: string;
metadataFields?: string[];
}): string {
const commands = [
sourceCommand({ source }),
...filterCommands({ source, start, end }),
statsCommand({ source }),
`SORT entity.last_seen_timestamp DESC`,
sourceCommand({ sources }),
idEvalCommand({ sources }),
timestampEvalCommand({ sources }),
filterCommands({ sources, start, end }),
statsCommand({ sources, metadataFields }),
'SORT entity.last_seen_timestamp DESC',
`LIMIT ${limit}`,
];

return commands.join('|');
return commands.join(' | ');
}