From f111cbc73ec40b728299cd457e93ae72a8d7a6b9 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Tue, 26 Sep 2023 06:14:57 +0930 Subject: [PATCH] libbeat/processors/cache: new processor (#36619) This is the initial infrastructure for a caching metadata processor to be added. It currently only supports in-memory caching, though both planned cache types are configurable with the file cache being mocked by an in-memory cache. Additional config options are added relative to the RFC, but these are intentionally not documented at this stage. --- .github/CODEOWNERS | 1 + CHANGELOG-developer.next.asciidoc | 1 + libbeat/processors/cache/cache.go | 290 +++++++++ libbeat/processors/cache/cache_test.go | 625 +++++++++++++++++++ libbeat/processors/cache/config.go | 113 ++++ libbeat/processors/cache/config_test.go | 182 ++++++ libbeat/processors/cache/docs/cache.asciidoc | 107 ++++ libbeat/processors/cache/mem_store.go | 257 ++++++++ libbeat/processors/cache/mem_store_test.go | 379 +++++++++++ 9 files changed, 1955 insertions(+) create mode 100644 libbeat/processors/cache/cache.go create mode 100644 libbeat/processors/cache/cache_test.go create mode 100644 libbeat/processors/cache/config.go create mode 100644 libbeat/processors/cache/config_test.go create mode 100644 libbeat/processors/cache/docs/cache.asciidoc create mode 100644 libbeat/processors/cache/mem_store.go create mode 100644 libbeat/processors/cache/mem_store_test.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a41ac9907501..8fe748c1bf50 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -55,6 +55,7 @@ CHANGELOG* /libbeat/ @elastic/elastic-agent-data-plane /libbeat/docs/processors-list.asciidoc @elastic/ingest-docs /libbeat/management @elastic/elastic-agent-control-plane +/libbeat/processors/cache/ @elastic/security-external-integrations /libbeat/processors/community_id/ @elastic/security-external-integrations /libbeat/processors/decode_xml/ @elastic/security-external-integrations /libbeat/processors/decode_xml_wineventlog/ @elastic/security-external-integrations diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 464afe040f82..d0f4c40ef3ba 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -172,6 +172,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158] - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] +- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] ==== Deprecated diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go new file mode 100644 index 000000000000..eea3ed115eae --- /dev/null +++ b/libbeat/processors/cache/cache.go @@ -0,0 +1,290 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/processors" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +const name = "cache" + +func init() { + // We cannot use this as a JS plugin as it is stateful and includes a Close method. + processors.RegisterPlugin(name, New) +} + +var ( + // ErrNoMatch is returned when the event doesn't contain the field + // specified in key_field. + ErrNoMatch = errors.New("field in key_field not found in the event") + + // ErrNoData is returned when metadata for an event can't be collected. + ErrNoData = errors.New("metadata not found") + + instanceID atomic.Uint32 +) + +// cache is a caching enrichment processor. +type cache struct { + config config + store Store + cancel context.CancelFunc + log *logp.Logger +} + +// Resulting processor implements `Close()` to release the cache resources. +func New(cfg *conf.C) (beat.Processor, error) { + config := defaultConfig() + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) + } + src, cancel, err := getStoreFor(config) + if err != nil { + return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) + } + + // Logging (each processor instance has a unique ID). + id := int(instanceID.Inc()) + log := logp.NewLogger(name).With("instance_id", id) + + p := &cache{ + config: config, + store: src, + cancel: cancel, + log: log, + } + p.log.Infow("initialized cache processor", "details", p) + return p, nil +} + +// getStoreFor returns a backing store for the provided configuration, +// and a context cancellation that releases the cache resource when it +// is no longer required. The cancellation should be called when the +// processor is closed. +func getStoreFor(cfg config) (Store, context.CancelFunc, error) { + switch { + case cfg.Store.Memory != nil: + s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) + return s, cancel, nil + + case cfg.Store.File != nil: + logp.L().Warn("using memory store when file is configured") + // TODO: Replace place-holder code with a file-store. + s, cancel := fileStores.get(cfg.Store.File.ID, cfg) + return s, cancel, nil + + default: + // This should have been caught by config validation. + return nil, noop, errors.New("no configured store") + } +} + +var ( + memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"} + fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock. +) + +// noop is a no-op context.CancelFunc. +func noop() {} + +// Store is the interface implemented by metadata providers. +type Store interface { + Put(key string, val any) error + Get(key string) (any, error) + Delete(key string) error + + // The string returned from the String method should + // be the backing store ID. Either "file:" or + // "memory:". + fmt.Stringer +} + +type CacheEntry struct { + key string + value any + expires time.Time + index int +} + +// Run enriches the given event with the host metadata. +func (p *cache) Run(event *beat.Event) (*beat.Event, error) { + switch { + case p.config.Put != nil: + p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put) + err := p.putFrom(event) + if err != nil { + switch { + case errors.Is(err, mapstr.ErrKeyNotFound): + if p.config.IgnoreMissing { + return event, nil + } + return event, err + } + return event, fmt.Errorf("error applying %s put processor: %w", name, err) + } + return event, nil + + case p.config.Get != nil: + p.log.Debugw("get", "backend_id", p.store, "config", p.config.Get) + result, err := p.getFor(event) + if err != nil { + switch { + case errors.Is(err, mapstr.ErrKeyNotFound): + if p.config.IgnoreMissing { + return event, nil + } + case errors.Is(err, ErrNoData): + return event, err + } + return event, fmt.Errorf("error applying %s get processor: %w", name, err) + } + if result != nil { + return result, nil + } + return event, ErrNoMatch + + case p.config.Delete != nil: + p.log.Debugw("delete", "backend_id", p.store, "config", p.config.Delete) + err := p.deleteFor(event) + if err != nil { + return event, fmt.Errorf("error applying %s delete processor: %w", name, err) + } + return event, nil + + default: + // This should never happen, but we don't need to flag it. + return event, nil + } +} + +// putFrom takes the configured value from the event and stores it in the cache +// if it exists. +func (p *cache) putFrom(event *beat.Event) error { + k, err := event.GetValue(p.config.Put.Key) + if err != nil { + return err + } + key, ok := k.(string) + if !ok { + return fmt.Errorf("key field '%s' not a string: %T", p.config.Put.Key, k) + } + p.log.Debugw("put", "backend_id", p.store, "key", key) + + val, err := event.GetValue(p.config.Put.Value) + if err != nil { + return err + } + + err = p.store.Put(key, val) + if err != nil { + return fmt.Errorf("failed to put '%s' into '%s': %w", key, p.config.Put.Value, err) + } + return nil +} + +// getFor gets the configured value from the cache for the event and inserts +// it into the configured field if it exists. +func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) { + // Check for clobbering. + dst := p.config.Get.Target + if !p.config.OverwriteKeys { + if _, err := event.GetValue(dst); err == nil { + return nil, fmt.Errorf("target field '%s' already exists and overwrite_keys is false", dst) + } + } + + // Get key into store for metadata. + key := p.config.Get.Key + v, err := event.GetValue(key) + if err != nil { + return nil, err + } + k, ok := v.(string) + if !ok { + return nil, fmt.Errorf("key field '%s' not a string: %T", key, v) + } + p.log.Debugw("get", "backend_id", p.store, "key", k) + + // Get metadata... + meta, err := p.store.Get(k) + if err != nil { + return nil, fmt.Errorf("%w for '%s': %w", ErrNoData, k, err) + } + if meta == nil { + return nil, fmt.Errorf("%w for '%s'", ErrNoData, k) + } + if m, ok := meta.(map[string]interface{}); ok { + meta = mapstr.M(m) + } + // ... and write it into the event. + // The implementation of PutValue currently leaves event + // essentially unchanged in the case of an error (in the + // case of an @metadata field there may be a mutation, + // but at most this will be the addition of a Meta field + // value to event). None of this is documented. + if _, err = event.PutValue(dst, meta); err != nil { + return nil, err + } + return event, nil +} + +// deleteFor deletes the configured value from the cache based on the value of +// the configured key. +func (p *cache) deleteFor(event *beat.Event) error { + v, err := event.GetValue(p.config.Delete.Key) + if err != nil { + return err + } + k, ok := v.(string) + if !ok { + return fmt.Errorf("key field '%s' not a string: %T", p.config.Delete.Key, v) + } + return p.store.Delete(k) +} + +func (p *cache) Close() error { + p.cancel() + return nil +} + +// String returns the processor representation formatted as a string +func (p *cache) String() string { + switch { + case p.config.Put != nil: + return fmt.Sprintf("%s=[operation=put, store_id=%s, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys) + case p.config.Get != nil: + return fmt.Sprintf("%s=[operation=get, store_id=%s, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys) + case p.config.Delete != nil: + return fmt.Sprintf("%s=[operation=delete, store_id=%s, key_field=%s]", name, p.store, p.config.Delete.Key) + default: + return fmt.Sprintf("%s=[operation=invalid]", name) + } +} diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go new file mode 100644 index 000000000000..6fe5847c01fa --- /dev/null +++ b/libbeat/processors/cache/cache_test.go @@ -0,0 +1,625 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/beats/v7/libbeat/beat" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type cacheTestStep struct { + event mapstr.M + want mapstr.M + wantCacheVal map[string]*CacheEntry + wantErr error +} + +var cacheTests = []struct { + name string + configs []testConfig + wantInitErr error + steps []cacheTestStep +}{ + { + name: "invalid_no_backend", + configs: []testConfig{ + { + cfg: mapstr.M{ + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "ttl": "168h", + "value_field": "crowdstrike.metadata", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: missing required field accessing 'backend'"), + }, + { + name: "invalid_no_key_field", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: string value is not set accessing 'put.key_field'"), + }, + { + name: "invalid_no_value_field", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: string value is not set accessing 'put.value_field'"), + }, + { + name: "put_value", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_reverse_config", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_with_get_error_no_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite_but_get_error", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new.child", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: expected map but type is string"), + }, + }, + }, +} + +type testConfig struct { + when func(e mapstr.M) bool + cfg mapstr.M +} + +func TestCache(t *testing.T) { + logp.TestingSetup(logp.WithSelectors(name)) + for _, test := range cacheTests { + t.Run(test.name, func(t *testing.T) { + var processors []beat.Processor + for i, cfg := range test.configs { + config, err := conf.NewConfigFrom(cfg.cfg) + if err != nil { + t.Fatal(err) + } + + p, err := New(config) + if !sameError(err, test.wantInitErr) { + t.Errorf("unexpected error from New: got:%v want:%v", err, test.wantInitErr) + } + if err != nil { + return + } + + t.Log(p) + c, ok := p.(*cache) + if !ok { + t.Fatalf("processor %d is not an *cache", i) + } + + defer func() { + err := c.Close() + if err != nil { + t.Errorf("unexpected error from c.Close(): %v", err) + } + }() + + processors = append(processors, p) + } + + for i, step := range test.steps { + for j, p := range processors { + if test.configs[j].when != nil && !test.configs[j].when(step.event) { + continue + } + got, err := p.Run(&beat.Event{ + Fields: step.event, + }) + if !sameError(err, step.wantErr) { + t.Errorf("unexpected error from Run: got:%v want:%v", err, step.wantErr) + return + } + if !cmp.Equal(step.want, got.Fields) { + t.Errorf("unexpected result %d\n--- want\n+++ got\n%s", i, cmp.Diff(step.want, got.Fields)) + } + switch got := p.(*cache).store.(type) { + case *memStore: + allow := cmp.AllowUnexported(CacheEntry{}) + ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index") + if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) { + t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore)) + } + } + } + } + }) + } +} diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go new file mode 100644 index 000000000000..06dacc5c6ca4 --- /dev/null +++ b/libbeat/processors/cache/config.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "time" +) + +type config struct { + Get *getConfig `config:"get"` + Put *putConfig `config:"put"` + Delete *delConfig `config:"delete"` + + Store *storeConfig `config:"backend" validate:"required"` + + // IgnoreMissing: Ignore errors if event has no matching field. + IgnoreMissing bool `config:"ignore_missing"` + + // OverwriteKeys allow target_fields to overwrite existing fields. + OverwriteKeys bool `config:"overwrite_keys"` +} + +func (cfg *config) Validate() error { + var ops int + if cfg.Put != nil { + ops++ + } + if cfg.Get != nil { + ops++ + } + if cfg.Delete != nil { + ops++ + } + switch ops { + case 0: + return errors.New("no operation specified for cache processor") + case 1: + return nil + default: + return errors.New("cannot specify multiple operations together in a cache processor") + } +} + +type getConfig struct { + // Key is the field containing the key to lookup for matching. + Key string `config:"key_field" validate:"required"` + + // Target is the destination field where fields will be added. + Target string `config:"target_field" validate:"required"` +} + +type putConfig struct { + // Key is the field containing the key to lookup for matching. + Key string `config:"key_field" validate:"required"` + + // Target is the destination field where fields will be added. + Value string `config:"value_field" validate:"required"` + + TTL *time.Duration `config:"ttl" validate:"required"` +} + +type delConfig struct { + // Key is the field containing the key to lookup for deletion. + Key string `config:"key_field" validate:"required"` +} + +func defaultConfig() config { + return config{ + IgnoreMissing: true, + OverwriteKeys: false, + } +} + +type storeConfig struct { + Memory *id `config:"memory"` + File *id `config:"file"` + + // Capacity and Effort are currently experimental + // and not in public-facing documentation. + Capacity int `config:"capacity"` + Effort int `config:"eviction_effort"` +} + +type id struct { + ID string `config:"id"` +} + +func (cfg *storeConfig) Validate() error { + switch { + case cfg.Memory != nil && cfg.File != nil: + return errors.New("must specify only one of backend.memory.id or backend.file.id") + case cfg.Memory != nil, cfg.File != nil: + default: + return errors.New("must specify one of backend.memory.id or backend.file.id") + } + return nil +} diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go new file mode 100644 index 000000000000..6e8b01d930f3 --- /dev/null +++ b/libbeat/processors/cache/config_test.go @@ -0,0 +1,182 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "testing" + + "github.com/elastic/go-ucfg/yaml" +) + +var validateTests = []struct { + name string + cfg string + want error +}{ + { + name: "put_file", + cfg: ` +backend: + file: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_memory", + cfg: ` +backend: + memory: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "get", + cfg: ` +backend: + file: + id: aidmaster +get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "delete", + cfg: ` +backend: + file: + id: aidmaster +delete: + key_field: crowdstrike.aid +`, + want: nil, + }, + { + name: "no_op", + cfg: ` +backend: + file: + id: aidmaster +`, + want: errors.New("no operation specified for cache processor accessing config"), + }, + { + + name: "too_many_ops", + cfg: ` +backend: + file: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +`, + want: errors.New("cannot specify multiple operations together in a cache processor accessing config"), + }, + { + + name: "no_backend", + cfg: ` +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("missing required field accessing 'backend'"), + }, + { + + name: "incomplete_backend", + cfg: ` +backend: + file: ~ +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("must specify one of backend.memory.id or backend.file.id accessing 'backend'"), + }, + { + + name: "too_many_backends", + cfg: ` +backend: + file: + id: aidmaster_f + memory: + id: aidmaster_m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("must specify only one of backend.memory.id or backend.file.id accessing 'backend'"), + }, +} + +func TestValidate(t *testing.T) { + for _, test := range validateTests { + t.Run(test.name, func(t *testing.T) { + got := ucfgRigmarole(test.cfg) + if !sameError(got, test.want) { + t.Errorf("unexpected error: got:%v want:%v", got, test.want) + } + }) + } +} + +func ucfgRigmarole(text string) error { + c, err := yaml.NewConfig([]byte(text)) + if err != nil { + return err + } + cfg := defaultConfig() + err = c.Unpack(&cfg) + if err != nil { + return err + } + return cfg.Validate() +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } +} diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc new file mode 100644 index 000000000000..94f5d07cece1 --- /dev/null +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -0,0 +1,107 @@ +[[add-cached-metadata]] +=== Add cached metadata + +++++ +cache +++++ + +experimental[] + +The `cache` processor enriches events with information from a previously +cached events. + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + put: + key_field: join_key_field + value_field: source_field +------------------------------------------------------------------------------- + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + get: + key_field: join_key_field + target_field: destination_field +------------------------------------------------------------------------------- + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + delete: + key_field: join_key_field +------------------------------------------------------------------------------- + +The fields added to the target field will depend on the provider. + +It has the following settings: + +One of `backend.memory.id` or `backend.file.id` must be provided. + +`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache. +`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache. + +One of `put`, `get` or `delete` must be provided. + +`put.key_field`:: Name of the field containing the key to put into the cache. Required if `put` is present. +`put.value_field`:: Name of the field containing the value to put into the cache. Required if `put` is present. +`put.ttl`:: The TTL to associate with the cached key/value. Valid time units are h, m, s, ms, us/µs and ns. Required if `put` is present. + +`get.key_field`:: Name of the field containing the key to get. Required if `get` is present. +`get.target_field`:: Name of the field to which the cached value will be written. Required if `get` is present. + +`delete.key_field`:: Name of the field containing the key to delete. Required if `delete` is present. + +`ignore_missing`:: (Optional) When set to `false`, events that don't contain any +of the fields in `match_keys` will be discarded and an error will be generated. By +default, this condition is ignored. + +`overwrite_keys`:: (Optional) By default, if a target field already exists, it +will not be overwritten and an error will be logged. If `overwrite_keys` is +set to `true`, this condition will be ignored. + +The `cache` processor can be used to perform joins within the Beat between +documents within an event stream. + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - if: + contains: + log.file.path: fdrv2/aidmaster + then: + - cache: + backend: + memory: + id: aidmaster + put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata + else: + - cache: + backend: + memory: + id: aidmaster + get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +------------------------------------------------------------------------------- + +This would enrich an event events with `log.file.path` not equal to +"fdrv2/aidmaster" with the `crowdstrike.metadata` fields from events +with `log.file.path` equal to that value where the `crowdstrike.aid` +field matches between the source and destination documents. diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go new file mode 100644 index 000000000000..900e91c8e600 --- /dev/null +++ b/libbeat/processors/cache/mem_store.go @@ -0,0 +1,257 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "sync" + "time" +) + +// memStoreSet is a collection of shared memStore caches. +type memStoreSet struct { + mu sync.Mutex + stores map[string]*memStore + typ string // TODO: Remove when a file-backed store exists. +} + +// get returns a memStore cache with the provided ID based on the config. +// If a memStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the memStore from set if the +// count reaches zero. +func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newMemStore(cfg, id, s.typ) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// free removes the memStore with the given ID from the set. free is safe +// for concurrent use. +func (s *memStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// memStore is a memory-backed cache store. +type memStore struct { + mu sync.Mutex + cache map[string]*CacheEntry + expiries expiryHeap + ttl time.Duration // ttl is the time entries are valid for in the cache. + refs int // refs is the number of processors referring to this store. + + // id is the index into global cache store for the cache. + id string + // typ is a temporary tag to differentiate memory stores + // from the mocked file store. + // TODO: Remove when a file-backed store exists. + typ string + + // cap is the maximum number of elements the cache + // will hold. If not positive, no limit. + cap int + // effort is the number of entries to examine during + // expired element eviction. If not positive, full effort. + effort int +} + +// newMemStore returns a new memStore configured to apply the give TTL duration. +// The memStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the memStore is held in. +func newMemStore(cfg config, id, typ string) *memStore { + return &memStore{ + id: id, + typ: typ, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + } +} + +func (c *memStore) String() string { return c.typ + ":" + c.id } + +// add updates the receiver for a new operation. It increases the reference +// count for the receiver, and if the config is a put operation and has no +// previous put operation defined, the TTL, cap and effort will be set from +// cfg. add is safe for concurrent use. +func (c *memStore) add(cfg config) { + c.mu.Lock() + defer c.mu.Unlock() + c.refs++ + + // We may have already constructed the store with + // a get or a delete config, so set the TTL, cap + // and effort if we have a put config. If another + // put config has already been included, we ignore + // the put options now. + if cfg.Put == nil { + return + } + if c.ttl == -1 { + // putConfig.TTL is a required field, so we don't + // need to check for nil-ness. + c.ttl = *cfg.Put.TTL + c.cap = cfg.Store.Capacity + c.effort = cfg.Store.Effort + } +} + +// dropFrom decreases the reference count for the memStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *memStore) dropFrom(stores *memStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +// Get returns the cached value associated with the provided key. If there is +// no value for the key, or the value has expired Get returns ErrNoData. Get +// is safe for concurrent use. +func (c *memStore) Get(key string) (any, error) { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.cache[key] + if !ok { + return nil, ErrNoData + } + if time.Now().After(v.expires) { + delete(c.cache, key) + return nil, ErrNoData + } + return v.value, nil +} + +// Put stores the provided value in the cache associated with the given key. +// The value is given an expiry time based on the configured TTL of the cache. +// Put is safe for concurrent use. +func (c *memStore) Put(key string, val any) error { + c.mu.Lock() + defer c.mu.Unlock() + now := time.Now() + c.evictExpired(now) + e := &CacheEntry{ + key: key, + value: val, + expires: now.Add(c.ttl), + } + c.cache[key] = e + heap.Push(&c.expiries, e) + return nil +} + +// evictExpired removes up to effort elements from the cache when the cache +// is below capacity, retaining all elements that have not expired. If the +// cache is at or above capacity, the oldest elements are removed to bring +// it under the capacity limit. +func (c *memStore) evictExpired(now time.Time) { + for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ { + if c.expiries[0].expires.After(now) { + break + } + e := c.expiries.pop() + delete(c.cache, e.key) + } + if c.cap <= 0 { + // No cap, so depend on effort. + return + } + for len(c.cache) >= c.cap { + e := c.expiries.pop() + delete(c.cache, e.key) + } +} + +// Delete removes the value associated with the provided key from the cache. +// Delete is safe for concurrent use. +func (c *memStore) Delete(key string) error { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.cache[key] + if !ok { + return nil + } + heap.Remove(&c.expiries, v.index) + delete(c.cache, key) + return nil +} + +var _ heap.Interface = (*expiryHeap)(nil) + +// expiryHeap is a min-date heap. +// +// TODO: This could be a queue instead, though deletion becomes more +// complicated in that case. +type expiryHeap []*CacheEntry + +func (h *expiryHeap) pop() *CacheEntry { + e := heap.Pop(h).(*CacheEntry) + e.index = -1 + return e +} + +func (h expiryHeap) Len() int { + return len(h) +} +func (h expiryHeap) Less(i, j int) bool { + return h[i].expires.Before(h[j].expires) +} +func (h expiryHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} +func (h *expiryHeap) Push(v any) { + e := v.(*CacheEntry) + e.index = len(*h) + *h = append(*h, e) +} +func (h *expiryHeap) Pop() any { + v := (*h)[len(*h)-1] + (*h)[len(*h)-1] = nil // Help GC. + *h = (*h)[:len(*h)-1] + return v +} diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go new file mode 100644 index 000000000000..692931854b81 --- /dev/null +++ b/libbeat/processors/cache/mem_store_test.go @@ -0,0 +1,379 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +type memStoreTestSteps struct { + doTo func(*memStore) error + want *memStore +} + +//nolint:errcheck // Paul Hogan was right. +var memStoreTests = []struct { + name string + cfg config + want *memStore + steps []memStoreTestSteps +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }, + steps: []memStoreTestSteps{ + 0: { + doTo: func(s *memStore) error { + putCfg := config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + steps: []memStoreTestSteps{ + 0: { + doTo: func(s *memStore) error { + putCfg := config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 1: { + doTo: func(s *memStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "two": {key: "two", value: int(2), index: 1}, + "three": {key: "three", value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "two", value: int(2), index: 1}, + {key: "three", value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 2: { + doTo: func(s *memStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "two": {key: "two", value: int(2), index: 1}, + "three": {key: "three", value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "two", value: int(2), index: 1}, + {key: "three", value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 3: { + doTo: func(s *memStore) error { + return s.Delete("two") + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "three": {key: "three", value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "three", value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 4: { + doTo: func(s *memStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "three": {key: "three", value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "three", value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 5: { + doTo: func(s *memStore) error { + s.dropFrom(&memStores) + if !memStores.has(s.id) { + return fmt.Errorf("%q memStore not found after single close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "three": {key: "three", value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "three", value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 6: { + doTo: func(s *memStore) error { + s.dropFrom(&memStores) + if memStores.has(s.id) { + return fmt.Errorf("%q memStore still found after double close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + }, + }, +} + +func TestMemStore(t *testing.T) { + allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires") + + for _, test := range memStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") + store.add(test.cfg) + memStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInMemStore) { + t.Errorf("unexpected new memStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected memStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInMemStore, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set, it is used only for testing. +func (s *memStoreSet) add(store *memStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set, it is used only for testing. +func (s *memStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} + +func ptrTo[T any](v T) *T { return &v }