Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

🌊 Streams: Show data retention on stream #204125

Merged
merged 20 commits into from
Jan 9, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
aa49c96
show data retention on stream
flash1293 Dec 12, 2024
b4cc7a2
[CI] Auto-commit changed files from 'node scripts/lint_ts_projects --…
kibanamachine Dec 12, 2024
7a4dcd2
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Dec 17, 2024
818793d
maybe fix things
flash1293 Dec 17, 2024
de1c7e7
maybe fix things
flash1293 Dec 17, 2024
67edb2b
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Dec 19, 2024
5623c43
clean up stuff
flash1293 Dec 19, 2024
5948b4f
[CI] Auto-commit changed files from 'node scripts/notice'
kibanamachine Dec 19, 2024
3614b60
[CI] Auto-commit changed files from 'node scripts/yarn_deduplicate'
kibanamachine Dec 19, 2024
59c24f2
[CI] Auto-commit changed files from 'node scripts/eslint --no-cache -…
kibanamachine Dec 19, 2024
0314451
some fixes
flash1293 Dec 19, 2024
ff8e23a
Merge branch 'flash1293/streams-data-retention' of github.com:flash12…
flash1293 Dec 19, 2024
74a1355
fix tests
flash1293 Dec 20, 2024
2bd1785
Update kibana.jsonc
flash1293 Dec 29, 2024
f39cf0b
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Dec 29, 2024
86b1c52
fix things
flash1293 Dec 29, 2024
d9334c3
Merge branch 'flash1293/streams-data-retention' of github.com:flash12…
flash1293 Dec 29, 2024
1f88212
Merge branch 'main' into flash1293/streams-data-retention
elasticmachine Dec 30, 2024
db287c7
Merge remote-tracking branch 'upstream/main' into flash1293/streams-d…
flash1293 Jan 8, 2025
5bb5bf8
fix tests
flash1293 Jan 8, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
fix tests
  • Loading branch information
flash1293 committed Jan 8, 2025
commit 5bb5bf838a13a8206801977b1c25815c1fa70b69
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import {
StreamLifecycle,
ReadStreamDefinition,
IngestReadStreamDefinition,
isWiredReadStream,
WiredReadStreamDefinition,
} from '@kbn/streams-schema';
import { omit } from 'lodash';
import { STREAMS_INDEX } from '../../../common/constants';
Expand Down Expand Up @@ -172,7 +174,7 @@ async function upsertInternalStream({
return scopedClusterClient.asInternalUser.index({
id: definition.name,
index: STREAMS_INDEX,
document: { ...omit(definition, 'elasticsearch_assets', 'inherited_fields') },
document: { ...omit(definition, 'elasticsearch_assets', 'inherited_fields', 'lifecycle') },
refresh: 'wait_for',
});
}
Expand Down Expand Up @@ -422,7 +424,7 @@ export async function readAncestors({
scopedClusterClient,
id: ancestorId,
skipAccessCheck: true,
}) as unknown as WiredStreamDefinition
}) as unknown as WiredReadStreamDefinition
)
),
};
Expand Down Expand Up @@ -469,7 +471,7 @@ export async function validateAncestorFields(
for (const name in fields) {
if (
Object.hasOwn(fields, name) &&
isWiredStream(ancestor) &&
isWiredReadStream(ancestor) &&
Object.entries(ancestor.stream.ingest.wired.fields).some(
([ancestorFieldName, attr]) =>
attr.type !== fields[name].type && ancestorFieldName === name
Expand Down Expand Up @@ -570,7 +572,7 @@ export async function syncStream({
rootDefinition,
logger,
}: SyncStreamParams) {
if (!isWiredStream(definition)) {
if (!isWiredStream(definition) && !isWiredReadStream(definition)) {
await syncUnmanagedStream({ scopedClusterClient, definition, logger, assetClient });
await upsertInternalStream({
scopedClusterClient,
Expand Down Expand Up @@ -641,9 +643,6 @@ interface ExecutionPlanStep {
}

async function syncUnmanagedStream({ scopedClusterClient, definition }: SyncStreamParams) {
if (isWiredStream(definition)) {
throw new Error('Got an unmanaged stream that is marked as managed');
}
if (definition.stream.ingest.routing.length) {
throw new Error('Unmanaged streams cannot have managed children, coming soon');
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import { z } from '@kbn/zod';
import { internal, notFound } from '@hapi/boom';
import { getFlattenedObject } from '@kbn/std';
import { isWiredStream } from '@kbn/streams-schema';
import { isWiredReadStream } from '@kbn/streams-schema';
import { DefinitionNotFound } from '../../../lib/streams/errors';
import { checkAccess, readAncestors, readStream } from '../../../lib/streams/stream_crud';
import { createServerRoute } from '../../create_server_route';
Expand Down Expand Up @@ -71,7 +71,7 @@ export const unmappedFieldsRoute = createServerRoute({
// Mapped fields from the stream's definition and inherited from ancestors
const mappedFields = new Set<string>();

if (isWiredStream(streamEntity)) {
if (isWiredReadStream(streamEntity)) {
Object.keys(streamEntity.stream.ingest.wired.fields).forEach((name) =>
mappedFields.add(name)
);
Expand Down
6 changes: 5 additions & 1 deletion x-pack/test/api_integration/apis/streams/classic.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ export default function ({ getService }: FtrProviderContext) {
expect(getResponse.body).to.eql({
name: TEST_STREAM_NAME,
dashboards: [],
inherited_fields: [],
inherited_fields: {},
lifecycle: {
policy: 'logs',
type: 'ilm',
},
stream: {
ingest: {
processing: [
Expand Down
14 changes: 4 additions & 10 deletions x-pack/test/api_integration/apis/streams/enrichment.ts
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,7 @@ export default function ({ getService }: FtrProviderContext) {
expect(result._source).to.eql({
'@timestamp': '2024-01-01T00:00:10.000Z',
message: '2023-01-01T00:00:10.000Z error test',
host: {
name: 'routeme',
},
'host.name': 'routeme',
inner_timestamp: '2023-01-01T00:00:10.000Z',
message2: 'test',
'log.level': 'error',
Expand Down Expand Up @@ -162,13 +160,9 @@ export default function ({ getService }: FtrProviderContext) {
'@timestamp': '2024-01-01T00:00:11.000Z',
message: '2023-01-01T00:00:10.000Z info mylogger this is the message',
inner_timestamp: '2023-01-01T00:00:10.000Z',
host: {
name: 'routeme',
},
log: {
level: 'info',
logger: 'mylogger',
},
'host.name': 'routeme',
'log.level': 'info',
'log.logger': 'mylogger',
message2: 'mylogger this is the message',
message3: 'this is the message',
});
Expand Down
Loading