diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index d1a66003d777e..02da6779da307 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -941,6 +941,7 @@ packages/kbn-stdio-dev-helpers @elastic/kibana-operations packages/kbn-storybook @elastic/kibana-operations x-pack/solutions/observability/plugins/streams_app @simianhacker @flash1293 @dgieselaar x-pack/solutions/observability/plugins/streams @simianhacker @flash1293 @dgieselaar +x-pack/packages/kbn-streams-schema @elastic/streams-program-team x-pack/solutions/observability/plugins/synthetics/e2e @elastic/obs-ux-management-team x-pack/solutions/observability/plugins/synthetics @elastic/obs-ux-management-team x-pack/packages/kbn-synthetics-private-location @elastic/obs-ux-management-team diff --git a/package.json b/package.json index 82bf7d6e42f9d..2e3620ca3a424 100644 --- a/package.json +++ b/package.json @@ -943,6 +943,7 @@ "@kbn/std": "link:packages/kbn-std", "@kbn/streams-app-plugin": "link:x-pack/solutions/observability/plugins/streams_app", "@kbn/streams-plugin": "link:x-pack/solutions/observability/plugins/streams", + "@kbn/streams-schema": "link:x-pack/packages/kbn-streams-schema", "@kbn/synthetics-plugin": "link:x-pack/solutions/observability/plugins/synthetics", "@kbn/synthetics-private-location": "link:x-pack/packages/kbn-synthetics-private-location", "@kbn/task-manager-fixture-plugin": "link:x-pack/test/alerting_api_integration/common/plugins/task_manager_fixture", diff --git a/tsconfig.base.json b/tsconfig.base.json index 148f367e32d3d..ebc3c51f781c3 100644 --- a/tsconfig.base.json +++ b/tsconfig.base.json @@ -1876,6 +1876,8 @@ "@kbn/streams-app-plugin/*": ["x-pack/solutions/observability/plugins/streams_app/*"], "@kbn/streams-plugin": ["x-pack/solutions/observability/plugins/streams"], "@kbn/streams-plugin/*": ["x-pack/solutions/observability/plugins/streams/*"], + "@kbn/streams-schema": ["x-pack/packages/kbn-streams-schema"], + "@kbn/streams-schema/*": ["x-pack/packages/kbn-streams-schema/*"], "@kbn/synthetics-e2e": ["x-pack/solutions/observability/plugins/synthetics/e2e"], "@kbn/synthetics-e2e/*": ["x-pack/solutions/observability/plugins/synthetics/e2e/*"], "@kbn/synthetics-plugin": ["x-pack/solutions/observability/plugins/synthetics"], diff --git a/x-pack/packages/kbn-streams-schema/README.md b/x-pack/packages/kbn-streams-schema/README.md new file mode 100644 index 0000000000000..368d090cada40 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/README.md @@ -0,0 +1,3 @@ +# @kbn/streams-schema + +This shared package contains the Zod schema definition for the Streams project. \ No newline at end of file diff --git a/x-pack/packages/kbn-streams-schema/index.ts b/x-pack/packages/kbn-streams-schema/index.ts new file mode 100644 index 0000000000000..c1f1effe45499 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './src/apis'; +export * from './src/models'; +export * from './src/helpers'; diff --git a/x-pack/packages/kbn-streams-schema/jest.config.js b/x-pack/packages/kbn-streams-schema/jest.config.js new file mode 100644 index 0000000000000..da4fa0627dfdd --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/jest.config.js @@ -0,0 +1,12 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +module.exports = { + preset: '@kbn/test', + rootDir: '../../..', + roots: ['/x-pack/packages/kbn-streams-schema'], +}; diff --git a/x-pack/packages/kbn-streams-schema/kibana.jsonc b/x-pack/packages/kbn-streams-schema/kibana.jsonc new file mode 100644 index 0000000000000..61129483995e9 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/kibana.jsonc @@ -0,0 +1,8 @@ +{ + "type": "shared-common", + "id": "@kbn/streams-schema", + "owner": "@elastic/streams-program-team", + "group": "observability", + "visibility": "shared" +} + diff --git a/x-pack/packages/kbn-streams-schema/package.json b/x-pack/packages/kbn-streams-schema/package.json new file mode 100644 index 0000000000000..056a531bf2e3e --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/package.json @@ -0,0 +1,7 @@ +{ + "name": "@kbn/streams-schema", + "description": "Streams Zod schema definition and common models shared between public and server.", + "private": true, + "version": "1.0.0", + "license": "Elastic License 2.0" +} diff --git a/x-pack/packages/kbn-streams-schema/src/apis/__snapshots__/read_streams_response.test.ts.snap b/x-pack/packages/kbn-streams-schema/src/apis/__snapshots__/read_streams_response.test.ts.snap new file mode 100644 index 0000000000000..b40f64d66180d --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/apis/__snapshots__/read_streams_response.test.ts.snap @@ -0,0 +1,105 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`ReadStreamResponse should successfully parse 1`] = ` +Object { + "streams": Array [ + Object { + "elasticsearch_assets": Array [], + "inherited_fields": Object { + "@timestamp": Object { + "from": "logs", + "type": "date", + }, + "message": Object { + "from": "logs", + "type": "match_only_text", + }, + }, + "name": "logs.nginx", + "stream": Object { + "ingest": Object { + "processing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "config": Object { + "grok": Object { + "field": "message", + "patterns": Array [ + "%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}", + ], + }, + }, + }, + ], + "routing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "name": "logs.errors", + }, + ], + "wired": Object { + "fields": Object { + "new_field": Object { + "type": "long", + }, + }, + }, + }, + }, + }, + Object { + "elasticsearch_assets": Array [], + "inherited_fields": Object { + "@timestamp": Object { + "from": "logs", + "type": "date", + }, + "message": Object { + "from": "logs", + "type": "match_only_text", + }, + }, + "name": "logs.nginx", + "stream": Object { + "ingest": Object { + "processing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "config": Object { + "grok": Object { + "field": "message", + "patterns": Array [ + "%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}", + ], + }, + }, + }, + ], + "routing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "name": "logs.errors", + }, + ], + }, + }, + }, + ], +} +`; diff --git a/x-pack/packages/kbn-streams-schema/src/apis/index.ts b/x-pack/packages/kbn-streams-schema/src/apis/index.ts new file mode 100644 index 0000000000000..f3551a83d704d --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/apis/index.ts @@ -0,0 +1,9 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './read_streams_response'; +export * from './list_streams_response'; diff --git a/x-pack/packages/kbn-streams-schema/src/apis/list_streams_response.ts b/x-pack/packages/kbn-streams-schema/src/apis/list_streams_response.ts new file mode 100644 index 0000000000000..f1f42f7e90ae2 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/apis/list_streams_response.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ +import { z } from '@kbn/zod'; +import { streamDefintionSchema } from '../models'; + +export const listStreamsResponseSchema = z.object({ + streams: z.array(streamDefintionSchema), +}); + +export type ListStreamsResponse = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/apis/read_streams_response.test.ts b/x-pack/packages/kbn-streams-schema/src/apis/read_streams_response.test.ts new file mode 100644 index 0000000000000..7200c9c4a1076 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/apis/read_streams_response.test.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { readStreamResponse } from '../fixtures/read_streams_response'; +import { readStreamResponseSchema } from './read_streams_response'; + +describe('ReadStreamResponse', () => { + it('should successfully parse', () => { + expect(readStreamResponseSchema.parse(readStreamResponse)).toMatchSnapshot(); + }); +}); diff --git a/x-pack/packages/kbn-streams-schema/src/apis/read_streams_response.ts b/x-pack/packages/kbn-streams-schema/src/apis/read_streams_response.ts new file mode 100644 index 0000000000000..bfb5d2f62e7fc --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/apis/read_streams_response.ts @@ -0,0 +1,15 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { readStreamDefinitonSchema } from '../models'; + +export const readStreamResponseSchema = z.object({ + streams: z.array(readStreamDefinitonSchema), +}); + +export type ReadStreamResponse = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_read_stream.ts b/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_read_stream.ts new file mode 100644 index 0000000000000..547c1194333ac --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_read_stream.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ingestStream } from './ingest_stream'; + +export const ingestReadStream = { + ...ingestStream, + inherited_fields: { + '@timestamp': { + type: 'date', + from: 'logs', + }, + message: { + type: 'match_only_text', + from: 'logs', + }, + }, +}; diff --git a/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_stream.ts b/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_stream.ts new file mode 100644 index 0000000000000..dfd1b8eb9d33e --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_stream.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ingestStreamConfig } from './ingest_stream_config'; + +export const ingestStream = { + name: 'logs.nginx', + elasticsearch_assets: [], + stream: ingestStreamConfig, +}; diff --git a/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_stream_config.ts b/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_stream_config.ts new file mode 100644 index 0000000000000..925ac9310762f --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/fixtures/ingest_stream_config.ts @@ -0,0 +1,36 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const ingestStreamConfig = { + ingest: { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: ['%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}'], + }, + }, + condition: { + field: 'log.level', + operator: 'eq', + value: 'error', + }, + }, + ], + routing: [ + { + name: 'logs.errors', + condition: { + field: 'log.level', + operator: 'eq', + value: 'error', + }, + }, + ], + }, +}; diff --git a/x-pack/packages/kbn-streams-schema/src/fixtures/read_streams_response.ts b/x-pack/packages/kbn-streams-schema/src/fixtures/read_streams_response.ts new file mode 100644 index 0000000000000..fb1cd3ade2f9a --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/fixtures/read_streams_response.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ingestReadStream } from './ingest_read_stream'; +import { wiredReadStream } from './wired_read_stream'; + +export const readStreamResponse = { + streams: [wiredReadStream, ingestReadStream], +}; diff --git a/x-pack/packages/kbn-streams-schema/src/fixtures/wired_read_stream.ts b/x-pack/packages/kbn-streams-schema/src/fixtures/wired_read_stream.ts new file mode 100644 index 0000000000000..177250bd8032a --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/fixtures/wired_read_stream.ts @@ -0,0 +1,22 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { wiredStream } from './wired_stream'; + +export const wiredReadStream = { + ...wiredStream, + inherited_fields: { + '@timestamp': { + type: 'date', + from: 'logs', + }, + message: { + type: 'match_only_text', + from: 'logs', + }, + }, +}; diff --git a/x-pack/packages/kbn-streams-schema/src/fixtures/wired_stream.ts b/x-pack/packages/kbn-streams-schema/src/fixtures/wired_stream.ts new file mode 100644 index 0000000000000..371f2f7876486 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/fixtures/wired_stream.ts @@ -0,0 +1,14 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { wiredStreamConfig } from './wired_stream_config'; + +export const wiredStream = { + name: 'logs.nginx', + elasticsearch_assets: [], + stream: wiredStreamConfig, +}; diff --git a/x-pack/packages/kbn-streams-schema/src/fixtures/wired_stream_config.ts b/x-pack/packages/kbn-streams-schema/src/fixtures/wired_stream_config.ts new file mode 100644 index 0000000000000..51c65e3c09794 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/fixtures/wired_stream_config.ts @@ -0,0 +1,43 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export const wiredStreamConfig = { + ingest: { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: ['%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}'], + }, + }, + condition: { + field: 'log.level', + operator: 'eq', + value: 'error', + }, + }, + ], + routing: [ + { + name: 'logs.errors', + condition: { + field: 'log.level', + operator: 'eq', + value: 'error', + }, + }, + ], + wired: { + fields: { + new_field: { + type: 'long', + }, + }, + }, + }, +}; diff --git a/x-pack/solutions/observability/plugins/streams/common/index.ts b/x-pack/packages/kbn-streams-schema/src/helpers/index.ts similarity index 78% rename from x-pack/solutions/observability/plugins/streams/common/index.ts rename to x-pack/packages/kbn-streams-schema/src/helpers/index.ts index 634994cb87f13..d1f9a8ff0c50a 100644 --- a/x-pack/solutions/observability/plugins/streams/common/index.ts +++ b/x-pack/packages/kbn-streams-schema/src/helpers/index.ts @@ -5,4 +5,4 @@ * 2.0. */ -export type { StreamDefinition, ReadStreamDefinition } from './types'; +export * from './type_guards'; diff --git a/x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts b/x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts new file mode 100644 index 0000000000000..557513fa74bb2 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/helpers/type_guards.ts @@ -0,0 +1,108 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ZodSchema } from '@kbn/zod'; +import { + AndCondition, + conditionSchema, + dissectProcessingDefinitionSchema, + DissectProcssingDefinition, + FilterCondition, + filterConditionSchema, + GrokProcessingDefinition, + grokProcessingDefinitionSchema, + IngestReadStreamDefinition, + ingestReadStreamDefinitonSchema, + IngestStreamDefinition, + ingestStreamDefinitonSchema, + OrCondition, + ReadStreamDefinition, + readStreamDefinitonSchema, + StreamDefinition, + streamDefintionSchema, + WiredReadStreamDefinition, + wiredReadStreamDefinitonSchema, + WiredStreamDefinition, + wiredStreamDefinitonSchema, +} from '../models'; +import { + IngestStreamConfigDefinition, + ingestStreamConfigDefinitonSchema, + StreamConfigDefinition, + streamConfigDefinitionSchema, + WiredStreamConfigDefinition, + wiredStreamConfigDefinitonSchema, +} from '../models/stream_config'; + +export function isSchema(zodSchema: ZodSchema, subject: T) { + try { + zodSchema.parse(subject); + return true; + } catch (e) { + return false; + } +} + +export function isReadStream(subject: any): subject is ReadStreamDefinition { + return isSchema(readStreamDefinitonSchema, subject); +} + +export function isWiredReadStream(subject: any): subject is WiredReadStreamDefinition { + return isSchema(wiredReadStreamDefinitonSchema, subject); +} + +export function isIngestReadStream(subject: any): subject is IngestReadStreamDefinition { + return isSchema(ingestReadStreamDefinitonSchema, subject); +} + +export function isStream(subject: any): subject is StreamDefinition { + return isSchema(streamDefintionSchema, subject); +} + +export function isIngestStream( + subject: IngestStreamDefinition | WiredStreamDefinition +): subject is IngestStreamDefinition { + return isSchema(ingestStreamDefinitonSchema, subject); +} + +export function isWiredStream( + subject: IngestStreamDefinition | WiredStreamDefinition +): subject is WiredStreamDefinition { + return isSchema(wiredStreamDefinitonSchema, subject); +} + +export function isWiredStreamConfig(subject: any): subject is WiredStreamConfigDefinition { + return isSchema(wiredStreamConfigDefinitonSchema, subject); +} + +export function isIngestStreamConfig(subject: any): subject is IngestStreamConfigDefinition { + return isSchema(ingestStreamConfigDefinitonSchema, subject); +} + +export function isStreamConfig(subject: any): subject is StreamConfigDefinition { + return isSchema(streamConfigDefinitionSchema, subject); +} + +export function isGrokProcessor(subject: any): subject is GrokProcessingDefinition { + return isSchema(grokProcessingDefinitionSchema, subject); +} + +export function isDissectProcessor(subject: any): subject is DissectProcssingDefinition { + return isSchema(dissectProcessingDefinitionSchema, subject); +} + +export function isFilterCondition(subject: any): subject is FilterCondition { + return isSchema(filterConditionSchema, subject); +} + +export function isAndCondition(subject: any): subject is AndCondition { + return isSchema(conditionSchema, subject) && subject.and != null; +} + +export function isOrCondition(subject: any): subject is OrCondition { + return isSchema(conditionSchema, subject) && subject.or != null; +} diff --git a/x-pack/solutions/observability/plugins/streams/common/types.ts b/x-pack/packages/kbn-streams-schema/src/models/common.ts similarity index 55% rename from x-pack/solutions/observability/plugins/streams/common/types.ts rename to x-pack/packages/kbn-streams-schema/src/models/common.ts index 7917864706c2d..0751d9fed90c9 100644 --- a/x-pack/solutions/observability/plugins/streams/common/types.ts +++ b/x-pack/packages/kbn-streams-schema/src/models/common.ts @@ -48,71 +48,71 @@ export const conditionSchema: z.ZodType = z.lazy(() => ); export const grokProcessingDefinitionSchema = z.object({ - type: z.literal('grok'), - field: z.string(), - patterns: z.array(z.string()), - pattern_definitions: z.optional(z.record(z.string())), + grok: z.object({ + field: z.string(), + patterns: z.array(z.string()), + pattern_definitions: z.optional(z.record(z.string())), + }), }); +export type GrokProcessingDefinition = z.infer; + export const dissectProcessingDefinitionSchema = z.object({ - type: z.literal('dissect'), - field: z.string(), - pattern: z.string(), + dissect: z.object({ + field: z.string(), + pattern: z.string(), + }), }); +export type DissectProcssingDefinition = z.infer; + +export const processingConfigSchema = z.union([ + grokProcessingDefinitionSchema, + dissectProcessingDefinitionSchema, +]); + export const processingDefinitionSchema = z.object({ condition: z.optional(conditionSchema), - config: z.discriminatedUnion('type', [ - grokProcessingDefinitionSchema, - dissectProcessingDefinitionSchema, - ]), + config: processingConfigSchema, }); export type ProcessingDefinition = z.infer; -export const fieldDefinitionSchema = z.object({ - name: z.string(), +export const fieldDefinitionConfigSchema = z.object({ type: z.enum(['keyword', 'match_only_text', 'long', 'double', 'date', 'boolean', 'ip']), format: z.optional(z.string()), }); -export type FieldDefinition = z.infer; - -export const streamChildSchema = z.object({ - id: z.string(), - condition: z.optional(conditionSchema), -}); +export type FieldDefinitionConfig = z.infer; -export type StreamChild = z.infer; +export const fieldDefinitionSchema = z.record(z.string(), fieldDefinitionConfigSchema); -export const streamWithoutIdDefinitonSchema = z.object({ - processing: z.array(processingDefinitionSchema).default([]), - fields: z.array(fieldDefinitionSchema).default([]), - managed: z.boolean().default(true), - children: z.array(streamChildSchema).default([]), -}); +export type FieldDefinition = z.infer; -export type StreamWithoutIdDefinition = z.infer; +export const inheritedFieldDefinitionSchema = z.record( + z.string(), + fieldDefinitionConfigSchema.extend({ from: z.string() }) +); -export const unmanagedElasticsearchAsset = z.object({ - type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']), - id: z.string(), -}); -export type UnmanagedElasticsearchAsset = z.infer; +export type InheritedFieldDefinition = z.infer; -export const streamDefinitonSchema = streamWithoutIdDefinitonSchema.extend({ - id: z.string(), - unmanaged_elasticsearch_assets: z.optional(z.array(unmanagedElasticsearchAsset)), +export const fieldDefinitionConfigWithNameSchema = fieldDefinitionConfigSchema.extend({ + name: z.string(), }); -export type StreamDefinition = z.infer; - -export const streamDefinitonWithoutChildrenSchema = streamDefinitonSchema.omit({ children: true }); - -export type StreamWithoutChildrenDefinition = z.infer; +export type FieldDefinitionConfigWithName = z.infer; -export const readStreamDefinitonSchema = streamDefinitonSchema.extend({ - inheritedFields: z.array(fieldDefinitionSchema.extend({ from: z.string() })).default([]), +export const streamChildSchema = z.object({ + name: z.string(), + condition: z.optional(conditionSchema), }); +export type StreamChild = z.infer; + +export const elasticsearchAssetSchema = z.array( + z.object({ + type: z.enum(['ingest_pipeline', 'component_template', 'index_template', 'data_stream']), + id: z.string(), + }) +); -export type ReadStreamDefinition = z.infer; +export type ElasticsearchAsset = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/index.ts b/x-pack/packages/kbn-streams-schema/src/models/index.ts new file mode 100644 index 0000000000000..dbb4ab6dbe6ac --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/index.ts @@ -0,0 +1,11 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './common'; +export * from './read_streams'; +export * from './streams'; +export * from './stream_config'; diff --git a/x-pack/packages/kbn-streams-schema/src/models/read_streams/__snapshots__/read_stream.test.ts.snap b/x-pack/packages/kbn-streams-schema/src/models/read_streams/__snapshots__/read_stream.test.ts.snap new file mode 100644 index 0000000000000..0e3ca0bda8ded --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/read_streams/__snapshots__/read_stream.test.ts.snap @@ -0,0 +1,104 @@ +// Jest Snapshot v1, https://goo.gl/fbAQLP + +exports[`ReadStream should successfully parse ingestReadStream 1`] = ` +Object { + "elasticsearch_assets": Array [], + "inherited_fields": Object { + "@timestamp": Object { + "from": "logs", + "type": "date", + }, + "message": Object { + "from": "logs", + "type": "match_only_text", + }, + }, + "name": "logs.nginx", + "stream": Object { + "ingest": Object { + "processing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "config": Object { + "grok": Object { + "field": "message", + "patterns": Array [ + "%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}", + ], + }, + }, + }, + ], + "routing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "name": "logs.errors", + }, + ], + }, + }, +} +`; + +exports[`ReadStream should successfully parse wiredReadStream 1`] = ` +Object { + "elasticsearch_assets": Array [], + "inherited_fields": Object { + "@timestamp": Object { + "from": "logs", + "type": "date", + }, + "message": Object { + "from": "logs", + "type": "match_only_text", + }, + }, + "name": "logs.nginx", + "stream": Object { + "ingest": Object { + "processing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "config": Object { + "grok": Object { + "field": "message", + "patterns": Array [ + "%{TIMESTAMP_ISO8601:event.timestamp} %{GREEDY:rest}", + ], + }, + }, + }, + ], + "routing": Array [ + Object { + "condition": Object { + "field": "log.level", + "operator": "eq", + "value": "error", + }, + "name": "logs.errors", + }, + ], + "wired": Object { + "fields": Object { + "new_field": Object { + "type": "long", + }, + }, + }, + }, + }, +} +`; diff --git a/x-pack/packages/kbn-streams-schema/src/models/read_streams/index.ts b/x-pack/packages/kbn-streams-schema/src/models/read_streams/index.ts new file mode 100644 index 0000000000000..1dce7ea7dcd30 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/read_streams/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './ingest_read_stream'; +export * from './wired_read_stream'; +export * from './read_stream'; diff --git a/x-pack/packages/kbn-streams-schema/src/models/read_streams/ingest_read_stream.ts b/x-pack/packages/kbn-streams-schema/src/models/read_streams/ingest_read_stream.ts new file mode 100644 index 0000000000000..b98e45c568432 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/read_streams/ingest_read_stream.ts @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { ingestStreamDefinitonSchema } from '../streams'; +import { inheritedFieldDefinitionSchema } from '../common'; + +export const ingestReadStreamDefinitonSchema = ingestStreamDefinitonSchema + .extend({ + inherited_fields: inheritedFieldDefinitionSchema.default({}), + }) + .strict(); + +export type IngestReadStreamDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/read_streams/read_stream.test.ts b/x-pack/packages/kbn-streams-schema/src/models/read_streams/read_stream.test.ts new file mode 100644 index 0000000000000..a4a3b974e29e4 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/read_streams/read_stream.test.ts @@ -0,0 +1,19 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { ingestReadStream } from '../../fixtures/ingest_read_stream'; +import { wiredReadStream } from '../../fixtures/wired_read_stream'; +import { readStreamDefinitonSchema } from './read_stream'; + +describe('ReadStream', () => { + it('should successfully parse wiredReadStream', () => { + expect(readStreamDefinitonSchema.parse(wiredReadStream)).toMatchSnapshot(); + }); + it('should successfully parse ingestReadStream', () => { + expect(readStreamDefinitonSchema.parse(ingestReadStream)).toMatchSnapshot(); + }); +}); diff --git a/x-pack/packages/kbn-streams-schema/src/models/read_streams/read_stream.ts b/x-pack/packages/kbn-streams-schema/src/models/read_streams/read_stream.ts new file mode 100644 index 0000000000000..e19574977bbe9 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/read_streams/read_stream.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { ingestReadStreamDefinitonSchema } from './ingest_read_stream'; +import { wiredReadStreamDefinitonSchema } from './wired_read_stream'; + +export const readStreamDefinitonSchema = z.union([ + wiredReadStreamDefinitonSchema, + ingestReadStreamDefinitonSchema, +]); + +export type ReadStreamDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/read_streams/wired_read_stream.ts b/x-pack/packages/kbn-streams-schema/src/models/read_streams/wired_read_stream.ts new file mode 100644 index 0000000000000..621da441d66b1 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/read_streams/wired_read_stream.ts @@ -0,0 +1,18 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { wiredStreamDefinitonSchema } from '../streams'; +import { inheritedFieldDefinitionSchema } from '../common'; + +export const wiredReadStreamDefinitonSchema = wiredStreamDefinitonSchema + .extend({ + inherited_fields: inheritedFieldDefinitionSchema.default({}), + }) + .strict(); + +export type WiredReadStreamDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/stream_config/index.ts b/x-pack/packages/kbn-streams-schema/src/models/stream_config/index.ts new file mode 100644 index 0000000000000..90b0f0e49bf7b --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/stream_config/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './wired_stream_config'; +export * from './ingest_stream_config'; +export * from './stream_config'; diff --git a/x-pack/packages/kbn-streams-schema/src/models/stream_config/ingest_stream_config.ts b/x-pack/packages/kbn-streams-schema/src/models/stream_config/ingest_stream_config.ts new file mode 100644 index 0000000000000..106ca450121ff --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/stream_config/ingest_stream_config.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { processingDefinitionSchema, streamChildSchema } from '../common'; + +export const ingestStreamConfigDefinitonSchema = z + .object({ + ingest: z.object({ + processing: z.array(processingDefinitionSchema).default([]), + routing: z.array(streamChildSchema).default([]), + }), + }) + .strict(); + +export type IngestStreamConfigDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/stream_config/stream_config.ts b/x-pack/packages/kbn-streams-schema/src/models/stream_config/stream_config.ts new file mode 100644 index 0000000000000..711f2ea7d725a --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/stream_config/stream_config.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { ingestStreamConfigDefinitonSchema } from './ingest_stream_config'; +import { wiredStreamConfigDefinitonSchema } from './wired_stream_config'; + +export const streamConfigDefinitionSchema = z.union([ + wiredStreamConfigDefinitonSchema, + ingestStreamConfigDefinitonSchema, +]); + +export type StreamConfigDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/stream_config/wired_stream_config.ts b/x-pack/packages/kbn-streams-schema/src/models/stream_config/wired_stream_config.ts new file mode 100644 index 0000000000000..ad875e7e5be93 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/stream_config/wired_stream_config.ts @@ -0,0 +1,23 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { fieldDefinitionSchema, processingDefinitionSchema, streamChildSchema } from '../common'; + +export const wiredStreamConfigDefinitonSchema = z + .object({ + ingest: z.object({ + processing: z.array(processingDefinitionSchema).default([]), + wired: z.object({ + fields: fieldDefinitionSchema.default({}), + }), + routing: z.array(streamChildSchema).default([]), + }), + }) + .strict(); + +export type WiredStreamConfigDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/streams/index.ts b/x-pack/packages/kbn-streams-schema/src/models/streams/index.ts new file mode 100644 index 0000000000000..b04fb5f85f933 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/streams/index.ts @@ -0,0 +1,10 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export * from './ingest_stream'; +export * from './wired_stream'; +export * from './stream'; diff --git a/x-pack/packages/kbn-streams-schema/src/models/streams/ingest_stream.ts b/x-pack/packages/kbn-streams-schema/src/models/streams/ingest_stream.ts new file mode 100644 index 0000000000000..d21f11d869929 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/streams/ingest_stream.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { ingestStreamConfigDefinitonSchema } from '../stream_config'; +import { elasticsearchAssetSchema } from '../common'; + +export const ingestStreamDefinitonSchema = z + .object({ + name: z.string(), + elasticsearch_assets: z.optional(elasticsearchAssetSchema), + stream: ingestStreamConfigDefinitonSchema, + }) + .strict(); + +export type IngestStreamDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/streams/stream.ts b/x-pack/packages/kbn-streams-schema/src/models/streams/stream.ts new file mode 100644 index 0000000000000..152397060e51b --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/streams/stream.ts @@ -0,0 +1,17 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { wiredStreamDefinitonSchema } from './wired_stream'; +import { ingestStreamDefinitonSchema } from './ingest_stream'; + +export const streamDefintionSchema = z.union([ + wiredStreamDefinitonSchema, + ingestStreamDefinitonSchema, +]); + +export type StreamDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/src/models/streams/wired_stream.ts b/x-pack/packages/kbn-streams-schema/src/models/streams/wired_stream.ts new file mode 100644 index 0000000000000..0374472673cdb --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/src/models/streams/wired_stream.ts @@ -0,0 +1,20 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { z } from '@kbn/zod'; +import { wiredStreamConfigDefinitonSchema } from '../stream_config'; +import { elasticsearchAssetSchema } from '../common'; + +export const wiredStreamDefinitonSchema = z + .object({ + name: z.string(), + elasticsearch_assets: z.optional(elasticsearchAssetSchema), + stream: wiredStreamConfigDefinitonSchema, + }) + .strict(); + +export type WiredStreamDefinition = z.infer; diff --git a/x-pack/packages/kbn-streams-schema/tsconfig.json b/x-pack/packages/kbn-streams-schema/tsconfig.json new file mode 100644 index 0000000000000..838f32bc68e74 --- /dev/null +++ b/x-pack/packages/kbn-streams-schema/tsconfig.json @@ -0,0 +1,20 @@ +{ + "extends": "../../../tsconfig.base.json", + "compilerOptions": { + "outDir": "target/types", + "types": [ + "jest", + "node" + ] + }, + "include": [ + "**/*.ts" + ], + "kbn_references": [ + "@kbn/zod" + ], + "exclude": [ + "target/**/*" + ] +} + diff --git a/x-pack/solutions/observability/plugins/streams/server/index.ts b/x-pack/solutions/observability/plugins/streams/server/index.ts index 9ef13c62d6b7b..bd8aee304ad15 100644 --- a/x-pack/solutions/observability/plugins/streams/server/index.ts +++ b/x-pack/solutions/observability/plugins/streams/server/index.ts @@ -17,5 +17,3 @@ export const plugin = async (context: PluginInitializerContext) = const { StreamsPlugin } = await import('./plugin'); return new StreamsPlugin(context); }; - -export type { ListStreamResponse } from './lib/streams/stream_crud'; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/component_templates/generate_layer.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/component_templates/generate_layer.ts index 04dcc8c5dafcb..2035ba1b3ae0e 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/component_templates/generate_layer.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/component_templates/generate_layer.ts @@ -10,7 +10,7 @@ import { MappingDateProperty, MappingProperty, } from '@elastic/elasticsearch/lib/api/types'; -import { StreamDefinition } from '../../../../common/types'; +import { WiredStreamDefinition } from '@kbn/streams-schema'; import { ASSET_VERSION } from '../../../../common/constants'; import { logsSettings } from './logs_layer'; import { isRoot } from '../helpers/hierarchy'; @@ -18,26 +18,26 @@ import { getComponentTemplateName } from './name'; export function generateLayer( id: string, - definition: StreamDefinition + definition: WiredStreamDefinition ): ClusterPutComponentTemplateRequest { const properties: Record = {}; - definition.fields.forEach((field) => { + Object.entries(definition.stream.ingest.wired.fields).forEach(([field, props]) => { const property: MappingProperty = { - type: field.type, + type: props.type, }; - if (field.name === '@timestamp') { + if (field === '@timestamp') { // @timestamp can't ignore malformed dates as it's used for sorting in logsdb (property as MappingDateProperty).ignore_malformed = false; } - if (field.type === 'date' && field.format) { - (property as MappingDateProperty).format = field.format; + if (props.type === 'date' && props.format) { + (property as MappingDateProperty).format = props.format; } - properties[field.name] = property; + properties[field] = property; }); return { name: getComponentTemplateName(id), template: { - settings: isRoot(definition.id) ? logsSettings : {}, + settings: isRoot(definition.name) ? logsSettings : {}, mappings: { subobjects: false, dynamic: false, diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_fields.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_fields.ts index 48b06b8ea0701..b9b1bbfd1948b 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_fields.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_fields.ts @@ -5,8 +5,13 @@ * 2.0. */ -import { Condition, FilterCondition } from '../../../../common/types'; -import { isAndCondition, isFilterCondition, isOrCondition } from './condition_guards'; +import { + Condition, + FilterCondition, + isAndCondition, + isFilterCondition, + isOrCondition, +} from '@kbn/streams-schema'; export function isComplete(condition: Condition): boolean { if (isFilterCondition(condition)) { diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_guards.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_guards.ts deleted file mode 100644 index 1469471bd8943..0000000000000 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_guards.ts +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one - * or more contributor license agreements. Licensed under the Elastic License - * 2.0; you may not use this file except in compliance with the Elastic License - * 2.0. - */ - -import { - AndCondition, - conditionSchema, - FilterCondition, - filterConditionSchema, - OrCondition, -} from '../../../../common/types'; - -export function isFilterCondition(subject: any): subject is FilterCondition { - const result = filterConditionSchema.safeParse(subject); - return result.success; -} - -export function isAndCondition(subject: any): subject is AndCondition { - const result = conditionSchema.safeParse(subject); - return result.success && subject.and != null; -} - -export function isOrCondition(subject: any): subject is OrCondition { - const result = conditionSchema.safeParse(subject); - return result.success && subject.or != null; -} diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts index 4da9b3beffae5..db951f1f3aa8a 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_painless.ts @@ -11,8 +11,10 @@ import { Condition, FilterCondition, UnaryFilterCondition, -} from '../../../../common/types'; -import { isAndCondition, isFilterCondition, isOrCondition } from './condition_guards'; + isAndCondition, + isFilterCondition, + isOrCondition, +} from '@kbn/streams-schema'; function safePainlessField(conditionOrField: FilterCondition | string) { if (isFilterCondition(conditionOrField)) { diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_query_dsl.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_query_dsl.ts index 3864639175008..f3364dbfa8405 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_query_dsl.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/condition_to_query_dsl.ts @@ -5,8 +5,13 @@ * 2.0. */ -import { Condition, FilterCondition } from '../../../../common/types'; -import { isAndCondition, isFilterCondition, isOrCondition } from './condition_guards'; +import { + Condition, + FilterCondition, + isAndCondition, + isFilterCondition, + isOrCondition, +} from '@kbn/streams-schema'; function conditionToClause(condition: FilterCondition) { switch (condition.operator) { diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/hierarchy.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/hierarchy.ts index 6f1cd308f3c3d..234987c3d6ca2 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/hierarchy.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/helpers/hierarchy.ts @@ -5,15 +5,16 @@ * 2.0. */ -import { StreamDefinition } from '../../../../common/types'; +import { StreamDefinition } from '@kbn/streams-schema'; export function isDescendandOf(parent: StreamDefinition, child: StreamDefinition) { - return child.id.startsWith(parent.id); + return child.name.startsWith(parent.name); } export function isChildOf(parent: StreamDefinition, child: StreamDefinition) { return ( - isDescendandOf(parent, child) && child.id.split('.').length === parent.id.split('.').length + 1 + isDescendandOf(parent, child) && + child.name.split('.').length === parent.name.split('.').length + 1 ); } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts index e7c9c784a8123..fb03868ed7482 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_ingest_pipeline.ts @@ -5,16 +5,33 @@ * 2.0. */ -import { StreamDefinition } from '../../../../common/types'; +import { + isDissectProcessor, + isGrokProcessor, + ProcessingDefinition, + StreamDefinition, +} from '@kbn/streams-schema'; +import { get } from 'lodash'; import { ASSET_VERSION } from '../../../../common/constants'; import { conditionToPainless } from '../helpers/condition_to_painless'; import { logsDefaultPipelineProcessors } from './logs_default_pipeline'; import { isRoot } from '../helpers/hierarchy'; import { getProcessingPipelineName } from './name'; +function getProcessorType(processor: ProcessingDefinition) { + if (isGrokProcessor(processor.config)) { + return 'grok'; + } + if (isDissectProcessor(processor.config)) { + return 'dissect'; + } + throw new Error('Unknown processor type'); +} + function generateProcessingSteps(definition: StreamDefinition) { - return definition.processing.map((processor) => { - const { type, ...config } = processor.config; + return definition.stream.ingest.processing.map((processor) => { + const type = getProcessorType(processor); + const config = get(processor.config, type); return { [type]: { ...config, @@ -28,7 +45,7 @@ export function generateIngestPipeline(id: string, definition: StreamDefinition) return { id: getProcessingPipelineName(id), processors: [ - ...(isRoot(definition.id) ? logsDefaultPipelineProcessors : []), + ...(isRoot(definition.name) ? logsDefaultPipelineProcessors : []), ...generateProcessingSteps(definition), { pipeline: { @@ -49,7 +66,7 @@ export function generateClassicIngestPipelineBody(definition: StreamDefinition) return { processors: generateProcessingSteps(definition), _meta: { - description: `Stream-managed pipeline for the ${definition.id} stream`, + description: `Stream-managed pipeline for the ${definition.name} stream`, managed: true, }, version: ASSET_VERSION, diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts index 9b46e0cf4ac92..d3a43b73713b4 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/ingest_pipelines/generate_reroute_pipeline.ts @@ -5,7 +5,7 @@ * 2.0. */ -import { StreamDefinition } from '../../../../common/types'; +import { StreamDefinition } from '@kbn/streams-schema'; import { ASSET_VERSION } from '../../../../common/constants'; import { conditionToPainless } from '../helpers/condition_to_painless'; import { getReroutePipelineName } from './name'; @@ -16,17 +16,17 @@ interface GenerateReroutePipelineParams { export async function generateReroutePipeline({ definition }: GenerateReroutePipelineParams) { return { - id: getReroutePipelineName(definition.id), - processors: definition.children.map((child) => { + id: getReroutePipelineName(definition.name), + processors: definition.stream.ingest.routing.map((child) => { return { reroute: { - destination: child.id, + destination: child.name, if: conditionToPainless(child.condition), }, }; }), _meta: { - description: `Reoute pipeline for the ${definition.id} stream`, + description: `Reoute pipeline for the ${definition.name} stream`, managed: true, }, version: ASSET_VERSION, diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/internal_stream_mapping.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/internal_stream_mapping.ts index faff949c0d97b..e8383561ad582 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/internal_stream_mapping.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/internal_stream_mapping.ts @@ -14,23 +14,16 @@ export function createStreamsIndex(scopedClusterClient: IScopedClusterClient) { mappings: { dynamic: 'strict', properties: { - processing: { - type: 'object', - enabled: false, - }, - fields: { - type: 'object', - enabled: false, - }, - children: { - type: 'object', - enabled: false, - }, - id: { + name: { type: 'keyword', }, - managed: { - type: 'boolean', + stream: { + properties: { + ingest: { + type: 'object', + enabled: false, + }, + }, }, }, }, diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts index 1bdb4f20a95cc..9c043a884dfb6 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/root_stream_definition.ts @@ -5,29 +5,30 @@ * 2.0. */ -import { StreamDefinition } from '../../../common/types'; +import { WiredStreamDefinition } from '@kbn/streams-schema'; -export const rootStreamDefinition: StreamDefinition = { - id: 'logs', - managed: true, - processing: [], - children: [], - fields: [ - { - name: '@timestamp', - type: 'date', +export const rootStreamDefinition: WiredStreamDefinition = { + name: 'logs', + stream: { + ingest: { + processing: [], + routing: [], + wired: { + fields: { + '@timestamp': { + type: 'date', + }, + message: { + type: 'match_only_text', + }, + 'host.name': { + type: 'keyword', + }, + 'log.level': { + type: 'keyword', + }, + }, + }, }, - { - name: 'message', - type: 'match_only_text', - }, - { - name: 'host.name', - type: 'keyword', - }, - { - name: 'log.level', - type: 'keyword', - }, - ], + }, }; diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts index 5066ecd61a601..5669a3301e208 100644 --- a/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/stream_crud.ts @@ -10,8 +10,16 @@ import { Logger } from '@kbn/logging'; import { IngestPipeline, IngestProcessorContainer } from '@elastic/elasticsearch/lib/api/types'; import { set } from '@kbn/safer-lodash-set'; import { IndicesDataStream } from '@elastic/elasticsearch/lib/api/types'; +import { + IngestStreamDefinition, + WiredStreamDefinition, + StreamDefinition, + ListStreamsResponse, + isWiredStream, + FieldDefinition, +} from '@kbn/streams-schema'; +import { omit } from 'lodash'; import { STREAMS_INDEX } from '../../../common/constants'; -import { FieldDefinition, StreamDefinition } from '../../../common/types'; import { generateLayer } from './component_templates/generate_layer'; import { deleteComponent, upsertComponent } from './component_templates/manage_component_templates'; import { getComponentTemplateName } from './component_templates/name'; @@ -142,58 +150,58 @@ export async function deleteStreamObjects({ id, scopedClusterClient, logger }: D async function upsertInternalStream({ definition, scopedClusterClient }: BaseParamsWithDefinition) { return scopedClusterClient.asInternalUser.index({ - id: definition.id, + id: definition.name, index: STREAMS_INDEX, - document: { ...definition }, + document: { ...omit(definition, 'elasticsearch_assets') }, refresh: 'wait_for', }); } type ListStreamsParams = BaseParams; -export interface ListStreamResponse { - definitions: StreamDefinition[]; -} - export async function listStreams({ scopedClusterClient, -}: ListStreamsParams): Promise { - const response = await scopedClusterClient.asInternalUser.search({ +}: ListStreamsParams): Promise { + const response = await scopedClusterClient.asInternalUser.search({ index: STREAMS_INDEX, size: 10000, - sort: [{ id: 'asc' }], + sort: [{ name: 'asc' }], }); const dataStreams = await listDataStreamsAsStreams({ scopedClusterClient }); let definitions = response.hits.hits.map((hit) => ({ ...hit._source! })); const hasAccess = await Promise.all( - definitions.map((definition) => checkReadAccess({ id: definition.id, scopedClusterClient })) + definitions.map((definition) => checkReadAccess({ id: definition.name, scopedClusterClient })) ); definitions = definitions.filter((_, index) => hasAccess[index]); - const definitionMap = new Map(definitions.map((definition) => [definition.id, definition])); + const definitionMap = new Map( + definitions.map((definition) => [definition.name, definition]) + ); dataStreams.forEach((dataStream) => { - if (!definitionMap.has(dataStream.id)) { - definitionMap.set(dataStream.id, dataStream); + if (!definitionMap.has(dataStream.name)) { + definitionMap.set(dataStream.name, dataStream); } }); return { - definitions: Array.from(definitionMap.values()), + streams: Array.from(definitionMap.values()), }; } export async function listDataStreamsAsStreams({ scopedClusterClient, -}: ListStreamsParams): Promise { +}: ListStreamsParams): Promise { const response = await scopedClusterClient.asInternalUser.indices.getDataStream(); return response.data_streams .filter((dataStream) => dataStream.template.endsWith('@stream') === false) .map((dataStream) => ({ - id: dataStream.name, - managed: false, - children: [], - fields: [], - processing: [], + name: dataStream.name, + stream: { + ingest: { + processing: [], + routing: [], + }, + }, })); } @@ -202,15 +210,11 @@ interface ReadStreamParams extends BaseParams { skipAccessCheck?: boolean; } -export interface ReadStreamResponse { - definition: StreamDefinition; -} - export async function readStream({ id, scopedClusterClient, skipAccessCheck, -}: ReadStreamParams): Promise { +}: ReadStreamParams): Promise { try { const response = await scopedClusterClient.asInternalUser.get({ id, @@ -223,11 +227,7 @@ export async function readStream({ throw new DefinitionNotFound(`Stream definition for ${id} not found.`); } } - return { - definition: { - ...definition, - }, - }; + return definition; } catch (e) { if (e.meta?.statusCode === 404) { return readDataStreamAsStream({ id, scopedClusterClient, skipAccessCheck }); @@ -237,20 +237,22 @@ export async function readStream({ } export async function readDataStreamAsStream({ id, scopedClusterClient }: ReadStreamParams) { - const definition: StreamDefinition = { - id, - managed: false, - children: [], - fields: [], - processing: [], + const definition: IngestStreamDefinition = { + name: id, + stream: { + ingest: { + routing: [], + processing: [], + }, + }, }; - definition.unmanaged_elasticsearch_assets = await getUnmanagedElasticsearchAssets({ + definition.elasticsearch_assets = await getUnmanagedElasticsearchAssets({ name: id, scopedClusterClient, }); - return { definition }; + return definition; } interface ReadUnmanagedAssetsParams extends BaseParams { @@ -314,19 +316,24 @@ interface ReadAncestorsParams extends BaseParams { } export interface ReadAncestorsResponse { - ancestors: Array<{ definition: StreamDefinition }>; + ancestors: StreamDefinition[]; } export async function readAncestors({ id, scopedClusterClient, -}: ReadAncestorsParams): Promise { +}: ReadAncestorsParams): Promise<{ ancestors: WiredStreamDefinition[] }> { const ancestorIds = getAncestors(id); return { ancestors: await Promise.all( - ancestorIds.map((ancestorId) => - readStream({ scopedClusterClient, id: ancestorId, skipAccessCheck: true }) + ancestorIds.map( + (ancestorId) => + readStream({ + scopedClusterClient, + id: ancestorId, + skipAccessCheck: true, + }) as unknown as WiredStreamDefinition ) ), }; @@ -337,7 +344,7 @@ interface ReadDescendantsParams extends BaseParams { } export async function readDescendants({ id, scopedClusterClient }: ReadDescendantsParams) { - const response = await scopedClusterClient.asInternalUser.search({ + const response = await scopedClusterClient.asInternalUser.search({ index: STREAMS_INDEX, size: 10000, body: { @@ -357,27 +364,30 @@ export async function readDescendants({ id, scopedClusterClient }: ReadDescendan }, }, }); - return response.hits.hits.map((hit) => hit._source as StreamDefinition); + return response.hits.hits.map((hit) => hit._source as WiredStreamDefinition); } export async function validateAncestorFields( scopedClusterClient: IScopedClusterClient, id: string, - fields: FieldDefinition[] + fields: FieldDefinition ) { const { ancestors } = await readAncestors({ id, scopedClusterClient, }); for (const ancestor of ancestors) { - for (const field of fields) { + for (const name in fields) { if ( - ancestor.definition.fields.some( - (ancestorField) => ancestorField.type !== field.type && ancestorField.name === field.name + Object.hasOwn(fields, name) && + isWiredStream(ancestor) && + Object.entries(ancestor.stream.ingest.wired.fields).some( + ([ancestorFieldName, attr]) => + attr.type !== fields[name].type && ancestorFieldName === name ) ) { throw new MalformedFields( - `Field ${field.name} is already defined with incompatible type in the parent stream ${ancestor.definition.id}` + `Field ${name} is already defined with incompatible type in the parent stream ${ancestor.name}` ); } } @@ -387,22 +397,23 @@ export async function validateAncestorFields( export async function validateDescendantFields( scopedClusterClient: IScopedClusterClient, id: string, - fields: FieldDefinition[] + fields: FieldDefinition ) { const descendants = await readDescendants({ id, scopedClusterClient, }); for (const descendant of descendants) { - for (const field of fields) { + for (const name in fields) { if ( - descendant.fields.some( - (descendantField) => - descendantField.type !== field.type && descendantField.name === field.name + Object.hasOwn(fields, name) && + Object.entries(descendant.stream.ingest.wired.fields).some( + ([descendantFieldName, attr]) => + attr.type !== fields[name].type && descendantFieldName === name ) ) { throw new MalformedFields( - `Field ${field.name} is already defined with incompatible type in the child stream ${descendant.id}` + `Field ${name} is already defined with incompatible type in the child stream ${descendant.name}` ); } } @@ -449,7 +460,7 @@ export async function syncStream({ rootDefinition, logger, }: SyncStreamParams) { - if (!definition.managed) { + if (!isWiredStream(definition)) { await syncUnmanagedStream({ scopedClusterClient, definition, logger }); await upsertInternalStream({ scopedClusterClient, @@ -457,7 +468,7 @@ export async function syncStream({ }); return; } - const componentTemplate = generateLayer(definition.id, definition); + const componentTemplate = generateLayer(definition.name, definition); await upsertComponent({ esClient: scopedClusterClient.asCurrentUser, logger, @@ -466,7 +477,7 @@ export async function syncStream({ await upsertIngestPipeline({ esClient: scopedClusterClient.asCurrentUser, logger, - pipeline: generateIngestPipeline(definition.id, definition), + pipeline: generateIngestPipeline(definition.name, definition), }); const reroutePipeline = await generateReroutePipeline({ definition, @@ -479,12 +490,13 @@ export async function syncStream({ await upsertTemplate({ esClient: scopedClusterClient.asCurrentUser, logger, - template: generateIndexTemplate(definition.id), + template: generateIndexTemplate(definition.name), }); if (rootDefinition) { const parentReroutePipeline = await generateReroutePipeline({ definition: rootDefinition, }); + await upsertIngestPipeline({ esClient: scopedClusterClient.asCurrentUser, logger, @@ -494,7 +506,7 @@ export async function syncStream({ await upsertDataStream({ esClient: scopedClusterClient.asCurrentUser, logger, - name: definition.id, + name: definition.name, }); await upsertInternalStream({ scopedClusterClient, @@ -502,7 +514,7 @@ export async function syncStream({ }); await rolloverDataStreamIfNecessary({ esClient: scopedClusterClient.asCurrentUser, - name: definition.id, + name: definition.name, logger, mappings: componentTemplate.template.mappings?.properties, }); @@ -514,24 +526,19 @@ interface ExecutionPlanStep { body?: Record; } -async function syncUnmanagedStream({ scopedClusterClient, definition, logger }: SyncStreamParams) { - if (definition.managed) { +async function syncUnmanagedStream({ scopedClusterClient, definition }: SyncStreamParams) { + if (isWiredStream(definition)) { throw new Error('Got an unmanaged stream that is marked as managed'); } - if (definition.fields.length) { - throw new Error( - 'Unmanaged streams cannot have managed fields, please edit the component templates directly' - ); - } - if (definition.children.length) { + if (definition.stream.ingest.routing.length) { throw new Error('Unmanaged streams cannot have managed children, coming soon'); } const unmanagedAssets = await getUnmanagedElasticsearchAssets({ - name: definition.id, + name: definition.name, scopedClusterClient, }); const executionPlan: ExecutionPlanStep[] = []; - const streamManagedPipelineName = getProcessingPipelineName(definition.id); + const streamManagedPipelineName = getProcessingPipelineName(definition.name); const pipelineName = unmanagedAssets.find((asset) => asset.type === 'ingest_pipeline')?.id; if (!pipelineName) { throw new Error('Unmanaged stream needs a default ingest pipeline'); @@ -546,7 +553,7 @@ async function syncUnmanagedStream({ scopedClusterClient, definition, logger }: executionPlan ); - if (definition.processing.length) { + if (definition.stream.ingest.processing.length) { // if the stream has processing, we need to create or update the stream managed pipeline executionPlan.push({ method: 'PUT', @@ -629,14 +636,14 @@ async function ensureStreamManagedPipelineReference( definition: StreamDefinition, executionPlan: ExecutionPlanStep[] ) { - const streamManagedPipelineName = getProcessingPipelineName(definition.id); + const streamManagedPipelineName = getProcessingPipelineName(definition.name); const { targetPipelineName, targetPipeline, referencesStreamManagedPipeline } = - await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, definition.id); + await findStreamManagedPipelineReference(scopedClusterClient, pipelineName, definition.name); if (!referencesStreamManagedPipeline) { const callStreamManagedPipelineProcessor: IngestProcessorContainer = { pipeline: { name: streamManagedPipelineName, - if: `ctx._index == '${definition.id}'`, + if: `ctx._index == '${definition.name}'`, ignore_missing_pipeline: true, description: "Call the stream's managed pipeline - do not change this manually but instead use the streams UI or API", diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts index d6bf5fbb84d8f..698d0f7f81d38 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/delete.ts @@ -9,6 +9,7 @@ import { z } from '@kbn/zod'; import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; import { badRequest, internal, notFound } from '@hapi/boom'; +import { isWiredStream } from '@kbn/streams-schema'; import { DefinitionNotFound, ForkConditionMissing, @@ -43,7 +44,6 @@ export const deleteStreamRoute = createServerRoute({ }), }), handler: async ({ - response, params, logger, request, @@ -79,8 +79,8 @@ export async function deleteStream( logger: Logger ) { try { - const { definition } = await readStream({ scopedClusterClient, id }); - if (!definition.managed) { + const definition = await readStream({ scopedClusterClient, id }); + if (!isWiredStream(definition)) { await deleteUnmanagedStreamObjects({ scopedClusterClient, id, logger }); return; } @@ -92,8 +92,8 @@ export async function deleteStream( // need to update parent first to cut off documents streaming down await updateParentStream(scopedClusterClient, id, parentId, logger); - for (const child of definition.children) { - await deleteStream(scopedClusterClient, child.id, logger); + for (const child of definition.stream.ingest.routing) { + await deleteStream(scopedClusterClient, child.name, logger); } await deleteStreamObjects({ scopedClusterClient, id, logger }); } catch (e) { @@ -111,12 +111,14 @@ async function updateParentStream( parentId: string, logger: Logger ) { - const { definition: parentDefinition } = await readStream({ + const parentDefinition = await readStream({ scopedClusterClient, id: parentId, }); - parentDefinition.children = parentDefinition.children.filter((child) => child.id !== id); + parentDefinition.stream.ingest.routing = parentDefinition.stream.ingest.routing.filter( + (child) => child.name !== id + ); await syncStream({ scopedClusterClient, diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/disable.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/disable.ts index b760b58f1fafd..3cf369f6da76d 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/disable.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/disable.ts @@ -22,12 +22,7 @@ export const disableStreamsRoute = createServerRoute({ requiredPrivileges: ['streams_write'], }, }, - handler: async ({ - request, - response, - logger, - getScopedClients, - }): Promise<{ acknowledged: true }> => { + handler: async ({ request, logger, getScopedClients }): Promise<{ acknowledged: true }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts index 19867018ce25f..cf88835602076 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/edit.ts @@ -9,6 +9,14 @@ import { z } from '@kbn/zod'; import { IScopedClusterClient } from '@kbn/core-elasticsearch-server'; import { Logger } from '@kbn/logging'; import { badRequest, internal, notFound } from '@hapi/boom'; +import { + isWiredStream, + isWiredStreamConfig, + streamConfigDefinitionSchema, + StreamDefinition, + WiredStreamConfigDefinition, + WiredStreamDefinition, +} from '@kbn/streams-schema'; import { DefinitionNotFound, ForkConditionMissing, @@ -16,7 +24,6 @@ import { SecurityException, } from '../../lib/streams/errors'; import { createServerRoute } from '../create_server_route'; -import { StreamDefinition, streamWithoutIdDefinitonSchema } from '../../../common/types'; import { syncStream, readStream, @@ -45,48 +52,63 @@ export const editStreamRoute = createServerRoute({ path: z.object({ id: z.string(), }), - body: streamWithoutIdDefinitonSchema, + body: streamConfigDefinitionSchema, }), - handler: async ({ response, params, logger, request, getScopedClients }) => { + handler: async ({ params, logger, request, getScopedClients }) => { try { const { scopedClusterClient } = await getScopedClients({ request }); - const streamDefinition = { ...params.body, id: params.path.id }; + const streamDefinition: StreamDefinition = { stream: params.body, name: params.path.id }; - if (!streamDefinition.managed) { + if (!isWiredStream(streamDefinition)) { await syncStream({ scopedClusterClient, - definition: { ...streamDefinition, id: params.path.id }, + definition: streamDefinition, rootDefinition: undefined, logger, }); return { acknowledged: true }; } - await validateStreamChildren(scopedClusterClient, params.path.id, params.body.children); - await validateAncestorFields(scopedClusterClient, params.path.id, params.body.fields); - await validateDescendantFields(scopedClusterClient, params.path.id, params.body.fields); + await validateStreamChildren(scopedClusterClient, params.path.id, params.body.ingest.routing); + if (isWiredStreamConfig(params.body)) { + await validateAncestorFields( + scopedClusterClient, + params.path.id, + params.body.ingest.wired.fields + ); + await validateDescendantFields( + scopedClusterClient, + params.path.id, + params.body.ingest.wired.fields + ); + } const parentId = getParentId(params.path.id); - let parentDefinition: StreamDefinition | undefined; + let parentDefinition: WiredStreamDefinition | undefined; // always need to go from the leaves to the parent when syncing ingest pipelines, otherwise data // will be routed before the data stream is ready - for (const child of streamDefinition.children) { + for (const child of streamDefinition.stream.ingest.routing) { const streamExists = await checkStreamExists({ scopedClusterClient, - id: child.id, + id: child.name, }); if (streamExists) { continue; } // create empty streams for each child if they don't exist - const childDefinition = { - id: child.id, - children: [], - fields: [], - processing: [], - managed: true, + const childDefinition: WiredStreamDefinition = { + name: child.name, + stream: { + ingest: { + processing: [], + routing: [], + wired: { + fields: {}, + }, + }, + }, }; await syncStream({ @@ -98,7 +120,7 @@ export const editStreamRoute = createServerRoute({ await syncStream({ scopedClusterClient, - definition: { ...streamDefinition, id: params.path.id, managed: true }, + definition: { ...streamDefinition, name: params.path.id }, rootDefinition: parentDefinition, logger, }); @@ -137,15 +159,15 @@ async function updateParentStream( id: string, logger: Logger ) { - const { definition: parentDefinition } = await readStream({ + const parentDefinition = await readStream({ scopedClusterClient, id: parentId, }); - if (!parentDefinition.children.some((child) => child.id === id)) { + if (!parentDefinition.stream.ingest.routing.some((child) => child.name === id)) { // add the child to the parent stream with an empty condition for now - parentDefinition.children.push({ - id, + parentDefinition.stream.ingest.routing.push({ + name: id, condition: undefined, }); @@ -155,21 +177,21 @@ async function updateParentStream( logger, }); } - return parentDefinition; + return parentDefinition as WiredStreamDefinition; } async function validateStreamChildren( scopedClusterClient: IScopedClusterClient, id: string, - children: StreamDefinition['children'] + children: WiredStreamConfigDefinition['ingest']['routing'] ) { try { - const { definition: oldDefinition } = await readStream({ + const oldDefinition = await readStream({ scopedClusterClient, id, }); - const oldChildren = oldDefinition.children.map((child) => child.id); - const newChildren = new Set(children.map((child) => child.id)); + const oldChildren = oldDefinition.stream.ingest.routing.map((child) => child.name); + const newChildren = new Set(children.map((child) => child.name)); children.forEach((child) => { validateCondition(child.condition); }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts index ee49a93aaacbb..8b479813f87af 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/enable.ts @@ -28,7 +28,6 @@ export const enableStreamsRoute = createServerRoute({ }, handler: async ({ request, - response, logger, getScopedClients, }): Promise<{ acknowledged: true; message: string }> => { diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts index 9ec61d27619e2..447fdfcc84978 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/fork.ts @@ -7,6 +7,7 @@ import { z } from '@kbn/zod'; import { badRequest, internal, notFound } from '@hapi/boom'; +import { conditionSchema, isWiredStream, WiredStreamDefinition } from '@kbn/streams-schema'; import { DefinitionNotFound, ForkConditionMissing, @@ -14,7 +15,6 @@ import { SecurityException, } from '../../lib/streams/errors'; import { createServerRoute } from '../create_server_route'; -import { conditionSchema, streamDefinitonWithoutChildrenSchema } from '../../../common/types'; import { syncStream, readStream, validateAncestorFields } from '../../lib/streams/stream_crud'; import { MalformedStreamId } from '../../lib/streams/errors/malformed_stream_id'; import { isChildOf } from '../../lib/streams/helpers/hierarchy'; @@ -36,7 +36,7 @@ export const forkStreamsRoute = createServerRoute({ path: z.object({ id: z.string(), }), - body: z.object({ stream: streamDefinitonWithoutChildrenSchema, condition: conditionSchema }), + body: z.object({ stream: z.object({ name: z.string() }), condition: conditionSchema }), }), handler: async ({ params, @@ -53,34 +53,39 @@ export const forkStreamsRoute = createServerRoute({ const { scopedClusterClient } = await getScopedClients({ request }); - const { definition: rootDefinition } = await readStream({ + const rootDefinition = await readStream({ scopedClusterClient, id: params.path.id, }); - if (rootDefinition.managed === false) { + if (!isWiredStream(rootDefinition)) { throw new MalformedStreamId('Cannot fork a stream that is not managed'); } - const childDefinition = { ...params.body.stream, children: [] }; + const childDefinition: WiredStreamDefinition = { + ...params.body.stream, + stream: { ingest: { processing: [], routing: [], wired: { fields: {} } } }, + }; // check whether root stream has a child of the given name already - if (rootDefinition.children.some((child) => child.id === childDefinition.id)) { + if ( + rootDefinition.stream.ingest.routing.some((child) => child.name === childDefinition.name) + ) { throw new MalformedStreamId( - `The stream with ID (${params.body.stream.id}) already exists as a child of the parent stream` + `The stream with ID (${params.body.stream.name}) already exists as a child of the parent stream` ); } if (!isChildOf(rootDefinition, childDefinition)) { throw new MalformedStreamId( - `The ID (${params.body.stream.id}) from the new stream must start with the parent's id (${rootDefinition.id}), followed by a dot and a name` + `The ID (${params.body.stream.name}) from the new stream must start with the parent's id (${rootDefinition.name}), followed by a dot and a name` ); } await validateAncestorFields( scopedClusterClient, - params.body.stream.id, - params.body.stream.fields + childDefinition.name, + childDefinition.stream.ingest.wired.fields ); // need to create the child first, otherwise we risk streaming data even though the child data stream is not ready @@ -91,8 +96,8 @@ export const forkStreamsRoute = createServerRoute({ logger, }); - rootDefinition.children.push({ - id: params.body.stream.id, + rootDefinition.stream.ingest.routing.push({ + name: params.body.stream.name, condition: params.body.condition, }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts index f98c635830bda..66edc3c7954b4 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/list.ts @@ -7,10 +7,10 @@ import { z } from '@kbn/zod'; import { notFound, internal } from '@hapi/boom'; +import { ListStreamsResponse } from '@kbn/streams-schema'; import { createServerRoute } from '../create_server_route'; import { DefinitionNotFound } from '../../lib/streams/errors'; import { listStreams } from '../../lib/streams/stream_crud'; -import { StreamDefinition } from '../../../common'; export const listStreamsRoute = createServerRoute({ endpoint: 'GET /api/streams', @@ -25,18 +25,10 @@ export const listStreamsRoute = createServerRoute({ }, }, params: z.object({}), - handler: async ({ - response, - request, - getScopedClients, - }): Promise<{ definitions: StreamDefinition[] }> => { + handler: async ({ request, getScopedClients }): Promise => { try { const { scopedClusterClient } = await getScopedClients({ request }); - const { definitions } = await listStreams({ scopedClusterClient }); - - return { - definitions, - }; + return listStreams({ scopedClusterClient }); } catch (e) { if (e instanceof DefinitionNotFound) { throw notFound(e); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/read.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/read.ts index dbbda8c0dc5de..cd3d43934f107 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/read.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/read.ts @@ -7,7 +7,12 @@ import { z } from '@kbn/zod'; import { notFound, internal } from '@hapi/boom'; -import { ReadStreamDefinition } from '../../../common/types'; +import { + FieldDefinitionConfig, + isIngestStream, + isWiredStream, + ReadStreamDefinition, +} from '@kbn/streams-schema'; import { createServerRoute } from '../create_server_route'; import { DefinitionNotFound } from '../../lib/streams/errors'; import { readAncestors, readStream } from '../../lib/streams/stream_crud'; @@ -27,13 +32,7 @@ export const readStreamRoute = createServerRoute({ params: z.object({ path: z.object({ id: z.string() }), }), - handler: async ({ - response, - params, - request, - logger, - getScopedClients, - }): Promise => { + handler: async ({ params, request, getScopedClients }): Promise => { try { const { scopedClusterClient } = await getScopedClients({ request }); const streamEntity = await readStream({ @@ -41,23 +40,29 @@ export const readStreamRoute = createServerRoute({ id: params.path.id, }); - if (streamEntity.definition.managed === false) { + // TODO: I have no idea why I can just do `isIngestStream` here but when I do, + // streamEntity becomes `streamEntity: never` in the statements afterwards + if (!isWiredStream(streamEntity) && isIngestStream(streamEntity)) { return { - ...streamEntity.definition, - inheritedFields: [], + ...streamEntity, + inherited_fields: {}, }; } const { ancestors } = await readAncestors({ - id: streamEntity.definition.id, + id: streamEntity.name, scopedClusterClient, }); const body = { - ...streamEntity.definition, - inheritedFields: ancestors.flatMap(({ definition: { id, fields } }) => - fields.map((field) => ({ ...field, from: id })) - ), + ...streamEntity, + inherited_fields: ancestors.reduce((acc, def) => { + Object.entries(def.stream.ingest.wired.fields).forEach(([key, fieldDef]) => { + acc[key] = { ...fieldDef, from: def.name }; + }); + return acc; + // TODO: replace this with a proper type + }, {} as Record), }; return body; diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/resync.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/resync.ts index 8e520410ca5c2..73955a2bd9bb5 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/resync.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/resync.ts @@ -22,20 +22,15 @@ export const resyncStreamsRoute = createServerRoute({ }, }, params: z.object({}), - handler: async ({ - response, - logger, - request, - getScopedClients, - }): Promise<{ acknowledged: true }> => { + handler: async ({ logger, request, getScopedClients }): Promise<{ acknowledged: true }> => { const { scopedClusterClient } = await getScopedClients({ request }); - const { definitions: streams } = await listStreams({ scopedClusterClient }); + const { streams } = await listStreams({ scopedClusterClient }); for (const stream of streams) { - const { definition } = await readStream({ + const definition = await readStream({ scopedClusterClient, - id: stream.id, + id: stream.name, }); await syncStream({ diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/sample.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/sample.ts index cd3a989c29109..f912e2e27fd96 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/sample.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/sample.ts @@ -7,7 +7,7 @@ import { z } from '@kbn/zod'; import { notFound, internal } from '@hapi/boom'; -import { conditionSchema } from '../../../common/types'; +import { conditionSchema } from '@kbn/streams-schema'; import { createServerRoute } from '../create_server_route'; import { DefinitionNotFound } from '../../lib/streams/errors'; import { checkReadAccess } from '../../lib/streams/stream_crud'; @@ -35,13 +35,7 @@ export const sampleStreamRoute = createServerRoute({ number: z.optional(z.number()), }), }), - handler: async ({ - response, - params, - request, - logger, - getScopedClients, - }): Promise<{ documents: unknown[] }> => { + handler: async ({ params, request, getScopedClients }): Promise<{ documents: unknown[] }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/fields_simulation.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/fields_simulation.ts index 01aa61a302a39..9db5a7013f01e 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/fields_simulation.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/fields_simulation.ts @@ -8,7 +8,7 @@ import { z } from '@kbn/zod'; import { notFound, internal } from '@hapi/boom'; import { getFlattenedObject } from '@kbn/std'; -import { fieldDefinitionSchema } from '../../../../common/types'; +import { fieldDefinitionConfigSchema } from '@kbn/streams-schema'; import { createServerRoute } from '../../create_server_route'; import { DefinitionNotFound } from '../../../lib/streams/errors'; import { checkReadAccess } from '../../../lib/streams/stream_crud'; @@ -30,14 +30,12 @@ export const schemaFieldsSimulationRoute = createServerRoute({ params: z.object({ path: z.object({ id: z.string() }), body: z.object({ - field_definitions: z.array(fieldDefinitionSchema), + field_definitions: z.array(fieldDefinitionConfigSchema.extend({ name: z.string() })), }), }), handler: async ({ - response, params, request, - logger, getScopedClients, }): Promise<{ status: 'unknown' | 'success' | 'failure'; diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/unmapped_fields.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/unmapped_fields.ts index 15bcb964b8fd6..12faa12f9cee4 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/unmapped_fields.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/schema/unmapped_fields.ts @@ -8,6 +8,7 @@ import { z } from '@kbn/zod'; import { internal, notFound } from '@hapi/boom'; import { getFlattenedObject } from '@kbn/std'; +import { isWiredStream } from '@kbn/streams-schema'; import { DefinitionNotFound } from '../../../lib/streams/errors'; import { checkReadAccess, readAncestors, readStream } from '../../../lib/streams/stream_crud'; import { createServerRoute } from '../../create_server_route'; @@ -29,13 +30,7 @@ export const unmappedFieldsRoute = createServerRoute({ params: z.object({ path: z.object({ id: z.string() }), }), - handler: async ({ - response, - params, - request, - logger, - getScopedClients, - }): Promise<{ unmappedFields: string[] }> => { + handler: async ({ params, request, getScopedClients }): Promise<{ unmappedFields: string[] }> => { try { const { scopedClusterClient } = await getScopedClients({ request }); @@ -76,7 +71,11 @@ export const unmappedFieldsRoute = createServerRoute({ // Mapped fields from the stream's definition and inherited from ancestors const mappedFields = new Set(); - streamEntity.definition.fields.forEach((field) => mappedFields.add(field.name)); + if (isWiredStream(streamEntity)) { + Object.keys(streamEntity.stream.ingest.wired.fields).forEach((name) => + mappedFields.add(name) + ); + } const { ancestors } = await readAncestors({ id: params.path.id, @@ -84,7 +83,7 @@ export const unmappedFieldsRoute = createServerRoute({ }); for (const ancestor of ancestors) { - ancestor.definition.fields.forEach((field) => mappedFields.add(field.name)); + Object.keys(ancestor.stream.ingest.wired.fields).forEach((name) => mappedFields.add(name)); } const unmappedFields = Array.from(sourceFields) diff --git a/x-pack/solutions/observability/plugins/streams/tsconfig.json b/x-pack/solutions/observability/plugins/streams/tsconfig.json index fbb8515998fb3..464f184918c96 100644 --- a/x-pack/solutions/observability/plugins/streams/tsconfig.json +++ b/x-pack/solutions/observability/plugins/streams/tsconfig.json @@ -31,6 +31,7 @@ "@kbn/observability-utils-server", "@kbn/observability-utils-common", "@kbn/std", - "@kbn/safer-lodash-set" + "@kbn/safer-lodash-set", + "@kbn/streams-schema" ] } diff --git a/x-pack/solutions/observability/plugins/streams_app/common/index.ts b/x-pack/solutions/observability/plugins/streams_app/common/index.ts index c41a05b84d307..6fcafd32bd71a 100644 --- a/x-pack/solutions/observability/plugins/streams_app/common/index.ts +++ b/x-pack/solutions/observability/plugins/streams_app/common/index.ts @@ -5,7 +5,7 @@ * 2.0. */ -import type { StreamDefinition } from '@kbn/streams-plugin/common'; +import type { StreamDefinition } from '@kbn/streams-schema'; interface EntityBase { type: string; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/condition_editor/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/condition_editor/index.tsx index e7e2a79b59294..e53c9f9069797 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/condition_editor/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/condition_editor/index.tsx @@ -21,7 +21,7 @@ import { Condition, FilterCondition, OrCondition, -} from '@kbn/streams-plugin/common/types'; +} from '@kbn/streams-schema'; import React, { useEffect } from 'react'; import { i18n } from '@kbn/i18n'; import { css } from '@emotion/css'; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/entity_detail_view/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/entity_detail_view/index.tsx index 4e1ec87866aee..abd9cf04ea2f0 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/entity_detail_view/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/entity_detail_view/index.tsx @@ -8,7 +8,7 @@ import { EuiFlexGroup, EuiFlexItem, EuiIcon, EuiLink, EuiPanel, EuiBadge } from import { i18n } from '@kbn/i18n'; import React from 'react'; import { css } from '@emotion/css'; -import { StreamDefinition } from '@kbn/streams-plugin/common'; +import { isIngestStream, StreamDefinition } from '@kbn/streams-schema'; import { useStreamsAppBreadcrumbs } from '../../hooks/use_streams_app_breadcrumbs'; import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; import { EntityOverviewTabList } from '../entity_overview_tab_list'; @@ -101,7 +101,7 @@ export function EntityDetailViewWithoutParams({ title={ <> {entity.displayName} - {definition && !definition.managed ? ( + {definition && isIngestStream(definition) ? ( <> {' '} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enriching/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enriching/index.tsx index d879142162353..fe532825d970a 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enriching/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enriching/index.tsx @@ -4,7 +4,7 @@ * 2.0; you may not use this file except in compliance with the Elastic License * 2.0. */ -import { StreamDefinition } from '@kbn/streams-plugin/common'; +import { StreamDefinition } from '@kbn/streams-schema'; import React from 'react'; export function StreamDetailEnriching({ diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx index 1664b322b5a8e..66e291cb3f61b 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/classic.tsx @@ -6,7 +6,7 @@ */ import React from 'react'; import { i18n } from '@kbn/i18n'; -import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-plugin/common'; +import { ReadStreamDefinition, StreamDefinition } from '@kbn/streams-schema'; import { EuiFlexGroup, EuiListGroup, EuiText } from '@elastic/eui'; import { useStreamsAppParams } from '../../hooks/use_streams_app_params'; import { RedirectTo } from '../redirect_to'; @@ -66,7 +66,7 @@ function UnmanagedStreamOverview({ definition }: { definition: StreamDefinition http: { basePath }, }, } = useKibana(); - const groupedAssets = (definition.unmanaged_elasticsearch_assets ?? []).reduce((acc, asset) => { + const groupedAssets = (definition.elasticsearch_assets ?? []).reduce((acc, asset) => { const title = assetToTitle(asset); if (title) { acc[title] = acc[title] ?? []; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx index 24567fe8d80a3..6379855fa3c92 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_management/index.tsx @@ -5,7 +5,7 @@ * 2.0. */ import React from 'react'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { isWiredReadStream, ReadStreamDefinition } from '@kbn/streams-schema'; import { WiredStreamDetailManagement } from './wired'; import { ClassicStreamDetailManagement } from './classic'; @@ -22,7 +22,7 @@ export function StreamDetailManagement({ return null; } - if (definition.managed) { + if (isWiredReadStream(definition)) { return ( void; isLoadingDefinition: boolean; }) { diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx index 28af5f4f104c1..1fdc95821172e 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_overview/index.tsx @@ -10,7 +10,7 @@ import { i18n } from '@kbn/i18n'; import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range'; import moment from 'moment'; import React, { useMemo } from 'react'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { ReadStreamDefinition } from '@kbn/streams-schema'; import { useKibana } from '../../hooks/use_kibana'; import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch'; import { ControlledEsqlChart } from '../esql_chart/controlled_esql_chart'; @@ -35,18 +35,18 @@ export function StreamDetailOverview({ definition }: { definition?: ReadStreamDe } = useDateRange({ data }); const indexPatterns = useMemo(() => { - if (!definition?.id) { + if (!definition?.name) { return undefined; } - const isRoot = definition.id.indexOf('.') === -1; + const isRoot = definition.name.indexOf('.') === -1; - const dataStreamOfDefinition = definition.id; + const dataStreamOfDefinition = definition.name; return isRoot ? [dataStreamOfDefinition, `${dataStreamOfDefinition}.*`] : [`${dataStreamOfDefinition}*`]; - }, [definition?.id]); + }, [definition?.name]); const discoverLocator = useMemo( () => share.url.locators.get('DISCOVER_APP_LOCATOR'), diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_routing/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_routing/index.tsx index 09f1116080dd8..d21d4d5933ad9 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_routing/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_routing/index.tsx @@ -25,9 +25,12 @@ import { css } from '@emotion/css'; import { i18n } from '@kbn/i18n'; import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller'; import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; import React from 'react'; -import { StreamChild } from '@kbn/streams-plugin/common/types'; +import { + StreamChild, + ReadStreamDefinition, + WiredStreamConfigDefinition, +} from '@kbn/streams-schema'; import { AbortableAsyncState } from '@kbn/observability-utils-browser/hooks/use_abortable_async'; import { useKibana } from '../../hooks/use_kibana'; import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch'; @@ -89,7 +92,7 @@ export function StreamDetailRouting({ closeModal={closeModal} clearChildUnderEdit={() => routingAppState.setChildUnderEdit(undefined)} refreshDefinition={refreshDefinition} - id={routingAppState.childUnderEdit.child.id} + id={routingAppState.childUnderEdit.child.name} /> )} - child.id === childUnderEdit.id ? childUnderEdit : child - ), - }, + ...stream, + ingest: { + ...stream.ingest, + routing: definition.stream.ingest.routing.map((child) => + child.name === childUnderEdit.name ? childUnderEdit : child + ), + }, + } as WiredStreamConfigDefinition, }, }); } @@ -350,7 +354,7 @@ function PreviewPanel({ signal, params: { path: { - id: definition.id, + id: definition.name, }, body: { condition: routingAppState.debouncedChildUnderEdit.child.condition, @@ -550,17 +554,17 @@ function ChildStreamList({ > - {definition.children.map((child, i) => ( + {definition.stream.ingest.routing.map((child, i) => ( { - if (child.id === childUnderEdit?.child.id) { + if (child.name === childUnderEdit?.child.name) { setChildUnderEdit(undefined); } else { setChildUnderEdit({ isNew: false, child }); @@ -601,7 +605,7 @@ function ChildStreamList({ setChildUnderEdit({ isNew: true, child: { - id: `${definition.id}.child`, + name: `${definition.name}.child`, condition: { field: '', operator: 'eq', @@ -627,7 +631,7 @@ function CurrentStreamEntry({ definition }: { definition: ReadStreamDefinition } return ( - {definition.id} + {definition.name} {i18n.translate('xpack.streams.streamDetailRouting.currentStream', { defaultMessage: 'Current stream', @@ -641,7 +645,7 @@ function CurrentStreamEntry({ definition }: { definition: ReadStreamDefinition } function PreviousStreamEntry({ definition }: { definition: ReadStreamDefinition }) { const router = useStreamsAppRouter(); - const parentId = definition.id.split('.').slice(0, -1).join('.'); + const parentId = definition.name.split('.').slice(0, -1).join('.'); if (parentId === '') { return null; } @@ -686,7 +690,7 @@ function RoutingStreamEntry({ - {child.id} + {child.name} { onChildChange({ ...child, - id: e.target.value, + name: e.target.value, }); }} /> diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/field_type.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/field_type.tsx index a278d22dcd3ec..14203f0b5d998 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/field_type.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/field_type.tsx @@ -7,8 +7,8 @@ import { EuiFlexGroup, EuiFlexItem, EuiToken } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; -import { FieldDefinition } from '@kbn/streams-plugin/common/types'; import React from 'react'; +import { FieldDefinitionConfig } from '@kbn/streams-schema'; export const FIELD_TYPE_MAP = { boolean: { @@ -55,7 +55,7 @@ export const FIELD_TYPE_MAP = { }, }; -export const FieldType = ({ type }: { type: FieldDefinition['type'] }) => { +export const FieldType = ({ type }: { type: FieldDefinitionConfig['type'] }) => { return ( diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx index b50fdee7e6ae9..4daf6acf0a6f2 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/fields_table.tsx @@ -22,7 +22,7 @@ import type { } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import useToggle from 'react-use/lib/useToggle'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common/types'; +import { isWiredStream, ReadStreamDefinition } from '@kbn/streams-schema'; import { FieldType } from './field_type'; import { FieldStatus } from './field_status'; import { FieldEntry, SchemaEditorEditingState } from './hooks/use_editing_state'; @@ -71,14 +71,13 @@ export const EMPTY_CONTENT = '-----'; export const FieldsTableContainer = ({ definition, unmappedFieldsResult, - isLoadingUnmappedFields, query, editingState, unpromotingState, }: FieldsTableContainerProps) => { const inheritedFields = useMemo(() => { - return definition.inheritedFields.map((field) => ({ - name: field.name, + return Object.entries(definition.inherited_fields).map(([name, field]) => ({ + name, type: field.type, format: field.format, parent: field.from, @@ -94,13 +93,16 @@ export const FieldsTableContainer = ({ }, [inheritedFields, query]); const mappedFields = useMemo(() => { - return definition.fields.map((field) => ({ - name: field.name, - type: field.type, - format: field.format, - parent: definition.id, - status: 'mapped' as const, - })); + if (isWiredStream(definition)) { + return Object.entries(definition.stream.ingest.wired.fields).map(([name, field]) => ({ + name, + type: field.type, + format: field.format, + parent: definition.name, + status: 'mapped' as const, + })); + } + return []; }, [definition]); const filteredMappedFields = useMemo(() => { @@ -114,11 +116,11 @@ export const FieldsTableContainer = ({ return unmappedFieldsResult ? unmappedFieldsResult.map((field) => ({ name: field, - parent: definition.id, + parent: definition.name, status: 'unmapped' as const, })) : []; - }, [definition.id, unmappedFieldsResult]); + }, [definition.name, unmappedFieldsResult]); const filteredUnmappedFields = useMemo(() => { if (!unmappedFieldsResult) return []; @@ -285,7 +287,9 @@ const FieldsTable = ({ definition, fields, editingState, unpromotingState }: Fie if (!fieldType) return EMPTY_CONTENT; return ; } else if (columnId === 'parent') { - return ; + return ( + + ); } else if (columnId === 'status') { return ; } else { diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/children_affected_callout.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/children_affected_callout.tsx index 1cb9cbd2dd045..b3cb9c5ef4f8a 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/children_affected_callout.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/children_affected_callout.tsx @@ -7,13 +7,13 @@ import React from 'react'; import { EuiCallOut } from '@elastic/eui'; -import { StreamDefinition } from '@kbn/streams-plugin/common/types'; import { i18n } from '@kbn/i18n'; +import { StreamConfigDefinition } from '@kbn/streams-schema'; export const ChildrenAffectedCallout = ({ childStreams, }: { - childStreams: StreamDefinition['children']; + childStreams: StreamConfigDefinition['ingest']['routing']; }) => { return ( stream.id).join(', '), + affectedStreams: childStreams.map((stream) => stream.name).join(', '), }, })} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/field_form_format.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/field_form_format.tsx index 98f5d899c1074..9b8ba2bdbe6db 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/field_form_format.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/field_form_format.tsx @@ -7,7 +7,7 @@ import { EuiFieldText } from '@elastic/eui'; import React from 'react'; -import { FieldDefinition } from '@kbn/streams-plugin/common/types'; +import { FieldDefinitionConfig } from '@kbn/streams-schema'; import { SchemaEditorEditingState } from '../hooks/use_editing_state'; type FieldFormFormatProps = Pick< @@ -15,7 +15,7 @@ type FieldFormFormatProps = Pick< 'nextFieldType' | 'nextFieldFormat' | 'setNextFieldFormat' >; -export const typeSupportsFormat = (type?: FieldDefinition['type']) => { +export const typeSupportsFormat = (type?: FieldDefinitionConfig['type']) => { if (!type) return false; return ['date'].includes(type); }; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/index.tsx index 8bbdd6abf9ad3..e0874c87234b7 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/index.tsx @@ -19,7 +19,7 @@ import { } from '@elastic/eui'; import React from 'react'; import { i18n } from '@kbn/i18n'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { ReadStreamDefinition } from '@kbn/streams-schema'; import { SchemaEditorEditingState } from '../hooks/use_editing_state'; import { ChildrenAffectedCallout } from './children_affected_callout'; import { SamplePreviewTable } from './sample_preview_table'; @@ -57,9 +57,9 @@ export const SchemaEditorFlyout = (props: SchemaEditorFlyoutProps) => { - {isEditing && definition.children.length > 0 ? ( + {isEditing && definition.stream.ingest.routing.length > 0 ? ( - + ) : null} diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/sample_preview_table.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/sample_preview_table.tsx index 8c04a0b70e3be..770c832b453a0 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/sample_preview_table.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/flyout/sample_preview_table.tsx @@ -7,11 +7,10 @@ import React, { useMemo } from 'react'; import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; -import { FieldDefinition } from '@kbn/streams-plugin/common/types'; import { css } from '@emotion/react'; import { i18n } from '@kbn/i18n'; import { EuiCallOut } from '@elastic/eui'; +import { FieldDefinitionConfigWithName, ReadStreamDefinition } from '@kbn/streams-schema'; import { getFormattedError } from '../../../util/errors'; import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch'; import { PreviewTable } from '../../preview_table'; @@ -20,7 +19,7 @@ import { LoadingPanel } from '../../loading_panel'; interface SamplePreviewTableProps { definition: ReadStreamDefinition; - nextFieldDefinition?: Partial; + nextFieldDefinition?: Partial; streamsRepositoryClient: StreamsRepositoryClient; } @@ -39,14 +38,14 @@ const SamplePreviewTableContent = ({ definition, nextFieldDefinition, streamsRepositoryClient, -}: SamplePreviewTableProps & { nextFieldDefinition: FieldDefinition }) => { +}: SamplePreviewTableProps & { nextFieldDefinition: FieldDefinitionConfigWithName }) => { const { value, loading, error } = useStreamsAppFetch( ({ signal }) => { return streamsRepositoryClient.fetch('POST /api/streams/{id}/schema/fields_simulation', { signal, params: { path: { - id: definition.id, + id: definition.name, }, body: { field_definitions: [nextFieldDefinition], @@ -54,7 +53,7 @@ const SamplePreviewTableContent = ({ }, }); }, - [definition.id, nextFieldDefinition, streamsRepositoryClient], + [definition.name, nextFieldDefinition, streamsRepositoryClient], { disableToastOnError: true, } diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_editing_state.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_editing_state.tsx index 9fc6288c1daf7..6cb274fc39168 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_editing_state.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_editing_state.tsx @@ -5,21 +5,26 @@ * 2.0. */ -import { FieldDefinition, ReadStreamDefinition } from '@kbn/streams-plugin/common/types'; +import { + ReadStreamDefinition, + FieldDefinitionConfigWithName, + isWiredReadStream, +} from '@kbn/streams-schema'; import { StreamsRepositoryClient } from '@kbn/streams-plugin/public/api'; import { useCallback, useMemo, useState } from 'react'; import useToggle from 'react-use/lib/useToggle'; import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller'; import { ToastsStart } from '@kbn/core-notifications-browser'; import { i18n } from '@kbn/i18n'; +import { omit } from 'lodash'; import { FieldStatus } from '../field_status'; export type SchemaEditorEditingState = ReturnType; export interface FieldEntry { - name: FieldDefinition['name']; - type?: FieldDefinition['type']; - format?: FieldDefinition['format']; + name: FieldDefinitionConfigWithName['name']; + type?: FieldDefinitionConfigWithName['type']; + format?: FieldDefinitionConfigWithName['format']; parent: string; status: FieldStatus; } @@ -90,7 +95,8 @@ export const useEditingState = ({ const saveChanges = useMemo(() => { return selectedField && isFullFieldDefinition(nextFieldDefinition) && - hasChanges(selectedField, nextFieldDefinition) + hasChanges(selectedField, nextFieldDefinition) && + isWiredReadStream(definition) ? async () => { toggleIsSaving(true); try { @@ -98,15 +104,22 @@ export const useEditingState = ({ signal: abortController.signal, params: { path: { - id: definition.id, + id: definition.name, }, body: { - processing: definition.processing, - children: definition.children, - fields: [ - ...definition.fields.filter((field) => field.name !== nextFieldDefinition.name), - nextFieldDefinition, - ], + ingest: { + ...definition.stream.ingest, + wired: { + fields: { + ...Object.fromEntries( + Object.entries(definition.stream.ingest.wired.fields).filter( + ([name, _field]) => name !== nextFieldDefinition.name + ) + ), + [nextFieldDefinition.name]: omit(nextFieldDefinition, 'name'), + }, + }, + }, }, }, }); @@ -133,10 +146,7 @@ export const useEditingState = ({ : undefined; }, [ abortController.signal, - definition.children, - definition.fields, - definition.id, - definition.processing, + definition, nextFieldDefinition, refreshDefinition, refreshUnmappedFields, @@ -165,14 +175,14 @@ export const useEditingState = ({ }; export const isFullFieldDefinition = ( - value?: Partial -): value is FieldDefinition => { + value?: Partial +): value is FieldDefinitionConfigWithName => { return !!value && !!value.name && !!value.type; }; const hasChanges = ( - selectedField: Partial, - nextFieldEntry: Partial + selectedField: Partial, + nextFieldEntry: Partial ) => { return ( selectedField.type !== nextFieldEntry.type || selectedField.format !== nextFieldEntry.format diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_unpromoting_state.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_unpromoting_state.tsx index b6e30c87cd7b4..5af56b61ff665 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_unpromoting_state.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/hooks/use_unpromoting_state.tsx @@ -11,7 +11,8 @@ import useToggle from 'react-use/lib/useToggle'; import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller'; import { ToastsStart } from '@kbn/core-notifications-browser'; import { i18n } from '@kbn/i18n'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; +import { WiredReadStreamDefinition } from '@kbn/streams-schema'; +import { omit } from 'lodash'; export type SchemaEditorUnpromotingState = ReturnType; @@ -23,7 +24,7 @@ export const useUnpromotingState = ({ toastsService, }: { streamsRepositoryClient: StreamsRepositoryClient; - definition: ReadStreamDefinition; + definition: WiredReadStreamDefinition; refreshDefinition: () => void; refreshUnmappedFields: () => void; toastsService: ToastsStart; @@ -46,12 +47,15 @@ export const useUnpromotingState = ({ signal: abortController.signal, params: { path: { - id: definition.id, + id: definition.name, }, body: { - processing: definition.processing, - children: definition.children, - fields: definition.fields.filter((field) => field.name !== selectedField), + ingest: { + ...definition.stream.ingest, + wired: { + fields: omit(definition.stream.ingest.wired.fields, selectedField), + }, + }, }, }, }); @@ -77,10 +81,8 @@ export const useUnpromotingState = ({ } }, [ abortController.signal, - definition.children, - definition.fields, - definition.id, - definition.processing, + definition.name, + definition.stream.ingest, refreshDefinition, refreshUnmappedFields, selectedField, diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/index.tsx index 3ca410e74ffdb..1af840d2c4110 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_schema_editor/index.tsx @@ -13,8 +13,8 @@ import { EuiPortal, Query, } from '@elastic/eui'; -import { ReadStreamDefinition } from '@kbn/streams-plugin/common'; import { css } from '@emotion/css'; +import { WiredReadStreamDefinition } from '@kbn/streams-schema'; import { useEditingState } from './hooks/use_editing_state'; import { SchemaEditorFlyout } from './flyout'; import { useKibana } from '../../hooks/use_kibana'; @@ -25,7 +25,7 @@ import { useStreamsAppFetch } from '../../hooks/use_streams_app_fetch'; import { FieldsTableContainer } from './fields_table'; interface SchemaEditorProps { - definition?: ReadStreamDefinition; + definition?: WiredReadStreamDefinition; refreshDefinition: () => void; isLoadingDefinition: boolean; } @@ -63,12 +63,12 @@ const Content = ({ signal, params: { path: { - id: definition.id, + id: definition.name, }, }, }); }, - [definition.id, streamsRepositoryClient] + [definition.name, streamsRepositoryClient] ); const editingState = useEditingState({ @@ -92,7 +92,7 @@ const Content = ({ // If the definition changes (e.g. navigating to parent stream), reset the entire editing state. useEffect(() => { reset(); - }, [definition.id, reset]); + }, [definition.name, reset]); return ( diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/streams_table/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/streams_table/index.tsx index 39814ed904555..ef80d1346edd4 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/streams_table/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/streams_table/index.tsx @@ -14,47 +14,47 @@ import { } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import type { AbortableAsyncState } from '@kbn/observability-utils-browser/hooks/use_abortable_async'; -import { StreamDefinition } from '@kbn/streams-plugin/common'; import React, { useMemo } from 'react'; +import { isWiredStreamConfig, StreamDefinition } from '@kbn/streams-schema'; import { useStreamsAppRouter } from '../../hooks/use_streams_app_router'; export function StreamsTable({ listFetch, query, }: { - listFetch: AbortableAsyncState<{ definitions: StreamDefinition[] }>; + listFetch: AbortableAsyncState<{ streams: StreamDefinition[] }>; query: string; }) { const router = useStreamsAppRouter(); const items = useMemo(() => { - return listFetch.value?.definitions ?? []; - }, [listFetch.value?.definitions]); + return listFetch.value?.streams ?? []; + }, [listFetch.value?.streams]); const filteredItems = useMemo(() => { if (!query) { return items; } - return items.filter((item) => item.id.toLowerCase().includes(query.toLowerCase())); + return items.filter((item) => item.name.toLowerCase().includes(query.toLowerCase())); }, [query, items]); const columns = useMemo>>(() => { return [ { - field: 'id', + field: 'name', name: i18n.translate('xpack.streams.streamsTable.nameColumnTitle', { defaultMessage: 'Name', }), - render: (_, { id, managed }) => { + render: (_, { name, stream }) => { return ( - + - {id} + {name} ); diff --git a/x-pack/solutions/observability/plugins/streams_app/tsconfig.json b/x-pack/solutions/observability/plugins/streams_app/tsconfig.json index 7a77dae1922d0..7824c84d6ea6b 100644 --- a/x-pack/solutions/observability/plugins/streams_app/tsconfig.json +++ b/x-pack/solutions/observability/plugins/streams_app/tsconfig.json @@ -37,5 +37,6 @@ "@kbn/ui-theme", "@kbn/navigation-plugin", "@kbn/core-notifications-browser", + "@kbn/streams-schema", ] } diff --git a/x-pack/test/api_integration/apis/streams/classic.ts b/x-pack/test/api_integration/apis/streams/classic.ts index 25a7238a757ca..67d72bcb0a0ac 100644 --- a/x-pack/test/api_integration/apis/streams/classic.ts +++ b/x-pack/test/api_integration/apis/streams/classic.ts @@ -42,54 +42,60 @@ export default function ({ getService }: FtrProviderContext) { const response = await indexDocument(esClient, 'logs-test-default', doc); expect(response.result).to.eql('created'); const streams = await listStreams(supertest); - const classicStream = streams.definitions.find( - (stream: JsonObject) => stream.id === 'logs-test-default' + const classicStream = streams.streams.find( + (stream: JsonObject) => stream.name === 'logs-test-default' ); expect(classicStream).to.eql({ - id: 'logs-test-default', - managed: false, - children: [], - fields: [], - processing: [], + name: 'logs-test-default', + stream: { + ingest: { + processing: [], + routing: [], + }, + }, }); }); it('Allows setting processing on classic streams', async () => { const response = await putStream(supertest, 'logs-test-default', { - managed: false, - children: [], - fields: [], - processing: [ - { - config: { - type: 'grok', - field: 'message', - patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', - ], + ingest: { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, }, - }, - ], + ], + routing: [], + }, }); expect(response).to.have.property('acknowledged', true); const streamBody = await getStream(supertest, 'logs-test-default'); expect(streamBody).to.eql({ - id: 'logs-test-default', - managed: false, - children: [], - inheritedFields: [], - fields: [], - processing: [ - { - config: { - type: 'grok', - field: 'message', - patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', - ], - }, + name: 'logs-test-default', + inherited_fields: {}, + stream: { + ingest: { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, + }, + ], + routing: [], }, - ], + }, }); }); @@ -121,10 +127,10 @@ export default function ({ getService }: FtrProviderContext) { it('Allows removing processing on classic streams', async () => { const response = await putStream(supertest, 'logs-test-default', { - managed: false, - children: [], - fields: [], - processing: [], + ingest: { + processing: [], + routing: [], + }, }); expect(response).to.have.property('acknowledged', true); }); @@ -154,8 +160,8 @@ export default function ({ getService }: FtrProviderContext) { it('Allows deleting classic streams', async () => { await deleteStream(supertest, 'logs-test-default'); const streams = await listStreams(supertest); - const classicStream = streams.definitions.find( - (stream: JsonObject) => stream.id === 'logs-test-default' + const classicStream = streams.streams.find( + (stream: JsonObject) => stream.name === 'logs-test-default' ); expect(classicStream).to.eql(undefined); }); diff --git a/x-pack/test/api_integration/apis/streams/enrichment.ts b/x-pack/test/api_integration/apis/streams/enrichment.ts index 22293b09fbbbb..e9fb604438ee6 100644 --- a/x-pack/test/api_integration/apis/streams/enrichment.ts +++ b/x-pack/test/api_integration/apis/streams/enrichment.ts @@ -6,8 +6,8 @@ */ import expect from '@kbn/expect'; -import { JsonObject } from '@kbn/utility-types'; import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/types'; +import { WiredStreamConfigDefinition } from '@kbn/streams-schema'; import { enableStreams, fetchDocument, indexDocument, putStream } from './helpers/requests'; import { FtrProviderContext } from '../../ftr_provider_context'; import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; @@ -29,53 +29,54 @@ export default function ({ getService }: FtrProviderContext) { }); it('Place processing steps', async () => { - const body = { - fields: [ - { - name: '@timestamp', - type: 'date', - }, - { - name: 'message', - type: 'match_only_text', - }, - { - name: 'message2', - type: 'match_only_text', - }, - { - name: 'host.name', - type: 'keyword', - }, - { - name: 'log.level', - type: 'keyword', - }, - ], - processing: [ - { - config: { - type: 'grok', - field: 'message', - patterns: [ - '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', - ], + const body: WiredStreamConfigDefinition = { + ingest: { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: [ + '%{TIMESTAMP_ISO8601:inner_timestamp} %{LOGLEVEL:log.level} %{GREEDYDATA:message2}', + ], + }, + }, }, - } as JsonObject, - { - config: { - type: 'dissect', - field: 'message2', - pattern: '%{log.logger} %{message3}', + { + config: { + dissect: { + field: 'message2', + pattern: '%{log.logger} %{message3}', + }, + }, + condition: { + field: 'log.level', + operator: 'eq', + value: 'info', + }, }, - condition: { - field: 'log.level', - operator: 'eq', - value: 'info', + ], + routing: [], + wired: { + fields: { + '@timestamp': { + type: 'date', + }, + message: { + type: 'match_only_text', + }, + message2: { + type: 'match_only_text', + }, + 'host.name': { + type: 'keyword', + }, + 'log.level': { + type: 'keyword', + }, }, - } as JsonObject, - ], - children: [], + }, + }, }; const response = await putStream(supertest, 'logs', body); expect(response).to.have.property('acknowledged', true); diff --git a/x-pack/test/api_integration/apis/streams/flush_config.ts b/x-pack/test/api_integration/apis/streams/flush_config.ts index f3fa79e92d457..b04b5ff7959a9 100644 --- a/x-pack/test/api_integration/apis/streams/flush_config.ts +++ b/x-pack/test/api_integration/apis/streams/flush_config.ts @@ -7,84 +7,99 @@ import expect from '@kbn/expect'; import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { StreamDefinition } from '@kbn/streams-schema'; import { deleteStream, enableStreams, indexDocument } from './helpers/requests'; import { FtrProviderContext } from '../../ftr_provider_context'; import { waitForDocumentInIndex } from '../../../alerting_api_integration/observability/helpers/alerting_wait_for_helpers'; import { cleanUpRootStream } from './helpers/cleanup'; -const streams = [ +const streams: StreamDefinition[] = [ { - processing: [], - fields: [ - { - name: '@timestamp', - type: 'date', - }, - { - name: 'message', - type: 'match_only_text', - }, - { - name: 'host.name', - type: 'keyword', - }, - { - name: 'log.level', - type: 'keyword', - }, - ], - children: [ - { - id: 'logs.test', - condition: { - and: [ - { - field: 'numberfield', - operator: 'gt', - value: 15, + name: 'logs', + stream: { + ingest: { + processing: [], + wired: { + fields: { + '@timestamp': { + type: 'date', }, - ], - }, - }, - { - id: 'logs.test2', - condition: { - and: [ - { - field: 'field2', - operator: 'eq', - value: 'abc', + message: { + type: 'match_only_text', + }, + 'host.name': { + type: 'keyword', + }, + 'log.level': { + type: 'keyword', }, - ], + }, }, + routing: [ + { + name: 'logs.test', + condition: { + and: [ + { + field: 'numberfield', + operator: 'gt', + value: 15, + }, + ], + }, + }, + { + name: 'logs.test2', + condition: { + and: [ + { + field: 'field2', + operator: 'eq', + value: 'abc', + }, + ], + }, + }, + ], }, - ], - id: 'logs', + }, }, { - id: 'logs.test', - processing: [], - fields: [], - children: [], + name: 'logs.test', + stream: { + ingest: { + processing: [], + wired: { + fields: {}, + }, + routing: [], + }, + }, }, { - id: 'logs.test2', - processing: [ - { - config: { - type: 'grok', - field: 'message', - patterns: ['%{NUMBER:numberfield}'], + name: 'logs.test2', + stream: { + ingest: { + processing: [ + { + config: { + grok: { + field: 'message', + patterns: ['%{NUMBER:numberfield}'], + }, + }, + }, + ], + wired: { + fields: { + numberfield: { + type: 'long', + }, + }, }, + routing: [], }, - ], - fields: [ - { - name: 'numberfield', - type: 'long', - }, - ], - children: [], + }, }, ]; @@ -107,9 +122,9 @@ export default function ({ getService }: FtrProviderContext) { }); it('PUTs all streams one by one without errors', async () => { - for (const { id: streamId, ...stream } of streams) { + for (const { name, stream } of streams) { const response = await supertest - .put(`/api/streams/${streamId}`) + .put(`/api/streams/${name}`) .set('kbn-xsrf', 'xxx') .send(stream) .expect(200); diff --git a/x-pack/test/api_integration/apis/streams/full_flow.ts b/x-pack/test/api_integration/apis/streams/full_flow.ts index aad931ab11816..fd46df8002d74 100644 --- a/x-pack/test/api_integration/apis/streams/full_flow.ts +++ b/x-pack/test/api_integration/apis/streams/full_flow.ts @@ -60,9 +60,7 @@ export default function ({ getService }: FtrProviderContext) { it('Fork logs to logs.nginx', async () => { const body = { stream: { - id: 'logs.nginx', - fields: [], - processing: [], + name: 'logs.nginx', }, condition: { field: 'log.logger', @@ -99,9 +97,7 @@ export default function ({ getService }: FtrProviderContext) { it('Fork logs to logs.nginx.access', async () => { const body = { stream: { - id: 'logs.nginx.access', - fields: [], - processing: [], + name: 'logs.nginx.access', }, condition: { field: 'log.level', operator: 'eq', value: 'info' }, }; @@ -139,9 +135,7 @@ export default function ({ getService }: FtrProviderContext) { it('Fork logs to logs.nginx.error with invalid condition', async () => { const body = { stream: { - id: 'logs.nginx.error', - fields: [], - processing: [], + name: 'logs.nginx.error', }, condition: { field: 'log', operator: 'eq', value: 'error' }, }; @@ -181,9 +175,7 @@ export default function ({ getService }: FtrProviderContext) { it('Fork logs to logs.number-test', async () => { const body = { stream: { - id: 'logs.number-test', - fields: [], - processing: [], + name: 'logs.number-test', }, condition: { field: 'code', operator: 'gte', value: '500' }, }; @@ -224,9 +216,7 @@ export default function ({ getService }: FtrProviderContext) { it('Fork logs to logs.string-test', async () => { const body = { stream: { - id: 'logs.string-test', - fields: [], - processing: [], + name: 'logs.string-test', }, condition: { or: [ diff --git a/x-pack/test/api_integration/apis/streams/helpers/requests.ts b/x-pack/test/api_integration/apis/streams/helpers/requests.ts index 43e7f02b7a750..ae3a325b5f9b4 100644 --- a/x-pack/test/api_integration/apis/streams/helpers/requests.ts +++ b/x-pack/test/api_integration/apis/streams/helpers/requests.ts @@ -9,6 +9,7 @@ import { JsonObject } from '@kbn/utility-types'; import { Agent } from 'supertest'; import expect from '@kbn/expect'; import { SearchTotalHits } from '@elastic/elasticsearch/lib/api/typesWithBodyKey'; +import { StreamConfigDefinition } from '@kbn/streams-schema'; export async function enableStreams(supertest: Agent) { const req = supertest.post('/api/streams/_enable').set('kbn-xsrf', 'xxx'); @@ -36,7 +37,7 @@ export async function forkStream(supertest: Agent, root: string, body: JsonObjec return response.body; } -export async function putStream(supertest: Agent, name: string, body: JsonObject) { +export async function putStream(supertest: Agent, name: string, body: StreamConfigDefinition) { const req = supertest.put(`/api/streams/${name}`).set('kbn-xsrf', 'xxx'); const response = await req.send(body).expect(200); return response.body; diff --git a/x-pack/test/tsconfig.json b/x-pack/test/tsconfig.json index 4bfe01a721695..4783e818d08b4 100644 --- a/x-pack/test/tsconfig.json +++ b/x-pack/test/tsconfig.json @@ -191,6 +191,7 @@ "@kbn/sse-utils-server", "@kbn/gen-ai-functional-testing", "@kbn/integration-assistant-plugin", - "@kbn/core-elasticsearch-server" + "@kbn/core-elasticsearch-server", + "@kbn/streams-schema" ] } diff --git a/yarn.lock b/yarn.lock index c047674bd6dbb..498afd947f290 100644 --- a/yarn.lock +++ b/yarn.lock @@ -7559,6 +7559,10 @@ version "0.0.0" uid "" +"@kbn/streams-schema@link:x-pack/packages/kbn-streams-schema": + version "0.0.0" + uid "" + "@kbn/synthetics-e2e@link:x-pack/solutions/observability/plugins/synthetics/e2e": version "0.0.0" uid ""