Skip to content

Commit

Permalink
[Streams 🌊] Add processors validation and simulation gate (#206566)
Browse files Browse the repository at this point in the history
## πŸ““ Summary

Closes elastic/streams-program#66

This work adds changes to prevent invalid processors from being
submitted.
The main rule is that a simulation is performed before any add/edit
submission to guarantee that the processor config is valid.

This work also updates the simulation API to detect whether there is a
non-additive change in any simulated document.

@patpscal error reporting UI for add/edit is different since the
simulator is not visible for edit, I used a callout but we can easily
update this once there is a final design in place.

### Form validation + simulation


https://github.com/user-attachments/assets/f7fc351b-6efc-4500-8490-b7f1c85139bf

### Non-additive processors


https://github.com/user-attachments/assets/47b5b739-c2cf-4a74-93a8-6ef43521c7d4
  • Loading branch information
tonyghiani authored Jan 15, 2025
1 parent b4342f4 commit 6429c53
Show file tree
Hide file tree
Showing 18 changed files with 454 additions and 216 deletions.
1 change: 1 addition & 0 deletions src/platform/packages/shared/kbn-object-utils/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ Result:
},
},
},
updated: {}
}
*/
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,25 +10,34 @@
import { calculateObjectDiff } from './calculate_object_diff';

describe('calculateObjectDiff', () => {
it('should return the added and removed parts between 2 objects', () => {
const { added, removed } = calculateObjectDiff({ alpha: 1, beta: 2 }, { alpha: 1, gamma: 3 });
it('should return the added, removed and updated parts between 2 objects', () => {
const { added, removed, updated } = calculateObjectDiff(
{ alpha: 1, beta: 2, sigma: 4 },
{ alpha: 1, gamma: 3, sigma: 5 }
);
expect(added).toEqual({ gamma: 3 });
expect(removed).toEqual({ beta: 2 });
expect(updated).toEqual({ sigma: 5 });
});

it('should work on nested objects', () => {
const { added, removed } = calculateObjectDiff(
{ alpha: 1, beta: { gamma: 2, delta: { sigma: 7 } } },
{ alpha: 1, beta: { gamma: 2, eta: 4 } }
const { added, removed, updated } = calculateObjectDiff(
{ alpha: 1, beta: { gamma: 2, delta: { sigma: 7, omega: 8 } } },
{ alpha: 1, beta: { gamma: 2, delta: { omega: 9 }, eta: 4 } }
);

expect(added).toEqual({ beta: { eta: 4 } });
expect(removed).toEqual({ beta: { delta: { sigma: 7 } } });
expect(updated).toEqual({ beta: { delta: { omega: 9 } } });
});

it('should return empty added/removed when the objects are the same', () => {
const { added, removed } = calculateObjectDiff({ alpha: 1, beta: 2 }, { alpha: 1, beta: 2 });
it('should return empty added/removed/updated when the objects are the same', () => {
const { added, removed, updated } = calculateObjectDiff(
{ alpha: 1, beta: 2 },
{ alpha: 1, beta: 2 }
);
expect(added).toEqual({});
expect(removed).toEqual({});
expect(updated).toEqual({});
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,9 @@ type DeepPartial<TInputObj> = {
interface ObjectDiffResult<TBase, TCompare> {
added: DeepPartial<TCompare>;
removed: DeepPartial<TBase>;
updated: {
[K in keyof TBase & keyof TCompare]?: TBase[K] extends TCompare[K] ? never : TCompare[K];
};
}

/**
Expand All @@ -34,31 +37,37 @@ export function calculateObjectDiff<TBase extends Obj, TCompare extends Obj>(
oldObj: TBase,
newObj?: TCompare
): ObjectDiffResult<TBase, TCompare> {
const added: DeepPartial<TCompare> = {};
const removed: DeepPartial<TBase> = {};
const added: ObjectDiffResult<TBase, TCompare>['added'] = {};
const removed: ObjectDiffResult<TBase, TCompare>['removed'] = {};
const updated: ObjectDiffResult<TBase, TCompare>['updated'] = {};

if (!newObj) return { added, removed };
if (!newObj) return { added, removed, updated };

function diffRecursive(
base: Obj,
compare: Obj,
addedMap: DeepPartial<Obj>,
removedMap: DeepPartial<Obj>
removedMap: DeepPartial<Obj>,
updatedMap: DeepPartial<Obj>
): void {
for (const key in compare) {
if (!(key in base)) {
addedMap[key] = compare[key];
} else if (isPlainObject(base[key]) && isPlainObject(compare[key])) {
addedMap[key] = {};
removedMap[key] = {};
updatedMap[key] = {};
diffRecursive(
base[key] as Obj,
compare[key] as Obj,
addedMap[key] as Obj,
removedMap[key] as Obj
removedMap[key] as Obj,
updatedMap[key] as Obj
);
if (isEmpty(addedMap[key])) delete addedMap[key];
if (isEmpty(removedMap[key])) delete removedMap[key];
} else if (base[key] !== compare[key]) {
updatedMap[key] = compare[key];
}
}

Expand All @@ -69,7 +78,7 @@ export function calculateObjectDiff<TBase extends Obj, TCompare extends Obj>(
}
}

diffRecursive(oldObj, newObj, added, removed);
diffRecursive(oldObj, newObj, added, removed, updated);

return { added, removed };
return { added, removed, updated };
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

export class NonAdditiveProcessor extends Error {
constructor(message: string) {
super(message);
this.name = 'NonAdditiveProcessor';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import {
IngestSimulateResponse,
IngestSimulateSimulateDocumentResult,
} from '@elastic/elasticsearch/lib/api/types';
import { isEmpty } from 'lodash';
import { NonAdditiveProcessor } from '../../../lib/streams/errors/non_additive_processor';
import { SimulationFailed } from '../../../lib/streams/errors/simulation_failed';
import { formatToIngestProcessors } from '../../../lib/streams/helpers/processing';
import { createServerRoute } from '../../create_server_route';
Expand Down Expand Up @@ -62,8 +64,17 @@ export const simulateProcessorRoute = createServerRoute({
throw new SimulationFailed(error);
}

const simulationDiffs = computeSimulationDiffs(simulationResult, docs);

const updatedFields = computeUpdatedFields(simulationDiffs);
if (!isEmpty(updatedFields)) {
throw new NonAdditiveProcessor(
`The processor is not additive to the documents. It might update fields [${updatedFields.join()}]`
);
}

const documents = computeSimulationDocuments(simulationResult, docs);
const detectedFields = computeDetectedFields(simulationResult, docs);
const detectedFields = computeDetectedFields(simulationDiffs);
const successRate = computeSuccessRate(simulationResult);
const failureRate = 1 - successRate;

Expand All @@ -78,7 +89,7 @@ export const simulateProcessorRoute = createServerRoute({
throw notFound(error);
}

if (error instanceof SimulationFailed) {
if (error instanceof SimulationFailed || error instanceof NonAdditiveProcessor) {
throw badRequest(error);
}

Expand All @@ -87,6 +98,35 @@ export const simulateProcessorRoute = createServerRoute({
},
});

const computeSimulationDiffs = (
simulation: IngestSimulateResponse,
sampleDocs: Array<{ _source: Record<string, unknown> }>
) => {
// Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval
const samplesToSimulationMap = new Map(simulation.docs.map((doc, id) => [doc, sampleDocs[id]]));

const diffs = simulation.docs.filter(isSuccessfulDocument).map((doc) => {
const sample = samplesToSimulationMap.get(doc);
if (sample) {
return calculateObjectDiff(sample._source, doc.processor_results.at(-1)?.doc?._source);
}

return calculateObjectDiff({});
});

return diffs;
};

const computeUpdatedFields = (simulationDiff: ReturnType<typeof computeSimulationDiffs>) => {
const diffs = simulationDiff
.map((simulatedDoc) => flattenObject(simulatedDoc.updated))
.flatMap(Object.keys);

const uniqueFields = [...new Set(diffs)];

return uniqueFields;
};

const computeSimulationDocuments = (
simulation: IngestSimulateResponse,
sampleDocs: Array<{ _source: Record<string, unknown> }>
Expand All @@ -108,31 +148,14 @@ const computeSimulationDocuments = (
};

const computeDetectedFields = (
simulation: IngestSimulateResponse,
sampleDocs: Array<{ _source: Record<string, unknown> }>
simulationDiff: ReturnType<typeof computeSimulationDiffs>
): Array<{
name: string;
type: FieldDefinitionConfig['type'] | 'unmapped';
}> => {
// Since we filter out failed documents, we need to map the simulation docs to the sample docs for later retrieval
const samplesToSimulationMap = new Map(simulation.docs.map((doc, id) => [doc, sampleDocs[id]]));

const diffs = simulation.docs
.filter(isSuccessfulDocument)
.map((doc) => {
const sample = samplesToSimulationMap.get(doc);
if (sample) {
const { added } = calculateObjectDiff(
sample._source,
doc.processor_results.at(-1)?.doc?._source
);
return flattenObject(added);
}

return {};
})
.map(Object.keys)
.flat();
const diffs = simulationDiff
.map((simulatedDoc) => flattenObject(simulatedDoc.added))
.flatMap(Object.keys);

const uniqueFields = [...new Set(diffs)];

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,17 @@ export const DissectPatternDefinition = () => {
const { core } = useKibana();
const esDocUrl = core.docLinks.links.ingest.dissectKeyModifiers;

const { field, fieldState } = useController({ name: 'pattern' });
const { field, fieldState } = useController({
name: 'pattern',
rules: {
required: i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.dissectPatternRequiredError',
{ defaultMessage: 'A pattern is required.' }
),
},
});

const { invalid, error } = fieldState;

return (
<EuiFormRow
Expand All @@ -41,7 +51,8 @@ export const DissectPatternDefinition = () => {
}}
/>
}
isInvalid={fieldState.invalid}
isInvalid={invalid}
error={error?.message}
fullWidth
>
<CodeEditor
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@

import React from 'react';
import { EuiSpacer } from '@elastic/eui';
import { i18n } from '@kbn/i18n';
import { DissectAppendSeparator } from './dissect_append_separator';
import { DissectPatternDefinition } from './dissect_pattern_definition';
import { ProcessorFieldSelector } from '../processor_field_selector';
import { ToggleField } from '../toggle_field';
import { OptionalFieldsAccordion } from '../optional_fields_accordion';
import { ProcessorConditionEditor } from '../processor_condition_editor';
import { IgnoreFailureToggle, IgnoreMissingToggle } from '../ignore_toggles';

export const DissectProcessorForm = () => {
return (
Expand All @@ -27,24 +26,8 @@ export const DissectProcessorForm = () => {
<ProcessorConditionEditor />
</OptionalFieldsAccordion>
<EuiSpacer size="m" />
<ToggleField
name="ignore_failure"
label={i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.ignoreFailuresLabel',
{ defaultMessage: 'Ignore failures for this processor' }
)}
/>
<ToggleField
name="ignore_missing"
label={i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.ignoreMissingLabel',
{ defaultMessage: 'Ignore missing' }
)}
helpText={i18n.translate(
'xpack.streams.streamDetailView.managementTab.enrichment.processorFlyout.ignoreMissingHelpText',
{ defaultMessage: 'Ignore documents with a missing field.' }
)}
/>
<IgnoreFailureToggle />
<IgnoreMissingToggle />
</>
);
};
Loading

0 comments on commit 6429c53

Please sign in to comment.