Skip to content

Commit

Permalink
Add benchmark for processors that create event backups (#36960)
Browse files Browse the repository at this point in the history
This will come handy once we have another PR with optimizations.

Also, extended a test case in the append processor due to the lack of coverage.
  • Loading branch information
rdner authored Oct 26, 2023
1 parent d8a1377 commit 8982110
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 1 deletion.
3 changes: 2 additions & 1 deletion libbeat/processors/actions/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func Test_appendProcessor_Run(t *testing.T) {
logger: log,
config: appendProcessorConfig{
Fields: []string{"field"},
Values: []interface{}{"value3", "value4"},
TargetField: "target",
},
},
Expand All @@ -281,7 +282,7 @@ func Test_appendProcessor_Run(t *testing.T) {
Meta: mapstr.M{},
Fields: mapstr.M{
"field": "I'm being appended",
"target": []interface{}{"value1", "value2", "I'm being appended"},
"target": []interface{}{"value1", "value2", "I'm being appended", "value3", "value4"},
},
},
},
Expand Down
221 changes: 221 additions & 0 deletions libbeat/processors/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
package processors_test

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
_ "github.com/elastic/beats/v7/libbeat/processors/actions"
_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/convert"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"
_ "github.com/elastic/beats/v7/libbeat/processors/dissect"
_ "github.com/elastic/beats/v7/libbeat/processors/extract_array"
_ "github.com/elastic/beats/v7/libbeat/processors/urldecode"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -566,3 +574,216 @@ func TestDropMissingFields(t *testing.T) {

assert.Equal(t, expectedEvent, processedEvent.Fields)
}

const (
fieldCount = 20
depth = 3
)

func BenchmarkEventBackups(b *testing.B) {
// listing all the processors that revert changes in case of an error
yml := []map[string]interface{}{
{
"append": map[string]interface{}{
"target_field": "append_target",
"values": []interface{}{"third", "fourth"},
"fail_on_error": true,
},
},
{
"copy_fields": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "copy_from",
"to": "copy.to",
},
},
"fail_on_error": true,
},
},
{
"decode_base64_field": map[string]interface{}{
"field": map[string]interface{}{
"from": "base64_from",
"to": "base64_to",
},
"fail_on_error": true,
},
},
{
"decompress_gzip_field": map[string]interface{}{
"field": map[string]interface{}{
"from": "gzip_from",
"to": "gzip_to",
},
"fail_on_error": true,
},
},
{
"rename": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "rename_from",
"to": "rename.to",
},
},
"fail_on_error": true,
},
},
{
"replace": map[string]interface{}{
"fields": []map[string]interface{}{
{
"field": "replace_test",
"pattern": "to replace",
"replacement": "replaced",
},
},
"fail_on_error": true,
},
},
{
"truncate_fields": map[string]interface{}{
"fields": []interface{}{"to_truncate"},
"max_characters": 4,
"fail_on_error": true,
},
},
{
"convert": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "convert_from",
"to": "convert.to",
"type": "integer",
},
},
"fail_on_error": true,
},
},
{
"decode_csv_fields": map[string]interface{}{
"fields": map[string]interface{}{
"csv_from": "csv.to",
},
"fail_on_error": true,
},
},
// it creates a backup unless `ignore_failure` is true
{
"dissect": map[string]interface{}{
"tokenizer": "%{key1} %{key2}",
"field": "to_dissect",
},
},
{
"extract_array": map[string]interface{}{
"field": "array_test",
"mappings": map[string]interface{}{
"array_first": 0,
"array_second": 1,
},
"fail_on_error": true,
},
},
{
"urldecode": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "url_from",
"to": "url.to",
},
},

"fail_on_error": true,
},
},
}

processors := GetProcessors(b, yml)
event := &beat.Event{
Timestamp: time.Now(),
Meta: mapstr.M{},
Fields: mapstr.M{
"append_target": []interface{}{"first", "second"},
"copy_from": "to_copy",
"base64_from": "dmFsdWU=",
// "decompressed data"
"gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}),
"rename_from": "renamed_value",
"replace_test": "something to replace",
"to_truncate": "something very long",
"convert_from": "42",
"csv_from": "1,2,3,4",
"to_dissect": "some words",
"array_test": []string{"first", "second"},
"url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome",
},
}

expFields := mapstr.M{
"append_target": []interface{}{"first", "second", "third", "fourth"},
"copy_from": "to_copy",
"copy": mapstr.M{
"to": "to_copy",
},
"base64_from": "dmFsdWU=",
"base64_to": "value",
"gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}),
"gzip_to": "decompressed data",
"rename": mapstr.M{"to": "renamed_value"},
"replace_test": "something replaced",
"to_truncate": "some",
"convert_from": "42",
"convert": mapstr.M{"to": int32(42)},
"csv_from": "1,2,3,4",
"csv": mapstr.M{"to": []string{"1", "2", "3", "4"}},
"to_dissect": "some words",
"dissect": mapstr.M{
"key1": "some",
"key2": "words",
},
"array_test": []string{"first", "second"},
"array_first": "first",
"array_second": "second",
"url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome",
"url": mapstr.M{"to": "https://www.elastic.co?some"},
}

generateFields(b, event.Meta, fieldCount, depth)
generateFields(b, event.Fields, fieldCount, depth)

var (
result *beat.Event
clone *beat.Event
err error
)

b.Run("run processors that use backups", func(b *testing.B) {
for i := 0; i < b.N; i++ {
clone = event.Clone() // necessary for making and comparing changes
result, err = processors.Run(clone)
}
require.NoError(b, err)
require.NotNil(b, result)
})

require.Equal(b, fmt.Sprintf("%p", clone), fmt.Sprintf("%p", result), "should be the same event")
for key := range expFields {
require.Equal(b, expFields[key], clone.Fields[key], fmt.Sprintf("%s does not match", key))
}
}

func generateFields(t require.TestingT, m mapstr.M, count, nesting int) {
for i := 0; i < count; i++ {
var err error
if nesting == 0 {
_, err = m.Put(fmt.Sprintf("field-%d", i), fmt.Sprintf("value-%d", i))
} else {
nested := mapstr.M{}
generateFields(t, nested, count, nesting-1)
_, err = m.Put(fmt.Sprintf("field-%d", i), nested)
}
require.NoError(t, err)
}
}

0 comments on commit 8982110

Please sign in to comment.