Skip to content

Commit

Permalink
[8.x] 🌊 Streams: Show data retention on stream (#204125) (#206393)
Browse files Browse the repository at this point in the history
# Backport

This will backport the following commits from `main` to `8.x`:
- [🌊 Streams: Show data retention on stream
(#204125)](#204125)

<!--- Backport version: 9.4.3 -->

### Questions ?
Please refer to the [Backport tool
documentation](https://github.com/sqren/backport)

<!--BACKPORT [{"author":{"name":"Joe
Reuter","email":"[email protected]"},"sourceCommit":{"committedDate":"2025-01-09T08:25:19Z","message":"🌊
Streams: Show data retention on stream (#204125)\n\nShow data retention
on streams\r\n\r\nIn case of a policy, the name of the policy is shown
(badge is clickable\r\nand leads to the edit page of the
policy):\r\n<img width=\"524\" alt=\"Screenshot 2024-12-12 at 20 57
36\"\r\nsrc=\"https://github.com/user-attachments/assets/2664b45b-2473-49c4-b1d6-dccb8fe48d43\"\r\n/>\r\n\r\nIn
case of DLM, the effect retention is shown:\r\n<img width=\"532\"
alt=\"Screenshot 2024-12-12 at 20 58
42\"\r\nsrc=\"https://github.com/user-attachments/assets/07ca8086-75e2-45f8-9d71-17bd0a76ebe5\"\r\n/>\r\n\r\nThis
is just the display piece, editing retention will be added
later\r\non.\r\n\r\nThis PR adjusts the base streams data stream
settings to use a localized\r\ndata stream retention configuration to
make it compatible
with\r\nserverless.\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<[email protected]>\r\nCo-authored-by:
Elastic Machine
<[email protected]>","sha":"3515a0f7b8958b11a9065696ac154b56446f7294","branchLabelMapping":{"^v9.0.0$":"main","^v8.18.0$":"8.x","^v(\\d+).(\\d+).\\d+$":"$1.$2"}},"sourcePullRequest":{"labels":["release_note:skip","v9.0.0","backport:version","v8.18.0","Feature:Streams"],"title":"🌊
Streams: Show data retention on
stream","number":204125,"url":"https://github.com/elastic/kibana/pull/204125","mergeCommit":{"message":"🌊
Streams: Show data retention on stream (#204125)\n\nShow data retention
on streams\r\n\r\nIn case of a policy, the name of the policy is shown
(badge is clickable\r\nand leads to the edit page of the
policy):\r\n<img width=\"524\" alt=\"Screenshot 2024-12-12 at 20 57
36\"\r\nsrc=\"https://github.com/user-attachments/assets/2664b45b-2473-49c4-b1d6-dccb8fe48d43\"\r\n/>\r\n\r\nIn
case of DLM, the effect retention is shown:\r\n<img width=\"532\"
alt=\"Screenshot 2024-12-12 at 20 58
42\"\r\nsrc=\"https://github.com/user-attachments/assets/07ca8086-75e2-45f8-9d71-17bd0a76ebe5\"\r\n/>\r\n\r\nThis
is just the display piece, editing retention will be added
later\r\non.\r\n\r\nThis PR adjusts the base streams data stream
settings to use a localized\r\ndata stream retention configuration to
make it compatible
with\r\nserverless.\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<[email protected]>\r\nCo-authored-by:
Elastic Machine
<[email protected]>","sha":"3515a0f7b8958b11a9065696ac154b56446f7294"}},"sourceBranch":"main","suggestedTargetBranches":["8.x"],"targetPullRequestStates":[{"branch":"main","label":"v9.0.0","branchLabelMappingKey":"^v9.0.0$","isSourceBranch":true,"state":"MERGED","url":"https://github.com/elastic/kibana/pull/204125","number":204125,"mergeCommit":{"message":"🌊
Streams: Show data retention on stream (#204125)\n\nShow data retention
on streams\r\n\r\nIn case of a policy, the name of the policy is shown
(badge is clickable\r\nand leads to the edit page of the
policy):\r\n<img width=\"524\" alt=\"Screenshot 2024-12-12 at 20 57
36\"\r\nsrc=\"https://github.com/user-attachments/assets/2664b45b-2473-49c4-b1d6-dccb8fe48d43\"\r\n/>\r\n\r\nIn
case of DLM, the effect retention is shown:\r\n<img width=\"532\"
alt=\"Screenshot 2024-12-12 at 20 58
42\"\r\nsrc=\"https://github.com/user-attachments/assets/07ca8086-75e2-45f8-9d71-17bd0a76ebe5\"\r\n/>\r\n\r\nThis
is just the display piece, editing retention will be added
later\r\non.\r\n\r\nThis PR adjusts the base streams data stream
settings to use a localized\r\ndata stream retention configuration to
make it compatible
with\r\nserverless.\r\n\r\n---------\r\n\r\nCo-authored-by:
kibanamachine
<[email protected]>\r\nCo-authored-by:
Elastic Machine
<[email protected]>","sha":"3515a0f7b8958b11a9065696ac154b56446f7294"}},{"branch":"8.x","label":"v8.18.0","branchLabelMappingKey":"^v8.18.0$","isSourceBranch":false,"state":"NOT_CREATED"}]}]
BACKPORT-->

Co-authored-by: Joe Reuter <[email protected]>
  • Loading branch information
kibanamachine and flash1293 authored Jan 13, 2025
1 parent 3ee7fde commit 4bd76d5
Show file tree
Hide file tree
Showing 26 changed files with 175 additions and 69 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { ingestStream } from './ingest_stream';

export const ingestReadStream = {
...ingestStream,
lifecycle: { type: 'dlm' },
inherited_fields: {
'@timestamp': {
type: 'date',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import { wiredStream } from './wired_stream';

export const wiredReadStream = {
...wiredStream,
lifecycle: { type: 'dlm' },
inherited_fields: {
'@timestamp': {
type: 'date',
Expand Down
7 changes: 7 additions & 0 deletions x-pack/packages/kbn-streams-schema/src/models/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,3 +116,10 @@ export const elasticsearchAssetSchema = z.array(
);

export type ElasticsearchAsset = z.infer<typeof elasticsearchAssetSchema>;

export const lifecycleSchema = z.discriminatedUnion('type', [
z.object({ type: z.literal('dlm'), data_retention: z.optional(z.string()) }),
z.object({ type: z.literal('ilm'), policy: z.string() }),
]);

export type StreamLifecycle = z.infer<typeof lifecycleSchema>;

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import { z } from '@kbn/zod';
import { ingestStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';

export const ingestReadStreamDefinitonSchema = ingestStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
lifecycle: lifecycleSchema,
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,12 @@

import { z } from '@kbn/zod';
import { wiredStreamDefinitonSchema } from '../streams';
import { inheritedFieldDefinitionSchema } from '../common';
import { inheritedFieldDefinitionSchema, lifecycleSchema } from '../common';

export const wiredReadStreamDefinitonSchema = wiredStreamDefinitonSchema
.extend({
inherited_fields: inheritedFieldDefinitionSchema.default({}),
lifecycle: lifecycleSchema,
})
.strict();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@

export const ILM_LOCATOR_ID = 'ILM_LOCATOR_ID';
export * from './src/policies';
export * from './src/locator';
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/
import type { SerializableRecord } from '@kbn/utility-types';

export interface IlmLocatorParams extends SerializableRecord {
page: 'policies_list' | 'policy_edit' | 'policy_create';
policyName?: string;
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@
"target/**/*"
],
"kbn_references": [
"@kbn/utility-types",
]
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,3 @@ import { IndexLifecycleManagementPlugin } from './plugin';
export const plugin = (initializerContext: PluginInitializerContext) => {
return new IndexLifecycleManagementPlugin(initializerContext);
};

export type { IlmLocatorParams } from './locator';
export { ILM_LOCATOR_ID } from './locator';
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@
* 2.0.
*/

import type { SerializableRecord } from '@kbn/utility-types';
import { ManagementAppLocator } from '@kbn/management-plugin/common';
import { LocatorDefinition } from '@kbn/share-plugin/public';
import { ILM_LOCATOR_ID } from '@kbn/index-lifecycle-management-common-shared';
import { ILM_LOCATOR_ID, IlmLocatorParams } from '@kbn/index-lifecycle-management-common-shared';
import {
getPoliciesListPath,
getPolicyCreatePath,
Expand All @@ -18,11 +17,6 @@ import { PLUGIN } from '../common/constants';

export { ILM_LOCATOR_ID };

export interface IlmLocatorParams extends SerializableRecord {
page: 'policies_list' | 'policy_edit' | 'policy_create';
policyName?: string;
}

export interface IlmLocatorDefinitionDependencies {
managementAppLocator: ManagementAppLocator;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"@kbn/test-jest-helpers",
"@kbn/core-http-browser-mocks",
"@kbn/i18n",
"@kbn/utility-types",
"@kbn/analytics",
"@kbn/es-ui-shared-plugin",
"@kbn/i18n-react",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import {
} from '@elastic/elasticsearch/lib/api/types';
import { WiredStreamDefinition } from '@kbn/streams-schema';
import { ASSET_VERSION } from '../../../../common/constants';
import { logsSettings } from './logs_layer';
import { logsSettings, logsLifecycle } from './logs_layer';
import { isRoot } from '../helpers/hierarchy';
import { getComponentTemplateName } from './name';

Expand All @@ -38,6 +38,7 @@ export function generateLayer(
name: getComponentTemplateName(id),
template: {
settings: isRoot(definition.name) ? logsSettings : {},
lifecycle: isRoot(definition.name) ? logsLifecycle : undefined,
mappings: {
subobjects: false,
dynamic: false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
* 2.0.
*/

import { IndicesIndexSettings } from '@elastic/elasticsearch/lib/api/types';
import {
IndicesIndexSettings,
IndicesDataStreamLifecycle,
} from '@elastic/elasticsearch/lib/api/types';

export const logsSettings: IndicesIndexSettings = {
index: {
lifecycle: {
name: 'logs',
},
mode: 'logsdb',
codec: 'best_compression',
mapping: {
total_fields: {
Expand All @@ -21,3 +22,5 @@ export const logsSettings: IndicesIndexSettings = {
},
},
};

export const logsLifecycle: IndicesDataStreamLifecycle = {};
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@ import {
ListStreamsResponse,
isWiredStream,
FieldDefinition,
StreamLifecycle,
ReadStreamDefinition,
IngestReadStreamDefinition,
isWiredReadStream,
WiredReadStreamDefinition,
} from '@kbn/streams-schema';
import { omit } from 'lodash';
import { STREAMS_INDEX } from '../../../common/constants';
Expand Down Expand Up @@ -65,8 +70,9 @@ export async function deleteUnmanagedStreamObjects({
scopedClusterClient,
logger,
}: DeleteStreamParams) {
const dataStream = await getDataStream({ name: id, scopedClusterClient });
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
name: id,
dataStream,
scopedClusterClient,
});
const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id;
Expand Down Expand Up @@ -168,7 +174,7 @@ async function upsertInternalStream({
return scopedClusterClient.asInternalUser.index({
id: definition.name,
index: STREAMS_INDEX,
document: { ...omit(definition, 'elasticsearch_assets') },
document: { ...omit(definition, 'elasticsearch_assets', 'inherited_fields', 'lifecycle') },
refresh: 'wait_for',
});
}
Expand Down Expand Up @@ -233,6 +239,24 @@ async function listManagedStreams({
});
}

function getDataStreamLifecycle(dataStream: IndicesDataStream): StreamLifecycle {
if (
dataStream.ilm_policy &&
(!dataStream.lifecycle || typeof dataStream.prefer_ilm === 'undefined' || dataStream.prefer_ilm)
) {
return {
type: 'ilm',
policy: dataStream.ilm_policy,
};
}
return {
type: 'dlm',
data_retention: dataStream.lifecycle?.data_retention
? String(dataStream.lifecycle.data_retention)
: undefined,
};
}

export async function listDataStreamsAsStreams({
scopedClusterClient,
}: ListStreamsParams): Promise<IngestStreamDefinition[]> {
Expand All @@ -259,7 +283,7 @@ export async function readStream({
id,
scopedClusterClient,
skipAccessCheck,
}: ReadStreamParams): Promise<StreamDefinition> {
}: ReadStreamParams): Promise<ReadStreamDefinition> {
try {
const response = await scopedClusterClient.asInternalUser.get<StreamDefinition>({
id,
Expand All @@ -272,7 +296,12 @@ export async function readStream({
throw new DefinitionNotFound(`Stream definition for ${id} not found.`);
}
}
return definition;
const dataStream = await getDataStream({ name: id, scopedClusterClient });
return {
...definition,
inherited_fields: {},
lifecycle: getDataStreamLifecycle(dataStream),
};
} catch (e) {
if (e.meta?.statusCode === 404) {
return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck });
Expand All @@ -282,8 +311,11 @@ export async function readStream({
}

export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadStreamParams) {
const definition: IngestStreamDefinition = {
const dataStream = await getDataStream({ name: id, scopedClusterClient });
const definition: IngestReadStreamDefinition = {
name: id,
lifecycle: getDataStreamLifecycle(dataStream),
inherited_fields: {},
stream: {
ingest: {
routing: [],
Expand All @@ -293,21 +325,24 @@ export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadSt
};

definition.elasticsearch_assets = await getUnmanagedElasticsearchAssets({
name: id,
dataStream,
scopedClusterClient,
});

return definition;
}

interface ReadUnmanagedAssetsParams extends BaseParams {
name: string;
dataStream: IndicesDataStream;
}

async function getUnmanagedElasticsearchAssets({
async function getDataStream({
name,
scopedClusterClient,
}: ReadUnmanagedAssetsParams) {
}: {
name: string;
scopedClusterClient: IScopedClusterClient;
}) {
let dataStream: IndicesDataStream | undefined;
try {
const response = await scopedClusterClient.asCurrentUser.indices.getDataStream({ name });
Expand All @@ -319,11 +354,16 @@ async function getUnmanagedElasticsearchAssets({
throw e;
}
}

if (!dataStream) {
throw new DefinitionNotFound(`Stream definition for ${name} not found.`);
}
return dataStream;
}

async function getUnmanagedElasticsearchAssets({
dataStream,
scopedClusterClient,
}: ReadUnmanagedAssetsParams) {
// retrieve linked index template, component template and ingest pipeline
const templateName = dataStream.template;
const componentTemplates: string[] = [];
Expand Down Expand Up @@ -356,7 +396,7 @@ async function getUnmanagedElasticsearchAssets({
},
{
type: 'data_stream' as const,
id: name,
id: dataStream.name,
},
];
}
Expand All @@ -383,7 +423,7 @@ export async function readAncestors({
scopedClusterClient,
id: ancestorId,
skipAccessCheck: true,
}) as unknown as WiredStreamDefinition
}) as unknown as WiredReadStreamDefinition
)
),
};
Expand Down Expand Up @@ -430,7 +470,7 @@ export async function validateAncestorFields(
for (const name in fields) {
if (
Object.hasOwn(fields, name) &&
isWiredStream(ancestor) &&
isWiredReadStream(ancestor) &&
Object.entries(ancestor.stream.ingest.wired.fields).some(
([ancestorFieldName, attr]) =>
attr.type !== fields[name].type && ancestorFieldName === name
Expand Down Expand Up @@ -531,7 +571,7 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
if (!isWiredStream(definition)) {
if (!isWiredStream(definition) && !isWiredReadStream(definition)) {
await syncUnmanagedStream({ scopedClusterClient, definition, logger, assetClient });
await upsertInternalStream({
scopedClusterClient,
Expand Down Expand Up @@ -602,14 +642,12 @@ interface ExecutionPlanStep {
}

async function syncUnmanagedStream({ scopedClusterClient, definition }: SyncStreamParams) {
if (isWiredStream(definition)) {
throw new Error('Got an unmanaged stream that is marked as managed');
}
if (definition.stream.ingest.routing.length) {
throw new Error('Unmanaged streams cannot have managed children, coming soon');
}
const dataStream = await getDataStream({ name: definition.name, scopedClusterClient });
const unmanagedAssets = await getUnmanagedElasticsearchAssets({
name: definition.name,
dataStream,
scopedClusterClient,
});
const executionPlan: ExecutionPlanStep[] = [];
Expand Down
Loading

0 comments on commit 4bd76d5

Please sign in to comment.