Skip to content

Commit

Permalink
Add benchmark for processors that create event backups
Browse files Browse the repository at this point in the history
This will come handy once we have another PR with optimizations.

Also, extended a test case in the append processor due to the lack of coverage.
  • Loading branch information
rdner committed Oct 25, 2023
1 parent bbf0111 commit b08e95f
Show file tree
Hide file tree
Showing 2 changed files with 223 additions and 1 deletion.
3 changes: 2 additions & 1 deletion libbeat/processors/actions/append_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,7 @@ func Test_appendProcessor_Run(t *testing.T) {
logger: log,
config: appendProcessorConfig{
Fields: []string{"field"},
Values: []interface{}{"value3", "value4"},
TargetField: "target",
},
},
Expand All @@ -281,7 +282,7 @@ func Test_appendProcessor_Run(t *testing.T) {
Meta: mapstr.M{},
Fields: mapstr.M{
"field": "I'm being appended",
"target": []interface{}{"value1", "value2", "I'm being appended"},
"target": []interface{}{"value1", "value2", "I'm being appended", "value3", "value4"},
},
},
},
Expand Down
221 changes: 221 additions & 0 deletions libbeat/processors/processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,23 @@
package processors_test

import (
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/libbeat/processors"
_ "github.com/elastic/beats/v7/libbeat/processors/actions"
_ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/add_process_metadata"
_ "github.com/elastic/beats/v7/libbeat/processors/convert"
_ "github.com/elastic/beats/v7/libbeat/processors/decode_csv_fields"
_ "github.com/elastic/beats/v7/libbeat/processors/dissect"
_ "github.com/elastic/beats/v7/libbeat/processors/extract_array"
_ "github.com/elastic/beats/v7/libbeat/processors/urldecode"
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
Expand Down Expand Up @@ -566,3 +574,216 @@ func TestDropMissingFields(t *testing.T) {

assert.Equal(t, expectedEvent, processedEvent.Fields)
}

const (
fieldCount = 20
depth = 3
)

func BenchmarkEventBackups(b *testing.B) {
// listing all the processors that revert changes in case of an error
yml := []map[string]interface{}{
{
"append": map[string]interface{}{
"target_field": "append_target",
"values": []interface{}{"third", "fourth"},
"fail_on_error": true,
},
},
{
"copy_fields": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "copy_from",
"to": "copy.to",
},
},
"fail_on_error": true,
},
},
{
"decode_base64_field": map[string]interface{}{
"field": map[string]interface{}{
"from": "base64_from",
"to": "base64_to",
},
"fail_on_error": true,
},
},
{
"decompress_gzip_field": map[string]interface{}{
"field": map[string]interface{}{
"from": "gzip_from",
"to": "gzip_to",
},
"fail_on_error": true,
},
},
{
"rename": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "rename_from",
"to": "rename.to",
},
},
"fail_on_error": true,
},
},
{
"replace": map[string]interface{}{
"fields": []map[string]interface{}{
{
"field": "replace_test",
"pattern": "to replace",
"replacement": "replaced",
},
},
"fail_on_error": true,
},
},
{
"truncate_fields": map[string]interface{}{
"fields": []interface{}{"to_truncate"},
"max_characters": 4,
"fail_on_error": true,
},
},
{
"convert": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "convert_from",
"to": "convert.to",
"type": "integer",
},
},
"fail_on_error": true,
},
},
{
"decode_csv_fields": map[string]interface{}{
"fields": map[string]interface{}{
"csv_from": "csv.to",
},
"fail_on_error": true,
},
},
// it creates a backup unless `ignore_failure` is true
{
"dissect": map[string]interface{}{
"tokenizer": "%{key1} %{key2}",
"field": "to_dissect",
},
},
{
"extract_array": map[string]interface{}{
"field": "array_test",
"mappings": map[string]interface{}{
"array_first": 0,
"array_second": 1,
},
"fail_on_error": true,
},
},
{
"urldecode": map[string]interface{}{
"fields": []map[string]interface{}{
{
"from": "url_from",
"to": "url.to",
},
},

"fail_on_error": true,
},
},
}

processors := GetProcessors(b, yml)
event := &beat.Event{
Timestamp: time.Now(),
Meta: mapstr.M{},
Fields: mapstr.M{
"append_target": []interface{}{"first", "second"},
"copy_from": "to_copy",
"base64_from": "dmFsdWU=",
// "decompressed data"
"gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}),
"rename_from": "renamed_value",
"replace_test": "something to replace",
"to_truncate": "something very long",
"convert_from": "42",
"csv_from": "1,2,3,4",
"to_dissect": "some words",
"array_test": []string{"first", "second"},
"url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome",
},
}

expFields := mapstr.M{
"append_target": []interface{}{"first", "second", "third", "fourth"},
"copy_from": "to_copy",
"copy": mapstr.M{
"to": "to_copy",
},
"base64_from": "dmFsdWU=",
"base64_to": "value",
"gzip_from": string([]byte{31, 139, 8, 0, 0, 0, 0, 0, 0, 255, 74, 73, 77, 206, 207, 45, 40, 74, 45, 46, 78, 77, 81, 72, 73, 44, 73, 4, 4, 0, 0, 255, 255, 108, 158, 105, 19, 17, 0, 0, 0}),
"gzip_to": "decompressed data",
"rename": mapstr.M{"to": "renamed_value"},
"replace_test": "something replaced",
"to_truncate": "some",
"convert_from": "42",
"convert": mapstr.M{"to": int32(42)},
"csv_from": "1,2,3,4",
"csv": mapstr.M{"to": []string{"1", "2", "3", "4"}},
"to_dissect": "some words",
"dissect": mapstr.M{
"key1": "some",
"key2": "words",
},
"array_test": []string{"first", "second"},
"array_first": "first",
"array_second": "second",
"url_from": "https%3A%2F%2Fwww.elastic.co%3Fsome",
"url": mapstr.M{"to": "https://www.elastic.co?some"},
}

generateFields(b, event.Meta, fieldCount, depth)
generateFields(b, event.Fields, fieldCount, depth)

var (
result *beat.Event
clone *beat.Event
err error
)

b.Run("run processors that use backups", func(b *testing.B) {
for i := 0; i < b.N; i++ {
clone = event.Clone() // necessary for making and comparing changes
result, err = processors.Run(clone)
}
require.NoError(b, err)
require.NotNil(b, result)
})

require.Equal(b, fmt.Sprintf("%p", clone), fmt.Sprintf("%p", result), "should be the same event")
for key := range expFields {
require.Equal(b, expFields[key], clone.Fields[key], fmt.Sprintf("%s does not match", key))
}
}

func generateFields(t require.TestingT, m mapstr.M, count, nesting int) {
for i := 0; i < count; i++ {
var err error
if nesting == 0 {
_, err = m.Put(fmt.Sprintf("field-%d", i), fmt.Sprintf("value-%d", i))
} else {
nested := mapstr.M{}
generateFields(t, nested, count, nesting-1)
_, err = m.Put(fmt.Sprintf("field-%d", i), nested)
}
require.NoError(t, err)
}
}

0 comments on commit b08e95f

Please sign in to comment.