diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index 79b6ecdf9e1a..f3193db12cbe 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -80,6 +80,7 @@ func New(cfg *conf.C) (beat.Processor, error) { cancel: cancel, log: log, } + p.log.Infow("initialized cache processor", "details", p.String()) return &p, nil } @@ -88,6 +89,11 @@ type Store interface { Put(key string, val any) error Get(key string) (any, error) Delete(key string) error + + // The string returned from the String method should + // be the backing store ID. Either "file:" or + // "memory:". + fmt.Stringer } type CacheEntry struct { @@ -112,13 +118,13 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { defer storeMu.Unlock() switch { case cfg.Store.Memory != nil: - s, cancel := getMemStore(memStores, cfg.Store.Memory.ID, cfg) + s, cancel := getMemStore(memStores, cfg.Store.Memory.ID, cfg, "memory") return s, cancel, nil case cfg.Store.File != nil: logp.L().Warn("using memory store when file is configured") // TODO: Replace place-holder code with a file-store. - s, cancel := getMemStore(fileStores, cfg.Store.File.ID, cfg) + s, cancel := getMemStore(fileStores, cfg.Store.File.ID, cfg, "file") return s, cancel, nil default: @@ -130,10 +136,12 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { // noop is a no-op context.CancelFunc. func noop() {} -func getMemStore(stores map[string]*memStore, id string, cfg config) (*memStore, context.CancelFunc) { +// TODO: Remove the typ parameter when a file-backed store is available +// and each type knows who they are. +func getMemStore(stores map[string]*memStore, id string, cfg config, typ string) (*memStore, context.CancelFunc) { s, ok := stores[id] if !ok { - s = newMemStore(cfg, id) + s = newMemStore(cfg, id, typ) stores[s.id] = s } @@ -155,6 +163,7 @@ func getMemStore(stores map[string]*memStore, id string, cfg config) (*memStore, func (p *cache) Run(event *beat.Event) (*beat.Event, error) { switch { case p.config.Put != nil: + p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put) err := p.putFrom(event) if err != nil { switch { @@ -169,6 +178,7 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) { return event, nil case p.config.Get != nil: + p.log.Debugw("get", "backend_id", p.store, "config", p.config.Get) result, err := p.getFor(event) if err != nil { switch { @@ -187,6 +197,7 @@ func (p *cache) Run(event *beat.Event) (*beat.Event, error) { return event, ErrNoMatch case p.config.Delete != nil: + p.log.Debugw("delete", "backend_id", p.store, "config", p.config.Delete) err := p.deleteFor(event) if err != nil { return event, fmt.Errorf("error applying %s delete processor: %w", name, err) @@ -289,13 +300,13 @@ func (p *cache) Close() error { func (p *cache) String() string { switch { case p.config.Put != nil: - return fmt.Sprintf("%s=[operation=put, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]", - name, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys) + return fmt.Sprintf("%s=[operation=put, store_id=%s, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys) case p.config.Get != nil: - return fmt.Sprintf("%s=[operation=get, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]", - name, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys) + return fmt.Sprintf("%s=[operation=get, store_id=%s, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys) case p.config.Delete != nil: - return fmt.Sprintf("%s=[operation=delete, key_field=%s]", name, p.config.Delete.Key) + return fmt.Sprintf("%s=[operation=delete, store_id=%s, key_field=%s]", name, p.store, p.config.Delete.Key) default: return fmt.Sprintf("%s=[operation=invalid]", name) } diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 2754f13fd0ee..18cb2189b789 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -33,6 +33,10 @@ type memStore struct { // id is the index into global cache store for the cache. id string + // typ is a temporary tag to differentiate memory stores + // from the mocked file store. + // TODO: Remove when a file-backed store exists. + typ string // cap is the maximum number of elements the cache // will hold. If not positive, no limit. @@ -45,9 +49,10 @@ type memStore struct { // newMemStore returns a new memStore configured to apply the give TTL duration. // The memStore is guaranteed not to grow larger than cap elements. id is the // look-up into the global cache store the memStore is held in. -func newMemStore(cfg config, id string) *memStore { +func newMemStore(cfg config, id, typ string) *memStore { return &memStore{ id: id, + typ: typ, cache: make(map[string]*CacheEntry), // Mark the ttl as invalid until we have had a put operation @@ -58,6 +63,8 @@ func newMemStore(cfg config, id string) *memStore { } } +func (c *memStore) String() string { return c.typ + ":" + c.id } + // setPutOptions allows concurrency-safe updating of the put options. While the shared // backing data store is incomplete, and has no put operation defined, the TTL // will be invalid, but will never be accessed since all time operations outside diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index e11c10f493e8..c66e8caf5477 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -335,14 +335,14 @@ var memStoreTests = []struct { func TestMemStore(t *testing.T) { allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) - ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ") ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires") for _, test := range memStoreTests { t.Run(test.name, func(t *testing.T) { // Construct the store and put in into the stores map as // we would if we were calling Run. - store := newMemStore(test.cfg, test.cfg.Store.Memory.ID) + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") store.setPutOptions(test.cfg) storeMu.Lock() memStores[store.id] = store