Skip to content

Commit

Permalink
libbeat/processors/cache: add file-backed cache
Browse files Browse the repository at this point in the history
  • Loading branch information
efd6 committed Sep 27, 2023
1 parent 58db08f commit d6fcef4
Show file tree
Hide file tree
Showing 7 changed files with 757 additions and 77 deletions.
1 change: 1 addition & 0 deletions CHANGELOG-developer.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only.
- Add device handling to Okta API package for entity analytics. {pull}35980[35980]
- Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493]
- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619]
- Add file-backed cache for cache enrichment processor. {pull}36686[36686]

==== Deprecated

Expand Down
33 changes: 16 additions & 17 deletions libbeat/processors/cache/cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
"errors"
"fmt"
"os"
"time"

"github.com/elastic/beats/v7/libbeat/beat"
Expand All @@ -29,6 +30,7 @@ import (
conf "github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/paths"
)

const name = "cache"
Expand Down Expand Up @@ -64,15 +66,15 @@ func New(cfg *conf.C) (beat.Processor, error) {
if err != nil {
return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err)
}
src, cancel, err := getStoreFor(config)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

// Logging (each processor instance has a unique ID).
id := int(instanceID.Inc())
log := logp.NewLogger(name).With("instance_id", id)

src, cancel, err := getStoreFor(config, log)
if err != nil {
return nil, fmt.Errorf("failed to get the store for %s: %w", name, err)
}

p := &cache{
config: config,
store: src,
Expand All @@ -87,16 +89,18 @@ func New(cfg *conf.C) (beat.Processor, error) {
// and a context cancellation that releases the cache resource when it
// is no longer required. The cancellation should be called when the
// processor is closed.
func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) {
switch {
case cfg.Store.Memory != nil:
s, cancel := memStores.get(cfg.Store.Memory.ID, cfg)
return s, cancel, nil

case cfg.Store.File != nil:
logp.L().Warn("using memory store when file is configured")
// TODO: Replace place-holder code with a file-store.
s, cancel := fileStores.get(cfg.Store.File.ID, cfg)
err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700)
if err != nil {
return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err)
}
s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log)
return s, cancel, nil

default:
Expand All @@ -105,11 +109,6 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) {
}
}

var (
memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"}
fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock.
)

// noop is a no-op context.CancelFunc.
func noop() {}

Expand All @@ -126,9 +125,9 @@ type Store interface {
}

type CacheEntry struct {
key string
value any
expires time.Time
Key string `json:"key"`
Value any `json:"val"`
Expires time.Time `json:"expires"`
index int
}

Expand Down
24 changes: 12 additions & 12 deletions libbeat/processors/cache/cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -191,7 +191,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -210,7 +210,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -271,7 +271,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -290,7 +290,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -351,7 +351,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -379,7 +379,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"),
},
Expand Down Expand Up @@ -441,7 +441,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -465,7 +465,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand Down Expand Up @@ -527,7 +527,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: nil,
},
Expand All @@ -547,7 +547,7 @@ var cacheTests = []struct {
},
},
wantCacheVal: map[string]*CacheEntry{
"one": {key: "one", value: "metadata_value"},
"one": {Key: "one", Value: "metadata_value"},
},
wantErr: errors.New("error applying cache get processor: expected map but type is string"),
},
Expand Down Expand Up @@ -613,7 +613,7 @@ func TestCache(t *testing.T) {
switch got := p.(*cache).store.(type) {
case *memStore:
allow := cmp.AllowUnexported(CacheEntry{})
ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index")
ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index")
if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) {
t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore))
}
Expand Down
Loading

0 comments on commit d6fcef4

Please sign in to comment.