From d6fcef4a762856a4228fd4cf9de76325e9dd16fc Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Wed, 27 Sep 2023 16:07:46 +0930 Subject: [PATCH] libbeat/processors/cache: add file-backed cache --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/processors/cache/cache.go | 33 +- libbeat/processors/cache/cache_test.go | 24 +- libbeat/processors/cache/file_store.go | 229 ++++++++++ libbeat/processors/cache/file_store_test.go | 455 ++++++++++++++++++++ libbeat/processors/cache/mem_store.go | 34 +- libbeat/processors/cache/mem_store_test.go | 58 +-- 7 files changed, 757 insertions(+), 77 deletions(-) create mode 100644 libbeat/processors/cache/file_store.go create mode 100644 libbeat/processors/cache/file_store_test.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index d0f4c40ef3ba..ca46d0182476 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -173,6 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] - Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] +- Add file-backed cache for cache enrichment processor. {pull}36686[36686] ==== Deprecated diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index eea3ed115eae..a7ce4876f505 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "os" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -29,6 +30,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) const name = "cache" @@ -64,15 +66,15 @@ func New(cfg *conf.C) (beat.Processor, error) { if err != nil { return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) } - src, cancel, err := getStoreFor(config) - if err != nil { - return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) - } - // Logging (each processor instance has a unique ID). id := int(instanceID.Inc()) log := logp.NewLogger(name).With("instance_id", id) + src, cancel, err := getStoreFor(config, log) + if err != nil { + return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) + } + p := &cache{ config: config, store: src, @@ -87,16 +89,18 @@ func New(cfg *conf.C) (beat.Processor, error) { // and a context cancellation that releases the cache resource when it // is no longer required. The cancellation should be called when the // processor is closed. -func getStoreFor(cfg config) (Store, context.CancelFunc, error) { +func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) { switch { case cfg.Store.Memory != nil: s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) return s, cancel, nil case cfg.Store.File != nil: - logp.L().Warn("using memory store when file is configured") - // TODO: Replace place-holder code with a file-store. - s, cancel := fileStores.get(cfg.Store.File.ID, cfg) + err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700) + if err != nil { + return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err) + } + s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log) return s, cancel, nil default: @@ -105,11 +109,6 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { } } -var ( - memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"} - fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock. -) - // noop is a no-op context.CancelFunc. func noop() {} @@ -126,9 +125,9 @@ type Store interface { } type CacheEntry struct { - key string - value any - expires time.Time + Key string `json:"key"` + Value any `json:"val"` + Expires time.Time `json:"expires"` index int } diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go index 6fe5847c01fa..8acd22d74d70 100644 --- a/libbeat/processors/cache/cache_test.go +++ b/libbeat/processors/cache/cache_test.go @@ -130,7 +130,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -191,7 +191,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -210,7 +210,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -271,7 +271,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -290,7 +290,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -351,7 +351,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -379,7 +379,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), }, @@ -441,7 +441,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -465,7 +465,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -527,7 +527,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -547,7 +547,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: expected map but type is string"), }, @@ -613,7 +613,7 @@ func TestCache(t *testing.T) { switch got := p.(*cache).store.(type) { case *memStore: allow := cmp.AllowUnexported(CacheEntry{}) - ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index") + ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) { t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore)) } diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go new file mode 100644 index 000000000000..870d6e028ddb --- /dev/null +++ b/libbeat/processors/cache/file_store.go @@ -0,0 +1,229 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "encoding/json" + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" +) + +// TODO: Consider having a periodic write-out (per time or per n puts) to +// reduce loss of state due to crashes. + +var fileStores = fileStoreSet{stores: map[string]*fileStore{}} + +// fileStoreSet is a collection of shared fileStore caches. +type fileStoreSet struct { + mu sync.Mutex + stores map[string]*fileStore +} + +// get returns a fileStore cache with the provided ID based on the config. +// If a fileStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the fileStore from the set if the +// count reaches zero. +func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newFileStore(cfg, id, pathFromConfig(cfg, log), log) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// pathFromConfig returns the mapping form a config to a file-system path. +func pathFromConfig(cfg config, log *logp.Logger) string { + path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cfg.Store.File.ID) + log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path) + return path +} + +// free removes the fileStore with the given ID from the set. free is safe +// for concurrent use. +func (s *fileStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// fileStore is a file-backed cache store. +type fileStore struct { + path string + memStore + log *logp.Logger +} + +// newFileStore returns a new fileStore configured to apply the give TTL duration. +// The fileStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the fileStore is held in. +func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore { + s := fileStore{ + path: path, + log: log, + memStore: memStore{ + id: id, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + }, + } + s.readState() + return &s +} + +func (c *fileStore) String() string { return "file:" + c.id } + +// dropFrom decreases the reference count for the fileStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *fileStore) dropFrom(stores *fileStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + c.writeState() + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +func (c *fileStore) readState() { + f, err := os.Open(c.path) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + c.log.Debugw("no state on file system", "error", err) + } else { + c.log.Errorw("failed to open file to read state", "error", err) + } + return + } + defer f.Close() + + // It would be nice to be able at this stage to determine + // whether the file is stale past the TTL of the cache, but + // we do not have this information yet. So we must read + // through all the elements. If any survive the filter, we + // were alive, otherwise delete the file. + + dec := json.NewDecoder(f) + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + c.log.Errorw("failed to read state element", "error", err) + } + break + } + if e.Expires.Before(time.Now()) { + // Don't retain expired elements. + continue + } + c.cache[e.Key] = &e + heap.Push(&c.expiries, &e) + } + + if len(c.cache) != 0 { + return + } + // We had no live entries, so delete the file. + err = os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete stale cache file", "error", err) + } +} + +func (c *fileStore) writeState() { + if len(c.cache) == 0 { + err := os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete write state when empty", "error", err) + } + return + } + f, err := os.CreateTemp("", "") + if err != nil { + c.log.Errorw("failed to open file to write state", "error", err) + return + } + // We are writing into tmp, so make sure we are private. + err = os.Chmod(f.Name(), 0o600) + if err != nil { + c.log.Errorw("failed to set state file mode", "error", err) + return + } + tmp := f.Name() + defer func() { + err = f.Close() + if err != nil { + c.log.Errorw("failed to close file after writing state", "error", err) + return + } + // Try to be atomic. + err = os.Rename(tmp, c.path) + if err != nil { + c.log.Errorw("failed to finalize writing state", "error", err) + } + }() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + now := time.Now() + for c.expiries.Len() != 0 { + e := c.expiries.pop() + if e.Expires.Before(now) { + // Don't write expired elements. + continue + } + err = enc.Encode(e) + if err != nil { + c.log.Errorw("failed to write state element", "error", err) + return + } + } +} diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go new file mode 100644 index 000000000000..737a56a5841e --- /dev/null +++ b/libbeat/processors/cache/file_store_test.go @@ -0,0 +1,455 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/elastic-agent-libs/logp" +) + +var keep = flag.Bool("keep", false, "keep testdata after test complete") + +type fileStoreTestSteps struct { + doTo func(*fileStore) error + want *fileStore +} + +//nolint:errcheck // Paul Hogan was right. +var fileStoreTests = []struct { + name string + cfg config + want *fileStore + steps []fileStoreTestSteps + wantPersisted []*CacheEntry +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &fileStore{path: "testdata/new_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &fileStore{path: "testdata/new_delete", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 1: { + doTo: func(s *fileStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 2: { + doTo: func(s *fileStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 3: { + doTo: func(s *fileStore) error { + return s.Delete("two") + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 4: { + doTo: func(s *fileStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 5: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if !fileStores.has(s.id) { + return fmt.Errorf("%q fileStore not found after single close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 6: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if fileStores.has(s.id) { + return fmt.Errorf("%q fileStore still found after double close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + wantPersisted: []*CacheEntry{ + // Numeric values are float due to JSON round-trip. + {Key: "one", Value: 1.0}, + {Key: "three", Value: 3.0}, + }, + }, +} + +func TestFileStore(t *testing.T) { + err := os.RemoveAll("testdata") + if err != nil && !errors.Is(err, fs.ErrNotExist) { + t.Fatalf("failed to clear testdata directory: %v", err) + } + err = os.Mkdir("testdata", 0o755) + if err != nil && !errors.Is(err, fs.ErrExist) { + t.Fatalf("failed to create testdata directory: %v", err) + } + if !*keep { + t.Cleanup(func() { os.RemoveAll("testdata") }) + } + + allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{}) + ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "log") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") + + for _, test := range fileStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + path := filepath.Join("testdata", test.name) + store := newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + store.add(test.cfg) + fileStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInFileStore, ignoreInMemStore) { + t.Errorf("unexpected new fileStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInFileStore, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected fileStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry)) + } + } + if test.wantPersisted == nil { + return + } + store = nil + f, err := os.Open(path) + if err != nil { + t.Fatalf("failed to open persisted data: %v", err) + } + defer f.Close() + dec := json.NewDecoder(f) + var got []*CacheEntry + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + t.Fatalf("unexpected error reading persisted cache data: %v", err) + } + break + } + got = append(got, &e) + } + if !cmp.Equal(test.wantPersisted, got, allow, ignoreInCacheEntry) { + t.Errorf("unexpected persisted state:\n--- want\n+++ got\n%s", + cmp.Diff(test.wantPersisted, got, allow, ignoreInCacheEntry)) + } + wantCache := make(map[string]*CacheEntry) + for _, e := range got { + wantCache[e.Key] = e + } + store = newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + // Specialise the in cache entry ignore list to include index. + ignoreMoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") + if !cmp.Equal(wantCache, store.cache, allow, ignoreMoreInCacheEntry) { + t.Errorf("unexpected restored state:\n--- want\n+++ got\n%s", + cmp.Diff(wantCache, store.cache, allow, ignoreMoreInCacheEntry)) + } + for k, e := range store.cache { + if e.index < 0 || len(store.expiries) <= e.index { + t.Errorf("cache entry %s index out of bounds: got:%d [0,%d)", k, e.index, len(store.expiries)) + continue + } + if !cmp.Equal(e, store.expiries[e.index], allow, ignoreInCacheEntry) { + t.Errorf("unexpected mismatched cache/expiry state %s:\n--- want\n+++ got\n%s", + k, cmp.Diff(e, store.expiries[e.index], allow, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set. It is used only for testing. +func (s *fileStoreSet) add(store *fileStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set. It is used only for testing. +func (s *fileStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 900e91c8e600..774a019d783d 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -24,24 +24,25 @@ import ( "time" ) +var memStores = memStoreSet{stores: map[string]*memStore{}} + // memStoreSet is a collection of shared memStore caches. type memStoreSet struct { mu sync.Mutex stores map[string]*memStore - typ string // TODO: Remove when a file-backed store exists. } // get returns a memStore cache with the provided ID based on the config. // If a memStore with the ID already exist, its configuration is adjusted // and its reference count is increased. The returned context.CancelFunc -// reduces the reference count and deletes the memStore from set if the +// reduces the reference count and deletes the memStore from the set if the // count reaches zero. func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { s.mu.Lock() defer s.mu.Unlock() store, ok := s.stores[id] if !ok { - store = newMemStore(cfg, id, s.typ) + store = newMemStore(cfg, id) s.stores[store.id] = store } store.add(cfg) @@ -69,10 +70,6 @@ type memStore struct { // id is the index into global cache store for the cache. id string - // typ is a temporary tag to differentiate memory stores - // from the mocked file store. - // TODO: Remove when a file-backed store exists. - typ string // cap is the maximum number of elements the cache // will hold. If not positive, no limit. @@ -85,10 +82,9 @@ type memStore struct { // newMemStore returns a new memStore configured to apply the give TTL duration. // The memStore is guaranteed not to grow larger than cap elements. id is the // look-up into the global cache store the memStore is held in. -func newMemStore(cfg config, id, typ string) *memStore { +func newMemStore(cfg config, id string) *memStore { return &memStore{ id: id, - typ: typ, cache: make(map[string]*CacheEntry), // Mark the ttl as invalid until we have had a put @@ -103,7 +99,7 @@ func newMemStore(cfg config, id, typ string) *memStore { } } -func (c *memStore) String() string { return c.typ + ":" + c.id } +func (c *memStore) String() string { return "memory:" + c.id } // add updates the receiver for a new operation. It increases the reference // count for the receiver, and if the config is a put operation and has no @@ -158,11 +154,11 @@ func (c *memStore) Get(key string) (any, error) { if !ok { return nil, ErrNoData } - if time.Now().After(v.expires) { + if time.Now().After(v.Expires) { delete(c.cache, key) return nil, ErrNoData } - return v.value, nil + return v.Value, nil } // Put stores the provided value in the cache associated with the given key. @@ -174,9 +170,9 @@ func (c *memStore) Put(key string, val any) error { now := time.Now() c.evictExpired(now) e := &CacheEntry{ - key: key, - value: val, - expires: now.Add(c.ttl), + Key: key, + Value: val, + Expires: now.Add(c.ttl), } c.cache[key] = e heap.Push(&c.expiries, e) @@ -189,11 +185,11 @@ func (c *memStore) Put(key string, val any) error { // it under the capacity limit. func (c *memStore) evictExpired(now time.Time) { for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ { - if c.expiries[0].expires.After(now) { + if c.expiries[0].Expires.After(now) { break } e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } if c.cap <= 0 { // No cap, so depend on effort. @@ -201,7 +197,7 @@ func (c *memStore) evictExpired(now time.Time) { } for len(c.cache) >= c.cap { e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } } @@ -237,7 +233,7 @@ func (h expiryHeap) Len() int { return len(h) } func (h expiryHeap) Less(i, j int) bool { - return h[i].expires.Before(h[j].expires) + return h[i].Expires.Before(h[j].Expires) } func (h expiryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 692931854b81..15be85c76785 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -199,14 +199,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -225,14 +225,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -247,12 +247,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -271,12 +271,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -295,12 +295,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 1, ttl: time.Second, @@ -332,14 +332,14 @@ var memStoreTests = []struct { func TestMemStore(t *testing.T) { allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) - ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ") - ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") for _, test := range memStoreTests { t.Run(test.name, func(t *testing.T) { // Construct the store and put in into the stores map as // we would if we were calling Run. - store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID) store.add(test.cfg) memStores.add(store) @@ -361,14 +361,14 @@ func TestMemStore(t *testing.T) { } } -// add adds the store to the set, it is used only for testing. +// add adds the store to the set. It is used only for testing. func (s *memStoreSet) add(store *memStore) { s.mu.Lock() s.stores[store.id] = store s.mu.Unlock() } -// has returns whether the store exists in the set, it is used only for testing. +// has returns whether the store exists in the set. It is used only for testing. func (s *memStoreSet) has(id string) bool { s.mu.Lock() _, ok := s.stores[id]