Skip to content

Commit

Permalink
[Automatic Import] Fix Non-ecs compatible fields in grok processor (e…
Browse files Browse the repository at this point in the history
…lastic#194727)

## Release Note

Fixes a bug to resolve non-ecs compatible fields in Structured /
Unstructured syslog processing in Automatic Import.

## Summary

elastic#194205 explains the issue. 

This PR fixes `packageName.dataStreamName` for handling header values
from grok processor for KV graph so that ecs mapping gets the header
values in the converted json Samples too..

### Before this PR

![image](https://github.com/user-attachments/assets/d2660f7d-2cca-413c-ab90-1a0f3e1b4a03)


### After this PR

<img width="706" alt="image"
src="https://github.com/user-attachments/assets/954b5a91-2123-46f9-b822-1709c3247901">


- Closes elastic#194205

### Checklist

Delete any items that are not applicable to this PR.

- [x] [Unit or functional
tests](https://www.elastic.co/guide/en/kibana/master/development-tests.html)
were updated or added to match the most common scenarios
  • Loading branch information
bhapas authored Oct 3, 2024
1 parent 5ea1ab0 commit b38941b
Show file tree
Hide file tree
Showing 13 changed files with 74 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,13 @@ export const KV_HEADER_EXAMPLE_ANSWER = {
rfc: 'RFC2454',
regex:
'/(?:(d{4}[-]d{2}[-]d{2}[T]d{2}[:]d{2}[:]d{2}(?:.d{1,6})?(?:[+-]d{2}[:]d{2}|Z)?)|-)s(?:([w][wd.@-]*)|-)s(.*)$/',
grok_pattern: '%{WORD:key1}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
grok_pattern:
'%{WORD:cisco.audit.key1}:%{WORD:cisco.audit.value1};%{WORD:cisco.audit.key2}:%{WORD:cisco.audit.value2}:%{GREEDYDATA:message}',
};

export const KV_HEADER_ERROR_EXAMPLE_ANSWER = {
grok_pattern:
'%{TIMESTAMP:timestamp}:%{WORD:value1};%{WORD:key2}:%{WORD:value2}:%{GREEDYDATA:message}',
'%{TIMESTAMP:cisco.audit.timestamp}:%{WORD:cisco.audit.value1};%{WORD:cisco.audit.key2}:%{WORD:cisco.audit.value2}:%{GREEDYDATA:message}',
};

export const onFailure = {
Expand All @@ -33,6 +34,8 @@ export const onFailure = {
},
};

export const removeProcessor = { remove: { field: 'message', ignore_missing: true } };

export const COMMON_ERRORS = [
{
error: 'field [message] does not contain value_split [=]',
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ export async function handleHeaderError({
const currentPattern = state.grokPattern;

const pattern = await kvHeaderErrorGraph.invoke({
packageName: state.packageName,
dataStreamName: state.dataStreamName,
current_pattern: JSON.stringify(currentPattern, null, 2),
errors: JSON.stringify(state.errors, null, 2),
ex_answer: JSON.stringify(KV_HEADER_ERROR_EXAMPLE_ANSWER, null, 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export async function handleHeader({

const pattern = await kvHeaderGraph.invoke({
samples: state.logSamples,
packageName: state.packageName,
dataStreamName: state.dataStreamName,
ex_answer: JSON.stringify(KV_HEADER_EXAMPLE_ANSWER, null, 2),
});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,8 +79,9 @@ Follow these steps to identify the header pattern:
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Do not parse the message part in the regex. Just the header part should be in regex nad grok_pattern.
- Do not parse the message part in the regex. Just the header part should be in regex and grok_pattern.
- Make sure to map the remaining message body to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>
Expand Down Expand Up @@ -121,8 +122,9 @@ Follow these steps to fix the errors in the header pattern:
4. Make sure the regex and grok pattern matches all the header information. Only the structured message body should be under GREEDYDATA in grok pattern.
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Do not parse the message part in the regex. Just the header part should be in regex nad grok_pattern.
- Do not parse the message part in the regex. Just the header part should be in regex and grok_pattern.
- Make sure to map the remaining message body to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>
Expand Down
51 changes: 30 additions & 21 deletions x-pack/plugins/integration_assistant/server/graphs/kv/validate.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,10 @@ import { ESProcessorItem } from '../../../common';
import type { KVState } from '../../types';
import type { HandleKVNodeParams } from './types';
import { testPipeline } from '../../util';
import { onFailure } from './constants';
import { onFailure, removeProcessor } from './constants';
import { createGrokProcessor } from '../../util/processors';

interface KVResult {
interface StructuredLogResult {
[packageName: string]: { [dataStreamName: string]: unknown };
}

Expand All @@ -32,25 +32,24 @@ export async function handleKVValidate({

// Pick logSamples if there was no header detected.
const samples = state.header ? state.kvLogMessages : state.logSamples;

const { pipelineResults: kvOutputSamples, errors } = (await createJSONInput(
kvProcessor,
samples,
client,
state
)) as { pipelineResults: KVResult[]; errors: object[] };

const { errors } = await verifyKVProcessor(kvProcessor, samples, client);
if (errors.length > 0) {
return { errors, lastExecutedChain: 'kvValidate' };
}

// Converts JSON Object into a string and parses it as a array of JSON strings
const jsonSamples = kvOutputSamples
const additionalProcessors = state.additionalProcessors;
additionalProcessors.push(kvProcessor[0]);
const samplesObject: StructuredLogResult[] = await buildJSONSamples(
state.logSamples,
additionalProcessors,
client
);

const jsonSamples = samplesObject
.map((log) => log[packageName])
.map((log) => log[dataStreamName])
.map((log) => JSON.stringify(log));
const additionalProcessors = state.additionalProcessors;
additionalProcessors.push(kvProcessor[0]);

return {
jsonSamples,
Expand Down Expand Up @@ -89,15 +88,25 @@ export async function handleHeaderValidate({
};
}

async function createJSONInput(
async function verifyKVProcessor(
kvProcessor: ESProcessorItem,
formattedSamples: string[],
client: IScopedClusterClient,
state: KVState
): Promise<{ pipelineResults: object[]; errors: object[] }> {
// This processor removes the original message field in the JSON output
const removeProcessor = { remove: { field: 'message', ignore_missing: true } };
client: IScopedClusterClient
): Promise<{ errors: object[] }> {
// This processor removes the original message field in the output
const pipeline = { processors: [kvProcessor[0], removeProcessor], on_failure: [onFailure] };
const { pipelineResults, errors } = await testPipeline(formattedSamples, pipeline, client);
return { pipelineResults, errors };
const { errors } = await testPipeline(formattedSamples, pipeline, client);
return { errors };
}

async function buildJSONSamples(
samples: string[],
processors: object[],
client: IScopedClusterClient
): Promise<StructuredLogResult[]> {
const pipeline = { processors: [...processors, removeProcessor], on_failure: [onFailure] };
const { pipelineResults } = (await testPipeline(samples, pipeline, client)) as {
pipelineResults: StructuredLogResult[];
};
return pipelineResults;
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ For each pipeline result you find matching values that would fit any of the rela
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- The \`message\` field may not be part of related fields.
- You can use as many processor objects as needed to map all relevant pipeline result fields to any of the ECS related fields.
- If no relevant fields or values are found that could be mapped confidently to any of the related fields, then respond with an empty array [] as valid JSON enclosed with 3 backticks (\`).
- Do not respond with anything except the array of processors as a valid JSON objects enclosed with 3 backticks (\`), see example response below.
Expand Down Expand Up @@ -79,6 +80,7 @@ Follow these steps to help resolve the current ingest pipeline issues:
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- The \`message\` field may not be part of related fields.
- Never use "split" in template values, only use the field name inside the triple brackets. If the error mentions "Improperly closed variable in query-template" then check each "value" field for any special characters and remove them.
- If solving an error means removing the last processor in the list, then return an empty array [] as valid JSON enclosed with 3 backticks (\`).
- Do not respond with anything except the complete updated array of processors as a valid JSON object enclosed with 3 backticks (\`), see example response below.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ export async function handleUnstructuredError({
const currentPatterns = state.grokPatterns;

const pattern = await grokErrorGraph.invoke({
packageName: state.packageName,
dataStreamName: state.dataStreamName,
current_pattern: JSON.stringify(currentPatterns, null, 2),
errors: JSON.stringify(state.errors, null, 2),
ex_answer: JSON.stringify(GROK_ERROR_EXAMPLE_ANSWER, null, 2),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export const GROK_MAIN_PROMPT = ChatPromptTemplate.fromMessages([
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Make sure to map the remaining message part to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>
Expand Down Expand Up @@ -89,6 +90,7 @@ Follow these steps to help improve the grok patterns and apply it step by step:
You ALWAYS follow these guidelines when writing your response:
<guidelines>
- Make sure to map the remaining message part to \'message\' in grok pattern.
- Make sure to add \`{packageName}.{dataStreamName}\` as a prefix to each field in the pattern. Refer to example response.
- Do not respond with anything except the processor as a JSON object enclosed with 3 backticks (\`), see example response above. Use strict JSON response format.
</guidelines>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@ export interface HandleUnstructuredNodeParams extends UnstructuredNodeParams {
}

export interface GrokResult {
[key: string]: unknown;
grok_patterns: string[];
message: string;
}

export interface LogResult {
[packageName: string]: { [dataStreamName: string]: unknown };
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ export async function handleUnstructured({
const samples = state.logSamples;

const pattern = (await grokMainGraph.invoke({
packageName: state.packageName,
dataStreamName: state.dataStreamName,
samples: samples[0],
ex_answer: JSON.stringify(GROK_EXAMPLE_ANSWER, null, 2),
})) as GrokResult;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,15 @@ describe('Testing unstructured validation without errors', () => {
const client = {
asCurrentUser: {
ingest: {
simulate: jest
.fn()
.mockReturnValue({ docs: [{ doc: { _source: { message: 'dummy data' } } }] }),
simulate: jest.fn().mockReturnValue({
docs: [
{
doc: {
_source: { testPackage: { testDatastream: { message: 'dummy data' } } },
},
},
],
}),
},
},
} as unknown as IScopedClusterClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
*/

import type { UnstructuredLogState } from '../../types';
import type { GrokResult, HandleUnstructuredNodeParams } from './types';
import type { HandleUnstructuredNodeParams, LogResult } from './types';
import { testPipeline } from '../../util';
import { onFailure } from './constants';
import { createGrokProcessor } from '../../util/processors';
Expand All @@ -18,17 +18,22 @@ export async function handleUnstructuredValidate({
const grokPatterns = state.grokPatterns;
const grokProcessor = createGrokProcessor(grokPatterns);
const pipeline = { processors: grokProcessor, on_failure: [onFailure] };
const packageName = state.packageName;
const dataStreamName = state.dataStreamName;

const { pipelineResults, errors } = (await testPipeline(state.logSamples, pipeline, client)) as {
pipelineResults: GrokResult[];
pipelineResults: LogResult[];
errors: object[];
};

if (errors.length > 0) {
return { errors, lastExecutedChain: 'unstructuredValidate' };
}

const jsonSamples: string[] = pipelineResults.map((entry) => JSON.stringify(entry));
const jsonSamples = pipelineResults
.map((log) => log[packageName])
.map((log) => log[dataStreamName])
.map((log) => JSON.stringify(log));
const additionalProcessors = state.additionalProcessors;
additionalProcessors.push(grokProcessor[0]);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,11 @@ processors:
field: originalMessage
ignore_missing: true
tag: remove_copied_message
if: 'ctx.event?.original != null'{% if log_format != 'unstructured' %}
if: 'ctx.event?.original != null'
- remove:
field: message
ignore_missing: true
tag: remove_message{% endif %}{% if (log_format == 'json') or (log_format == 'ndjson') %}
tag: remove_message{% if (log_format == 'json') or (log_format == 'ndjson') %}
- json:
field: event.original
tag: json_original
Expand Down

0 comments on commit b38941b

Please sign in to comment.