From f9e021c37065c74cbe241df4a47c63faffcb9733 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 20 Sep 2023 07:41:49 +0930 Subject: [PATCH] address pr comments --- libbeat/processors/cache/cache.go | 48 ++-- libbeat/processors/cache/cache_test.go | 285 ++++++++++++++++++++- libbeat/processors/cache/mem_store.go | 56 ++-- libbeat/processors/cache/mem_store_test.go | 77 +++++- 4 files changed, 406 insertions(+), 60 deletions(-) diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index 58b4a5f028c4..79b6ecdf9e1a 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -127,36 +127,30 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { } } +// noop is a no-op context.CancelFunc. +func noop() {} + func getMemStore(stores map[string]*memStore, id string, cfg config) (*memStore, context.CancelFunc) { s, ok := stores[id] - if ok { - // We may have already constructed the store with - // a get or a delete config, so set the TTL, cap - // and effort if we have a put config. If another - // put config has already been included, we ignore - // the put options now. - s.setPutOptions(cfg) - return s, noop + if !ok { + s = newMemStore(cfg, id) + stores[s.id] = s } - s = newMemStore(cfg) - stores[id] = s + + // We may have already constructed the store with + // a get or a delete config, so set the TTL, cap + // and effort if we have a put config. If another + // put config has already been included, we ignore + // the put options now. + s.setPutOptions(cfg) + return s, func() { - // TODO: Consider making this reference counted. - // Currently, what we have is essentially an - // ownership model, where the put operation is - // owner. This could conceivably be problematic - // if a processor were shared between different - // inputs and the put is closed due to a config - // change. storeMu.Lock() - delete(stores, id) + s.close(stores) storeMu.Unlock() } } -// noop is a no-op context.CancelFunc. -func noop() {} - // Run enriches the given event with the host metadata. func (p *cache) Run(event *beat.Event) (*beat.Event, error) { switch { @@ -260,12 +254,16 @@ func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) { if m, ok := meta.(map[string]interface{}); ok { meta = mapstr.M(m) } - // ... and write it into the cloned event. - result = event.Clone() - if _, err = result.PutValue(dst, meta); err != nil { + // ... and write it into the event. + // The implementation of PutValue currently leaves event + // essentially unchanged in the case of an error (in the + // case of an @metadata field there may be a mutation, + // but at most this will be the addition of a Meta field + // value to event). None of this is documented. + if _, err = event.PutValue(dst, meta); err != nil { return nil, err } - return result, nil + return event, nil } // deleteFor deletes the configured value from the cache based on the value of diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go index e29363a924b4..6fe5847c01fa 100644 --- a/libbeat/processors/cache/cache_test.go +++ b/libbeat/processors/cache/cache_test.go @@ -19,7 +19,6 @@ package cache import ( "errors" - "io" "testing" "github.com/google/go-cmp/cmp" @@ -297,6 +296,263 @@ var cacheTests = []struct { }, }, }, + { + name: "put_and_get_value_with_get_error_no_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite_but_get_error", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new.child", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: expected map but type is string"), + }, + }, + }, } type testConfig struct { @@ -309,7 +565,7 @@ func TestCache(t *testing.T) { for _, test := range cacheTests { t.Run(test.name, func(t *testing.T) { var processors []beat.Processor - for _, cfg := range test.configs { + for i, cfg := range test.configs { config, err := conf.NewConfigFrom(cfg.cfg) if err != nil { t.Fatal(err) @@ -322,7 +578,20 @@ func TestCache(t *testing.T) { if err != nil { return } + t.Log(p) + c, ok := p.(*cache) + if !ok { + t.Fatalf("processor %d is not an *cache", i) + } + + defer func() { + err := c.Close() + if err != nil { + t.Errorf("unexpected error from c.Close(): %v", err) + } + }() + processors = append(processors, p) } @@ -351,18 +620,6 @@ func TestCache(t *testing.T) { } } } - - for i, p := range processors { - p, ok := p.(io.Closer) - if !ok { - t.Errorf("processor %d is not an io.Closer", i) - continue - } - err := p.Close() - if err != nil { - t.Errorf("unexpected error from p.Close(): %v", err) - } - } }) } } diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 1c832699bd79..2754f13fd0ee 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -29,6 +29,10 @@ type memStore struct { cache map[string]*CacheEntry expiries expiryHeap ttl time.Duration // ttl is the time entries are valid for in the cache. + refs int // refs is the number of processors referring to this store. + + // id is the index into global cache store for the cache. + id string // cap is the maximum number of elements the cache // will hold. If not positive, no limit. @@ -39,37 +43,33 @@ type memStore struct { } // newMemStore returns a new memStore configured to apply the give TTL duration. -// The memStore is guaranteed not to grow larger than cap elements. -func newMemStore(cfg config) *memStore { - // Mark the ttl as invalid until we have had a put operation - // configured. - ttl := time.Duration(-1) - cap := -1 - effort := -1 - if cfg.Put != nil { - // putConfig.TTL is a required field, so we don't - // need to check for nil-ness. - ttl = *cfg.Put.TTL - cap = cfg.Store.Capacity - effort = cfg.Store.Effort - } +// The memStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the memStore is held in. +func newMemStore(cfg config, id string) *memStore { return &memStore{ - cache: make(map[string]*CacheEntry), - ttl: ttl, - cap: cap, - effort: effort, + id: id, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put operation + // configured. + ttl: -1, + cap: -1, + effort: -1, } } // setPutOptions allows concurrency-safe updating of the put options. While the shared // backing data store is incomplete, and has no put operation defined, the TTL // will be invalid, but will never be accessed since all time operations outside -// put refer to absolute times. +// put refer to absolute times. setPutOptions also increases the reference count +// for the memStore for all operation types. func (c *memStore) setPutOptions(cfg config) { + c.mu.Lock() + defer c.mu.Unlock() + c.refs++ if cfg.Put == nil { return } - c.mu.Lock() if c.ttl == -1 { // putConfig.TTL is a required field, so we don't // need to check for nil-ness. @@ -77,6 +77,22 @@ func (c *memStore) setPutOptions(cfg config) { c.cap = cfg.Store.Capacity c.effort = cfg.Store.Effort } +} + +// close decreases the reference count for the memStore and removes it from the +// stores map if the count is zero. +func (c *memStore) close(stores map[string]*memStore) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + delete(stores, c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } c.mu.Unlock() } diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 7e0cde8b2dca..e11c10f493e8 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -50,7 +50,9 @@ var memStoreTests = []struct { }, }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{}, + refs: 1, ttl: time.Second, cap: 1000, effort: 10, @@ -67,7 +69,9 @@ var memStoreTests = []struct { Get: &getConfig{}, }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{}, + refs: 1, // TTL, capacity and effort are set only by put. ttl: -1, cap: -1, @@ -85,7 +89,9 @@ var memStoreTests = []struct { Delete: &delConfig{}, }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{}, + refs: 1, // TTL, capacity and effort are set only by put. ttl: -1, cap: -1, @@ -103,8 +109,10 @@ var memStoreTests = []struct { Get: &getConfig{}, }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{}, // TTL, capacity and effort are set only by put. + refs: 1, ttl: -1, cap: -1, effort: -1, @@ -126,7 +134,9 @@ var memStoreTests = []struct { return nil }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{}, + refs: 2, ttl: time.Second, cap: 1000, effort: 10, @@ -145,7 +155,9 @@ var memStoreTests = []struct { Get: &getConfig{}, }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{}, + refs: 1, // TTL, capacity and effort are set only by put. ttl: -1, cap: -1, @@ -168,7 +180,9 @@ var memStoreTests = []struct { return nil }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{}, + refs: 2, ttl: time.Second, cap: 1000, effort: 10, @@ -182,6 +196,7 @@ var memStoreTests = []struct { return nil }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{ "one": {key: "one", value: int(1), index: 0}, "two": {key: "two", value: int(2), index: 1}, @@ -192,6 +207,7 @@ var memStoreTests = []struct { {key: "two", value: int(2), index: 1}, {key: "three", value: int(3), index: 2}, }, + refs: 2, ttl: time.Second, cap: 1000, effort: 10, @@ -206,6 +222,7 @@ var memStoreTests = []struct { return err }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{ "one": {key: "one", value: int(1), index: 0}, "two": {key: "two", value: int(2), index: 1}, @@ -216,6 +233,7 @@ var memStoreTests = []struct { {key: "two", value: int(2), index: 1}, {key: "three", value: int(3), index: 2}, }, + refs: 2, ttl: time.Second, cap: 1000, effort: 10, @@ -226,6 +244,7 @@ var memStoreTests = []struct { return s.Delete("two") }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{ "one": {key: "one", value: int(1), index: 0}, "three": {key: "three", value: int(3), index: 1}, @@ -234,6 +253,7 @@ var memStoreTests = []struct { {key: "one", value: int(1), index: 0}, {key: "three", value: int(3), index: 1}, }, + refs: 2, ttl: time.Second, cap: 1000, effort: 10, @@ -248,6 +268,7 @@ var memStoreTests = []struct { return nil }, want: &memStore{ + id: "test", cache: map[string]*CacheEntry{ "one": {key: "one", value: int(1), index: 0}, "three": {key: "three", value: int(3), index: 1}, @@ -256,11 +277,58 @@ var memStoreTests = []struct { {key: "one", value: int(1), index: 0}, {key: "three", value: int(3), index: 1}, }, + refs: 2, ttl: time.Second, cap: 1000, effort: 10, }, }, + 5: { + doTo: func(s *memStore) error { + storeMu.Lock() + s.close(memStores) + defer storeMu.Unlock() + if _, ok := memStores[s.id]; !ok { + return fmt.Errorf("%q memStore not found after single close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "three": {key: "three", value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "three", value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 6: { + doTo: func(s *memStore) error { + storeMu.Lock() + s.close(memStores) + defer storeMu.Unlock() + if _, ok := memStores[s.id]; ok { + return fmt.Errorf("%q memStore still found after double close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, }, }, } @@ -272,7 +340,14 @@ func TestMemStore(t *testing.T) { for _, test := range memStoreTests { t.Run(test.name, func(t *testing.T) { - store := newMemStore(test.cfg) + // Construct the store and put in into the stores map as + // we would if we were calling Run. + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID) + store.setPutOptions(test.cfg) + storeMu.Lock() + memStores[store.id] = store + storeMu.Unlock() + if !cmp.Equal(test.want, store, allow, ignoreInMemStore) { t.Errorf("unexpected new memStore result:\n--- want\n+++ got\n%s", cmp.Diff(test.want, store, allow, ignoreInMemStore))