Skip to content

Commit

Permalink
add periodic writes to file-backed cache
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 27, 2023
1 parent d6fcef4 commit e0327b6
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 28 deletions.
11 changes: 8 additions & 3 deletions libbeat/processors/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,19 +88,24 @@ func defaultConfig() config {
}

type storeConfig struct {
Memory *id `config:"memory"`
File *id `config:"file"`
Memory *memConfig `config:"memory"`
File *fileConfig `config:"file"`

// Capacity and Effort are currently experimental
// and not in public-facing documentation.
Capacity int `config:"capacity"`
Effort int `config:"eviction_effort"`
}

type id struct {
type memConfig struct {
ID string `config:"id"`
}

type fileConfig struct {
ID string `config:"id"`
WriteOutEvery time.Duration `config:"write_frequency"`
}

func (cfg *storeConfig) Validate() error {
switch {
case cfg.Memory != nil && cfg.File != nil:
Expand Down
14 changes: 14 additions & 0 deletions libbeat/processors/cache/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ put:
ttl: 168h
key_field: crowdstrike.aid
value_field: crowdstrike.metadata
`,
want: nil,
},
{
name: "put_file_with_periodic_write_out",
cfg: `
backend:
file:
id: aidmaster
write_frequency: 15m
put:
ttl: 168h
key_field: crowdstrike.aid
value_field: crowdstrike.metadata
`,
want: nil,
},
Expand Down
5 changes: 3 additions & 2 deletions libbeat/processors/cache/docs/cache.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ It has the following settings:

One of `backend.memory.id` or `backend.file.id` must be provided.

`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache.
`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache.
`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.

One of `put`, `get` or `delete` must be provided.

Expand Down
47 changes: 40 additions & 7 deletions libbeat/processors/cache/file_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ import (
"github.com/elastic/elastic-agent-libs/paths"
)

// TODO: Consider having a periodic write-out (per time or per n puts) to
// reduce loss of state due to crashes.

var fileStores = fileStoreSet{stores: map[string]*fileStore{}}

// fileStoreSet is a collection of shared fileStore caches.
Expand Down Expand Up @@ -81,8 +78,14 @@ func (s *fileStoreSet) free(id string) {

// fileStore is a file-backed cache store.
type fileStore struct {
path string
memStore

path string
// cancel stops periodic write out operations.
// Write out operations are protected by the
// memStore's mutex.
cancel context.CancelFunc

log *logp.Logger
}

Expand All @@ -108,6 +111,12 @@ func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore {
effort: -1,
},
}
s.cancel = noop
if cfg.Store.File.WriteOutEvery > 0 {
var ctx context.Context
ctx, s.cancel = context.WithCancel(context.Background())
go s.periodicWriteOut(ctx, cfg.Store.File.WriteOutEvery)
}
s.readState()
return &s
}
Expand All @@ -123,7 +132,11 @@ func (c *fileStore) dropFrom(stores *fileStoreSet) {
panic("invalid reference count")
}
if c.refs == 0 {
c.writeState()
// Stop periodic writes
c.cancel()
// and do a final write out.
c.writeState(true)

stores.free(c.id)
// GC assists.
c.cache = nil
Expand Down Expand Up @@ -178,8 +191,28 @@ func (c *fileStore) readState() {
}
}

func (c *fileStore) writeState() {
if len(c.cache) == 0 {
// periodicWriteOut writes the cache contents to the backing file at the
// specified interval until the context is cancelled. periodicWriteOut is
// safe for concurrent use.
func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) {
tick := time.NewTicker(every)
defer tick.Stop()
for {
select {
case <-tick.C:
c.mu.Lock()
c.writeState(false)
c.mu.Unlock()
case <-ctx.Done():
return
}
}
}

// writeState writes the current cache state to the backing file.
// If final is true and the cache is empty, the file will be deleted.
func (c *fileStore) writeState(final bool) {
if len(c.cache) == 0 && final {
err := os.Remove(c.path)
if err != nil {
c.log.Errorw("failed to delete write state when empty", "error", err)
Expand Down
18 changes: 9 additions & 9 deletions libbeat/processors/cache/file_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ var fileStoreTests = []struct {
name: "new_put",
cfg: config{
Store: &storeConfig{
File: &id{"test"},
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -75,7 +75,7 @@ var fileStoreTests = []struct {
name: "new_get",
cfg: config{
Store: &storeConfig{
File: &id{"test"},
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -95,7 +95,7 @@ var fileStoreTests = []struct {
name: "new_delete",
cfg: config{
Store: &storeConfig{
File: &id{"test"},
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -115,7 +115,7 @@ var fileStoreTests = []struct {
name: "new_get_add_put",
cfg: config{
Store: &storeConfig{
File: &id{"test"},
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -135,7 +135,7 @@ var fileStoreTests = []struct {
doTo: func(s *fileStore) error {
putCfg := config{
Store: &storeConfig{
File: &id{"test"},
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -161,7 +161,7 @@ var fileStoreTests = []struct {
name: "ensemble",
cfg: config{
Store: &storeConfig{
File: &id{"test"},
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -181,7 +181,7 @@ var fileStoreTests = []struct {
doTo: func(s *fileStore) error {
putCfg := config{
Store: &storeConfig{
File: &id{"test"},
File: &fileConfig{ID: "test"},
Capacity: 1000,
Effort: 10,
},
Expand Down Expand Up @@ -361,7 +361,7 @@ func TestFileStore(t *testing.T) {
}

allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{})
ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "log")
ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "cancel", "log")
ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu")
ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires")

Expand Down Expand Up @@ -391,7 +391,7 @@ func TestFileStore(t *testing.T) {
if test.wantPersisted == nil {
return
}
store = nil

f, err := os.Open(path)
if err != nil {
t.Fatalf("failed to open persisted data: %v", err)
Expand Down
14 changes: 7 additions & 7 deletions libbeat/processors/cache/mem_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ var memStoreTests = []struct {
name: "new_put",
cfg: config{
Store: &storeConfig{
Memory: &id{"test"},
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -63,7 +63,7 @@ var memStoreTests = []struct {
name: "new_get",
cfg: config{
Store: &storeConfig{
Memory: &id{"test"},
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -83,7 +83,7 @@ var memStoreTests = []struct {
name: "new_delete",
cfg: config{
Store: &storeConfig{
Memory: &id{"test"},
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -103,7 +103,7 @@ var memStoreTests = []struct {
name: "new_get_add_put",
cfg: config{
Store: &storeConfig{
Memory: &id{"test"},
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -123,7 +123,7 @@ var memStoreTests = []struct {
doTo: func(s *memStore) error {
putCfg := config{
Store: &storeConfig{
Memory: &id{"test"},
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -149,7 +149,7 @@ var memStoreTests = []struct {
name: "ensemble",
cfg: config{
Store: &storeConfig{
Memory: &id{"test"},
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Expand All @@ -169,7 +169,7 @@ var memStoreTests = []struct {
doTo: func(s *memStore) error {
putCfg := config{
Store: &storeConfig{
Memory: &id{"test"},
Memory: &memConfig{"test"},
Capacity: 1000,
Effort: 10,
},
Expand Down

0 comments on commit e0327b6

Please sign in to comment.