From ac43beffbd8eed05361107f4662fd658f0f3d22e Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 21 Sep 2023 06:51:36 +0930 Subject: [PATCH] address pr comments --- .github/CODEOWNERS | 1 + libbeat/processors/cache/cache.go | 73 +++++++-------------- libbeat/processors/cache/mem_store.go | 74 ++++++++++++++++++---- libbeat/processors/cache/mem_store_test.go | 37 +++++++---- 4 files changed, 108 insertions(+), 77 deletions(-) diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a41ac990750..8fe748c1bf5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -55,6 +55,7 @@ CHANGELOG* /libbeat/ @elastic/elastic-agent-data-plane /libbeat/docs/processors-list.asciidoc @elastic/ingest-docs /libbeat/management @elastic/elastic-agent-control-plane +/libbeat/processors/cache/ @elastic/security-external-integrations /libbeat/processors/community_id/ @elastic/security-external-integrations /libbeat/processors/decode_xml/ @elastic/security-external-integrations /libbeat/processors/decode_xml_wineventlog/ @elastic/security-external-integrations diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index 6f0111ab6f6..eea3ed115ea 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -21,7 +21,6 @@ import ( "context" "errors" "fmt" - "sync" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -84,47 +83,20 @@ func New(cfg *conf.C) (beat.Processor, error) { return p, nil } -// Store is the interface implemented by metadata providers. -type Store interface { - Put(key string, val any) error - Get(key string) (any, error) - Delete(key string) error - - // The string returned from the String method should - // be the backing store ID. Either "file:" or - // "memory:". - fmt.Stringer -} - -type CacheEntry struct { - key string - value any - expires time.Time - index int -} - -var ( - storeMu sync.Mutex - memStores = map[string]*memStore{} - fileStores = map[string]*memStore{} -) - // getStoreFor returns a backing store for the provided configuration, // and a context cancellation that releases the cache resource when it // is no longer required. The cancellation should be called when the // processor is closed. func getStoreFor(cfg config) (Store, context.CancelFunc, error) { - storeMu.Lock() - defer storeMu.Unlock() switch { case cfg.Store.Memory != nil: - s, cancel := getMemStore(memStores, cfg.Store.Memory.ID, cfg, "memory") + s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) return s, cancel, nil case cfg.Store.File != nil: logp.L().Warn("using memory store when file is configured") // TODO: Replace place-holder code with a file-store. - s, cancel := getMemStore(fileStores, cfg.Store.File.ID, cfg, "file") + s, cancel := fileStores.get(cfg.Store.File.ID, cfg) return s, cancel, nil default: @@ -133,30 +105,31 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { } } +var ( + memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"} + fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock. +) + // noop is a no-op context.CancelFunc. func noop() {} -// TODO: Remove the typ parameter when a file-backed store is available -// and each type knows who they are. -func getMemStore(stores map[string]*memStore, id string, cfg config, typ string) (*memStore, context.CancelFunc) { - s, ok := stores[id] - if !ok { - s = newMemStore(cfg, id, typ) - stores[s.id] = s - } +// Store is the interface implemented by metadata providers. +type Store interface { + Put(key string, val any) error + Get(key string) (any, error) + Delete(key string) error - // We may have already constructed the store with - // a get or a delete config, so set the TTL, cap - // and effort if we have a put config. If another - // put config has already been included, we ignore - // the put options now. - s.setPutOptions(cfg) - - return s, func() { - storeMu.Lock() - s.close(stores) - storeMu.Unlock() - } + // The string returned from the String method should + // be the backing store ID. Either "file:" or + // "memory:". + fmt.Stringer +} + +type CacheEntry struct { + key string + value any + expires time.Time + index int } // Run enriches the given event with the host metadata. diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 18cb2189b78..39f70a9ed53 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -19,10 +19,46 @@ package cache import ( "container/heap" + "context" "sync" "time" ) +// memStoreSet is a collection of shared memStore caches. +type memStoreSet struct { + mu sync.Mutex + stores map[string]*memStore + typ string // TODO: Remove when a file-backed store exists. +} + +// get returns a memStore cache with the provided ID based on the config. +// If a memStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the memStore from set if the +// count reaches zero. +func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newMemStore(cfg, id, s.typ) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// free removes the memStore with the given ID from the set. free is safe +// for concurrent use. +func (s *memStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + // memStore is a memory-backed cache store. type memStore struct { mu sync.Mutex @@ -55,8 +91,12 @@ func newMemStore(cfg config, id, typ string) *memStore { typ: typ, cache: make(map[string]*CacheEntry), - // Mark the ttl as invalid until we have had a put operation - // configured. + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. ttl: -1, cap: -1, effort: -1, @@ -65,15 +105,20 @@ func newMemStore(cfg config, id, typ string) *memStore { func (c *memStore) String() string { return c.typ + ":" + c.id } -// setPutOptions allows concurrency-safe updating of the put options. While the shared -// backing data store is incomplete, and has no put operation defined, the TTL -// will be invalid, but will never be accessed since all time operations outside -// put refer to absolute times. setPutOptions also increases the reference count -// for the memStore for all operation types. -func (c *memStore) setPutOptions(cfg config) { +// add updates a the receiver for a new operation. It increases the reference +// count for the receiver, and if the config is a put operation and has no +// previous put operation defined, the TTL, cap and effort will be set from +// cfg. add is safe for concurrent use. +func (c *memStore) add(cfg config) { c.mu.Lock() defer c.mu.Unlock() c.refs++ + + // We may have already constructed the store with + // a get or a delete config, so set the TTL, cap + // and effort if we have a put config. If another + // put config has already been included, we ignore + // the put options now. if cfg.Put == nil { return } @@ -86,16 +131,16 @@ func (c *memStore) setPutOptions(cfg config) { } } -// close decreases the reference count for the memStore and removes it from the -// stores map if the count is zero. -func (c *memStore) close(stores map[string]*memStore) { +// dropFrom decreases the reference count for the memStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *memStore) dropFrom(stores *memStoreSet) { c.mu.Lock() c.refs-- if c.refs < 0 { panic("invalid reference count") } if c.refs == 0 { - delete(stores, c.id) + stores.free(c.id) // GC assists. c.cache = nil c.expiries = nil @@ -104,7 +149,8 @@ func (c *memStore) close(stores map[string]*memStore) { } // Get return the cached value associated with the provided key. If there is -// no value for the key, or the value has expired Get returns ErrNoData. +// no value for the key, or the value has expired Get returns ErrNoData. Get +// is safe for concurrent use. func (c *memStore) Get(key string) (any, error) { c.mu.Lock() defer c.mu.Unlock() @@ -121,6 +167,7 @@ func (c *memStore) Get(key string) (any, error) { // Put stores the provided value in the cache associated with the given key. // The value is given an expiry time based on the configured TTL of the cache. +// Put is safe for concurrent use. func (c *memStore) Put(key string, val any) error { c.mu.Lock() defer c.mu.Unlock() @@ -159,6 +206,7 @@ func (c *memStore) evictExpired(now time.Time) { } // Delete removes the value associated with the provided key from the cache. +// Delete is safe for concurrent use. func (c *memStore) Delete(key string) error { c.mu.Lock() defer c.mu.Unlock() diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index c66e8caf547..65d9727c895 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -130,7 +130,7 @@ var memStoreTests = []struct { TTL: ptrTo(time.Second), }, } - s.setPutOptions(putCfg) + s.add(putCfg) return nil }, want: &memStore{ @@ -176,7 +176,7 @@ var memStoreTests = []struct { TTL: ptrTo(time.Second), }, } - s.setPutOptions(putCfg) + s.add(putCfg) return nil }, want: &memStore{ @@ -285,10 +285,8 @@ var memStoreTests = []struct { }, 5: { doTo: func(s *memStore) error { - storeMu.Lock() - s.close(memStores) - defer storeMu.Unlock() - if _, ok := memStores[s.id]; !ok { + s.dropFrom(&memStores) + if !memStores.has(s.id) { return fmt.Errorf("%q memStore not found after single close", s.id) } return nil @@ -311,10 +309,8 @@ var memStoreTests = []struct { }, 6: { doTo: func(s *memStore) error { - storeMu.Lock() - s.close(memStores) - defer storeMu.Unlock() - if _, ok := memStores[s.id]; ok { + s.dropFrom(&memStores) + if memStores.has(s.id) { return fmt.Errorf("%q memStore still found after double close", s.id) } return nil @@ -343,10 +339,8 @@ func TestMemStore(t *testing.T) { // Construct the store and put in into the stores map as // we would if we were calling Run. store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") - store.setPutOptions(test.cfg) - storeMu.Lock() - memStores[store.id] = store - storeMu.Unlock() + store.add(test.cfg) + memStores.add(store) if !cmp.Equal(test.want, store, allow, ignoreInMemStore) { t.Errorf("unexpected new memStore result:\n--- want\n+++ got\n%s", @@ -366,4 +360,19 @@ func TestMemStore(t *testing.T) { } } +// add adds the store to the set, it is used only for testing. +func (s *memStoreSet) add(store *memStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set, it is used only for testing. +func (s *memStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} + func ptrTo[T any](v T) *T { return &v }