From 6429c53597b203d0f4435909333131b00c01a996 Mon Sep 17 00:00:00 2001 From: Marco Antonio Ghiani Date: Wed, 15 Jan 2025 16:09:02 +0100 Subject: [PATCH] =?UTF-8?q?[Streams=20=F0=9F=8C=8A]=20Add=20processors=20v?= =?UTF-8?q?alidation=20and=20simulation=20gate=20(#206566)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## 📓 Summary Closes https://github.com/elastic/streams-program/issues/66 This work adds changes to prevent invalid processors from being submitted. The main rule is that a simulation is performed before any add/edit submission to guarantee that the processor config is valid. This work also updates the simulation API to detect whether there is a non-additive change in any simulated document. @patpscal error reporting UI for add/edit is different since the simulator is not visible for edit, I used a callout but we can easily update this once there is a final design in place. ### Form validation + simulation https://github.com/user-attachments/assets/f7fc351b-6efc-4500-8490-b7f1c85139bf ### Non-additive processors https://github.com/user-attachments/assets/47b5b739-c2cf-4a74-93a8-6ef43521c7d4 --- .../shared/kbn-object-utils/README.md | 1 + .../src/calculate_object_diff.test.ts | 23 +++- .../src/calculate_object_diff.ts | 23 +++- .../streams/errors/non_additive_processor.ts | 13 ++ .../routes/streams/processing/simulate.ts | 69 ++++++---- .../dissect/dissect_pattern_definition.tsx | 15 ++- .../flyout/dissect/index.tsx | 23 +--- .../flyout/grok/grok_patterns_editor.tsx | 91 +++++++++---- .../flyout/grok/index.tsx | 23 +--- .../flyout/ignore_toggles.tsx | 56 ++++++++ .../stream_detail_enrichment/flyout/index.tsx | 44 ++++-- .../flyout/processor_field_selector.tsx | 20 ++- .../flyout/processor_outcome_preview.tsx | 127 ++++++------------ .../flyout/toggle_field.tsx | 4 +- .../hooks/use_processing_simulator.ts | 106 +++++++++++++++ .../stream_detail_enrichment/page_content.tsx | 1 + .../processors_list.tsx | 10 +- .../stream_detail_enrichment/utils.ts | 21 ++- 18 files changed, 454 insertions(+), 216 deletions(-) create mode 100644 x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts create mode 100644 x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/ignore_toggles.tsx create mode 100644 x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts diff --git a/src/platform/packages/shared/kbn-object-utils/README.md b/src/platform/packages/shared/kbn-object-utils/README.md index c3a94576d727f..184fb58429690 100644 --- a/src/platform/packages/shared/kbn-object-utils/README.md +++ b/src/platform/packages/shared/kbn-object-utils/README.md @@ -44,6 +44,7 @@ Result: }, }, }, + updated: {} } */ ``` diff --git a/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.test.ts b/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.test.ts index 340be7783aef3..44fe49127f9e3 100644 --- a/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.test.ts +++ b/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.test.ts @@ -10,25 +10,34 @@ import { calculateObjectDiff } from './calculate_object_diff'; describe('calculateObjectDiff', () => { - it('should return the added and removed parts between 2 objects', () => { - const { added, removed } = calculateObjectDiff({ alpha: 1, beta: 2 }, { alpha: 1, gamma: 3 }); + it('should return the added, removed and updated parts between 2 objects', () => { + const { added, removed, updated } = calculateObjectDiff( + { alpha: 1, beta: 2, sigma: 4 }, + { alpha: 1, gamma: 3, sigma: 5 } + ); expect(added).toEqual({ gamma: 3 }); expect(removed).toEqual({ beta: 2 }); + expect(updated).toEqual({ sigma: 5 }); }); it('should work on nested objects', () => { - const { added, removed } = calculateObjectDiff( - { alpha: 1, beta: { gamma: 2, delta: { sigma: 7 } } }, - { alpha: 1, beta: { gamma: 2, eta: 4 } } + const { added, removed, updated } = calculateObjectDiff( + { alpha: 1, beta: { gamma: 2, delta: { sigma: 7, omega: 8 } } }, + { alpha: 1, beta: { gamma: 2, delta: { omega: 9 }, eta: 4 } } ); expect(added).toEqual({ beta: { eta: 4 } }); expect(removed).toEqual({ beta: { delta: { sigma: 7 } } }); + expect(updated).toEqual({ beta: { delta: { omega: 9 } } }); }); - it('should return empty added/removed when the objects are the same', () => { - const { added, removed } = calculateObjectDiff({ alpha: 1, beta: 2 }, { alpha: 1, beta: 2 }); + it('should return empty added/removed/updated when the objects are the same', () => { + const { added, removed, updated } = calculateObjectDiff( + { alpha: 1, beta: 2 }, + { alpha: 1, beta: 2 } + ); expect(added).toEqual({}); expect(removed).toEqual({}); + expect(updated).toEqual({}); }); }); diff --git a/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.ts b/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.ts index 33fe441de5ac6..81860685a2ce0 100644 --- a/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.ts +++ b/src/platform/packages/shared/kbn-object-utils/src/calculate_object_diff.ts @@ -22,6 +22,9 @@ type DeepPartial = { interface ObjectDiffResult { added: DeepPartial; removed: DeepPartial; + updated: { + [K in keyof TBase & keyof TCompare]?: TBase[K] extends TCompare[K] ? never : TCompare[K]; + }; } /** @@ -34,16 +37,18 @@ export function calculateObjectDiff( oldObj: TBase, newObj?: TCompare ): ObjectDiffResult { - const added: DeepPartial = {}; - const removed: DeepPartial = {}; + const added: ObjectDiffResult['added'] = {}; + const removed: ObjectDiffResult['removed'] = {}; + const updated: ObjectDiffResult['updated'] = {}; - if (!newObj) return { added, removed }; + if (!newObj) return { added, removed, updated }; function diffRecursive( base: Obj, compare: Obj, addedMap: DeepPartial, - removedMap: DeepPartial + removedMap: DeepPartial, + updatedMap: DeepPartial ): void { for (const key in compare) { if (!(key in base)) { @@ -51,14 +56,18 @@ export function calculateObjectDiff( } else if (isPlainObject(base[key]) && isPlainObject(compare[key])) { addedMap[key] = {}; removedMap[key] = {}; + updatedMap[key] = {}; diffRecursive( base[key] as Obj, compare[key] as Obj, addedMap[key] as Obj, - removedMap[key] as Obj + removedMap[key] as Obj, + updatedMap[key] as Obj ); if (isEmpty(addedMap[key])) delete addedMap[key]; if (isEmpty(removedMap[key])) delete removedMap[key]; + } else if (base[key] !== compare[key]) { + updatedMap[key] = compare[key]; } } @@ -69,7 +78,7 @@ export function calculateObjectDiff( } } - diffRecursive(oldObj, newObj, added, removed); + diffRecursive(oldObj, newObj, added, removed, updated); - return { added, removed }; + return { added, removed, updated }; } diff --git a/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts new file mode 100644 index 0000000000000..5a237fcf42f1e --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams/server/lib/streams/errors/non_additive_processor.ts @@ -0,0 +1,13 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +export class NonAdditiveProcessor extends Error { + constructor(message: string) { + super(message); + this.name = 'NonAdditiveProcessor'; + } +} diff --git a/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/simulate.ts b/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/simulate.ts index 4b0c7a14ed861..fc19371b749ec 100644 --- a/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/simulate.ts +++ b/x-pack/solutions/observability/plugins/streams/server/routes/streams/processing/simulate.ts @@ -13,6 +13,8 @@ import { IngestSimulateResponse, IngestSimulateSimulateDocumentResult, } from '@elastic/elasticsearch/lib/api/types'; +import { isEmpty } from 'lodash'; +import { NonAdditiveProcessor } from '../../../lib/streams/errors/non_additive_processor'; import { SimulationFailed } from '../../../lib/streams/errors/simulation_failed'; import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing'; import { createServerRoute } from '../../create_server_route'; @@ -62,8 +64,17 @@ export const simulateProcessorRoute = createServerRoute({ throw new SimulationFailed(error); } + const simulationDiffs = computeSimulationDiffs(simulationResult, docs); + + const updatedFields = computeUpdatedFields(simulationDiffs); + if (!isEmpty(updatedFields)) { + throw new NonAdditiveProcessor( + `The processor is not additive to the documents. It might update fields [${updatedFields.join()}]` + ); + } + const documents = computeSimulationDocuments(simulationResult, docs); - const detectedFields = computeDetectedFields(simulationResult, docs); + const detectedFields = computeDetectedFields(simulationDiffs); const successRate = computeSuccessRate(simulationResult); const failureRate = 1 - successRate; @@ -78,7 +89,7 @@ export const simulateProcessorRoute = createServerRoute({ throw notFound(error); } - if (error instanceof SimulationFailed) { + if (error instanceof SimulationFailed || error instanceof NonAdditiveProcessor) { throw badRequest(error); } @@ -87,6 +98,35 @@ export const simulateProcessorRoute = createServerRoute({ }, }); +const computeSimulationDiffs = ( + simulation: IngestSimulateResponse, + sampleDocs: Array<{ _source: Record }> +) => { + // Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval + const samplesToSimulationMap = new Map(simulation.docs.map((doc, id) => [doc, sampleDocs[id]])); + + const diffs = simulation.docs.filter(isSuccessfulDocument).map((doc) => { + const sample = samplesToSimulationMap.get(doc); + if (sample) { + return calculateObjectDiff(sample._source, doc.processor_results.at(-1)?.doc?._source); + } + + return calculateObjectDiff({}); + }); + + return diffs; +}; + +const computeUpdatedFields = (simulationDiff: ReturnType) => { + const diffs = simulationDiff + .map((simulatedDoc) => flattenObject(simulatedDoc.updated)) + .flatMap(Object.keys); + + const uniqueFields = [...new Set(diffs)]; + + return uniqueFields; +}; + const computeSimulationDocuments = ( simulation: IngestSimulateResponse, sampleDocs: Array<{ _source: Record }> @@ -108,31 +148,14 @@ const computeSimulationDocuments = ( }; const computeDetectedFields = ( - simulation: IngestSimulateResponse, - sampleDocs: Array<{ _source: Record }> + simulationDiff: ReturnType ): Array<{ name: string; type: FieldDefinitionConfig['type'] | 'unmapped'; }> => { - // Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval - const samplesToSimulationMap = new Map(simulation.docs.map((doc, id) => [doc, sampleDocs[id]])); - - const diffs = simulation.docs - .filter(isSuccessfulDocument) - .map((doc) => { - const sample = samplesToSimulationMap.get(doc); - if (sample) { - const { added } = calculateObjectDiff( - sample._source, - doc.processor_results.at(-1)?.doc?._source - ); - return flattenObject(added); - } - - return {}; - }) - .map(Object.keys) - .flat(); + const diffs = simulationDiff + .map((simulatedDoc) => flattenObject(simulatedDoc.added)) + .flatMap(Object.keys); const uniqueFields = [...new Set(diffs)]; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/dissect/dissect_pattern_definition.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/dissect/dissect_pattern_definition.tsx index 751a522c0610b..80d9245c61971 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/dissect/dissect_pattern_definition.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/dissect/dissect_pattern_definition.tsx @@ -17,7 +17,17 @@ export const DissectPatternDefinition = () => { const { core } = useKibana(); const esDocUrl = core.docLinks.links.ingest.dissectKeyModifiers; - const { field, fieldState } = useController({ name: 'pattern' }); + const { field, fieldState } = useController({ + name: 'pattern', + rules: { + required: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.dissectPatternRequiredError', + { defaultMessage: 'A pattern is required.' } + ), + }, + }); + + const { invalid, error } = fieldState; return ( { }} /> } - isInvalid={fieldState.invalid} + isInvalid={invalid} + error={error?.message} fullWidth > { return ( @@ -27,24 +26,8 @@ export const DissectProcessorForm = () => { - - + + ); }; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/grok_patterns_editor.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/grok_patterns_editor.tsx index efefc6870d5b4..dff7a12781f4f 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/grok_patterns_editor.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/grok_patterns_editor.tsx @@ -6,7 +6,13 @@ */ import React from 'react'; -import { useFormContext, useFieldArray, UseFormRegisterReturn } from 'react-hook-form'; +import { + useFormContext, + useFieldArray, + UseFormRegisterReturn, + FieldError, + FieldErrorsImpl, +} from 'react-hook-form'; import { DragDropContextProps, EuiFormRow, @@ -24,11 +30,23 @@ import { SortableList } from '../../sortable_list'; import { GrokFormState } from '../../types'; export const GrokPatternsEditor = () => { - const { register } = useFormContext(); + const { + formState: { errors }, + register, + } = useFormContext(); const { fields, append, remove, move } = useFieldArray>({ name: 'patterns', }); + const fieldsWithError = fields.map((field, id) => { + return { + ...field, + error: (errors.patterns as unknown as FieldErrorsImpl[])?.[id]?.value as + | FieldError + | undefined, + }; + }); + const handlerPatternDrag: DragDropContextProps['onDragEnd'] = ({ source, destination }) => { if (source && destination) { move(source.index, destination.index); @@ -50,13 +68,18 @@ export const GrokPatternsEditor = () => { > - {fields.map((field, idx) => ( + {fieldsWithError.map((field, idx) => ( ))} @@ -73,25 +96,28 @@ export const GrokPatternsEditor = () => { }; interface DraggablePatternInputProps { + field: GrokFormState['patterns'][number] & { id: string; error?: FieldError }; idx: number; inputProps: UseFormRegisterReturn<`patterns.${number}.value`>; onRemove: ((idx: number) => void) | null; - pattern: GrokFormState['patterns'][number] & { id: string }; } const DraggablePatternInput = ({ + field, idx, inputProps, onRemove, - pattern, }: DraggablePatternInputProps) => { const { ref, ...inputPropsWithoutRef } = inputProps; + const { error, id } = field; + + const isInvalid = Boolean(error); return ( {(provided) => ( - - - - - - {onRemove && ( - onRemove(idx)} - aria-label={i18n.translate( - 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.grokEditor.removePattern', - { defaultMessage: 'Remove grok pattern' } - )} + + + + + + - )} - + {onRemove && ( + onRemove(idx)} + aria-label={i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.grokEditor.removePattern', + { defaultMessage: 'Remove grok pattern' } + )} + /> + )} + + )} ); diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/index.tsx index 0adfe8faa4dfc..b025cda42055d 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/grok/index.tsx @@ -7,13 +7,12 @@ import React from 'react'; import { EuiSpacer } from '@elastic/eui'; -import { i18n } from '@kbn/i18n'; import { GrokPatternDefinition } from './grok_pattern_definition'; import { GrokPatternsEditor } from './grok_patterns_editor'; import { ProcessorFieldSelector } from '../processor_field_selector'; -import { ToggleField } from '../toggle_field'; import { OptionalFieldsAccordion } from '../optional_fields_accordion'; import { ProcessorConditionEditor } from '../processor_condition_editor'; +import { IgnoreFailureToggle, IgnoreMissingToggle } from '../ignore_toggles'; export const GrokProcessorForm = () => { return ( @@ -27,24 +26,8 @@ export const GrokProcessorForm = () => { - - + + ); }; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/ignore_toggles.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/ignore_toggles.tsx new file mode 100644 index 0000000000000..88bbc0410e7c7 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/ignore_toggles.tsx @@ -0,0 +1,56 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import React from 'react'; +import { i18n } from '@kbn/i18n'; +import { useWatch } from 'react-hook-form'; +import { EuiCode, EuiText } from '@elastic/eui'; +import { FormattedMessage } from '@kbn/i18n-react'; +import { ToggleField } from './toggle_field'; + +export const IgnoreFailureToggle = () => { + const value = useWatch({ name: 'ignore_failure' }); + + return ( + + ignore_failure, + }} + /> + + ) : undefined + } + /> + ); +}; + +export const IgnoreMissingToggle = () => { + return ( + + ); +}; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx index d8f0d2f0c5ffa..bd04e2cbf959e 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/index.tsx @@ -19,6 +19,7 @@ import { DangerZone } from './danger_zone'; import { DissectProcessorForm } from './dissect'; import { GrokProcessorForm } from './grok'; import { convertFormStateToProcessing, getDefaultFormState } from '../utils'; +import { useProcessingSimulator } from '../hooks/use_processing_simulator'; const ProcessorOutcomePreview = dynamic(() => import(/* webpackChunkName: "management_processor_outcome" */ './processor_outcome_preview').then( @@ -29,11 +30,11 @@ const ProcessorOutcomePreview = dynamic(() => ); export interface ProcessorFlyoutProps { + definition: ReadStreamDefinition; onClose: () => void; } export interface AddProcessorFlyoutProps extends ProcessorFlyoutProps { - definition: ReadStreamDefinition; onAddProcessor: (newProcessing: ProcessingDefinition, newFields?: DetectedField[]) => void; } export interface EditProcessorFlyoutProps extends ProcessorFlyoutProps { @@ -49,7 +50,7 @@ export function AddProcessorFlyout({ }: AddProcessorFlyoutProps) { const defaultValues = useMemo(() => getDefaultFormState('grok'), []); - const methods = useForm({ defaultValues }); + const methods = useForm({ defaultValues, mode: 'onChange' }); const formFields = methods.watch(); @@ -58,11 +59,21 @@ export function AddProcessorFlyout({ [defaultValues, formFields] ); - const handleSubmit: SubmitHandler = (data) => { + const { error, isLoading, refreshSamples, simulation, samples, simulate } = + useProcessingSimulator({ + definition, + condition: { field: formFields.field, operator: 'exists' }, + }); + + const handleSubmit: SubmitHandler = async (data) => { const processingDefinition = convertFormStateToProcessing(data); - onAddProcessor(processingDefinition, data.detected_fields); - onClose(); + simulate(processingDefinition).then((responseBody) => { + if (responseBody instanceof Error) return; + + onAddProcessor(processingDefinition, data.detected_fields); + onClose(); + }); }; return ( @@ -74,7 +85,10 @@ export function AddProcessorFlyout({ { defaultMessage: 'Add processor' } )} confirmButton={ - + {i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.confirmAddProcessor', { defaultMessage: 'Add processor' } @@ -90,7 +104,16 @@ export function AddProcessorFlyout({ {formFields.type === 'dissect' && } - + ); @@ -107,7 +130,7 @@ export function EditProcessorFlyout({ [processor] ); - const methods = useForm({ defaultValues }); + const methods = useForm({ defaultValues, mode: 'onChange' }); const formFields = methods.watch(); @@ -146,7 +169,10 @@ export function EditProcessorFlyout({ /> } confirmButton={ - + {i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.confirmEditProcessor', { defaultMessage: 'Update processor' } diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_field_selector.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_field_selector.tsx index da4a455f894ba..3fb52254f5d45 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_field_selector.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_field_selector.tsx @@ -8,11 +8,21 @@ import { EuiFormRow, EuiFieldText } from '@elastic/eui'; import { i18n } from '@kbn/i18n'; import React from 'react'; -import { useFormContext } from 'react-hook-form'; +import { useController } from 'react-hook-form'; export const ProcessorFieldSelector = () => { - const { register } = useFormContext(); - const { ref, ...inputProps } = register(`field`); + const { field, fieldState } = useController({ + name: 'field', + rules: { + required: i18n.translate( + 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.fieldSelectorRequiredError', + { defaultMessage: 'A field value is required.' } + ), + }, + }); + + const { ref, ...inputProps } = field; + const { invalid, error } = fieldState; return ( { 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.fieldSelectorHelpText', { defaultMessage: 'Field to search for matches.' } )} + isInvalid={invalid} + error={error?.message} > - + ); }; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx index 45e47157dd69a..47a08264b70ab 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/processor_outcome_preview.tsx @@ -35,107 +35,45 @@ import { useController, useFieldArray } from 'react-hook-form'; import { css } from '@emotion/react'; import { flattenObject } from '@kbn/object-utils'; import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public'; -import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch'; import { useKibana } from '../../../hooks/use_kibana'; import { StreamsAppSearchBar, StreamsAppSearchBarProps } from '../../streams_app_search_bar'; import { PreviewTable } from '../../preview_table'; -import { convertFormStateToProcessing, isCompleteProcessingDefinition } from '../utils'; +import { convertFormStateToProcessing } from '../utils'; import { DetectedField, ProcessorFormState } from '../types'; +import { UseProcessingSimulatorReturnType } from '../hooks/use_processing_simulator'; interface ProcessorOutcomePreviewProps { definition: ReadStreamDefinition; formFields: ProcessorFormState; + isLoading: UseProcessingSimulatorReturnType['isLoading']; + simulation: UseProcessingSimulatorReturnType['simulation']; + samples: UseProcessingSimulatorReturnType['samples']; + onRefreshSamples: UseProcessingSimulatorReturnType['refreshSamples']; + onSimulate: UseProcessingSimulatorReturnType['simulate']; + simulationError: UseProcessingSimulatorReturnType['error']; } export const ProcessorOutcomePreview = ({ definition, formFields, + isLoading, + simulation, + samples, + onRefreshSamples, + onSimulate, + simulationError, }: ProcessorOutcomePreviewProps) => { const { dependencies } = useKibana(); - const { - data, - streams: { streamsRepositoryClient }, - } = dependencies.start; + const { data } = dependencies.start; - const { - timeRange, - absoluteTimeRange: { start, end }, - setTimeRange, - } = useDateRange({ data }); + const { timeRange, setTimeRange } = useDateRange({ data }); const [selectedDocsFilter, setSelectedDocsFilter] = useState('outcome_filter_all'); - const { - value: samples, - loading: isLoadingSamples, - refresh: refreshSamples, - } = useStreamsAppFetch( - ({ signal }) => { - if (!definition || !formFields.field) { - return { documents: [] }; - } - - return streamsRepositoryClient.fetch('POST /api/streams/{id}/_sample', { - signal, - params: { - path: { id: definition.name }, - body: { - condition: { field: formFields.field, operator: 'exists' }, - start: start?.valueOf(), - end: end?.valueOf(), - number: 100, - }, - }, - }); - }, - [definition, formFields.field, streamsRepositoryClient, start, end], - { disableToastOnError: true } - ); - - const { - value: simulation, - loading: isLoadingSimulation, - error, - refresh: refreshSimulation, - } = useStreamsAppFetch( - async ({ signal }) => { - if (!definition || !samples || isEmpty(samples.documents)) { - return Promise.resolve(null); - } - - const processingDefinition = convertFormStateToProcessing(formFields); - - if (!isCompleteProcessingDefinition(processingDefinition)) { - return Promise.resolve(null); - } - - const simulationResult = await streamsRepositoryClient.fetch( - 'POST /api/streams/{id}/processing/_simulate', - { - signal, - params: { - path: { id: definition.name }, - body: { - documents: samples.documents as Array>, - processing: [processingDefinition], - }, - }, - } - ); - - return simulationResult; - }, - [definition, samples, streamsRepositoryClient], - { disableToastOnError: true } - ); - - const simulationError = error as IHttpFetchError | undefined; - const simulationDocuments = useMemo(() => { if (!simulation?.documents) { - const docs = (samples?.documents ?? []) as Array>; - return docs.map((doc) => flattenObject(doc)); + return samples.map((doc) => flattenObject(doc)); } const filterDocuments = (filter: DocsFilterOption) => { @@ -151,11 +89,24 @@ export const ProcessorOutcomePreview = ({ }; return filterDocuments(selectedDocsFilter).map((doc) => doc.value); - }, [samples?.documents, simulation?.documents, selectedDocsFilter]); + }, [samples, simulation?.documents, selectedDocsFilter]); + + const detectedFieldsColumns = useMemo( + () => + simulation?.detected_fields ? simulation.detected_fields.map((field) => field.name) : [], + [simulation?.detected_fields] + ); - const detectedFieldsColumns = simulation?.detected_fields - ? simulation.detected_fields.map((field) => field.name) - : []; + const tableColumns = useMemo(() => { + switch (selectedDocsFilter) { + case 'outcome_filter_unmatched': + return [formFields.field]; + case 'outcome_filter_matched': + case 'outcome_filter_all': + default: + return [formFields.field, ...detectedFieldsColumns]; + } + }, [formFields.field, detectedFieldsColumns, selectedDocsFilter]); const detectedFieldsEnabled = isWiredReadStream(definition) && simulation && !isEmpty(simulation.detected_fields); @@ -175,8 +126,8 @@ export const ProcessorOutcomePreview = ({ iconType="play" color="accentSecondary" size="s" - onClick={refreshSimulation} - isLoading={isLoadingSimulation} + onClick={() => onSimulate(convertFormStateToProcessing(formFields))} + isLoading={isLoading} > {i18n.translate( 'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.runSimulation', @@ -191,16 +142,16 @@ export const ProcessorOutcomePreview = ({ onDocsFilterChange={setSelectedDocsFilter} timeRange={timeRange} onTimeRangeChange={setTimeRange} - onTimeRangeRefresh={refreshSamples} + onTimeRangeRefresh={onRefreshSamples} simulationFailureRate={simulation?.failure_rate} simulationSuccessRate={simulation?.success_rate} /> ); diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/toggle_field.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/toggle_field.tsx index 885934dc48933..47b38ff0dbcef 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/toggle_field.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/flyout/toggle_field.tsx @@ -7,10 +7,10 @@ import React from 'react'; import { useController } from 'react-hook-form'; -import { EuiFormRow, EuiSwitch, htmlIdGenerator } from '@elastic/eui'; +import { EuiFormRow, EuiFormRowProps, EuiSwitch, htmlIdGenerator } from '@elastic/eui'; interface ToggleFieldProps { - helpText?: string; + helpText?: EuiFormRowProps['helpText']; id?: string; label: string; name: string; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts new file mode 100644 index 0000000000000..1ff63fbc484e6 --- /dev/null +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/hooks/use_processing_simulator.ts @@ -0,0 +1,106 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +import { useAbortController } from '@kbn/observability-utils-browser/hooks/use_abort_controller'; +import { ReadStreamDefinition, ProcessingDefinition, Condition } from '@kbn/streams-schema'; +import useAsyncFn from 'react-use/lib/useAsyncFn'; +import { IHttpFetchError, ResponseErrorBody } from '@kbn/core/public'; +import { useDateRange } from '@kbn/observability-utils-browser/hooks/use_date_range'; +import { APIReturnType } from '@kbn/streams-plugin/public/api'; +import { useStreamsAppFetch } from '../../../hooks/use_streams_app_fetch'; +import { useKibana } from '../../../hooks/use_kibana'; + +type Simulation = APIReturnType<'POST /api/streams/{id}/processing/_simulate'>; + +export interface UseProcessingSimulatorReturnType { + error?: IHttpFetchError; + isLoading: boolean; + refreshSamples: () => void; + samples: Array>; + simulate: (processing: ProcessingDefinition) => Promise; + simulation?: Simulation | null; +} + +export const useProcessingSimulator = ({ + definition, + condition, +}: { + definition: ReadStreamDefinition; + condition?: Condition; +}): UseProcessingSimulatorReturnType => { + const { dependencies } = useKibana(); + const { + data, + streams: { streamsRepositoryClient }, + } = dependencies.start; + + const { + absoluteTimeRange: { start, end }, + } = useDateRange({ data }); + + const abortController = useAbortController(); + + const serializedCondition = JSON.stringify(condition); + + const { + loading: isLoadingSamples, + value: samples, + refresh: refreshSamples, + } = useStreamsAppFetch( + ({ signal }) => { + if (!definition) { + return { documents: [] }; + } + + return streamsRepositoryClient.fetch('POST /api/streams/{id}/_sample', { + signal, + params: { + path: { id: definition.name }, + body: { + condition, + start: start?.valueOf(), + end: end?.valueOf(), + number: 100, + }, + }, + }); + }, + [definition, streamsRepositoryClient, start, end, serializedCondition], + { disableToastOnError: true } + ); + + const sampleDocs = (samples?.documents ?? []) as Array>; + + const [{ loading: isLoadingSimulation, error, value }, simulate] = useAsyncFn( + (processingDefinition: ProcessingDefinition) => { + if (!definition) { + return Promise.resolve(null); + } + + return streamsRepositoryClient.fetch('POST /api/streams/{id}/processing/_simulate', { + signal: abortController.signal, + params: { + path: { id: definition.name }, + body: { + documents: sampleDocs, + processing: [processingDefinition], + }, + }, + }); + }, + [definition, sampleDocs] + ); + + return { + isLoading: isLoadingSamples || isLoadingSimulation, + error: error as IHttpFetchError | undefined, + refreshSamples, + simulate, + simulation: value, + samples: sampleDocs, + }; +}; diff --git a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/page_content.tsx b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/page_content.tsx index 6ded439d24385..5d4781c6a7ad3 100644 --- a/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/page_content.tsx +++ b/x-pack/solutions/observability/plugins/streams_app/public/components/stream_detail_enrichment/page_content.tsx @@ -118,6 +118,7 @@ export function StreamDetailEnrichmentContent({ { if (formState.type === 'grok') { - const { condition, patterns, ...grokConfig } = formState; + const { condition, patterns, field, pattern_definitions, ignore_failure, ignore_missing } = + formState; return { condition: isCompleteCondition(condition) ? condition : undefined, @@ -93,19 +96,29 @@ export const convertFormStateToProcessing = ( patterns: patterns .filter(({ value }) => value.trim().length > 0) .map(({ value }) => value), - ...grokConfig, + field, + pattern_definitions, + ignore_failure, + ignore_missing, }, }, }; } if (formState.type === 'dissect') { - const { condition, ...dissectConfig } = formState; + const { condition, field, pattern, append_separator, ignore_failure, ignore_missing } = + formState; return { condition: isCompleteCondition(condition) ? condition : undefined, config: { - dissect: dissectConfig, + dissect: { + field, + pattern, + append_separator, + ignore_failure, + ignore_missing, + }, }, }; }