Skip to content

Commit

Permalink
🌊 Streams: Show data retention on stream (elastic#204125)
Browse files Browse the repository at this point in the history
Show data retention on streams

In case of a policy, the name of the policy is shown (badge is clickable
and leads to the edit page of the policy):
<img width="524" alt="Screenshot 2024-12-12 at 20 57 36"
src="https://github.com/user-attachments/assets/2664b45b-2473-49c4-b1d6-dccb8fe48d43"
/>

In case of DLM, the effect retention is shown:
<img width="532" alt="Screenshot 2024-12-12 at 20 58 42"
src="https://github.com/user-attachments/assets/07ca8086-75e2-45f8-9d71-17bd0a76ebe5"
/>

This is just the display piece, editing retention will be added later
on.

This PR adjusts the base streams data stream settings to use a localized
data stream retention configuration to make it compatible with
serverless.

---------

Co-authored-by: kibanamachine <[email protected]>
Co-authored-by: Elastic Machine <[email protected]>
  • Loading branch information
3 people authored and viduni94 committed Jan 23, 2025
1 parent 8faf6c6 commit e42a38d
Show file tree
Hide file tree
Showing 26 changed files with 175 additions and 69 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ingestStream } from './ingest_stream';

export const ingestReadStream = {
...ingestStream,
lifecycle: { type: 'dlm' },
inherited_fields: {
'@timestamp': {
type: 'date',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { wiredStream } from './wired_stream';

export const wiredReadStream = {
...wiredStream,
lifecycle: { type: 'dlm' },
inherited_fields: {
'@timestamp': {
type: 'date',
Expand Down
7 changes: 7 additions & 0 deletions x-pack/packages/kbn-streams-schema/src/models/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,10 @@ export const elasticsearchAssetSchema = z.array(
);

export type ElasticsearchAsset = z.infer<typeof elasticsearchAssetSchema>;

export const lifecycleSchema = z.discriminatedUnion('type', [
z.object({ type: z.literal('dlm'), data_retention: z.optional(z.string()) }),
z.object({ type: z.literal('ilm'), policy: z.string() }),
]);

export type StreamLifecycle = z.infer<typeof lifecycleSchema>;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import { z } from '@kbn/zod';
import { ingestStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';

export const ingestReadStreamDefinitonSchema = ingestStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
lifecycle: lifecycleSchema,
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import { z } from '@kbn/zod';
import { wiredStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';

export const wiredReadStreamDefinitonSchema = wiredStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
lifecycle: lifecycleSchema,
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@

export const ILM_LOCATOR_ID = 'ILM_LOCATOR_ID';
export * from './src/policies';
export * from './src/locator';
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SerializableRecord } from '@kbn/utility-types';

export interface IlmLocatorParams extends SerializableRecord {
page: 'policies_list' | 'policy_edit' | 'policy_create';
policyName?: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
"target/**/*"
],
"kbn_references": [
"@kbn/utility-types",
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,3 @@ import { IndexLifecycleManagementPlugin } from './plugin';
export const plugin = (initializerContext: PluginInitializerContext) => {
return new IndexLifecycleManagementPlugin(initializerContext);
};

export type { IlmLocatorParams } from './locator';
export { ILM_LOCATOR_ID } from './locator';
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* 2.0.
*/

import type { SerializableRecord } from '@kbn/utility-types';
import { ManagementAppLocator } from '@kbn/management-plugin/common';
import { LocatorDefinition } from '@kbn/share-plugin/public';
import { ILM_LOCATOR_ID } from '@kbn/index-lifecycle-management-common-shared';
import { ILM_LOCATOR_ID, IlmLocatorParams } from '@kbn/index-lifecycle-management-common-shared';
import {
getPoliciesListPath,
getPolicyCreatePath,
Expand All @@ -18,11 +17,6 @@ import { PLUGIN } from '../common/constants';

export { ILM_LOCATOR_ID };

export interface IlmLocatorParams extends SerializableRecord {
page: 'policies_list' | 'policy_edit' | 'policy_create';
policyName?: string;
}

export interface IlmLocatorDefinitionDependencies {
managementAppLocator: ManagementAppLocator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"@kbn/test-jest-helpers",
"@kbn/core-http-browser-mocks",
"@kbn/i18n",
"@kbn/utility-types",
"@kbn/analytics",
"@kbn/es-ui-shared-plugin",
"@kbn/i18n-react",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from '@elastic/elasticsearch/lib/api/types';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { logsSettings, logsLifecycle } from './logs_layer';
import { isRoot } from '../helpers/hierarchy';
import { getComponentTemplateName } from './name';

Expand All @@ -38,6 +38,7 @@ export function generateLayer(
name: getComponentTemplateName(id),
template: {
settings: isRoot(definition.name) ? logsSettings : {},
lifecycle: isRoot(definition.name) ? logsLifecycle : undefined,
mappings: {
subobjects: false,
dynamic: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* 2.0.
*/

import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types';
import {
IndicesIndexSettings,
IndicesDataStreamLifecycle,
} from '@elastic/elasticsearch/lib/api/types';

export const logsSettings: IndicesIndexSettings = {
index: {
lifecycle: {
name: 'logs',
},
mode: 'logsdb',
codec: 'best_compression',
mapping: {
total_fields: {
Expand All @@ -21,3 +22,5 @@ export const logsSettings: IndicesIndexSettings = {
},
},
};

export const logsLifecycle: IndicesDataStreamLifecycle = {};
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import {
ListStreamsResponse,
isWiredStream,
FieldDefinition,
StreamLifecycle,
ReadStreamDefinition,
IngestReadStreamDefinition,
isWiredReadStream,
WiredReadStreamDefinition,
} from '@kbn/streams-schema';
import { omit } from 'lodash';
import { STREAMS_INDEX } from '../../../common/constants';
Expand Down Expand Up @@ -65,8 +70,9 @@ export async function deleteUnmanagedStreamObjects({
scopedClusterClient,
logger,
}: DeleteStreamParams) {
const dataStream = await getDataStream({ name: id, scopedClusterClient });
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
name: id,
dataStream,
scopedClusterClient,
});
const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id;
Expand Down Expand Up @@ -168,7 +174,7 @@ async function upsertInternalStream({
return scopedClusterClient.asInternalUser.index({
id: definition.name,
index: STREAMS_INDEX,
document: { ...omit(definition, 'elasticsearch_assets') },
document: { ...omit(definition, 'elasticsearch_assets', 'inherited_fields', 'lifecycle') },
refresh: 'wait_for',
});
}
Expand Down Expand Up @@ -233,6 +239,24 @@ async function listManagedStreams({
});
}

function getDataStreamLifecycle(dataStream: IndicesDataStream): StreamLifecycle {
if (
dataStream.ilm_policy &&
(!dataStream.lifecycle || typeof dataStream.prefer_ilm === 'undefined' || dataStream.prefer_ilm)
) {
return {
type: 'ilm',
policy: dataStream.ilm_policy,
};
}
return {
type: 'dlm',
data_retention: dataStream.lifecycle?.data_retention
? String(dataStream.lifecycle.data_retention)
: undefined,
};
}

export async function listDataStreamsAsStreams({
scopedClusterClient,
}: ListStreamsParams): Promise<IngestStreamDefinition[]> {
Expand All @@ -259,7 +283,7 @@ export async function readStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams): Promise<StreamDefinition> {
}: ReadStreamParams): Promise<ReadStreamDefinition> {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
Expand All @@ -272,7 +296,12 @@ export async function readStream({
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return definition;
const dataStream = await getDataStream({ name: id, scopedClusterClient });
return {
...definition,
inherited_fields: {},
lifecycle: getDataStreamLifecycle(dataStream),
};
} catch (e) {
if (e.meta?.statusCode === 404) {
return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck });
Expand All @@ -282,8 +311,11 @@ export async function readStream({
}

export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadStreamParams) {
const definition: IngestStreamDefinition = {
const dataStream = await getDataStream({ name: id, scopedClusterClient });
const definition: IngestReadStreamDefinition = {
name: id,
lifecycle: getDataStreamLifecycle(dataStream),
inherited_fields: {},
stream: {
ingest: {
routing: [],
Expand All @@ -293,21 +325,24 @@ export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadSt
};

definition.elasticsearch_assets = await getUnmanagedElasticsearchAssets({
name: id,
dataStream,
scopedClusterClient,
});

return definition;
}

interface ReadUnmanagedAssetsParams extends BaseParams {
name: string;
dataStream: IndicesDataStream;
}

async function getUnmanagedElasticsearchAssets({
async function getDataStream({
name,
scopedClusterClient,
}: ReadUnmanagedAssetsParams) {
}: {
name: string;
scopedClusterClient: IScopedClusterClient;
}) {
let dataStream: IndicesDataStream | undefined;
try {
const response = await scopedClusterClient.asCurrentUser.indices.getDataStream({ name });
Expand All @@ -319,11 +354,16 @@ async function getUnmanagedElasticsearchAssets({
throw e;
}
}

if (!dataStream) {
throw new DefinitionNotFound(`Stream definition for ${name} not found.`);
}
return dataStream;
}

async function getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient,
}: ReadUnmanagedAssetsParams) {
// retrieve linked index template, component template and ingest pipeline
const templateName = dataStream.template;
const componentTemplates: string[] = [];
Expand Down Expand Up @@ -356,7 +396,7 @@ async function getUnmanagedElasticsearchAssets({
},
{
type: 'data_stream' as const,
id: name,
id: dataStream.name,
},
];
}
Expand All @@ -383,7 +423,7 @@ export async function readAncestors({
scopedClusterClient,
id: ancestorId,
skipAccessCheck: true,
}) as unknown as WiredStreamDefinition
}) as unknown as WiredReadStreamDefinition
)
),
};
Expand Down Expand Up @@ -430,7 +470,7 @@ export async function validateAncestorFields(
for (const name in fields) {
if (
Object.hasOwn(fields, name) &&
isWiredStream(ancestor) &&
isWiredReadStream(ancestor) &&
Object.entries(ancestor.stream.ingest.wired.fields).some(
([ancestorFieldName, attr]) =>
attr.type !== fields[name].type && ancestorFieldName === name
Expand Down Expand Up @@ -531,7 +571,7 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
if (!isWiredStream(definition)) {
if (!isWiredStream(definition) && !isWiredReadStream(definition)) {
await syncUnmanagedStream({ scopedClusterClient, definition, logger, assetClient });
await upsertInternalStream({
scopedClusterClient,
Expand Down Expand Up @@ -602,14 +642,12 @@ interface ExecutionPlanStep {
}

async function syncUnmanagedStream({ scopedClusterClient, definition }: SyncStreamParams) {
if (isWiredStream(definition)) {
throw new Error('Got an unmanaged stream that is marked as managed');
}
if (definition.stream.ingest.routing.length) {
throw new Error('Unmanaged streams cannot have managed children, coming soon');
}
const dataStream = await getDataStream({ name: definition.name, scopedClusterClient });
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
name: definition.name,
dataStream,
scopedClusterClient,
});
const executionPlan: ExecutionPlanStep[] = [];
Expand Down
Loading

0 comments on commit e42a38d

Please sign in to comment.