Skip to content

Commit

Permalink
libbeat/processors/cache: add file-backed cache (#36686)
Browse files Browse the repository at this point in the history
This adds a file-backed cache implementation to the cache processor. Caching
between put and get operations is done in-memory using the memory cache, but
the file cache will load previously written cache state on start-up and will
write cache contents to file when the cache is dropped. Depending on user
configuration, the file cache will also periodically write the cache state to
the backing file to reduce state loss in the event of a crash.

For simplicity, the cache state is stored as a JSON stream of objects with
fields for the key, value and expiry timestamp of cached entities.
  • Loading branch information
efd6 authored Sep 28, 2023
1 parent d5f659d commit 9e503b4
Show file tree
Hide file tree
Showing 10 changed files with 877 additions and 90 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686]

==== Deprecated

Expand Down
33 changes: 16 additions & 17 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -29,6 +30,7 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

const name = "cache"
Expand Down Expand Up @@ -64,15 +66,15 @@ func New(cfg *conf.C) (beat.Processor, error) {
if err != nil {
return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err)
}
src, cancel, err := getStoreFor(config)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

// Logging (each processor instance has a unique ID).
id := int(instanceID.Inc())
log := logp.NewLogger(name).With("instance_id", id)

src, cancel, err := getStoreFor(config, log)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

p := &cache{
config: config,
store: src,
Expand All @@ -87,16 +89,18 @@ func New(cfg *conf.C) (beat.Processor, error) {
// and a context cancellation that releases the cache resource when it
// is no longer required. The cancellation should be called when the
// processor is closed.
func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) {
switch {
case cfg.Store.Memory != nil:
s, cancel := memStores.get(cfg.Store.Memory.ID, cfg)
return s, cancel, nil

case cfg.Store.File != nil:
logp.L().Warn("using memory store when file is configured")
// TODO: Replace place-holder code with a file-store.
s, cancel := fileStores.get(cfg.Store.File.ID, cfg)
err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700)
if err != nil {
return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err)
}
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log)
return s, cancel, nil

default:
Expand All @@ -105,11 +109,6 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
}
}

var (
memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"}
fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock.
)

// noop is a no-op context.CancelFunc.
func noop() {}

Expand All @@ -126,9 +125,9 @@ type Store interface {
}

type CacheEntry struct {
key string
value any
expires time.Time
Key string `json:"key"`
Value any `json:"val"`
Expires time.Time `json:"expires"`
index int
}

Expand Down
24 changes: 12 additions & 12 deletions libbeat/processors/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -191,7 +191,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -210,7 +210,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -271,7 +271,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -290,7 +290,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -351,7 +351,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -379,7 +379,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"),
},
Expand Down Expand Up @@ -441,7 +441,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -465,7 +465,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -527,7 +527,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -547,7 +547,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: expected map but type is string"),
},
Expand Down Expand Up @@ -613,7 +613,7 @@ func TestCache(t *testing.T) {
switch got := p.(*cache).store.(type) {
case *memStore:
allow := cmp.AllowUnexported(CacheEntry{})
ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index")
ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index")
if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) {
t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore))
}
Expand Down
13 changes: 9 additions & 4 deletions libbeat/processors/cache/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,17 +88,22 @@ func defaultConfig() config {
}

type storeConfig struct {
Memory *id `config:"memory"`
File *id `config:"file"`
Memory *memConfig `config:"memory"`
File *fileConfig `config:"file"`

// Capacity and Effort are currently experimental
// and not in public-facing documentation.
Capacity int `config:"capacity"`
Effort int `config:"eviction_effort"`
}

type id struct {
ID string `config:"id"`
type memConfig struct {
ID string `config:"id" validate:"required"`
}

type fileConfig struct {
ID string `config:"id" validate:"required"`
WriteOutEvery time.Duration `config:"write_interval"`
}

func (cfg *storeConfig) Validate() error {
Expand Down
36 changes: 36 additions & 0 deletions libbeat/processors/cache/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,20 @@ put:
ttl: 168h
key_field: crowdstrike.aid
value_field: crowdstrike.metadata
`,
want: nil,
},
{
name: "put_file_with_periodic_write_out",
cfg: `
backend:
file:
id: aidmaster
write_interval: 15m
put:
ttl: 168h
key_field: crowdstrike.aid
value_field: crowdstrike.metadata
`,
want: nil,
},
Expand Down Expand Up @@ -78,6 +92,28 @@ delete:
`,
want: nil,
},
{
name: "memory_no_id",
cfg: `
backend:
memory:
id: ''
delete:
key_field: crowdstrike.aid
`,
want: errors.New("string value is not set accessing 'backend.memory.id'"),
},
{
name: "file_no_id",
cfg: `
backend:
file:
id: ''
delete:
key_field: crowdstrike.aid
`,
want: errors.New("string value is not set accessing 'backend.file.id'"),
},
{
name: "no_op",
cfg: `
Expand Down
5 changes: 3 additions & 2 deletions libbeat/processors/cache/docs/cache.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,9 @@ It has the following settings:

One of `backend.memory.id` or `backend.file.id` must be provided.

`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache.
`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache.
`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache.
`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes.

One of `put`, `get` or `delete` must be provided.

Expand Down
Loading

0 comments on commit 9e503b4

Please sign in to comment.