From 9e503b4a8cca23cb0b2428e3b49fb7be763b3ebc Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Thu, 28 Sep 2023 14:00:35 +0930 Subject: [PATCH] libbeat/processors/cache: add file-backed cache (#36686) This adds a file-backed cache implementation to the cache processor. Caching between put and get operations is done in-memory using the memory cache, but the file cache will load previously written cache state on start-up and will write cache contents to file when the cache is dropped. Depending on user configuration, the file cache will also periodically write the cache state to the backing file to reduce state loss in the event of a crash. For simplicity, the cache state is stored as a JSON stream of objects with fields for the key, value and expiry timestamp of cached entities. --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/processors/cache/cache.go | 33 +- libbeat/processors/cache/cache_test.go | 24 +- libbeat/processors/cache/config.go | 13 +- libbeat/processors/cache/config_test.go | 36 ++ libbeat/processors/cache/docs/cache.asciidoc | 5 +- libbeat/processors/cache/file_store.go | 294 ++++++++++++ libbeat/processors/cache/file_store_test.go | 455 +++++++++++++++++++ libbeat/processors/cache/mem_store.go | 34 +- libbeat/processors/cache/mem_store_test.go | 72 +-- 10 files changed, 877 insertions(+), 90 deletions(-) create mode 100644 libbeat/processors/cache/file_store.go create mode 100644 libbeat/processors/cache/file_store_test.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index d0f4c40ef3b..ca46d018247 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -173,6 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] - Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] +- Add file-backed cache for cache enrichment processor. {pull}36686[36686] ==== Deprecated diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index eea3ed115ea..a7ce4876f50 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "os" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -29,6 +30,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) const name = "cache" @@ -64,15 +66,15 @@ func New(cfg *conf.C) (beat.Processor, error) { if err != nil { return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) } - src, cancel, err := getStoreFor(config) - if err != nil { - return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) - } - // Logging (each processor instance has a unique ID). id := int(instanceID.Inc()) log := logp.NewLogger(name).With("instance_id", id) + src, cancel, err := getStoreFor(config, log) + if err != nil { + return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) + } + p := &cache{ config: config, store: src, @@ -87,16 +89,18 @@ func New(cfg *conf.C) (beat.Processor, error) { // and a context cancellation that releases the cache resource when it // is no longer required. The cancellation should be called when the // processor is closed. -func getStoreFor(cfg config) (Store, context.CancelFunc, error) { +func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) { switch { case cfg.Store.Memory != nil: s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) return s, cancel, nil case cfg.Store.File != nil: - logp.L().Warn("using memory store when file is configured") - // TODO: Replace place-holder code with a file-store. - s, cancel := fileStores.get(cfg.Store.File.ID, cfg) + err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700) + if err != nil { + return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err) + } + s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log) return s, cancel, nil default: @@ -105,11 +109,6 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { } } -var ( - memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"} - fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock. -) - // noop is a no-op context.CancelFunc. func noop() {} @@ -126,9 +125,9 @@ type Store interface { } type CacheEntry struct { - key string - value any - expires time.Time + Key string `json:"key"` + Value any `json:"val"` + Expires time.Time `json:"expires"` index int } diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go index 6fe5847c01f..8acd22d74d7 100644 --- a/libbeat/processors/cache/cache_test.go +++ b/libbeat/processors/cache/cache_test.go @@ -130,7 +130,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -191,7 +191,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -210,7 +210,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -271,7 +271,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -290,7 +290,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -351,7 +351,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -379,7 +379,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), }, @@ -441,7 +441,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -465,7 +465,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -527,7 +527,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -547,7 +547,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: expected map but type is string"), }, @@ -613,7 +613,7 @@ func TestCache(t *testing.T) { switch got := p.(*cache).store.(type) { case *memStore: allow := cmp.AllowUnexported(CacheEntry{}) - ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index") + ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) { t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore)) } diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go index 06dacc5c6ca..36eeb423fc2 100644 --- a/libbeat/processors/cache/config.go +++ b/libbeat/processors/cache/config.go @@ -88,8 +88,8 @@ func defaultConfig() config { } type storeConfig struct { - Memory *id `config:"memory"` - File *id `config:"file"` + Memory *memConfig `config:"memory"` + File *fileConfig `config:"file"` // Capacity and Effort are currently experimental // and not in public-facing documentation. @@ -97,8 +97,13 @@ type storeConfig struct { Effort int `config:"eviction_effort"` } -type id struct { - ID string `config:"id"` +type memConfig struct { + ID string `config:"id" validate:"required"` +} + +type fileConfig struct { + ID string `config:"id" validate:"required"` + WriteOutEvery time.Duration `config:"write_interval"` } func (cfg *storeConfig) Validate() error { diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go index 6e8b01d930f..4a956caef02 100644 --- a/libbeat/processors/cache/config_test.go +++ b/libbeat/processors/cache/config_test.go @@ -39,6 +39,20 @@ put: ttl: 168h key_field: crowdstrike.aid value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_file_with_periodic_write_out", + cfg: ` +backend: + file: + id: aidmaster + write_interval: 15m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata `, want: nil, }, @@ -78,6 +92,28 @@ delete: `, want: nil, }, + { + name: "memory_no_id", + cfg: ` +backend: + memory: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.memory.id'"), + }, + { + name: "file_no_id", + cfg: ` +backend: + file: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.file.id'"), + }, { name: "no_op", cfg: ` diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc index 94f5d07cece..3e17d37e213 100644 --- a/libbeat/processors/cache/docs/cache.asciidoc +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -51,8 +51,9 @@ It has the following settings: One of `backend.memory.id` or `backend.file.id` must be provided. -`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache. -`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache. +`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache. +`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache. +`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes. One of `put`, `get` or `delete` must be provided. diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go new file mode 100644 index 00000000000..45ed9d84068 --- /dev/null +++ b/libbeat/processors/cache/file_store.go @@ -0,0 +1,294 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "encoding/json" + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" +) + +var fileStores = fileStoreSet{stores: map[string]*fileStore{}} + +// fileStoreSet is a collection of shared fileStore caches. +type fileStoreSet struct { + mu sync.Mutex + stores map[string]*fileStore +} + +// get returns a fileStore cache with the provided ID based on the config. +// If a fileStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the fileStore from the set if the +// count reaches zero. +func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newFileStore(cfg, id, pathFromConfig(cfg, log), log) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// pathFromConfig returns the mapping form a config to a file-system path. +func pathFromConfig(cfg config, log *logp.Logger) string { + path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID)) + log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path) + return path +} + +// cleanFilename replaces illegal printable characters (and space or dot) in +// filenames, with underscore. +func cleanFilename(s string) string { + return pathCleaner.Replace(s) +} + +var pathCleaner = strings.NewReplacer( + "/", "_", + "<", "_", + ">", "_", + ":", "_", + `"`, "_", + "/", "_", + `\`, "_", + "|", "_", + "?", "_", + "*", "_", + ".", "_", + " ", "_", +) + +// free removes the fileStore with the given ID from the set. free is safe +// for concurrent use. +func (s *fileStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// fileStore is a file-backed cache store. +type fileStore struct { + memStore + + path string + // cancel stops periodic write out operations. + // Write out operations are protected by the + // memStore's mutex. + cancel context.CancelFunc + + log *logp.Logger +} + +// newFileStore returns a new fileStore configured to apply the give TTL duration. +// The fileStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the fileStore is held in. +func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore { + s := fileStore{ + path: path, + log: log, + memStore: memStore{ + id: id, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + }, + } + s.cancel = noop + if cfg.Store.File.WriteOutEvery > 0 { + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + go s.periodicWriteOut(ctx, cfg.Store.File.WriteOutEvery) + } + s.readState() + return &s +} + +func (c *fileStore) String() string { return "file:" + c.id } + +// dropFrom decreases the reference count for the fileStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *fileStore) dropFrom(stores *fileStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + // Stop periodic writes + c.cancel() + // and do a final write out. + c.writeState(true) + + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +func (c *fileStore) readState() { + f, err := os.Open(c.path) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + c.log.Debugw("no state on file system", "error", err) + } else { + c.log.Errorw("failed to open file to read state", "error", err) + } + return + } + defer f.Close() + + // It would be nice to be able at this stage to determine + // whether the file is stale past the TTL of the cache, but + // we do not have this information yet. So we must read + // through all the elements. If any survive the filter, we + // were alive, otherwise delete the file. + + dec := json.NewDecoder(f) + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + switch err := err.(type) { + case *json.SyntaxError: + c.log.Errorw("failed to read state element", "error", err, "path", c.path, "offset", err.Offset) + default: + c.log.Errorw("failed to read state element", "error", err, "path", c.path) + } + } + break + } + if e.Expires.Before(time.Now()) { + // Don't retain expired elements. + continue + } + c.cache[e.Key] = &e + heap.Push(&c.expiries, &e) + } + + if len(c.cache) != 0 { + return + } + // We had no live entries, so delete the file. + err = os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete stale cache file", "error", err) + } +} + +// periodicWriteOut writes the cache contents to the backing file at the +// specified interval until the context is cancelled. periodicWriteOut is +// safe for concurrent use. +func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) { + tick := time.NewTicker(every) + defer tick.Stop() + for { + select { + case <-tick.C: + c.mu.Lock() + c.writeState(false) + c.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +// writeState writes the current cache state to the backing file. +// If final is true and the cache is empty, the file will be deleted. +func (c *fileStore) writeState(final bool) { + if len(c.cache) == 0 && final { + err := os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete write state when empty", "error", err) + } + return + } + f, err := os.CreateTemp(filepath.Dir(c.path), filepath.Base(c.path)+"-*.tmp") + if err != nil { + c.log.Errorw("failed to open file to write state", "error", err) + return + } + // Try to make sure we are private. + err = os.Chmod(f.Name(), 0o600) + if err != nil { + c.log.Errorw("failed to set state file mode", "error", err) + return + } + tmp := f.Name() + defer func() { + err = f.Sync() + if err != nil { + c.log.Errorw("failed to sync file after writing state", "error", err) + return + } + err = f.Close() + if err != nil { + c.log.Errorw("failed to close file after writing state", "error", err) + return + } + // Try to be atomic. + err = os.Rename(tmp, c.path) + if err != nil { + c.log.Errorw("failed to finalize writing state", "error", err) + } + }() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + now := time.Now() + for c.expiries.Len() != 0 { + e := c.expiries.pop() + if e.Expires.Before(now) { + // Don't write expired elements. + continue + } + err = enc.Encode(e) + if err != nil { + c.log.Errorw("failed to write state element", "error", err) + return + } + } +} diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go new file mode 100644 index 00000000000..557a7eca9a5 --- /dev/null +++ b/libbeat/processors/cache/file_store_test.go @@ -0,0 +1,455 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/elastic-agent-libs/logp" +) + +var keep = flag.Bool("keep", false, "keep testdata after test complete") + +type fileStoreTestSteps struct { + doTo func(*fileStore) error + want *fileStore +} + +//nolint:errcheck // Paul Hogan was right. +var fileStoreTests = []struct { + name string + cfg config + want *fileStore + steps []fileStoreTestSteps + wantPersisted []*CacheEntry +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &fileStore{path: "testdata/new_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &fileStore{path: "testdata/new_delete", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 1: { + doTo: func(s *fileStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 2: { + doTo: func(s *fileStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 3: { + doTo: func(s *fileStore) error { + return s.Delete("two") + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 4: { + doTo: func(s *fileStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 5: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if !fileStores.has(s.id) { + return fmt.Errorf("%q fileStore not found after single close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 6: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if fileStores.has(s.id) { + return fmt.Errorf("%q fileStore still found after double close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + wantPersisted: []*CacheEntry{ + // Numeric values are float due to JSON round-trip. + {Key: "one", Value: 1.0}, + {Key: "three", Value: 3.0}, + }, + }, +} + +func TestFileStore(t *testing.T) { + err := os.RemoveAll("testdata") + if err != nil && !errors.Is(err, fs.ErrNotExist) { + t.Fatalf("failed to clear testdata directory: %v", err) + } + err = os.Mkdir("testdata", 0o755) + if err != nil && !errors.Is(err, fs.ErrExist) { + t.Fatalf("failed to create testdata directory: %v", err) + } + if !*keep { + t.Cleanup(func() { os.RemoveAll("testdata") }) + } + + allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{}) + ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "cancel", "log") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") + + for _, test := range fileStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + path := filepath.Join("testdata", test.name) + store := newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + store.add(test.cfg) + fileStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInFileStore, ignoreInMemStore) { + t.Errorf("unexpected new fileStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInFileStore, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected fileStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry)) + } + } + if test.wantPersisted == nil { + return + } + + f, err := os.Open(path) + if err != nil { + t.Fatalf("failed to open persisted data: %v", err) + } + defer f.Close() + dec := json.NewDecoder(f) + var got []*CacheEntry + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + t.Fatalf("unexpected error reading persisted cache data: %v", err) + } + break + } + got = append(got, &e) + } + if !cmp.Equal(test.wantPersisted, got, allow, ignoreInCacheEntry) { + t.Errorf("unexpected persisted state:\n--- want\n+++ got\n%s", + cmp.Diff(test.wantPersisted, got, allow, ignoreInCacheEntry)) + } + wantCache := make(map[string]*CacheEntry) + for _, e := range got { + wantCache[e.Key] = e + } + store = newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + // Specialise the in cache entry ignore list to include index. + ignoreMoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") + if !cmp.Equal(wantCache, store.cache, allow, ignoreMoreInCacheEntry) { + t.Errorf("unexpected restored state:\n--- want\n+++ got\n%s", + cmp.Diff(wantCache, store.cache, allow, ignoreMoreInCacheEntry)) + } + for k, e := range store.cache { + if e.index < 0 || len(store.expiries) <= e.index { + t.Errorf("cache entry %s index out of bounds: got:%d [0,%d)", k, e.index, len(store.expiries)) + continue + } + if !cmp.Equal(e, store.expiries[e.index], allow, ignoreInCacheEntry) { + t.Errorf("unexpected mismatched cache/expiry state %s:\n--- want\n+++ got\n%s", + k, cmp.Diff(e, store.expiries[e.index], allow, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set. It is used only for testing. +func (s *fileStoreSet) add(store *fileStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set. It is used only for testing. +func (s *fileStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 900e91c8e60..774a019d783 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -24,24 +24,25 @@ import ( "time" ) +var memStores = memStoreSet{stores: map[string]*memStore{}} + // memStoreSet is a collection of shared memStore caches. type memStoreSet struct { mu sync.Mutex stores map[string]*memStore - typ string // TODO: Remove when a file-backed store exists. } // get returns a memStore cache with the provided ID based on the config. // If a memStore with the ID already exist, its configuration is adjusted // and its reference count is increased. The returned context.CancelFunc -// reduces the reference count and deletes the memStore from set if the +// reduces the reference count and deletes the memStore from the set if the // count reaches zero. func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { s.mu.Lock() defer s.mu.Unlock() store, ok := s.stores[id] if !ok { - store = newMemStore(cfg, id, s.typ) + store = newMemStore(cfg, id) s.stores[store.id] = store } store.add(cfg) @@ -69,10 +70,6 @@ type memStore struct { // id is the index into global cache store for the cache. id string - // typ is a temporary tag to differentiate memory stores - // from the mocked file store. - // TODO: Remove when a file-backed store exists. - typ string // cap is the maximum number of elements the cache // will hold. If not positive, no limit. @@ -85,10 +82,9 @@ type memStore struct { // newMemStore returns a new memStore configured to apply the give TTL duration. // The memStore is guaranteed not to grow larger than cap elements. id is the // look-up into the global cache store the memStore is held in. -func newMemStore(cfg config, id, typ string) *memStore { +func newMemStore(cfg config, id string) *memStore { return &memStore{ id: id, - typ: typ, cache: make(map[string]*CacheEntry), // Mark the ttl as invalid until we have had a put @@ -103,7 +99,7 @@ func newMemStore(cfg config, id, typ string) *memStore { } } -func (c *memStore) String() string { return c.typ + ":" + c.id } +func (c *memStore) String() string { return "memory:" + c.id } // add updates the receiver for a new operation. It increases the reference // count for the receiver, and if the config is a put operation and has no @@ -158,11 +154,11 @@ func (c *memStore) Get(key string) (any, error) { if !ok { return nil, ErrNoData } - if time.Now().After(v.expires) { + if time.Now().After(v.Expires) { delete(c.cache, key) return nil, ErrNoData } - return v.value, nil + return v.Value, nil } // Put stores the provided value in the cache associated with the given key. @@ -174,9 +170,9 @@ func (c *memStore) Put(key string, val any) error { now := time.Now() c.evictExpired(now) e := &CacheEntry{ - key: key, - value: val, - expires: now.Add(c.ttl), + Key: key, + Value: val, + Expires: now.Add(c.ttl), } c.cache[key] = e heap.Push(&c.expiries, e) @@ -189,11 +185,11 @@ func (c *memStore) Put(key string, val any) error { // it under the capacity limit. func (c *memStore) evictExpired(now time.Time) { for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ { - if c.expiries[0].expires.After(now) { + if c.expiries[0].Expires.After(now) { break } e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } if c.cap <= 0 { // No cap, so depend on effort. @@ -201,7 +197,7 @@ func (c *memStore) evictExpired(now time.Time) { } for len(c.cache) >= c.cap { e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } } @@ -237,7 +233,7 @@ func (h expiryHeap) Len() int { return len(h) } func (h expiryHeap) Less(i, j int) bool { - return h[i].expires.Before(h[j].expires) + return h[i].Expires.Before(h[j].Expires) } func (h expiryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 692931854b8..6be5eadb357 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -42,7 +42,7 @@ var memStoreTests = []struct { name: "new_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -63,7 +63,7 @@ var memStoreTests = []struct { name: "new_get", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -83,7 +83,7 @@ var memStoreTests = []struct { name: "new_delete", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -103,7 +103,7 @@ var memStoreTests = []struct { name: "new_get_add_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -123,7 +123,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -149,7 +149,7 @@ var memStoreTests = []struct { name: "ensemble", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -169,7 +169,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -199,14 +199,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -225,14 +225,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -247,12 +247,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -271,12 +271,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -295,12 +295,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 1, ttl: time.Second, @@ -332,14 +332,14 @@ var memStoreTests = []struct { func TestMemStore(t *testing.T) { allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) - ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ") - ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") for _, test := range memStoreTests { t.Run(test.name, func(t *testing.T) { // Construct the store and put in into the stores map as // we would if we were calling Run. - store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID) store.add(test.cfg) memStores.add(store) @@ -361,14 +361,14 @@ func TestMemStore(t *testing.T) { } } -// add adds the store to the set, it is used only for testing. +// add adds the store to the set. It is used only for testing. func (s *memStoreSet) add(store *memStore) { s.mu.Lock() s.stores[store.id] = store s.mu.Unlock() } -// has returns whether the store exists in the set, it is used only for testing. +// has returns whether the store exists in the set. It is used only for testing. func (s *memStoreSet) has(id string) bool { s.mu.Lock() _, ok := s.stores[id]