From 0cd0db21877ddb9f2d2f0e052716237531cd2795 Mon Sep 17 00:00:00 2001 From: Dan Kortschak Date: Thu, 28 Sep 2023 15:53:15 +0930 Subject: [PATCH] libbeat/processors/cache: don't write cache states that have not been altered Add a dirty flag to memStore to be used by fileStore so that needless writes are not performed. --- CHANGELOG-developer.next.asciidoc | 2 +- libbeat/processors/cache/file_store.go | 6 ++++++ libbeat/processors/cache/file_store_test.go | 7 +++++++ libbeat/processors/cache/mem_store.go | 5 +++++ libbeat/processors/cache/mem_store_test.go | 7 +++++++ 5 files changed, 26 insertions(+), 1 deletion(-) diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index ca46d0182476..cc84570d5d77 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -173,7 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] - Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] -- Add file-backed cache for cache enrichment processor. {pull}36686[36686] +- Add file-backed cache for cache enrichment processor. {pull}36686[36686] {pull}36696[36696] ==== Deprecated diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go index 45ed9d840689..1ab4ab21ae4e 100644 --- a/libbeat/processors/cache/file_store.go +++ b/libbeat/processors/cache/file_store.go @@ -202,6 +202,7 @@ func (c *fileStore) readState() { } if e.Expires.Before(time.Now()) { // Don't retain expired elements. + c.dirty = true // The cache now does not reflect the file. continue } c.cache[e.Key] = &e @@ -239,6 +240,9 @@ func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) { // writeState writes the current cache state to the backing file. // If final is true and the cache is empty, the file will be deleted. func (c *fileStore) writeState(final bool) { + if !c.dirty { + return + } if len(c.cache) == 0 && final { err := os.Remove(c.path) if err != nil { @@ -291,4 +295,6 @@ func (c *fileStore) writeState(final bool) { return } } + // Only mark as not dirty if we succeeded in the write. + c.dirty = false } diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go index 557a7eca9a54..22d43083e041 100644 --- a/libbeat/processors/cache/file_store_test.go +++ b/libbeat/processors/cache/file_store_test.go @@ -196,6 +196,7 @@ var fileStoreTests = []struct { id: "test", cache: map[string]*CacheEntry{}, refs: 2, + dirty: false, ttl: time.Second, cap: 1000, effort: 10, @@ -221,6 +222,7 @@ var fileStoreTests = []struct { {Key: "three", Value: int(3), index: 2}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -247,6 +249,7 @@ var fileStoreTests = []struct { {Key: "three", Value: int(3), index: 2}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -267,6 +270,7 @@ var fileStoreTests = []struct { {Key: "three", Value: int(3), index: 1}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -291,6 +295,7 @@ var fileStoreTests = []struct { {Key: "three", Value: int(3), index: 1}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -315,6 +320,7 @@ var fileStoreTests = []struct { {Key: "three", Value: int(3), index: 1}, }, refs: 1, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -333,6 +339,7 @@ var fileStoreTests = []struct { cache: nil, // assistively nil-ed. expiries: nil, // assistively nil-ed. refs: 0, + dirty: false, ttl: time.Second, cap: 1000, effort: 10, diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 774a019d783d..a2fec841d272 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -67,6 +67,9 @@ type memStore struct { expiries expiryHeap ttl time.Duration // ttl is the time entries are valid for in the cache. refs int // refs is the number of processors referring to this store. + // dirty marks the cache as changed from the + // state in a backing file if it exists. + dirty bool // id is the index into global cache store for the cache. id string @@ -176,6 +179,7 @@ func (c *memStore) Put(key string, val any) error { } c.cache[key] = e heap.Push(&c.expiries, e) + c.dirty = true return nil } @@ -212,6 +216,7 @@ func (c *memStore) Delete(key string) error { } heap.Remove(&c.expiries, v.index) delete(c.cache, key) + c.dirty = true return nil } diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 6be5eadb3577..4a6cf500e05b 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -184,6 +184,7 @@ var memStoreTests = []struct { id: "test", cache: map[string]*CacheEntry{}, refs: 2, + dirty: false, ttl: time.Second, cap: 1000, effort: 10, @@ -209,6 +210,7 @@ var memStoreTests = []struct { {Key: "three", Value: int(3), index: 2}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -235,6 +237,7 @@ var memStoreTests = []struct { {Key: "three", Value: int(3), index: 2}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -255,6 +258,7 @@ var memStoreTests = []struct { {Key: "three", Value: int(3), index: 1}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -279,6 +283,7 @@ var memStoreTests = []struct { {Key: "three", Value: int(3), index: 1}, }, refs: 2, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -303,6 +308,7 @@ var memStoreTests = []struct { {Key: "three", Value: int(3), index: 1}, }, refs: 1, + dirty: true, ttl: time.Second, cap: 1000, effort: 10, @@ -321,6 +327,7 @@ var memStoreTests = []struct { cache: nil, // assistively nil-ed. expiries: nil, // assistively nil-ed. refs: 0, + dirty: true, ttl: time.Second, cap: 1000, effort: 10,