diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go index 06dacc5c6ca..019bd8d5840 100644 --- a/libbeat/processors/cache/config.go +++ b/libbeat/processors/cache/config.go @@ -88,8 +88,8 @@ func defaultConfig() config { } type storeConfig struct { - Memory *id `config:"memory"` - File *id `config:"file"` + Memory *memConfig `config:"memory"` + File *fileConfig `config:"file"` // Capacity and Effort are currently experimental // and not in public-facing documentation. @@ -97,10 +97,15 @@ type storeConfig struct { Effort int `config:"eviction_effort"` } -type id struct { +type memConfig struct { ID string `config:"id"` } +type fileConfig struct { + ID string `config:"id"` + WriteOutEvery time.Duration `config:"write_frequency"` +} + func (cfg *storeConfig) Validate() error { switch { case cfg.Memory != nil && cfg.File != nil: diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go index 6e8b01d930f..69d158eb830 100644 --- a/libbeat/processors/cache/config_test.go +++ b/libbeat/processors/cache/config_test.go @@ -39,6 +39,20 @@ put: ttl: 168h key_field: crowdstrike.aid value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_file_with_periodic_write_out", + cfg: ` +backend: + file: + id: aidmaster + write_frequency: 15m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata `, want: nil, }, diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc index 94f5d07cece..3e17d37e213 100644 --- a/libbeat/processors/cache/docs/cache.asciidoc +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -51,8 +51,9 @@ It has the following settings: One of `backend.memory.id` or `backend.file.id` must be provided. -`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache. -`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache. +`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache. +`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache. +`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes. One of `put`, `get` or `delete` must be provided. diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go index 870d6e028dd..604b7c01cde 100644 --- a/libbeat/processors/cache/file_store.go +++ b/libbeat/processors/cache/file_store.go @@ -33,9 +33,6 @@ import ( "github.com/elastic/elastic-agent-libs/paths" ) -// TODO: Consider having a periodic write-out (per time or per n puts) to -// reduce loss of state due to crashes. - var fileStores = fileStoreSet{stores: map[string]*fileStore{}} // fileStoreSet is a collection of shared fileStore caches. @@ -81,8 +78,14 @@ func (s *fileStoreSet) free(id string) { // fileStore is a file-backed cache store. type fileStore struct { - path string memStore + + path string + // cancel stops periodic write out operations. + // Write out operations are protected by the + // memStore's mutex. + cancel context.CancelFunc + log *logp.Logger } @@ -108,6 +111,12 @@ func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore { effort: -1, }, } + s.cancel = noop + if cfg.Store.File.WriteOutEvery > 0 { + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + go s.periodicWriteOut(ctx, cfg.Store.File.WriteOutEvery) + } s.readState() return &s } @@ -123,7 +132,11 @@ func (c *fileStore) dropFrom(stores *fileStoreSet) { panic("invalid reference count") } if c.refs == 0 { - c.writeState() + // Stop periodic writes + c.cancel() + // and do a final write out. + c.writeState(true) + stores.free(c.id) // GC assists. c.cache = nil @@ -178,8 +191,28 @@ func (c *fileStore) readState() { } } -func (c *fileStore) writeState() { - if len(c.cache) == 0 { +// periodicWriteOut writes the cache contents to the backing file at the +// specified interval until the context is cancelled. periodicWriteOut is +// safe for concurrent use. +func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) { + tick := time.NewTicker(every) + defer tick.Stop() + for { + select { + case <-tick.C: + c.mu.Lock() + c.writeState(false) + c.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +// writeState writes the current cache state to the backing file. +// If final is true and the cache is empty, the file will be deleted. +func (c *fileStore) writeState(final bool) { + if len(c.cache) == 0 && final { err := os.Remove(c.path) if err != nil { c.log.Errorw("failed to delete write state when empty", "error", err) diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go index 737a56a5841..557a7eca9a5 100644 --- a/libbeat/processors/cache/file_store_test.go +++ b/libbeat/processors/cache/file_store_test.go @@ -54,7 +54,7 @@ var fileStoreTests = []struct { name: "new_put", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -75,7 +75,7 @@ var fileStoreTests = []struct { name: "new_get", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -95,7 +95,7 @@ var fileStoreTests = []struct { name: "new_delete", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -115,7 +115,7 @@ var fileStoreTests = []struct { name: "new_get_add_put", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -135,7 +135,7 @@ var fileStoreTests = []struct { doTo: func(s *fileStore) error { putCfg := config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -161,7 +161,7 @@ var fileStoreTests = []struct { name: "ensemble", cfg: config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -181,7 +181,7 @@ var fileStoreTests = []struct { doTo: func(s *fileStore) error { putCfg := config{ Store: &storeConfig{ - File: &id{"test"}, + File: &fileConfig{ID: "test"}, Capacity: 1000, Effort: 10, }, @@ -361,7 +361,7 @@ func TestFileStore(t *testing.T) { } allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{}) - ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "log") + ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "cancel", "log") ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") @@ -391,7 +391,7 @@ func TestFileStore(t *testing.T) { if test.wantPersisted == nil { return } - store = nil + f, err := os.Open(path) if err != nil { t.Fatalf("failed to open persisted data: %v", err) diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 15be85c7678..6be5eadb357 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -42,7 +42,7 @@ var memStoreTests = []struct { name: "new_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -63,7 +63,7 @@ var memStoreTests = []struct { name: "new_get", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -83,7 +83,7 @@ var memStoreTests = []struct { name: "new_delete", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -103,7 +103,7 @@ var memStoreTests = []struct { name: "new_get_add_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -123,7 +123,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -149,7 +149,7 @@ var memStoreTests = []struct { name: "ensemble", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -169,7 +169,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, },