From 19dc9e51406734ce5ed6d946c35ea23ffd733549 Mon Sep 17 00:00:00 2001 From: Denis Rechkunov <denis.rechkunov@elastic.co> Date: Thu, 12 Oct 2023 15:44:27 +0200 Subject: [PATCH] Optimize memory allocations in the event processing pipeline Previously, every time a processor ran on an event we made a clone of the entire event for two reasons: 1. this event could have some nested maps that are shared among multiple events. 2. in case a processor fails to make a change it should be able to revert its partial changes. This change added a new `EventEditor` wrapper that is used for collecting pending event changes in processors with an option to `Apply` or `Reset` them. Additionally, this `EventEditor` takes care of the efficient memory management when making changes to an event by cloning only the nested maps that processors access or modify. Most of the processors just put new keys or delete existing keys on the root-level, so most of the time the nested maps in the event remain untouched and it does not require the whole event to be cloned. --- libbeat/beat/event.go | 132 +++++++++---- libbeat/beat/event_editor.go | 280 +++++++++++++++++++++++++++ libbeat/beat/event_editor_test.go | 219 +++++++++++++++++++++ libbeat/processors/processor_test.go | 145 ++++++++++++++ 4 files changed, 733 insertions(+), 43 deletions(-) create mode 100644 libbeat/beat/event_editor.go create mode 100644 libbeat/beat/event_editor_test.go diff --git a/libbeat/beat/event.go b/libbeat/beat/event.go index 54fc3e27cc39..968a282ddc7d 100644 --- a/libbeat/beat/event.go +++ b/libbeat/beat/event.go @@ -32,6 +32,8 @@ const FlagField = "log.flags" const ( timestampFieldKey = "@timestamp" metadataFieldKey = "@metadata" + metadataKeyPrefix = metadataFieldKey + "." + metadataKeyOffset = len(metadataKeyPrefix) ) // Event is the common event format shared by all beats. @@ -47,28 +49,50 @@ type Event struct { } var ( - errNoTimestamp = errors.New("value is no timestamp") - errNoMapStr = errors.New("value is no map[string]interface{} type") + errNoTimestamp = errors.New("value is no timestamp") + errNoMapStr = errors.New("value is no map[string]interface{} type") ) // SetID overwrites the "id" field in the events metadata. // If Meta is nil, a new Meta dictionary is created. func (e *Event) SetID(id string) { - if e.Meta == nil { - e.Meta = mapstr.M{} + _, _ = e.PutValue("@metadata._id", id) +} + +func (e *Event) HasKey(key string) (bool, error) { + if key == timestampFieldKey { + return true, nil + } + + if subKey, ok := metadataKey(key); ok { + if subKey == "" || e.Meta == nil { + return e.Meta != nil, nil + } + return e.Meta.HasKey(subKey) } - e.Meta["_id"] = id + + return e.Fields.HasKey(key) } func (e *Event) GetValue(key string) (interface{}, error) { if key == timestampFieldKey { return e.Timestamp, nil - } else if subKey, ok := metadataKey(key); ok { - if subKey == "" || e.Meta == nil { + } + + if subKey, ok := metadataKey(key); ok { + if e.Meta == nil { + return nil, mapstr.ErrKeyNotFound + } + if subKey == "" { return e.Meta, nil } return e.Meta.GetValue(subKey) } + + if e.Fields == nil { + return nil, mapstr.ErrKeyNotFound + } + return e.Fields.GetValue(key) } @@ -106,6 +130,36 @@ func (e *Event) DeepUpdateNoOverwrite(d mapstr.M) { e.deepUpdate(d, false) } +func (e *Event) PutValue(key string, v interface{}) (interface{}, error) { + if key == timestampFieldKey { + return e.setTimestamp(v) + } + + if subKey, ok := metadataKey(key); ok { + return e.putMetadataValue(subKey, v) + } + + if e.Fields == nil { + e.Fields = mapstr.M{} + } + + return e.Fields.Put(key, v) +} + +func (e *Event) Delete(key string) error { + if subKey, ok := metadataKey(key); ok { + if subKey == "" { + e.Meta = nil + return nil + } + if e.Meta == nil { + return nil + } + return e.Meta.Delete(subKey) + } + return e.Fields.Delete(key) +} + func (e *Event) deepUpdate(d mapstr.M, overwrite bool) { if len(d) == 0 { return @@ -116,7 +170,7 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) { timestampValue, timestampExists := d[timestampFieldKey] if timestampExists { if overwrite { - _ = e.setTimestamp(timestampValue) + _, _ = e.setTimestamp(timestampValue) } // Temporary delete it from the update map, @@ -127,6 +181,7 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) { // It's supported to update the metadata using this function. // However, we must handle it separately since it's a separate field of the event. + // !!!BUG!!!, no `@metadata` key prefix support metaValue, metaExists := d[metadataFieldKey] if metaExists { var metaUpdate mapstr.M @@ -180,54 +235,45 @@ func (e *Event) deepUpdate(d mapstr.M, overwrite bool) { } } -func (e *Event) setTimestamp(v interface{}) error { +func (e *Event) setTimestamp(v interface{}) (interface{}, error) { + // to satisfy the PutValue interface, this function + // must return the overwritten value + prevValue := e.Timestamp + switch ts := v.(type) { case time.Time: e.Timestamp = ts + return prevValue, nil case common.Time: e.Timestamp = time.Time(ts) + return prevValue, nil default: - return errNoTimestamp + return nil, errNoTimestamp } - - return nil } -func (e *Event) PutValue(key string, v interface{}) (interface{}, error) { - if key == timestampFieldKey { - err := e.setTimestamp(v) - return nil, err - } else if subKey, ok := metadataKey(key); ok { - if subKey == "" { - switch meta := v.(type) { - case mapstr.M: - e.Meta = meta - case map[string]interface{}: - e.Meta = meta - default: - return nil, errNoMapStr - } - } else if e.Meta == nil { - e.Meta = mapstr.M{} - } - return e.Meta.Put(subKey, v) +func (e *Event) putMetadataValue(subKey string, v interface{}) (interface{}, error) { + if e.Meta == nil { + e.Meta = mapstr.M{} } + if subKey == "" { + // to satisfy the PutValue interface, this function + // must return the overwritten value + prevValue := e.Meta - return e.Fields.Put(key, v) -} - -func (e *Event) Delete(key string) error { - if subKey, ok := metadataKey(key); ok { - if subKey == "" { - e.Meta = nil - return nil - } - if e.Meta == nil { - return nil + switch meta := v.(type) { + case mapstr.M: + e.Meta = meta + return prevValue, nil + case map[string]interface{}: + e.Meta = mapstr.M(meta) + return prevValue, nil + default: + return nil, errNoMapStr } - return e.Meta.Delete(subKey) } - return e.Fields.Delete(key) + + return e.Meta.Put(subKey, v) } func metadataKey(key string) (string, bool) { diff --git a/libbeat/beat/event_editor.go b/libbeat/beat/event_editor.go new file mode 100644 index 000000000000..280b37a485c0 --- /dev/null +++ b/libbeat/beat/event_editor.go @@ -0,0 +1,280 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beat + +import ( + "errors" + "fmt" + "strings" + + "github.com/elastic/elastic-agent-libs/mapstr" +) + +var ( + ErrMetadataAccess = fmt.Errorf("accessing %q key directly is not supported, try nested keys", metadataFieldKey) +) + +type EventAccessor interface { + // GetValue gets a value from the map. The key can be expressed in dot-notation (e.g. x.y). + // If the key does not exist then `mapstr.ErrKeyNotFound` error is returned. + GetValue(key string) (interface{}, error) + + // PutValue associates the specified value with the specified key. If the event + // previously contained a mapping for the key, the old value is replaced and + // returned. The key can be expressed in dot-notation (e.g. x.y) to put a value + // into a nested map. + // + // If you need insert keys containing dots then you must use bracket notation + // to insert values (e.g. m[key] = value). + PutValue(key string, v interface{}) (interface{}, error) + + // Delete value with the given key. + // The key can be expressed in dot-notation (e.g. x.y) + Delete(key string) error + + // DeepUpdate recursively copies the key-value pairs from `d` to various properties of the event. + // When the key equals `@timestamp` it's set as the `Timestamp` property of the event. + // When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields` + // The rest of the keys are set to the `Fields` map. + // If the key is present and the value is a map as well, the sub-map will be updated recursively + // via `DeepUpdate`. + // `DeepUpdateNoOverwrite` is a version of this function that does not + // overwrite existing values.p + DeepUpdate(d mapstr.M) + + // DeepUpdateNoOverwrite recursively copies the key-value pairs from `d` to various properties of the event. + // The `@timestamp` update is ignored due to "no overwrite" behavior. + // When the key equals `@metadata` the update is routed into the `Meta` map instead of `Fields`. + // The rest of the keys are set to the `Fields` map. + // If the key is present and the value is a map as well, the sub-map will be updated recursively + // via `DeepUpdateNoOverwrite`. + // `DeepUpdate` is a version of this function that overwrites existing values. + DeepUpdateNoOverwrite(d mapstr.M) +} + +// EventEditor is a wrapper that allows to make changes to the wrapped event +// preserving states of the original nested maps by cloning them on demand. +// +// The first time a nested map gets updated it's cloned and, from that moment on, only the copy is modified. +// Once all the changes are collected, users should call `Apply` to copy pending changes to the original event. +// When the changes get applied the pointers to originally referenced nested maps get replaced with pointers to +// modified copies. +// +// This allows us to: +// * avoid cloning the entire event and be more efficient in memory management +// * collect multiple changes and apply them at once, using a transaction-like mechanism (`Apply`/`Reset` functions). +type EventEditor struct { + original *Event + pending *Event + deletions map[string]struct{} +} + +func NewEventEditor(e *Event) *EventEditor { + return &EventEditor{ + original: e, + } +} + +// GetValue implements the `EventAccessor` interface. +func (e *EventEditor) GetValue(key string) (interface{}, error) { + if key == metadataFieldKey { + return nil, ErrMetadataAccess + } + + dotIdx := e.dotIdx(key) + if dotIdx == -1 && e.checkDeleted(key) { + return nil, mapstr.ErrKeyNotFound + } + + if e.pending == nil { + e.pending = &Event{} + } + + // we try to retrieve from the pending changes first, + // since it should have the most recent data + pendingVal, err := e.pending.GetValue(key) + if err == nil { + return pendingVal, nil + } + if !errors.Is(err, mapstr.ErrKeyNotFound) { + return nil, err + } + + value, err := e.original.GetValue(key) + if err != nil { + return nil, err + } + + // if the end value is not a map, we can just return it, + // value types will be copied automatically + switch value.(type) { + case mapstr.M, map[string]interface{}: + default: + return value, nil + } + + // if the key leads to a map value or it's in a nested map, + // we must check it out before returning, so we return a clone + // that the consumer can modify + if e.checkout(key) { + return e.pending.GetValue(key) + } + + // otherwise, we failed to fetch the value + return nil, mapstr.ErrKeyNotFound +} + +// PutValue implements the `EventAccessor` interface. +func (e *EventEditor) PutValue(key string, v interface{}) (interface{}, error) { + panic("not implemented") +} + +// Delete implements the `EventAccessor` interface. +func (e *EventEditor) Delete(key string) error { + // in case the key is present in the pending event + // we delete the key from there as usual + exists, _ := e.pending.HasKey(key) + if exists { + // in case the original event also originally + // had this key, we mark it for deletion as well + _ = e.markAsDeleted(key) + return e.pending.Delete(key) + } + + // following actions make sense only if the original event + // has the key we're trying to delete + has, _ := e.original.HasKey(key) + if !has { + return mapstr.ErrKeyNotFound + } + + // this function successfully marks only root-level fields + // returns `false` if the value is in a nested map. + marked := e.markAsDeleted(key) + if marked { + return nil + } + + // if it's a dot-separated key in a nested map we must check it out + // and then delete the key from the pending event. + checkedOut := e.checkout(key) + if checkedOut { + // recursive call that should now fall into the first condition + return e.Delete(key) + } + + // should never happen if the implementation is correct + return fmt.Errorf("failed to delete %q key from the event", key) +} + +// DeepUpdate implements the `EventAccessor` interface. +func (e *EventEditor) DeepUpdate(d mapstr.M) { panic("not implemented") } + +// DeepUpdateNoOverwrite implements the `EventAccessor` interface. +func (e *EventEditor) DeepUpdateNoOverwrite(d mapstr.M) { panic("not implemented") } + +// markAsDeleted marks a key for deletion when `Apply` is called +// if it's a root-level key in the original event. +// Returns `true` if the key was successfully marked. +func (e *EventEditor) markAsDeleted(key string) bool { + dotIdx := e.dotIdx(key) + // nested keys are not marked since nested maps + // are cloned into the `pending` event and altered there + if dotIdx != -1 { + return false + } + has, _ := e.original.HasKey(key) + if !has { + return false + } + if e.deletions == nil { + e.deletions = make(map[string]struct{}) + } + e.deletions[key] = struct{}{} + return true +} + +// checkout clones a nested map of the original event to the event with pending changes. +// Only nested maps at the root-level of the key are cloned which leads to a recursive +// clone of the entire nested sub-tree even if the key pointed to a nested map on a deeper level. +// Returns `true` if the value was indeed a nested map and needed to be checked out. +func (e *EventEditor) checkout(key string) bool { + // only nested maps need to be checked out, so we can preserve their original state + rootKey := e.rootKey(key) + + // it might be already checked out + checkedOut, _ := e.pending.HasKey(rootKey) + if checkedOut { + return false + } + + value, err := e.original.GetValue(rootKey) + if err != nil { + return false + } + + // we check out only nested maps + switch typedVal := value.(type) { + case mapstr.M: + _, _ = e.pending.PutValue(rootKey, typedVal.Clone()) + case map[string]interface{}: + _, _ = e.pending.PutValue(rootKey, mapstr.M(typedVal).Clone()) + } + return true +} + +// dotIdx returns index of the first `.` character or `-1` if there is no `.` character. +// Accounts for the `@metadata` subkeys, since it's stored in a separate map, +// root level keys will be in the `@metadata.*` namespace. +func (e *EventEditor) dotIdx(key string) int { + // metadata keys are special, since they're stored in a separate map + // we don't want to copy the whole map with all metadata, we want + // to checkout only nested maps one by one for efficiency + if strings.HasPrefix(key, metadataKeyPrefix) { + // we start looking after the `@metadata` prefix + dotIdx := strings.Index(key[metadataKeyOffset:], ".") + // if there is no dot in the subkey, then the second segment + // is considered to be a root-level key + if dotIdx == -1 { + return -1 + } + // otherwise we need to offset the dot index by the prefix we removed + return dotIdx + metadataKeyOffset + } + + return strings.Index(key, ".") +} + +// rootKey reduces the key to its root-level. +func (e *EventEditor) rootKey(key string) string { + dotIdx := e.dotIdx(key) + if dotIdx == -1 { + return key + } else { + return key[:dotIdx] + } +} + +// checkDeleted returns `true` if the key was marked for deletion. +// The key can be expressed in dot-notation (e.g. x.y) and if the root-level prefix on +// the key path is deleted the function returns `true`. +func (e *EventEditor) checkDeleted(key string) bool { + rootKey := e.rootKey(key) + _, deleted := e.deletions[rootKey] + return deleted +} diff --git a/libbeat/beat/event_editor_test.go b/libbeat/beat/event_editor_test.go new file mode 100644 index 000000000000..ed045cb2dd25 --- /dev/null +++ b/libbeat/beat/event_editor_test.go @@ -0,0 +1,219 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beat + +import ( + "fmt" + "testing" + "time" + + "github.com/elastic/elastic-agent-libs/mapstr" + + "github.com/stretchr/testify/require" +) + +func TestEventEditor(t *testing.T) { + level2MetadataMap := mapstr.M{ + "metaLevel2Value": "metavalue3", + } + level1MetadataMap := mapstr.M{ + "metaLevel1Map": level2MetadataMap, + } + + level2FieldsMap := mapstr.M{ + "fieldsLevel2Value": "fieldsvalue3", + } + level1FieldsMap := mapstr.M{ + "fieldsLevel1Map": level2FieldsMap, + } + + event := &Event{ + Timestamp: time.Now(), + Meta: mapstr.M{ + "metaLevel0Map": level1MetadataMap, + "metaLevel0Value": "metavalue1", + }, + Fields: mapstr.M{ + "fieldsLevel0Map": level1FieldsMap, + "fieldsLevel0Value": "fieldsvalue1", + }, + } + + t.Run("rootKey", func(t *testing.T) { + cases := []struct { + val string + exp string + }{ + { + val: "@metadata.some", + exp: "@metadata.some", + }, + { + val: "@metadata.some.nested", + exp: "@metadata.some", + }, + { + val: "some.nested.key", + exp: "some", + }, + { + val: "key", + exp: "key", + }, + } + + for _, tc := range cases { + e := NewEventEditor(event) + t.Run(tc.val, func(t *testing.T) { + require.Equal(t, tc.exp, e.rootKey(tc.val)) + }) + } + }) + + t.Run("metadata", func(t *testing.T) { + t.Run("no acess to @metadata key", func(t *testing.T) { + editor := NewEventEditor(event) + metadata, err := editor.GetValue("@metadata") + require.Nil(t, metadata) + require.Error(t, err) + require.ErrorIs(t, err, ErrMetadataAccess) + }) + + t.Run("non-existing key", func(t *testing.T) { + editor := NewEventEditor(event) + val, err := editor.GetValue("@metadata.none") + require.Nil(t, val) + require.Error(t, err) + require.ErrorIs(t, err, mapstr.ErrKeyNotFound) + }) + + t.Run("gets a value type", func(t *testing.T) { + editor := NewEventEditor(event) + requireEqualMapValues(t, editor, map[string]interface{}{ + "@metadata.metaLevel0Value": "metavalue1", + }) + }) + + t.Run("gets a nested map", func(t *testing.T) { + editor := NewEventEditor(event) + nested, err := editor.GetValue("@metadata.metaLevel0Map") + require.NoError(t, err) + requireClonedMaps(t, level1MetadataMap, nested) + }) + + t.Run("gets a deeper nested map", func(t *testing.T) { + editor := NewEventEditor(event) + + nested, err := editor.GetValue("@metadata.metaLevel0Map.metaLevel1Map") + require.NoError(t, err) + requireClonedMaps(t, level2MetadataMap, nested) + + // the higher level map should be also cloned by accessing the inner map + higher, err := editor.GetValue("@metadata.metaLevel0Map") + require.NoError(t, err) + requireClonedMaps(t, level1MetadataMap, higher) + + // the nested map we got previously should be a part of this higher level map + require.IsType(t, mapstr.M{}, higher) + higherMap := higher.(mapstr.M) + nested2, err := higherMap.GetValue("metaLevel1Map") + require.NoError(t, err) + requireSameMap(t, nested, nested2) + }) + }) + + t.Run("fields", func(t *testing.T) { + t.Run("non-existing key", func(t *testing.T) { + editor := NewEventEditor(event) + val, err := editor.GetValue("none") + require.Nil(t, val) + require.Error(t, err) + require.ErrorIs(t, err, mapstr.ErrKeyNotFound) + }) + + t.Run("gets a value type", func(t *testing.T) { + editor := NewEventEditor(event) + requireEqualMapValues(t, editor, map[string]interface{}{ + "fieldsLevel0Value": "fieldsvalue1", + }) + }) + + t.Run("gets a nested map", func(t *testing.T) { + editor := NewEventEditor(event) + nested, err := editor.GetValue("fieldsLevel0Map") + require.NoError(t, err) + requireClonedMaps(t, level1FieldsMap, nested) + }) + + t.Run("gets a deeper nested map", func(t *testing.T) { + editor := NewEventEditor(event) + + nested, err := editor.GetValue("fieldsLevel0Map.fieldsLevel1Map") + require.NoError(t, err) + requireClonedMaps(t, level2FieldsMap, nested) + + // the higher level map should be also cloned by accessing the inner map + higher, err := editor.GetValue("fieldsLevel0Map") + require.NoError(t, err) + requireClonedMaps(t, level1FieldsMap, higher) + + // the nested map we got previously should be a part of this higher level map + require.IsType(t, mapstr.M{}, higher) + higherMap := higher.(mapstr.M) + nested2, err := higherMap.GetValue("fieldsLevel1Map") + require.NoError(t, err) + requireSameMap(t, nested, nested2) + }) + }) +} + +func requireClonedMaps(t *testing.T, expected, actual interface{}) { + t.Helper() + requireNotSameMap(t, expected, actual) + require.IsType(t, mapstr.M{}, expected) + require.IsType(t, mapstr.M{}, actual) + + expectedMap := expected.(mapstr.M) + expectedJSON := expectedMap.String() + + actualMap := actual.(mapstr.M) + actualJSON := actualMap.String() + + require.JSONEq(t, expectedJSON, actualJSON) +} + +func requireSameMap(t *testing.T, expected, actual interface{}) { + expectedAddr := fmt.Sprintf("%p", expected) + actualAddr := fmt.Sprintf("%p", actual) + require.Equalf(t, expectedAddr, actualAddr, "should reference the same map: %s != %s", expectedAddr, actualAddr) +} + +func requireNotSameMap(t *testing.T, expected, actual interface{}) { + expectedAddr := fmt.Sprintf("%p", expected) + actualAddr := fmt.Sprintf("%p", actual) + require.NotEqualf(t, expectedAddr, actualAddr, "should reference different maps: %s == %s", expectedAddr, actualAddr) +} + +func requireEqualMapValues(t *testing.T, e EventAccessor, expected map[string]interface{}) { + t.Helper() + for key := range expected { + val, err := e.GetValue(key) + require.NoError(t, err) + require.Equal(t, expected[key], val) + } +} diff --git a/libbeat/processors/processor_test.go b/libbeat/processors/processor_test.go index 41ed628fbfb9..644b27d16af4 100644 --- a/libbeat/processors/processor_test.go +++ b/libbeat/processors/processor_test.go @@ -18,13 +18,16 @@ package processors_test import ( + "fmt" "testing" "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/elastic/beats/v7/libbeat/beat" "github.com/elastic/beats/v7/libbeat/processors" + "github.com/elastic/beats/v7/libbeat/processors/actions" _ "github.com/elastic/beats/v7/libbeat/processors/actions" _ "github.com/elastic/beats/v7/libbeat/processors/add_cloud_metadata" conf "github.com/elastic/elastic-agent-libs/config" @@ -566,3 +569,145 @@ func TestDropMissingFields(t *testing.T) { assert.Equal(t, expectedEvent, processedEvent.Fields) } + +func TestProcessorsRun(t *testing.T) { + cfg := []map[string]interface{}{ + { + "drop_fields": mapstr.M{ + "fields": []string{"proc.drop"}, + }, + }, + { + "add_fields": mapstr.M{ + "target": "added", + "fields": mapstr.M{ + "field": "value", + }, + }, + }, + } + processors := GetProcessors(t, cfg) + + event := &beat.Event{ + Timestamp: time.Now(), + Fields: mapstr.M{ + "beat": mapstr.M{ + "hostname": "mar", + "name": "my-shipper-1", + }, + + "proc": mapstr.M{ + "drop": mapstr.M{ + "start_time": "Jan14", + "system": 26027, + "total": 79390, + "total_p": 0, + "user": 53363, + }, + "cmdline": "/sbin/launchd", + }, + "mem": mapstr.M{ + "rss": 11194368, + "rss_p": 0, + "share": 0, + "size": int64(2555572224), + }, + "type": "process", + }, + } + + var ( + processed *beat.Event + err error + ) + + t.Run("processed and not dropped", func(t *testing.T) { + processed, err = processors.Run(event) + require.NoError(t, err) + require.NotNil(t, processed) + }) + + t.Run("field was added", func(t *testing.T) { + added, err := processed.GetValue("added.field") + require.NoError(t, err) + require.Equal(t, "value", added) + }) + + t.Run("field was not added to original event", func(t *testing.T) { + _, err = event.GetValue("added.field") + require.Error(t, err) + require.ErrorIs(t, err, mapstr.ErrKeyNotFound) + }) + + t.Run("field was dropped", func(t *testing.T) { + _, err = processed.GetValue("proc.drop") + require.Error(t, err) + require.ErrorIs(t, err, mapstr.ErrKeyNotFound) + }) + + t.Run("field was not dropped from original event", func(t *testing.T) { + value, err := processed.GetValue("proc.drop.system") + require.NoError(t, err) + require.Equal(t, 26027, value) + }) +} + +const ( + fieldCount = 10000 + nestingLevel = 3 +) + +func BenchmarkProcessorsRun(b *testing.B) { + processors := processors.NewList(nil) + key1 := "added.field" + proc1 := actions.NewAddFields(mapstr.M{key1: "value"}, true, true) + processors.AddProcessor(proc1) + key2 := "field-0.field-0" + proc2 := actions.NewAddFields(mapstr.M{key2: "value"}, true, true) + processors.AddProcessor(proc2) + + event := &beat.Event{ + Timestamp: time.Now(), + Meta: mapstr.M{}, + Fields: mapstr.M{}, + } + + generateFields(b, event.Meta, 100, 2) + generateFields(b, event.Fields, 100, 2) + + var ( + processed *beat.Event + err error + ) + + b.Run("processors.Run", func(b *testing.B) { + for i := 0; i < b.N; i++ { + processed, err = processors.Run(event) + require.NoError(b, err) + require.NotNil(b, processed) + } + }) + + added, err := processed.GetValue(key1) + require.NoError(b, err) + require.Equal(b, "value", added) + + added, err = processed.GetValue(key2) + require.NoError(b, err) + require.Equal(b, "value", added) + +} + +func generateFields(t require.TestingT, m mapstr.M, count, nesting int) { + for i := 0; i < count; i++ { + var err error + if nesting == 0 { + _, err = m.Put(fmt.Sprintf("field-%d", i), i) + } else { + nested := mapstr.M{} + generateFields(t, nested, count, nesting-1) + _, err = m.Put(fmt.Sprintf("field-%d", i), nested) + } + require.NoError(t, err) + } +}