From f111cbc73ec40b728299cd457e93ae72a8d7a6b9 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Tue, 26 Sep 2023 06:14:57 +0930 Subject: [PATCH 1/8] libbeat/processors/cache: new processor (#36619) This is the initial infrastructure for a caching metadata processor to be added. It currently only supports in-memory caching, though both planned cache types are configurable with the file cache being mocked by an in-memory cache. Additional config options are added relative to the RFC, but these are intentionally not documented at this stage. --- .github/CODEOWNERS | 1 + CHANGELOG-developer.next.asciidoc | 1 + libbeat/processors/cache/cache.go | 290 +++++++++ libbeat/processors/cache/cache_test.go | 625 +++++++++++++++++++ libbeat/processors/cache/config.go | 113 ++++ libbeat/processors/cache/config_test.go | 182 ++++++ libbeat/processors/cache/docs/cache.asciidoc | 107 ++++ libbeat/processors/cache/mem_store.go | 257 ++++++++ libbeat/processors/cache/mem_store_test.go | 379 +++++++++++ 9 files changed, 1955 insertions(+) create mode 100644 libbeat/processors/cache/cache.go create mode 100644 libbeat/processors/cache/cache_test.go create mode 100644 libbeat/processors/cache/config.go create mode 100644 libbeat/processors/cache/config_test.go create mode 100644 libbeat/processors/cache/docs/cache.asciidoc create mode 100644 libbeat/processors/cache/mem_store.go create mode 100644 libbeat/processors/cache/mem_store_test.go diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a41ac990750..8fe748c1bf5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -55,6 +55,7 @@ CHANGELOG* /libbeat/ @elastic/elastic-agent-data-plane /libbeat/docs/processors-list.asciidoc @elastic/ingest-docs /libbeat/management @elastic/elastic-agent-control-plane +/libbeat/processors/cache/ @elastic/security-external-integrations /libbeat/processors/community_id/ @elastic/security-external-integrations /libbeat/processors/decode_xml/ @elastic/security-external-integrations /libbeat/processors/decode_xml_wineventlog/ @elastic/security-external-integrations diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 464afe040f8..d0f4c40ef3b 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -172,6 +172,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158] - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] +- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] ==== Deprecated diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go new file mode 100644 index 00000000000..eea3ed115ea --- /dev/null +++ b/libbeat/processors/cache/cache.go @@ -0,0 +1,290 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "context" + "errors" + "fmt" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/processors" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +const name = "cache" + +func init() { + // We cannot use this as a JS plugin as it is stateful and includes a Close method. + processors.RegisterPlugin(name, New) +} + +var ( + // ErrNoMatch is returned when the event doesn't contain the field + // specified in key_field. + ErrNoMatch = errors.New("field in key_field not found in the event") + + // ErrNoData is returned when metadata for an event can't be collected. + ErrNoData = errors.New("metadata not found") + + instanceID atomic.Uint32 +) + +// cache is a caching enrichment processor. +type cache struct { + config config + store Store + cancel context.CancelFunc + log *logp.Logger +} + +// Resulting processor implements `Close()` to release the cache resources. +func New(cfg *conf.C) (beat.Processor, error) { + config := defaultConfig() + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) + } + src, cancel, err := getStoreFor(config) + if err != nil { + return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) + } + + // Logging (each processor instance has a unique ID). + id := int(instanceID.Inc()) + log := logp.NewLogger(name).With("instance_id", id) + + p := &cache{ + config: config, + store: src, + cancel: cancel, + log: log, + } + p.log.Infow("initialized cache processor", "details", p) + return p, nil +} + +// getStoreFor returns a backing store for the provided configuration, +// and a context cancellation that releases the cache resource when it +// is no longer required. The cancellation should be called when the +// processor is closed. +func getStoreFor(cfg config) (Store, context.CancelFunc, error) { + switch { + case cfg.Store.Memory != nil: + s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) + return s, cancel, nil + + case cfg.Store.File != nil: + logp.L().Warn("using memory store when file is configured") + // TODO: Replace place-holder code with a file-store. + s, cancel := fileStores.get(cfg.Store.File.ID, cfg) + return s, cancel, nil + + default: + // This should have been caught by config validation. + return nil, noop, errors.New("no configured store") + } +} + +var ( + memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"} + fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock. +) + +// noop is a no-op context.CancelFunc. +func noop() {} + +// Store is the interface implemented by metadata providers. +type Store interface { + Put(key string, val any) error + Get(key string) (any, error) + Delete(key string) error + + // The string returned from the String method should + // be the backing store ID. Either "file:" or + // "memory:". + fmt.Stringer +} + +type CacheEntry struct { + key string + value any + expires time.Time + index int +} + +// Run enriches the given event with the host metadata. +func (p *cache) Run(event *beat.Event) (*beat.Event, error) { + switch { + case p.config.Put != nil: + p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put) + err := p.putFrom(event) + if err != nil { + switch { + case errors.Is(err, mapstr.ErrKeyNotFound): + if p.config.IgnoreMissing { + return event, nil + } + return event, err + } + return event, fmt.Errorf("error applying %s put processor: %w", name, err) + } + return event, nil + + case p.config.Get != nil: + p.log.Debugw("get", "backend_id", p.store, "config", p.config.Get) + result, err := p.getFor(event) + if err != nil { + switch { + case errors.Is(err, mapstr.ErrKeyNotFound): + if p.config.IgnoreMissing { + return event, nil + } + case errors.Is(err, ErrNoData): + return event, err + } + return event, fmt.Errorf("error applying %s get processor: %w", name, err) + } + if result != nil { + return result, nil + } + return event, ErrNoMatch + + case p.config.Delete != nil: + p.log.Debugw("delete", "backend_id", p.store, "config", p.config.Delete) + err := p.deleteFor(event) + if err != nil { + return event, fmt.Errorf("error applying %s delete processor: %w", name, err) + } + return event, nil + + default: + // This should never happen, but we don't need to flag it. + return event, nil + } +} + +// putFrom takes the configured value from the event and stores it in the cache +// if it exists. +func (p *cache) putFrom(event *beat.Event) error { + k, err := event.GetValue(p.config.Put.Key) + if err != nil { + return err + } + key, ok := k.(string) + if !ok { + return fmt.Errorf("key field '%s' not a string: %T", p.config.Put.Key, k) + } + p.log.Debugw("put", "backend_id", p.store, "key", key) + + val, err := event.GetValue(p.config.Put.Value) + if err != nil { + return err + } + + err = p.store.Put(key, val) + if err != nil { + return fmt.Errorf("failed to put '%s' into '%s': %w", key, p.config.Put.Value, err) + } + return nil +} + +// getFor gets the configured value from the cache for the event and inserts +// it into the configured field if it exists. +func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) { + // Check for clobbering. + dst := p.config.Get.Target + if !p.config.OverwriteKeys { + if _, err := event.GetValue(dst); err == nil { + return nil, fmt.Errorf("target field '%s' already exists and overwrite_keys is false", dst) + } + } + + // Get key into store for metadata. + key := p.config.Get.Key + v, err := event.GetValue(key) + if err != nil { + return nil, err + } + k, ok := v.(string) + if !ok { + return nil, fmt.Errorf("key field '%s' not a string: %T", key, v) + } + p.log.Debugw("get", "backend_id", p.store, "key", k) + + // Get metadata... + meta, err := p.store.Get(k) + if err != nil { + return nil, fmt.Errorf("%w for '%s': %w", ErrNoData, k, err) + } + if meta == nil { + return nil, fmt.Errorf("%w for '%s'", ErrNoData, k) + } + if m, ok := meta.(map[string]interface{}); ok { + meta = mapstr.M(m) + } + // ... and write it into the event. + // The implementation of PutValue currently leaves event + // essentially unchanged in the case of an error (in the + // case of an @metadata field there may be a mutation, + // but at most this will be the addition of a Meta field + // value to event). None of this is documented. + if _, err = event.PutValue(dst, meta); err != nil { + return nil, err + } + return event, nil +} + +// deleteFor deletes the configured value from the cache based on the value of +// the configured key. +func (p *cache) deleteFor(event *beat.Event) error { + v, err := event.GetValue(p.config.Delete.Key) + if err != nil { + return err + } + k, ok := v.(string) + if !ok { + return fmt.Errorf("key field '%s' not a string: %T", p.config.Delete.Key, v) + } + return p.store.Delete(k) +} + +func (p *cache) Close() error { + p.cancel() + return nil +} + +// String returns the processor representation formatted as a string +func (p *cache) String() string { + switch { + case p.config.Put != nil: + return fmt.Sprintf("%s=[operation=put, store_id=%s, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys) + case p.config.Get != nil: + return fmt.Sprintf("%s=[operation=get, store_id=%s, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys) + case p.config.Delete != nil: + return fmt.Sprintf("%s=[operation=delete, store_id=%s, key_field=%s]", name, p.store, p.config.Delete.Key) + default: + return fmt.Sprintf("%s=[operation=invalid]", name) + } +} diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go new file mode 100644 index 00000000000..6fe5847c01f --- /dev/null +++ b/libbeat/processors/cache/cache_test.go @@ -0,0 +1,625 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/beats/v7/libbeat/beat" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type cacheTestStep struct { + event mapstr.M + want mapstr.M + wantCacheVal map[string]*CacheEntry + wantErr error +} + +var cacheTests = []struct { + name string + configs []testConfig + wantInitErr error + steps []cacheTestStep +}{ + { + name: "invalid_no_backend", + configs: []testConfig{ + { + cfg: mapstr.M{ + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "ttl": "168h", + "value_field": "crowdstrike.metadata", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: missing required field accessing 'backend'"), + }, + { + name: "invalid_no_key_field", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: string value is not set accessing 'put.key_field'"), + }, + { + name: "invalid_no_value_field", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: string value is not set accessing 'put.value_field'"), + }, + { + name: "put_value", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_reverse_config", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_with_get_error_no_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite_but_get_error", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new.child", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {key: "one", value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: expected map but type is string"), + }, + }, + }, +} + +type testConfig struct { + when func(e mapstr.M) bool + cfg mapstr.M +} + +func TestCache(t *testing.T) { + logp.TestingSetup(logp.WithSelectors(name)) + for _, test := range cacheTests { + t.Run(test.name, func(t *testing.T) { + var processors []beat.Processor + for i, cfg := range test.configs { + config, err := conf.NewConfigFrom(cfg.cfg) + if err != nil { + t.Fatal(err) + } + + p, err := New(config) + if !sameError(err, test.wantInitErr) { + t.Errorf("unexpected error from New: got:%v want:%v", err, test.wantInitErr) + } + if err != nil { + return + } + + t.Log(p) + c, ok := p.(*cache) + if !ok { + t.Fatalf("processor %d is not an *cache", i) + } + + defer func() { + err := c.Close() + if err != nil { + t.Errorf("unexpected error from c.Close(): %v", err) + } + }() + + processors = append(processors, p) + } + + for i, step := range test.steps { + for j, p := range processors { + if test.configs[j].when != nil && !test.configs[j].when(step.event) { + continue + } + got, err := p.Run(&beat.Event{ + Fields: step.event, + }) + if !sameError(err, step.wantErr) { + t.Errorf("unexpected error from Run: got:%v want:%v", err, step.wantErr) + return + } + if !cmp.Equal(step.want, got.Fields) { + t.Errorf("unexpected result %d\n--- want\n+++ got\n%s", i, cmp.Diff(step.want, got.Fields)) + } + switch got := p.(*cache).store.(type) { + case *memStore: + allow := cmp.AllowUnexported(CacheEntry{}) + ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index") + if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) { + t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore)) + } + } + } + } + }) + } +} diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go new file mode 100644 index 00000000000..06dacc5c6ca --- /dev/null +++ b/libbeat/processors/cache/config.go @@ -0,0 +1,113 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "time" +) + +type config struct { + Get *getConfig `config:"get"` + Put *putConfig `config:"put"` + Delete *delConfig `config:"delete"` + + Store *storeConfig `config:"backend" validate:"required"` + + // IgnoreMissing: Ignore errors if event has no matching field. + IgnoreMissing bool `config:"ignore_missing"` + + // OverwriteKeys allow target_fields to overwrite existing fields. + OverwriteKeys bool `config:"overwrite_keys"` +} + +func (cfg *config) Validate() error { + var ops int + if cfg.Put != nil { + ops++ + } + if cfg.Get != nil { + ops++ + } + if cfg.Delete != nil { + ops++ + } + switch ops { + case 0: + return errors.New("no operation specified for cache processor") + case 1: + return nil + default: + return errors.New("cannot specify multiple operations together in a cache processor") + } +} + +type getConfig struct { + // Key is the field containing the key to lookup for matching. + Key string `config:"key_field" validate:"required"` + + // Target is the destination field where fields will be added. + Target string `config:"target_field" validate:"required"` +} + +type putConfig struct { + // Key is the field containing the key to lookup for matching. + Key string `config:"key_field" validate:"required"` + + // Target is the destination field where fields will be added. + Value string `config:"value_field" validate:"required"` + + TTL *time.Duration `config:"ttl" validate:"required"` +} + +type delConfig struct { + // Key is the field containing the key to lookup for deletion. + Key string `config:"key_field" validate:"required"` +} + +func defaultConfig() config { + return config{ + IgnoreMissing: true, + OverwriteKeys: false, + } +} + +type storeConfig struct { + Memory *id `config:"memory"` + File *id `config:"file"` + + // Capacity and Effort are currently experimental + // and not in public-facing documentation. + Capacity int `config:"capacity"` + Effort int `config:"eviction_effort"` +} + +type id struct { + ID string `config:"id"` +} + +func (cfg *storeConfig) Validate() error { + switch { + case cfg.Memory != nil && cfg.File != nil: + return errors.New("must specify only one of backend.memory.id or backend.file.id") + case cfg.Memory != nil, cfg.File != nil: + default: + return errors.New("must specify one of backend.memory.id or backend.file.id") + } + return nil +} diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go new file mode 100644 index 00000000000..6e8b01d930f --- /dev/null +++ b/libbeat/processors/cache/config_test.go @@ -0,0 +1,182 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "testing" + + "github.com/elastic/go-ucfg/yaml" +) + +var validateTests = []struct { + name string + cfg string + want error +}{ + { + name: "put_file", + cfg: ` +backend: + file: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_memory", + cfg: ` +backend: + memory: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "get", + cfg: ` +backend: + file: + id: aidmaster +get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "delete", + cfg: ` +backend: + file: + id: aidmaster +delete: + key_field: crowdstrike.aid +`, + want: nil, + }, + { + name: "no_op", + cfg: ` +backend: + file: + id: aidmaster +`, + want: errors.New("no operation specified for cache processor accessing config"), + }, + { + + name: "too_many_ops", + cfg: ` +backend: + file: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +`, + want: errors.New("cannot specify multiple operations together in a cache processor accessing config"), + }, + { + + name: "no_backend", + cfg: ` +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("missing required field accessing 'backend'"), + }, + { + + name: "incomplete_backend", + cfg: ` +backend: + file: ~ +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("must specify one of backend.memory.id or backend.file.id accessing 'backend'"), + }, + { + + name: "too_many_backends", + cfg: ` +backend: + file: + id: aidmaster_f + memory: + id: aidmaster_m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("must specify only one of backend.memory.id or backend.file.id accessing 'backend'"), + }, +} + +func TestValidate(t *testing.T) { + for _, test := range validateTests { + t.Run(test.name, func(t *testing.T) { + got := ucfgRigmarole(test.cfg) + if !sameError(got, test.want) { + t.Errorf("unexpected error: got:%v want:%v", got, test.want) + } + }) + } +} + +func ucfgRigmarole(text string) error { + c, err := yaml.NewConfig([]byte(text)) + if err != nil { + return err + } + cfg := defaultConfig() + err = c.Unpack(&cfg) + if err != nil { + return err + } + return cfg.Validate() +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } +} diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc new file mode 100644 index 00000000000..94f5d07cece --- /dev/null +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -0,0 +1,107 @@ +[[add-cached-metadata]] +=== Add cached metadata + +++++ +cache +++++ + +experimental[] + +The `cache` processor enriches events with information from a previously +cached events. + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + put: + key_field: join_key_field + value_field: source_field +------------------------------------------------------------------------------- + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + get: + key_field: join_key_field + target_field: destination_field +------------------------------------------------------------------------------- + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + delete: + key_field: join_key_field +------------------------------------------------------------------------------- + +The fields added to the target field will depend on the provider. + +It has the following settings: + +One of `backend.memory.id` or `backend.file.id` must be provided. + +`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache. +`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache. + +One of `put`, `get` or `delete` must be provided. + +`put.key_field`:: Name of the field containing the key to put into the cache. Required if `put` is present. +`put.value_field`:: Name of the field containing the value to put into the cache. Required if `put` is present. +`put.ttl`:: The TTL to associate with the cached key/value. Valid time units are h, m, s, ms, us/µs and ns. Required if `put` is present. + +`get.key_field`:: Name of the field containing the key to get. Required if `get` is present. +`get.target_field`:: Name of the field to which the cached value will be written. Required if `get` is present. + +`delete.key_field`:: Name of the field containing the key to delete. Required if `delete` is present. + +`ignore_missing`:: (Optional) When set to `false`, events that don't contain any +of the fields in `match_keys` will be discarded and an error will be generated. By +default, this condition is ignored. + +`overwrite_keys`:: (Optional) By default, if a target field already exists, it +will not be overwritten and an error will be logged. If `overwrite_keys` is +set to `true`, this condition will be ignored. + +The `cache` processor can be used to perform joins within the Beat between +documents within an event stream. + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - if: + contains: + log.file.path: fdrv2/aidmaster + then: + - cache: + backend: + memory: + id: aidmaster + put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata + else: + - cache: + backend: + memory: + id: aidmaster + get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +------------------------------------------------------------------------------- + +This would enrich an event events with `log.file.path` not equal to +"fdrv2/aidmaster" with the `crowdstrike.metadata` fields from events +with `log.file.path` equal to that value where the `crowdstrike.aid` +field matches between the source and destination documents. diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go new file mode 100644 index 00000000000..900e91c8e60 --- /dev/null +++ b/libbeat/processors/cache/mem_store.go @@ -0,0 +1,257 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "sync" + "time" +) + +// memStoreSet is a collection of shared memStore caches. +type memStoreSet struct { + mu sync.Mutex + stores map[string]*memStore + typ string // TODO: Remove when a file-backed store exists. +} + +// get returns a memStore cache with the provided ID based on the config. +// If a memStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the memStore from set if the +// count reaches zero. +func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newMemStore(cfg, id, s.typ) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// free removes the memStore with the given ID from the set. free is safe +// for concurrent use. +func (s *memStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// memStore is a memory-backed cache store. +type memStore struct { + mu sync.Mutex + cache map[string]*CacheEntry + expiries expiryHeap + ttl time.Duration // ttl is the time entries are valid for in the cache. + refs int // refs is the number of processors referring to this store. + + // id is the index into global cache store for the cache. + id string + // typ is a temporary tag to differentiate memory stores + // from the mocked file store. + // TODO: Remove when a file-backed store exists. + typ string + + // cap is the maximum number of elements the cache + // will hold. If not positive, no limit. + cap int + // effort is the number of entries to examine during + // expired element eviction. If not positive, full effort. + effort int +} + +// newMemStore returns a new memStore configured to apply the give TTL duration. +// The memStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the memStore is held in. +func newMemStore(cfg config, id, typ string) *memStore { + return &memStore{ + id: id, + typ: typ, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + } +} + +func (c *memStore) String() string { return c.typ + ":" + c.id } + +// add updates the receiver for a new operation. It increases the reference +// count for the receiver, and if the config is a put operation and has no +// previous put operation defined, the TTL, cap and effort will be set from +// cfg. add is safe for concurrent use. +func (c *memStore) add(cfg config) { + c.mu.Lock() + defer c.mu.Unlock() + c.refs++ + + // We may have already constructed the store with + // a get or a delete config, so set the TTL, cap + // and effort if we have a put config. If another + // put config has already been included, we ignore + // the put options now. + if cfg.Put == nil { + return + } + if c.ttl == -1 { + // putConfig.TTL is a required field, so we don't + // need to check for nil-ness. + c.ttl = *cfg.Put.TTL + c.cap = cfg.Store.Capacity + c.effort = cfg.Store.Effort + } +} + +// dropFrom decreases the reference count for the memStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *memStore) dropFrom(stores *memStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +// Get returns the cached value associated with the provided key. If there is +// no value for the key, or the value has expired Get returns ErrNoData. Get +// is safe for concurrent use. +func (c *memStore) Get(key string) (any, error) { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.cache[key] + if !ok { + return nil, ErrNoData + } + if time.Now().After(v.expires) { + delete(c.cache, key) + return nil, ErrNoData + } + return v.value, nil +} + +// Put stores the provided value in the cache associated with the given key. +// The value is given an expiry time based on the configured TTL of the cache. +// Put is safe for concurrent use. +func (c *memStore) Put(key string, val any) error { + c.mu.Lock() + defer c.mu.Unlock() + now := time.Now() + c.evictExpired(now) + e := &CacheEntry{ + key: key, + value: val, + expires: now.Add(c.ttl), + } + c.cache[key] = e + heap.Push(&c.expiries, e) + return nil +} + +// evictExpired removes up to effort elements from the cache when the cache +// is below capacity, retaining all elements that have not expired. If the +// cache is at or above capacity, the oldest elements are removed to bring +// it under the capacity limit. +func (c *memStore) evictExpired(now time.Time) { + for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ { + if c.expiries[0].expires.After(now) { + break + } + e := c.expiries.pop() + delete(c.cache, e.key) + } + if c.cap <= 0 { + // No cap, so depend on effort. + return + } + for len(c.cache) >= c.cap { + e := c.expiries.pop() + delete(c.cache, e.key) + } +} + +// Delete removes the value associated with the provided key from the cache. +// Delete is safe for concurrent use. +func (c *memStore) Delete(key string) error { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.cache[key] + if !ok { + return nil + } + heap.Remove(&c.expiries, v.index) + delete(c.cache, key) + return nil +} + +var _ heap.Interface = (*expiryHeap)(nil) + +// expiryHeap is a min-date heap. +// +// TODO: This could be a queue instead, though deletion becomes more +// complicated in that case. +type expiryHeap []*CacheEntry + +func (h *expiryHeap) pop() *CacheEntry { + e := heap.Pop(h).(*CacheEntry) + e.index = -1 + return e +} + +func (h expiryHeap) Len() int { + return len(h) +} +func (h expiryHeap) Less(i, j int) bool { + return h[i].expires.Before(h[j].expires) +} +func (h expiryHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} +func (h *expiryHeap) Push(v any) { + e := v.(*CacheEntry) + e.index = len(*h) + *h = append(*h, e) +} +func (h *expiryHeap) Pop() any { + v := (*h)[len(*h)-1] + (*h)[len(*h)-1] = nil // Help GC. + *h = (*h)[:len(*h)-1] + return v +} diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go new file mode 100644 index 00000000000..692931854b8 --- /dev/null +++ b/libbeat/processors/cache/mem_store_test.go @@ -0,0 +1,379 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +type memStoreTestSteps struct { + doTo func(*memStore) error + want *memStore +} + +//nolint:errcheck // Paul Hogan was right. +var memStoreTests = []struct { + name string + cfg config + want *memStore + steps []memStoreTestSteps +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }, + steps: []memStoreTestSteps{ + 0: { + doTo: func(s *memStore) error { + putCfg := config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + steps: []memStoreTestSteps{ + 0: { + doTo: func(s *memStore) error { + putCfg := config{ + Store: &storeConfig{ + Memory: &id{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 1: { + doTo: func(s *memStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "two": {key: "two", value: int(2), index: 1}, + "three": {key: "three", value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "two", value: int(2), index: 1}, + {key: "three", value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 2: { + doTo: func(s *memStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "two": {key: "two", value: int(2), index: 1}, + "three": {key: "three", value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "two", value: int(2), index: 1}, + {key: "three", value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 3: { + doTo: func(s *memStore) error { + return s.Delete("two") + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "three": {key: "three", value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "three", value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 4: { + doTo: func(s *memStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "three": {key: "three", value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "three", value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 5: { + doTo: func(s *memStore) error { + s.dropFrom(&memStores) + if !memStores.has(s.id) { + return fmt.Errorf("%q memStore not found after single close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {key: "one", value: int(1), index: 0}, + "three": {key: "three", value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {key: "one", value: int(1), index: 0}, + {key: "three", value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 6: { + doTo: func(s *memStore) error { + s.dropFrom(&memStores) + if memStores.has(s.id) { + return fmt.Errorf("%q memStore still found after double close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + }, + }, +} + +func TestMemStore(t *testing.T) { + allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires") + + for _, test := range memStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") + store.add(test.cfg) + memStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInMemStore) { + t.Errorf("unexpected new memStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected memStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInMemStore, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set, it is used only for testing. +func (s *memStoreSet) add(store *memStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set, it is used only for testing. +func (s *memStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} + +func ptrTo[T any](v T) *T { return &v } From abfcd96455c5aae7bc08d170ea788f27b9da3b74 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Wed, 27 Sep 2023 06:49:29 +0930 Subject: [PATCH 2/8] x-pack/packetbeat: bump npcap version to v1.76 (#36549) --- CHANGELOG.next.asciidoc | 4 +- x-pack/packetbeat/LICENSE-Npcap.txt | 465 +++++++++++---------- x-pack/packetbeat/magefile.go | 2 +- x-pack/packetbeat/npcap/installer/LICENSE | 4 +- x-pack/packetbeat/tests/system/app_test.go | 2 +- 5 files changed, 242 insertions(+), 235 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 228d201f74f..a8ed9f032c5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -238,12 +238,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Osquerybeat* -*Packetbeat* - - *Packetbeat* - Improve efficiency of sniffers by deduplicating interface configurations. {issue}36574[36574] {pull}36576[36576] +- Bump Windows Npcap version to v1.76. {issue}36539[36539] {pull}36549[36549] *Winlogbeat* diff --git a/x-pack/packetbeat/LICENSE-Npcap.txt b/x-pack/packetbeat/LICENSE-Npcap.txt index 893b6ff218a..bb51b2fbd6f 100644 --- a/x-pack/packetbeat/LICENSE-Npcap.txt +++ b/x-pack/packetbeat/LICENSE-Npcap.txt @@ -1,228 +1,237 @@ - -NPCAP COPYRIGHT / END USER LICENSE AGREEMENT - -Npcap is a Windows packet sniffing driver and library and is copyright -(c) 2013-2022 by Insecure.Com LLC ("The Nmap Project"). All rights -reserved. - -Even though Npcap source code is publicly available for review, it is -not open source software and may not be redistributed without special -permission from the Nmap Project. The standard version is also -limited to installation on five systems. We fund the Npcap project by -selling two types of commercial licenses to a special Npcap OEM -edition: - -1) Npcap OEM Redistribution License allows companies to redistribute -Npcap with their products. - -2) Npcap OEM Internal Use License allows companies to use Npcap OEM -internally in excess of the free/demo version's normal 5-system -limitation. - -Both of these licenses include updates and support as well as a -warranty. Npcap OEM also includes a silent installer for unattended -installation. Further details about Npcap OEM are available from -https://nmap.org/npcap/oem/, and you are also welcome to contact us at -sales@nmap.com to ask any questions or set up a license for your -organization. - -Free and open source software producers are also welcome to contact us -for redistribution requests. However, we normally recommend that such -authors instead ask your users to download and install Npcap themselves. - -If the Nmap Project (directly or through one of our commercial -licensing customers) has granted you additional rights to Npcap or -Npcap OEM, those additional rights take precedence where they conflict -with the terms of this license agreement. - -Since the Npcap source code is available for download and review, -users sometimes contribute code patches to fix bugs or add new -features. By sending these changes to the Nmap Project (including -through direct email or our mailing lists or submitting pull requests -through our source code repository), it is understood unless you -specify otherwise that you are offering the Nmap Project the -unlimited, non-exclusive right to reuse, modify, and relicence your -code contribution so that we may (but are not obligated to) -incorporate it into Npcap. If you wish to specify special license -conditions or restrictions on your contributions, just say so when you -send them. - -This copy of Npcap (the "Software") and accompanying documentation is -licensed and not sold. This Software is protected by copyright laws -and treaties, as well as laws and treaties related to other forms of -intellectual property. The Nmap Project owns intellectual property -rights in the Software. The Licensee's ("you" or "your") license to -download, use, copy, or change the Software is subject to these rights -and to all the terms and conditions of this End User License Agreement -("Agreement"). - -ACCEPTANCE - -By accepting this agreement or by downloading, installing, using, or -copying the Software, or by clicking "I Agree", you agree to be bound -by the terms of this EULA. If you do not agree to the terms of this -EULA, do not install, use, or copy the Software. - -LICENSE GRANT - -This Agreement entitles you to install and use five (5) copies of the -Software. In addition, you may make archival copies of the Software -which may only be used for the reinstallation of the Software. This -Agreement does not permit the installation or use of more than 5 -copies of the Software, or the installation of the Software on more -than five computer at any given time, on a system that allows shared -used of applications by more than five users, or on any configuration -or system of computers that allows more than five users. A user may -only have one instance of this Agreement active at once. For example, -downloading the software multiple times, downloading multiple versions -of the software, and/or executing the software installer multiple -times do not grant any additional rights such as using the software on -more machines. - -The terms "computer" and "machine" in this license include any -computing device, including software computing instances such as -virtual machines and Docker containers. - -Copies of Npcap do not count toward the five copy, five computer, or -five user limitations imposed by this section if they are installed -and used solely in conjunction with any of the following software: - -o The Nmap Security Scanner, as distributed from https://nmap.org - -o The Wireshark network protocol analyzer, as distributed from - https://www.wireshark.org/ - -o Microsoft Defender for Identity, as distributed from - https://www.microsoft.com/en-us/microsoft-365/security/identity-defender - -Users wishing to redistribute Npcap or exceed the usage limits imposed -by this free license or benefit from commercial support and features -such as a silent installer should contact sales@nmap.com to obtain an -appropriate commercial license agreement. More details on our OEM -edition is also available from https://nmap.org/npcap/oem/. - -DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY - -This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - -RESTRICTIONS ON TRANSFER - -Without first obtaining the express written consent of the Nmap -Project, you may not assign your rights and obligations under this -Agreement, or redistribute, encumber, sell, rent, lease, sublicense, -or otherwise transfer your rights to the Software Product. - -RESTRICTIONS ON USE - -You may not use, copy, or install the Software Product on more than -five computers, or permit the use, copying, or installation of the -Software Product by more than five users or on more than five -computers. - -RESTRICTIONS ON COPYING - -You may not copy any part of the Software except to the extent that -licensed use inherently demands the creation of a temporary copy -stored in computer memory and not permanently affixed on storage -medium. You may make archival copies as well. - -DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY - -UNLESS OTHERWISE EXPLICITLY AGREED TO IN WRITING BY THE NMAP PROJECT, -THE NMAP PROJECT MAKES NO OTHER WARRANTIES, EXPRESS OR IMPLIED, IN -FACT OR IN LAW, INCLUDING, BUT NOT LIMITED TO, ANY IMPLIED WARRANTIES -OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE OTHER THAN AS -SET FORTH IN THIS AGREEMENT OR IN THE LIMITED WARRANTY DOCUMENTS -PROVIDED WITH THE SOFTWARE. - -The Nmap Project makes no warranty that the Software will meet your -requirements or operate under your specific conditions of use. The -Nmap Project makes no warranty that operation of the Software Product -will be secure, error free, or free from interruption. YOU MUST -DETERMINE WHETHER THE SOFTWARE SUFFICIENTLY MEETS YOUR REQUIREMENTS -FOR SECURITY AND UNINTERRUPTABILITY. YOU BEAR SOLE RESPONSIBILITY AND -ALL LIABILITY FOR ANY LOSS INCURRED DUE TO FAILURE OF THE SOFTWARE TO -MEET YOUR REQUIREMENTS. THE NMAP PROJECT WILL NOT, UNDER ANY -CIRCUMSTANCES, BE RESPONSIBLE OR LIABLE FOR THE LOSS OF DATA ON ANY -COMPUTER OR INFORMATION STORAGE DEVICE. - -UNDER NO CIRCUMSTANCES SHALL THE NMAP PROJECT, ITS DIRECTORS, -OFFICERS, EMPLOYEES OR AGENTS BE LIABLE TO YOU OR ANY OTHER PARTY FOR -INDIRECT, CONSEQUENTIAL, SPECIAL, INCIDENTAL, PUNITIVE, OR EXEMPLARY -DAMAGES OF ANY KIND (INCLUDING LOST REVENUES OR PROFITS OR LOSS OF -BUSINESS) RESULTING FROM THIS AGREEMENT, OR FROM THE FURNISHING, -PERFORMANCE, INSTALLATION, OR USE OF THE SOFTWARE, WHETHER DUE TO A -BREACH OF CONTRACT, BREACH OF WARRANTY, OR THE NEGLIGENCE OF THE NMAP -PROJECT OR ANY OTHER PARTY, EVEN IF THE NMAP PROJECT IS ADVISED -BEFOREHAND OF THE POSSIBILITY OF SUCH DAMAGES. TO THE EXTENT THAT THE -APPLICABLE JURISDICTION LIMITS THE NMAP PROJECT'S ABILITY TO DISCLAIM -ANY IMPLIED WARRANTIES, THIS DISCLAIMER SHALL BE EFFECTIVE TO THE -MAXIMUM EXTENT PERMITTED. - -LIMITATIONS OF REMEDIES AND DAMAGES - -Your remedy for a breach of this Agreement or of any warranty included -in this Agreement is the correction or replacement of the Software or -a refund of the purchase price of the Software, exclusive of any costs -for shipping and handling. Selection of whether to correct or replace -or refund shall be solely at the discretion of the Nmap Project. The -Nmap Project reserves the right to substitute a functionally -equivalent copy of the Software Product as a replacement. - -Any claim must be made within the applicable warranty period. All -warranties cover only defects arising under normal use and do not -include malfunctions or failure resulting from misuse, abuse, neglect, -alteration, problems with electrical power, acts of nature, unusual -temperatures or humidity, improper installation, or damage determined -by the Nmap Project to have been caused by you. All limited warranties -on the Software Product are granted only to you and are -non-transferable. - -You agree to indemnify and hold the Nmap Project harmless from all -claims, judgments, liabilities, expenses, or costs arising from your -breach of this Agreement and/or acts or omissions. - -GOVERNING LAW, JURISDICTION AND COSTS - -This Agreement is governed by the laws the United States of America -and Washington State, without regard to Washington's conflict or -choice of law provisions. - -SEVERABILITY - -If any provision of this Agreement shall be held to be invalid or -unenforceable, the remainder of this Agreement shall remain in full -force and effect. To the extent any express or implied restrictions -are not permitted by applicable laws, these express or implied -restrictions shall remain in force and effect to the maximum extent -permitted by such applicable laws. - -THIRD PARTY SOFTWARE ATTRIBUTION - -Npcap uses several 3rd party open source software libraries: - -* The libpcap portable packet capturing library from https://tcpdump.org -* The Winpcap packet capturing library. It has been abandoned, but is - currently still available from https://www.winpcap.org/. -* The ieee80211_radiotap.h header file from David Young - -All of these are open source with BSD-style licenses that allow for -unlimited use and royalty-free redistribution within other software -(including commercial/proprietary software). Some include a warranty -disclaimer (relating to the original authors) and require a small -amount of acknowledgment text be added somewhere in the documentation -of any software which includes them (including indirect inclusion -through Npcap). - -The required acknowledgement text as well as full license text and -source details for these libraries is available from: -https://npcap.org/src/docs/Npcap-Third-Party-Open-Source.pdf . - -Since Insecure.Com LLC is not the author of this 3rd party code, we -can not waive or modify it’s software copyright or license. Npcap -users and redistributors must comply with the relevant Npcap license -(either the free/demo license or a commercial Npcap OEM license they -may have purchased) as well as the minimal requirements of this 3rd -party open source software. + +NPCAP COPYRIGHT / END USER LICENSE AGREEMENT + +Npcap (https://npcap.com) is a Windows packet sniffing driver and +library and is copyright (c) 2013-2023 by Nmap Software LLC ("The Nmap +Project"). All rights reserved. + +Even though Npcap source code is publicly available for review, it is +not open source software and may not be redistributed or used in other +software without special permission from the Nmap Project. The +standard (free) version is usually limited to installation on five +systems. We fund the Npcap project by selling two types of commercial +licenses to a special Npcap OEM edition: + +1) The Npcap OEM Redistribution License allows companies distribute +Npcap OEM within their products. Licensees generally use the Npcap OEM +silent installer, ensuring a seamless experience for end +users. Licensees may choose between a perpetual unlimited license or +a quarterly term license, along with options for commercial support and +updates. Prices and details: https://npcap.com/oem/redist.html + +2) The Npcap OEM Internal-Use License is for organizations that wish +to use Npcap OEM internally, without redistribution outside their +organization. This allows them to bypass the 5-system usage cap of the +Npcap free edition. It includes commercial support and update options, +and provides the extra Npcap OEM features such as the silent installer +for automated deployment. Prices and details: +https://npcap.com/oem/internal.html + +Both of these licenses include updates and support as well as a +warranty. Npcap OEM also includes a silent installer for unattended +installation. Further details about Npcap OEM are available from +https://npcap.com/oem/, and you are also welcome to contact us at +sales@nmap.com to ask any questions or set up a license for your +organization. + +Free and open source software producers are also welcome to contact us +for redistribution requests. However, we normally recommend that such +authors instead ask your users to download and install Npcap +themselves. It will be free for them if they need 5 or fewer copies. + +If the Nmap Project (directly or through one of our commercial +licensing customers) has granted you additional rights to Npcap or +Npcap OEM, those additional rights take precedence where they conflict +with the terms of this license agreement. + +Since the Npcap source code is available for download and review, +users sometimes contribute code patches to fix bugs or add new +features. By sending these changes to the Nmap Project (including +through direct email or our mailing lists or submitting pull requests +through our source code repository), it is understood unless you +specify otherwise that you are offering the Nmap Project the +unlimited, non-exclusive right to reuse, modify, and relicense your +code contribution so that we may (but are not obligated to) +incorporate it into Npcap. If you wish to specify special license +conditions or restrictions on your contributions, just say so when you +send them. + +This copy of Npcap (the "Software") and accompanying documentation is +licensed and not sold. This Software is protected by copyright laws +and treaties, as well as laws and treaties related to other forms of +intellectual property. The Nmap Project owns intellectual property +rights in the Software. The Licensee's ("you" or "your") license to +download, use, copy, or change the Software is subject to these rights +and to all the terms and conditions of this End User License Agreement +("Agreement"). + +ACCEPTANCE + +By accepting this agreement or by downloading, installing, using, or +copying the Software, or by clicking "I Agree", you agree to be bound +by the terms of this EULA. If you do not agree to the terms of this +EULA, do not install, use, or copy the Software. + +LICENSE GRANT + +This Agreement entitles you to install and use five (5) copies of the +Software. In addition, you may make archival copies of the Software +which may only be used for the reinstallation of the Software. This +Agreement does not permit the installation or use of more than 5 +copies of the Software, or the installation of the Software on more +than five computer at any given time, on a system that allows shared +used of applications by more than five users, or on any configuration +or system of computers that allows more than five users. A user may +only have one instance of this Agreement active at once. For example, +downloading the software multiple times, downloading multiple versions +of the software, and/or executing the software installer multiple +times do not grant any additional rights such as using the software on +more machines. + +The terms "computer" and "machine" in this license include any +computing device, including software computing instances such as +virtual machines and Docker containers. + +Copies of Npcap do not count toward the five copy, five computer, or +five user limitations imposed by this section if they are installed +and used solely in conjunction with any of the following software: + +o The Nmap Security Scanner, as distributed from https://nmap.org + +o The Wireshark network protocol analyzer, as distributed from + https://www.wireshark.org/ + +o Microsoft Defender for Identity, as distributed from + https://www.microsoft.com/en-us/microsoft-365/security/identity-defender + +Users wishing to redistribute Npcap or exceed the usage limits imposed +by this free license or benefit from commercial support and features +such as a silent installer should contact sales@nmap.com to obtain an +appropriate commercial license agreement. More details on our OEM +edition is also available from https://npcap.com/oem/. + +DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + +RESTRICTIONS ON TRANSFER + +Without first obtaining the express written consent of the Nmap +Project, you may not assign your rights and obligations under this +Agreement, or redistribute, encumber, sell, rent, lease, sublicense, +or otherwise transfer your rights to the Software Product. + +RESTRICTIONS ON USE + +You may not use, copy, or install the Software Product on more than +five computers, or permit the use, copying, or installation of the +Software Product by more than five users or on more than five +computers. + +RESTRICTIONS ON COPYING + +You may not copy any part of the Software except to the extent that +licensed use inherently demands the creation of a temporary copy +stored in computer memory and not permanently affixed on storage +medium. You may make archival copies as well. + +DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY + +UNLESS OTHERWISE EXPLICITLY AGREED TO IN WRITING BY THE NMAP PROJECT, +THE NMAP PROJECT MAKES NO OTHER WARRANTIES, EXPRESS OR IMPLIED, IN +FACT OR IN LAW, INCLUDING, BUT NOT LIMITED TO, ANY IMPLIED WARRANTIES +OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE OTHER THAN AS +SET FORTH IN THIS AGREEMENT OR IN THE LIMITED WARRANTY DOCUMENTS +PROVIDED WITH THE SOFTWARE. + +The Nmap Project makes no warranty that the Software will meet your +requirements or operate under your specific conditions of use. The +Nmap Project makes no warranty that operation of the Software Product +will be secure, error free, or free from interruption. YOU MUST +DETERMINE WHETHER THE SOFTWARE SUFFICIENTLY MEETS YOUR REQUIREMENTS +FOR SECURITY AND UNINTERRUPTABILITY. YOU BEAR SOLE RESPONSIBILITY AND +ALL LIABILITY FOR ANY LOSS INCURRED DUE TO FAILURE OF THE SOFTWARE TO +MEET YOUR REQUIREMENTS. THE NMAP PROJECT WILL NOT, UNDER ANY +CIRCUMSTANCES, BE RESPONSIBLE OR LIABLE FOR THE LOSS OF DATA ON ANY +COMPUTER OR INFORMATION STORAGE DEVICE. + +UNDER NO CIRCUMSTANCES SHALL THE NMAP PROJECT, ITS DIRECTORS, +OFFICERS, EMPLOYEES OR AGENTS BE LIABLE TO YOU OR ANY OTHER PARTY FOR +INDIRECT, CONSEQUENTIAL, SPECIAL, INCIDENTAL, PUNITIVE, OR EXEMPLARY +DAMAGES OF ANY KIND (INCLUDING LOST REVENUES OR PROFITS OR LOSS OF +BUSINESS) RESULTING FROM THIS AGREEMENT, OR FROM THE FURNISHING, +PERFORMANCE, INSTALLATION, OR USE OF THE SOFTWARE, WHETHER DUE TO A +BREACH OF CONTRACT, BREACH OF WARRANTY, OR THE NEGLIGENCE OF THE NMAP +PROJECT OR ANY OTHER PARTY, EVEN IF THE NMAP PROJECT IS ADVISED +BEFOREHAND OF THE POSSIBILITY OF SUCH DAMAGES. TO THE EXTENT THAT THE +APPLICABLE JURISDICTION LIMITS THE NMAP PROJECT'S ABILITY TO DISCLAIM +ANY IMPLIED WARRANTIES, THIS DISCLAIMER SHALL BE EFFECTIVE TO THE +MAXIMUM EXTENT PERMITTED. + +LIMITATIONS OF REMEDIES AND DAMAGES + +Your remedy for a breach of this Agreement or of any warranty included +in this Agreement is the correction or replacement of the Software or +a refund of the purchase price of the Software, exclusive of any costs +for shipping and handling. Selection of whether to correct or replace +or refund shall be solely at the discretion of the Nmap Project. The +Nmap Project reserves the right to substitute a functionally +equivalent copy of the Software Product as a replacement. + +Any claim must be made within the applicable warranty period. All +warranties cover only defects arising under normal use and do not +include malfunctions or failure resulting from misuse, abuse, neglect, +alteration, problems with electrical power, acts of nature, unusual +temperatures or humidity, improper installation, or damage determined +by the Nmap Project to have been caused by you. All limited warranties +on the Software Product are granted only to you and are +non-transferable. + +You agree to indemnify and hold the Nmap Project harmless from all +claims, judgments, liabilities, expenses, or costs arising from your +breach of this Agreement and/or acts or omissions. + +GOVERNING LAW, JURISDICTION AND COSTS + +This Agreement is governed by the laws of the United States of America +and Deleware State, without regard to Delaware's conflict or choice of +law provisions. + +SEVERABILITY + +If any provision of this Agreement shall be held to be invalid or +unenforceable, the remainder of this Agreement shall remain in full +force and effect. To the extent any express or implied restrictions +are not permitted by applicable laws, these express or implied +restrictions shall remain in force and effect to the maximum extent +permitted by such applicable laws. + +THIRD PARTY SOFTWARE ATTRIBUTION + +Npcap uses several 3rd party open source software libraries: + +* The libpcap portable packet capturing library from https://tcpdump.org +* The Winpcap packet capturing library. It has been abandoned, but is + currently still available from https://www.winpcap.org/. +* The ieee80211_radiotap.h header file from David Young + +All of these are open source with BSD-style licenses that allow for +unlimited use and royalty-free redistribution within other software +(including commercial/proprietary software). Some include a warranty +disclaimer (relating to the original authors) and require a small +amount of acknowledgment text be added somewhere in the documentation +of any software which includes them (including indirect inclusion +through Npcap). + +The required acknowledgement text as well as full license text and +source details for these libraries is available from: +https://npcap.com/src/docs/Npcap-Third-Party-Open-Source.pdf . + +Since Nmap Software LLC is not the author of this 3rd party code, we +can not waive or modify its software copyright or license. Npcap +users and redistributors must comply with the relevant Npcap license +(either the free/demo license or a commercial Npcap OEM license they +may have purchased) as well as the minimal requirements of this 3rd +party open source software. diff --git a/x-pack/packetbeat/magefile.go b/x-pack/packetbeat/magefile.go index daee711156a..2d0d367a460 100644 --- a/x-pack/packetbeat/magefile.go +++ b/x-pack/packetbeat/magefile.go @@ -36,7 +36,7 @@ import ( // the packetbeat executable. It is used to specify which npcap builder crossbuild // image to use and the installer to obtain from the cloud store for testing. const ( - NpcapVersion = "1.71" + NpcapVersion = "1.76" installer = "npcap-" + NpcapVersion + "-oem.exe" ) diff --git a/x-pack/packetbeat/npcap/installer/LICENSE b/x-pack/packetbeat/npcap/installer/LICENSE index 0bf8aa4d9b6..61173d3226a 100644 --- a/x-pack/packetbeat/npcap/installer/LICENSE +++ b/x-pack/packetbeat/npcap/installer/LICENSE @@ -1,10 +1,10 @@ -------------------------------------------------------------------------------- Dependency : Npcap (https://nmap.org/npcap/) -Version: 1.71 +Version: 1.76 Licence type: Commercial -------------------------------------------------------------------------------- -Npcap is Copyright (c) 2013-2021 Insecure.Com LLC. All rights reserved. +Npcap is Copyright (c) 2013-2023 Insecure.Com LLC. All rights reserved. See https://npcap.org for details. Portions of Npcap are Copyright (c) 1999 - 2005 NetGroup, Politecnico di diff --git a/x-pack/packetbeat/tests/system/app_test.go b/x-pack/packetbeat/tests/system/app_test.go index f6db2aa2977..b7566e7450b 100644 --- a/x-pack/packetbeat/tests/system/app_test.go +++ b/x-pack/packetbeat/tests/system/app_test.go @@ -29,7 +29,7 @@ import ( ) // Keep in sync with NpcapVersion in magefile.go. -const NpcapVersion = "1.71" +const NpcapVersion = "1.76" func TestWindowsNpcapInstaller(t *testing.T) { if runtime.GOOS != "windows" { From 58db08f2bf76e124b46c28118148afe45f55c577 Mon Sep 17 00:00:00 2001 From: nima Date: Wed, 27 Sep 2023 09:26:19 +1000 Subject: [PATCH 3/8] Update rate_limit.asciidoc (#36562) --- libbeat/processors/ratelimit/docs/rate_limit.asciidoc | 1 - 1 file changed, 1 deletion(-) diff --git a/libbeat/processors/ratelimit/docs/rate_limit.asciidoc b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc index 9a7a2f32322..aa847ea4292 100644 --- a/libbeat/processors/ratelimit/docs/rate_limit.asciidoc +++ b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc @@ -1,6 +1,5 @@ [[rate-limit]] === Rate limit the flow of events -beta[] ++++ rate_limit From d5f659dc080b81960a15d7a9c4b0401da95ecc94 Mon Sep 17 00:00:00 2001 From: apmmachine <58790750+apmmachine@users.noreply.github.com> Date: Wed, 27 Sep 2023 13:39:37 -0400 Subject: [PATCH 4/8] [updatecli] update elastic stack version for testing 8.11.0-b2ac6b4d (#36673) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * chore: Update snapshot.yml Made with ❤️️ by updatecli * chore: Update snapshot.yml Made with ❤️️ by updatecli * chore: Update snapshot.yml Made with ❤️️ by updatecli --------- Co-authored-by: apmmachine Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- testing/environments/snapshot.yml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index 7a387583a4c..20233b29f4c 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -3,7 +3,7 @@ version: '2.3' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0-c47419c4-SNAPSHOT + image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0-b2ac6b4d-SNAPSHOT # When extend is used it merges healthcheck.tests, see: # https://github.com/docker/compose/issues/8962 # healthcheck: @@ -31,7 +31,7 @@ services: - "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles" logstash: - image: docker.elastic.co/logstash/logstash:8.11.0-c47419c4-SNAPSHOT + image: docker.elastic.co/logstash/logstash:8.11.0-b2ac6b4d-SNAPSHOT healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] retries: 600 @@ -44,7 +44,7 @@ services: - 5055:5055 kibana: - image: docker.elastic.co/kibana/kibana:8.11.0-c47419c4-SNAPSHOT + image: docker.elastic.co/kibana/kibana:8.11.0-b2ac6b4d-SNAPSHOT environment: - "ELASTICSEARCH_USERNAME=kibana_system_user" - "ELASTICSEARCH_PASSWORD=testing" From 9e503b4a8cca23cb0b2428e3b49fb7be763b3ebc Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Thu, 28 Sep 2023 14:00:35 +0930 Subject: [PATCH 5/8] libbeat/processors/cache: add file-backed cache (#36686) This adds a file-backed cache implementation to the cache processor. Caching between put and get operations is done in-memory using the memory cache, but the file cache will load previously written cache state on start-up and will write cache contents to file when the cache is dropped. Depending on user configuration, the file cache will also periodically write the cache state to the backing file to reduce state loss in the event of a crash. For simplicity, the cache state is stored as a JSON stream of objects with fields for the key, value and expiry timestamp of cached entities. --- CHANGELOG-developer.next.asciidoc | 1 + libbeat/processors/cache/cache.go | 33 +- libbeat/processors/cache/cache_test.go | 24 +- libbeat/processors/cache/config.go | 13 +- libbeat/processors/cache/config_test.go | 36 ++ libbeat/processors/cache/docs/cache.asciidoc | 5 +- libbeat/processors/cache/file_store.go | 294 ++++++++++++ libbeat/processors/cache/file_store_test.go | 455 +++++++++++++++++++ libbeat/processors/cache/mem_store.go | 34 +- libbeat/processors/cache/mem_store_test.go | 72 +-- 10 files changed, 877 insertions(+), 90 deletions(-) create mode 100644 libbeat/processors/cache/file_store.go create mode 100644 libbeat/processors/cache/file_store_test.go diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index d0f4c40ef3b..ca46d018247 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -173,6 +173,7 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] - Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] +- Add file-backed cache for cache enrichment processor. {pull}36686[36686] ==== Deprecated diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go index eea3ed115ea..a7ce4876f50 100644 --- a/libbeat/processors/cache/cache.go +++ b/libbeat/processors/cache/cache.go @@ -21,6 +21,7 @@ import ( "context" "errors" "fmt" + "os" "time" "github.com/elastic/beats/v7/libbeat/beat" @@ -29,6 +30,7 @@ import ( conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" ) const name = "cache" @@ -64,15 +66,15 @@ func New(cfg *conf.C) (beat.Processor, error) { if err != nil { return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) } - src, cancel, err := getStoreFor(config) - if err != nil { - return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) - } - // Logging (each processor instance has a unique ID). id := int(instanceID.Inc()) log := logp.NewLogger(name).With("instance_id", id) + src, cancel, err := getStoreFor(config, log) + if err != nil { + return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) + } + p := &cache{ config: config, store: src, @@ -87,16 +89,18 @@ func New(cfg *conf.C) (beat.Processor, error) { // and a context cancellation that releases the cache resource when it // is no longer required. The cancellation should be called when the // processor is closed. -func getStoreFor(cfg config) (Store, context.CancelFunc, error) { +func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) { switch { case cfg.Store.Memory != nil: s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) return s, cancel, nil case cfg.Store.File != nil: - logp.L().Warn("using memory store when file is configured") - // TODO: Replace place-holder code with a file-store. - s, cancel := fileStores.get(cfg.Store.File.ID, cfg) + err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700) + if err != nil { + return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err) + } + s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log) return s, cancel, nil default: @@ -105,11 +109,6 @@ func getStoreFor(cfg config) (Store, context.CancelFunc, error) { } } -var ( - memStores = memStoreSet{stores: map[string]*memStore{}, typ: "memory"} - fileStores = memStoreSet{stores: map[string]*memStore{}, typ: "file"} // This is a temporary mock. -) - // noop is a no-op context.CancelFunc. func noop() {} @@ -126,9 +125,9 @@ type Store interface { } type CacheEntry struct { - key string - value any - expires time.Time + Key string `json:"key"` + Value any `json:"val"` + Expires time.Time `json:"expires"` index int } diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go index 6fe5847c01f..8acd22d74d7 100644 --- a/libbeat/processors/cache/cache_test.go +++ b/libbeat/processors/cache/cache_test.go @@ -130,7 +130,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -191,7 +191,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -210,7 +210,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -271,7 +271,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -290,7 +290,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -351,7 +351,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -379,7 +379,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), }, @@ -441,7 +441,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -465,7 +465,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -527,7 +527,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: nil, }, @@ -547,7 +547,7 @@ var cacheTests = []struct { }, }, wantCacheVal: map[string]*CacheEntry{ - "one": {key: "one", value: "metadata_value"}, + "one": {Key: "one", Value: "metadata_value"}, }, wantErr: errors.New("error applying cache get processor: expected map but type is string"), }, @@ -613,7 +613,7 @@ func TestCache(t *testing.T) { switch got := p.(*cache).store.(type) { case *memStore: allow := cmp.AllowUnexported(CacheEntry{}) - ignore := cmpopts.IgnoreFields(CacheEntry{}, "expires", "index") + ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) { t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore)) } diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go index 06dacc5c6ca..36eeb423fc2 100644 --- a/libbeat/processors/cache/config.go +++ b/libbeat/processors/cache/config.go @@ -88,8 +88,8 @@ func defaultConfig() config { } type storeConfig struct { - Memory *id `config:"memory"` - File *id `config:"file"` + Memory *memConfig `config:"memory"` + File *fileConfig `config:"file"` // Capacity and Effort are currently experimental // and not in public-facing documentation. @@ -97,8 +97,13 @@ type storeConfig struct { Effort int `config:"eviction_effort"` } -type id struct { - ID string `config:"id"` +type memConfig struct { + ID string `config:"id" validate:"required"` +} + +type fileConfig struct { + ID string `config:"id" validate:"required"` + WriteOutEvery time.Duration `config:"write_interval"` } func (cfg *storeConfig) Validate() error { diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go index 6e8b01d930f..4a956caef02 100644 --- a/libbeat/processors/cache/config_test.go +++ b/libbeat/processors/cache/config_test.go @@ -39,6 +39,20 @@ put: ttl: 168h key_field: crowdstrike.aid value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_file_with_periodic_write_out", + cfg: ` +backend: + file: + id: aidmaster + write_interval: 15m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata `, want: nil, }, @@ -78,6 +92,28 @@ delete: `, want: nil, }, + { + name: "memory_no_id", + cfg: ` +backend: + memory: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.memory.id'"), + }, + { + name: "file_no_id", + cfg: ` +backend: + file: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.file.id'"), + }, { name: "no_op", cfg: ` diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc index 94f5d07cece..3e17d37e213 100644 --- a/libbeat/processors/cache/docs/cache.asciidoc +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -51,8 +51,9 @@ It has the following settings: One of `backend.memory.id` or `backend.file.id` must be provided. -`backend.memory.id`:: The ID of a memory based cache. Use the same ID across instance to reference the same cache. -`backend.file.id`:: The ID of a file based cache. Use the same ID across instance to reference the same cache. +`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache. +`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache. +`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes. One of `put`, `get` or `delete` must be provided. diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go new file mode 100644 index 00000000000..45ed9d84068 --- /dev/null +++ b/libbeat/processors/cache/file_store.go @@ -0,0 +1,294 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "encoding/json" + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" +) + +var fileStores = fileStoreSet{stores: map[string]*fileStore{}} + +// fileStoreSet is a collection of shared fileStore caches. +type fileStoreSet struct { + mu sync.Mutex + stores map[string]*fileStore +} + +// get returns a fileStore cache with the provided ID based on the config. +// If a fileStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the fileStore from the set if the +// count reaches zero. +func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newFileStore(cfg, id, pathFromConfig(cfg, log), log) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// pathFromConfig returns the mapping form a config to a file-system path. +func pathFromConfig(cfg config, log *logp.Logger) string { + path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID)) + log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path) + return path +} + +// cleanFilename replaces illegal printable characters (and space or dot) in +// filenames, with underscore. +func cleanFilename(s string) string { + return pathCleaner.Replace(s) +} + +var pathCleaner = strings.NewReplacer( + "/", "_", + "<", "_", + ">", "_", + ":", "_", + `"`, "_", + "/", "_", + `\`, "_", + "|", "_", + "?", "_", + "*", "_", + ".", "_", + " ", "_", +) + +// free removes the fileStore with the given ID from the set. free is safe +// for concurrent use. +func (s *fileStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// fileStore is a file-backed cache store. +type fileStore struct { + memStore + + path string + // cancel stops periodic write out operations. + // Write out operations are protected by the + // memStore's mutex. + cancel context.CancelFunc + + log *logp.Logger +} + +// newFileStore returns a new fileStore configured to apply the give TTL duration. +// The fileStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the fileStore is held in. +func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore { + s := fileStore{ + path: path, + log: log, + memStore: memStore{ + id: id, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + }, + } + s.cancel = noop + if cfg.Store.File.WriteOutEvery > 0 { + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + go s.periodicWriteOut(ctx, cfg.Store.File.WriteOutEvery) + } + s.readState() + return &s +} + +func (c *fileStore) String() string { return "file:" + c.id } + +// dropFrom decreases the reference count for the fileStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *fileStore) dropFrom(stores *fileStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + // Stop periodic writes + c.cancel() + // and do a final write out. + c.writeState(true) + + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +func (c *fileStore) readState() { + f, err := os.Open(c.path) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + c.log.Debugw("no state on file system", "error", err) + } else { + c.log.Errorw("failed to open file to read state", "error", err) + } + return + } + defer f.Close() + + // It would be nice to be able at this stage to determine + // whether the file is stale past the TTL of the cache, but + // we do not have this information yet. So we must read + // through all the elements. If any survive the filter, we + // were alive, otherwise delete the file. + + dec := json.NewDecoder(f) + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + switch err := err.(type) { + case *json.SyntaxError: + c.log.Errorw("failed to read state element", "error", err, "path", c.path, "offset", err.Offset) + default: + c.log.Errorw("failed to read state element", "error", err, "path", c.path) + } + } + break + } + if e.Expires.Before(time.Now()) { + // Don't retain expired elements. + continue + } + c.cache[e.Key] = &e + heap.Push(&c.expiries, &e) + } + + if len(c.cache) != 0 { + return + } + // We had no live entries, so delete the file. + err = os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete stale cache file", "error", err) + } +} + +// periodicWriteOut writes the cache contents to the backing file at the +// specified interval until the context is cancelled. periodicWriteOut is +// safe for concurrent use. +func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) { + tick := time.NewTicker(every) + defer tick.Stop() + for { + select { + case <-tick.C: + c.mu.Lock() + c.writeState(false) + c.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +// writeState writes the current cache state to the backing file. +// If final is true and the cache is empty, the file will be deleted. +func (c *fileStore) writeState(final bool) { + if len(c.cache) == 0 && final { + err := os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete write state when empty", "error", err) + } + return + } + f, err := os.CreateTemp(filepath.Dir(c.path), filepath.Base(c.path)+"-*.tmp") + if err != nil { + c.log.Errorw("failed to open file to write state", "error", err) + return + } + // Try to make sure we are private. + err = os.Chmod(f.Name(), 0o600) + if err != nil { + c.log.Errorw("failed to set state file mode", "error", err) + return + } + tmp := f.Name() + defer func() { + err = f.Sync() + if err != nil { + c.log.Errorw("failed to sync file after writing state", "error", err) + return + } + err = f.Close() + if err != nil { + c.log.Errorw("failed to close file after writing state", "error", err) + return + } + // Try to be atomic. + err = os.Rename(tmp, c.path) + if err != nil { + c.log.Errorw("failed to finalize writing state", "error", err) + } + }() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + now := time.Now() + for c.expiries.Len() != 0 { + e := c.expiries.pop() + if e.Expires.Before(now) { + // Don't write expired elements. + continue + } + err = enc.Encode(e) + if err != nil { + c.log.Errorw("failed to write state element", "error", err) + return + } + } +} diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go new file mode 100644 index 00000000000..557a7eca9a5 --- /dev/null +++ b/libbeat/processors/cache/file_store_test.go @@ -0,0 +1,455 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/elastic-agent-libs/logp" +) + +var keep = flag.Bool("keep", false, "keep testdata after test complete") + +type fileStoreTestSteps struct { + doTo func(*fileStore) error + want *fileStore +} + +//nolint:errcheck // Paul Hogan was right. +var fileStoreTests = []struct { + name string + cfg config + want *fileStore + steps []fileStoreTestSteps + wantPersisted []*CacheEntry +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &fileStore{path: "testdata/new_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &fileStore{path: "testdata/new_delete", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 1: { + doTo: func(s *fileStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 2: { + doTo: func(s *fileStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 3: { + doTo: func(s *fileStore) error { + return s.Delete("two") + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 4: { + doTo: func(s *fileStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 5: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if !fileStores.has(s.id) { + return fmt.Errorf("%q fileStore not found after single close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 6: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if fileStores.has(s.id) { + return fmt.Errorf("%q fileStore still found after double close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + wantPersisted: []*CacheEntry{ + // Numeric values are float due to JSON round-trip. + {Key: "one", Value: 1.0}, + {Key: "three", Value: 3.0}, + }, + }, +} + +func TestFileStore(t *testing.T) { + err := os.RemoveAll("testdata") + if err != nil && !errors.Is(err, fs.ErrNotExist) { + t.Fatalf("failed to clear testdata directory: %v", err) + } + err = os.Mkdir("testdata", 0o755) + if err != nil && !errors.Is(err, fs.ErrExist) { + t.Fatalf("failed to create testdata directory: %v", err) + } + if !*keep { + t.Cleanup(func() { os.RemoveAll("testdata") }) + } + + allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{}) + ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "cancel", "log") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") + + for _, test := range fileStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + path := filepath.Join("testdata", test.name) + store := newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + store.add(test.cfg) + fileStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInFileStore, ignoreInMemStore) { + t.Errorf("unexpected new fileStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInFileStore, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected fileStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry)) + } + } + if test.wantPersisted == nil { + return + } + + f, err := os.Open(path) + if err != nil { + t.Fatalf("failed to open persisted data: %v", err) + } + defer f.Close() + dec := json.NewDecoder(f) + var got []*CacheEntry + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + t.Fatalf("unexpected error reading persisted cache data: %v", err) + } + break + } + got = append(got, &e) + } + if !cmp.Equal(test.wantPersisted, got, allow, ignoreInCacheEntry) { + t.Errorf("unexpected persisted state:\n--- want\n+++ got\n%s", + cmp.Diff(test.wantPersisted, got, allow, ignoreInCacheEntry)) + } + wantCache := make(map[string]*CacheEntry) + for _, e := range got { + wantCache[e.Key] = e + } + store = newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + // Specialise the in cache entry ignore list to include index. + ignoreMoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") + if !cmp.Equal(wantCache, store.cache, allow, ignoreMoreInCacheEntry) { + t.Errorf("unexpected restored state:\n--- want\n+++ got\n%s", + cmp.Diff(wantCache, store.cache, allow, ignoreMoreInCacheEntry)) + } + for k, e := range store.cache { + if e.index < 0 || len(store.expiries) <= e.index { + t.Errorf("cache entry %s index out of bounds: got:%d [0,%d)", k, e.index, len(store.expiries)) + continue + } + if !cmp.Equal(e, store.expiries[e.index], allow, ignoreInCacheEntry) { + t.Errorf("unexpected mismatched cache/expiry state %s:\n--- want\n+++ got\n%s", + k, cmp.Diff(e, store.expiries[e.index], allow, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set. It is used only for testing. +func (s *fileStoreSet) add(store *fileStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set. It is used only for testing. +func (s *fileStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go index 900e91c8e60..774a019d783 100644 --- a/libbeat/processors/cache/mem_store.go +++ b/libbeat/processors/cache/mem_store.go @@ -24,24 +24,25 @@ import ( "time" ) +var memStores = memStoreSet{stores: map[string]*memStore{}} + // memStoreSet is a collection of shared memStore caches. type memStoreSet struct { mu sync.Mutex stores map[string]*memStore - typ string // TODO: Remove when a file-backed store exists. } // get returns a memStore cache with the provided ID based on the config. // If a memStore with the ID already exist, its configuration is adjusted // and its reference count is increased. The returned context.CancelFunc -// reduces the reference count and deletes the memStore from set if the +// reduces the reference count and deletes the memStore from the set if the // count reaches zero. func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { s.mu.Lock() defer s.mu.Unlock() store, ok := s.stores[id] if !ok { - store = newMemStore(cfg, id, s.typ) + store = newMemStore(cfg, id) s.stores[store.id] = store } store.add(cfg) @@ -69,10 +70,6 @@ type memStore struct { // id is the index into global cache store for the cache. id string - // typ is a temporary tag to differentiate memory stores - // from the mocked file store. - // TODO: Remove when a file-backed store exists. - typ string // cap is the maximum number of elements the cache // will hold. If not positive, no limit. @@ -85,10 +82,9 @@ type memStore struct { // newMemStore returns a new memStore configured to apply the give TTL duration. // The memStore is guaranteed not to grow larger than cap elements. id is the // look-up into the global cache store the memStore is held in. -func newMemStore(cfg config, id, typ string) *memStore { +func newMemStore(cfg config, id string) *memStore { return &memStore{ id: id, - typ: typ, cache: make(map[string]*CacheEntry), // Mark the ttl as invalid until we have had a put @@ -103,7 +99,7 @@ func newMemStore(cfg config, id, typ string) *memStore { } } -func (c *memStore) String() string { return c.typ + ":" + c.id } +func (c *memStore) String() string { return "memory:" + c.id } // add updates the receiver for a new operation. It increases the reference // count for the receiver, and if the config is a put operation and has no @@ -158,11 +154,11 @@ func (c *memStore) Get(key string) (any, error) { if !ok { return nil, ErrNoData } - if time.Now().After(v.expires) { + if time.Now().After(v.Expires) { delete(c.cache, key) return nil, ErrNoData } - return v.value, nil + return v.Value, nil } // Put stores the provided value in the cache associated with the given key. @@ -174,9 +170,9 @@ func (c *memStore) Put(key string, val any) error { now := time.Now() c.evictExpired(now) e := &CacheEntry{ - key: key, - value: val, - expires: now.Add(c.ttl), + Key: key, + Value: val, + Expires: now.Add(c.ttl), } c.cache[key] = e heap.Push(&c.expiries, e) @@ -189,11 +185,11 @@ func (c *memStore) Put(key string, val any) error { // it under the capacity limit. func (c *memStore) evictExpired(now time.Time) { for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ { - if c.expiries[0].expires.After(now) { + if c.expiries[0].Expires.After(now) { break } e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } if c.cap <= 0 { // No cap, so depend on effort. @@ -201,7 +197,7 @@ func (c *memStore) evictExpired(now time.Time) { } for len(c.cache) >= c.cap { e := c.expiries.pop() - delete(c.cache, e.key) + delete(c.cache, e.Key) } } @@ -237,7 +233,7 @@ func (h expiryHeap) Len() int { return len(h) } func (h expiryHeap) Less(i, j int) bool { - return h[i].expires.Before(h[j].expires) + return h[i].Expires.Before(h[j].Expires) } func (h expiryHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go index 692931854b8..6be5eadb357 100644 --- a/libbeat/processors/cache/mem_store_test.go +++ b/libbeat/processors/cache/mem_store_test.go @@ -42,7 +42,7 @@ var memStoreTests = []struct { name: "new_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -63,7 +63,7 @@ var memStoreTests = []struct { name: "new_get", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -83,7 +83,7 @@ var memStoreTests = []struct { name: "new_delete", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -103,7 +103,7 @@ var memStoreTests = []struct { name: "new_get_add_put", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -123,7 +123,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -149,7 +149,7 @@ var memStoreTests = []struct { name: "ensemble", cfg: config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -169,7 +169,7 @@ var memStoreTests = []struct { doTo: func(s *memStore) error { putCfg := config{ Store: &storeConfig{ - Memory: &id{"test"}, + Memory: &memConfig{"test"}, Capacity: 1000, Effort: 10, }, @@ -199,14 +199,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -225,14 +225,14 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "two": {key: "two", value: int(2), index: 1}, - "three": {key: "three", value: int(3), index: 2}, + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "two", value: int(2), index: 1}, - {key: "three", value: int(3), index: 2}, + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, }, refs: 2, ttl: time.Second, @@ -247,12 +247,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -271,12 +271,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 2, ttl: time.Second, @@ -295,12 +295,12 @@ var memStoreTests = []struct { want: &memStore{ id: "test", cache: map[string]*CacheEntry{ - "one": {key: "one", value: int(1), index: 0}, - "three": {key: "three", value: int(3), index: 1}, + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, }, expiries: expiryHeap{ - {key: "one", value: int(1), index: 0}, - {key: "three", value: int(3), index: 1}, + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, }, refs: 1, ttl: time.Second, @@ -332,14 +332,14 @@ var memStoreTests = []struct { func TestMemStore(t *testing.T) { allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) - ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu", "typ") - ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "expires") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") for _, test := range memStoreTests { t.Run(test.name, func(t *testing.T) { // Construct the store and put in into the stores map as // we would if we were calling Run. - store := newMemStore(test.cfg, test.cfg.Store.Memory.ID, "memory") + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID) store.add(test.cfg) memStores.add(store) @@ -361,14 +361,14 @@ func TestMemStore(t *testing.T) { } } -// add adds the store to the set, it is used only for testing. +// add adds the store to the set. It is used only for testing. func (s *memStoreSet) add(store *memStore) { s.mu.Lock() s.stores[store.id] = store s.mu.Unlock() } -// has returns whether the store exists in the set, it is used only for testing. +// has returns whether the store exists in the set. It is used only for testing. func (s *memStoreSet) has(id string) bool { s.mu.Lock() _, ok := s.stores[id] From 017b589ca3c87c54e0c41e0bd2d9bdaf0f22e6a4 Mon Sep 17 00:00:00 2001 From: Dan Kortschak <90160302+efd6@users.noreply.github.com> Date: Thu, 28 Sep 2023 19:50:00 +0930 Subject: [PATCH 6/8] x-pack/filebeat/input/httpjson: improve template evaluation logging (#36668) This adds valuable missing context to the debug logging emitted for template evaluation. Previously, only the final result was printed to the log, which failed to provide the information required to be able to determine why any given template was being executed. So add the target name to the logs. --- CHANGELOG.next.asciidoc | 1 + x-pack/filebeat/input/httpjson/cursor.go | 2 +- x-pack/filebeat/input/httpjson/input_test.go | 2 ++ x-pack/filebeat/input/httpjson/rate_limiter.go | 6 +++--- x-pack/filebeat/input/httpjson/request.go | 2 +- x-pack/filebeat/input/httpjson/value_tpl.go | 6 +++--- x-pack/filebeat/input/httpjson/value_tpl_test.go | 4 +++- 7 files changed, 14 insertions(+), 9 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index a8ed9f032c5..2064650e19f 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -218,6 +218,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Disable warning message about ingest pipeline loading when running under Elastic Agent. {pull}36659[36659] - Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427] - Update mito CEL extension library to v1.6.0. {pull}36651[36651] +- Improve template evaluation logging for HTTPJSON input. {pull}36668[36668] *Auditbeat* diff --git a/x-pack/filebeat/input/httpjson/cursor.go b/x-pack/filebeat/input/httpjson/cursor.go index 92cd53a52a2..1a864361ee7 100644 --- a/x-pack/filebeat/input/httpjson/cursor.go +++ b/x-pack/filebeat/input/httpjson/cursor.go @@ -50,7 +50,7 @@ func (c *cursor) update(trCtx *transformContext) { } for k, cfg := range c.cfg { - v, _ := cfg.Value.Execute(trCtx, transformable{}, "", cfg.Default, c.log) + v, _ := cfg.Value.Execute(trCtx, transformable{}, k, cfg.Default, c.log) if v != "" || !cfg.mustIgnoreEmptyValue() { _, _ = c.state.Put(k, v) c.log.Debugf("cursor.%s stored with %s", k, v) diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index e4d94527e72..de4cc3f11e6 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -1248,6 +1248,8 @@ var testCases = []struct { } func TestInput(t *testing.T) { + logp.TestingSetup() + for _, test := range testCases { t.Run(test.name, func(t *testing.T) { if test.skipReason != "" { diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index dcbed7c1ac9..30c50ae3f05 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -104,7 +104,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { ctx := emptyTransformContext() ctx.updateLastResponse(response{header: resp.Header.Clone()}) - remaining, _ := r.remaining.Execute(ctx, tr, "", nil, r.log) + remaining, _ := r.remaining.Execute(ctx, tr, "rate-limit_remaining", nil, r.log) if remaining == "" { return 0, errors.New("remaining value is empty") } @@ -119,7 +119,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { if r.earlyLimit != nil { earlyLimit := *r.earlyLimit if earlyLimit > 0 && earlyLimit < 1 { - limit, _ := r.limit.Execute(ctx, tr, "", nil, r.log) + limit, _ := r.limit.Execute(ctx, tr, "early_limit", nil, r.log) if limit != "" { l, err := strconv.ParseInt(limit, 10, 64) if err == nil { @@ -141,7 +141,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { return 0, nil } - reset, _ := r.reset.Execute(ctx, tr, "", nil, r.log) + reset, _ := r.reset.Execute(ctx, tr, "rate-limit_reset", nil, r.log) if reset == "" { return 0, errors.New("reset value is empty") } diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index f70ab4094df..248918e8116 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -385,7 +385,7 @@ func evaluateResponse(expression *valueTpl, data []byte, log *logp.Logger) (bool lastResponse: &response{body: dataMap}, } - val, err := expression.Execute(paramCtx, tr, "", nil, log) + val, err := expression.Execute(paramCtx, tr, "response_evaluation", nil, log) if err != nil { return false, fmt.Errorf("error while evaluating expression: %w", err) } diff --git a/x-pack/filebeat/input/httpjson/value_tpl.go b/x-pack/filebeat/input/httpjson/value_tpl.go index 133271e726f..97bc75a62d9 100644 --- a/x-pack/filebeat/input/httpjson/value_tpl.go +++ b/x-pack/filebeat/input/httpjson/value_tpl.go @@ -96,7 +96,7 @@ func (t *valueTpl) Unpack(in string) error { func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, targetName string, defaultVal *valueTpl, log *logp.Logger) (val string, err error) { fallback := func(err error) (string, error) { if defaultVal != nil { - log.Debugf("template execution: falling back to default value") + log.Debugw("template execution: falling back to default value", "target", targetName) return defaultVal.Execute(emptyTransformContext(), transformable{}, targetName, nil, log) } return "", err @@ -107,7 +107,7 @@ func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, targetName val, err = fallback(errExecutingTemplate) } if err != nil { - log.Debugf("template execution failed: %v", err) + log.Debugw("template execution failed", "target", targetName, "error", err) } tryDebugTemplateValue(targetName, val, log) }() @@ -142,7 +142,7 @@ func tryDebugTemplateValue(target, val string, log *logp.Logger) { case "Authorization", "Proxy-Authorization": // ignore filtered headers default: - log.Debugf("template execution: evaluated template %q", val) + log.Debugw("evaluated template", "target", target, "value", val) } } diff --git a/x-pack/filebeat/input/httpjson/value_tpl_test.go b/x-pack/filebeat/input/httpjson/value_tpl_test.go index 37589cd8821..487451099ad 100644 --- a/x-pack/filebeat/input/httpjson/value_tpl_test.go +++ b/x-pack/filebeat/input/httpjson/value_tpl_test.go @@ -19,6 +19,8 @@ import ( ) func TestValueTpl(t *testing.T) { + logp.TestingSetup() + cases := []struct { name string value string @@ -764,7 +766,7 @@ func TestValueTpl(t *testing.T) { assert.NoError(t, defTpl.Unpack(tc.paramDefVal)) } - got, err := tpl.Execute(tc.paramCtx, tc.paramTr, "", defTpl, logp.NewLogger("")) + got, err := tpl.Execute(tc.paramCtx, tc.paramTr, tc.name, defTpl, logp.NewLogger("")) assert.Equal(t, tc.expectedVal, got) if tc.expectedError == "" { assert.NoError(t, err) From 8dcf23cb650b6752283fc178a42d75b3b217d26b Mon Sep 17 00:00:00 2001 From: Denis Date: Thu, 28 Sep 2023 14:28:35 +0200 Subject: [PATCH 7/8] Make file system metadata values strings (#36697) To better align with ECS and better store the values in Elasticsearch. --- CHANGELOG.next.asciidoc | 1 + libbeat/reader/readfile/fs_metafields_other.go | 5 +++-- libbeat/reader/readfile/fs_metafields_windows.go | 7 ++++--- libbeat/reader/readfile/metafields_other_test.go | 4 ++-- libbeat/reader/readfile/metafields_windows_test.go | 6 +++--- 5 files changed, 13 insertions(+), 10 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 2064650e19f..21f721492d0 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -15,6 +15,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Filebeat* +- Switch types of `log.file.device`, `log.file.inode`, `log.file.idxhi`, `log.file.idxlo` and `log.file.vol` fields to strings to better align with ECS and integrations. {pull}36697[36697] *Heartbeat* diff --git a/libbeat/reader/readfile/fs_metafields_other.go b/libbeat/reader/readfile/fs_metafields_other.go index 425b7435fe8..cc764c4bbcc 100644 --- a/libbeat/reader/readfile/fs_metafields_other.go +++ b/libbeat/reader/readfile/fs_metafields_other.go @@ -22,6 +22,7 @@ package readfile import ( "fmt" "os" + "strconv" "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/elastic-agent-libs/mapstr" @@ -34,11 +35,11 @@ const ( func setFileSystemMetadata(fi os.FileInfo, fields mapstr.M) error { osstate := file.GetOSState(fi) - _, err := fields.Put(deviceIDKey, osstate.Device) + _, err := fields.Put(deviceIDKey, strconv.FormatUint(osstate.Device, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", deviceIDKey, err) } - _, err = fields.Put(inodeKey, osstate.Inode) + _, err = fields.Put(inodeKey, osstate.InodeString()) if err != nil { return fmt.Errorf("failed to set %q: %w", inodeKey, err) } diff --git a/libbeat/reader/readfile/fs_metafields_windows.go b/libbeat/reader/readfile/fs_metafields_windows.go index 113a74cf829..97bfd5c72de 100644 --- a/libbeat/reader/readfile/fs_metafields_windows.go +++ b/libbeat/reader/readfile/fs_metafields_windows.go @@ -20,6 +20,7 @@ package readfile import ( "fmt" "os" + "strconv" "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/elastic-agent-libs/mapstr" @@ -33,15 +34,15 @@ const ( func setFileSystemMetadata(fi os.FileInfo, fields mapstr.M) error { osstate := file.GetOSState(fi) - _, err := fields.Put(idxhiKey, osstate.IdxHi) + _, err := fields.Put(idxhiKey, strconv.FormatUint(osstate.IdxHi, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", idxhiKey, err) } - _, err = fields.Put(idxloKey, osstate.IdxLo) + _, err = fields.Put(idxloKey, strconv.FormatUint(osstate.IdxLo, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", idxloKey, err) } - _, err = fields.Put(volKey, osstate.Vol) + _, err = fields.Put(volKey, strconv.FormatUint(osstate.Vol, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", volKey, err) } diff --git a/libbeat/reader/readfile/metafields_other_test.go b/libbeat/reader/readfile/metafields_other_test.go index 7874d24d4ae..b9d25b85420 100644 --- a/libbeat/reader/readfile/metafields_other_test.go +++ b/libbeat/reader/readfile/metafields_other_test.go @@ -44,13 +44,13 @@ func checkFields(t *testing.T, expected, actual mapstr.M) { dev, err := actual.GetValue(deviceIDKey) require.NoError(t, err) - require.Equal(t, uint64(17), dev) + require.Equal(t, "17", dev) err = actual.Delete(deviceIDKey) require.NoError(t, err) inode, err := actual.GetValue(inodeKey) require.NoError(t, err) - require.Equal(t, uint64(999), inode) + require.Equal(t, "999", inode) err = actual.Delete(inodeKey) require.NoError(t, err) diff --git a/libbeat/reader/readfile/metafields_windows_test.go b/libbeat/reader/readfile/metafields_windows_test.go index 37ff5cb4bda..dce0b8d2161 100644 --- a/libbeat/reader/readfile/metafields_windows_test.go +++ b/libbeat/reader/readfile/metafields_windows_test.go @@ -52,19 +52,19 @@ func checkFields(t *testing.T, expected, actual mapstr.M) { idxhi, err := actual.GetValue(idxhiKey) require.NoError(t, err) - require.Equal(t, uint64(100), idxhi) + require.Equal(t, "100", idxhi) err = actual.Delete(idxhiKey) require.NoError(t, err) idxlo, err := actual.GetValue(idxloKey) require.NoError(t, err) - require.Equal(t, uint64(200), idxlo) + require.Equal(t, "200", idxlo) err = actual.Delete(idxloKey) require.NoError(t, err) vol, err := actual.GetValue(volKey) require.NoError(t, err) - require.Equal(t, uint64(300), vol) + require.Equal(t, "300", vol) err = actual.Delete(volKey) require.NoError(t, err) From 231e93a6d20e35a8fc6f34dc767110b8a27ec645 Mon Sep 17 00:00:00 2001 From: Fae Charlton Date: Thu, 28 Sep 2023 09:29:14 -0400 Subject: [PATCH 8/8] Enable compression by default for Elasticsearch outputs (#36681) * Enable compression by default for Elasticsearch outputs * update config template * update changelog * regenerate config templates * update docs * make check --- CHANGELOG.next.asciidoc | 2 ++ auditbeat/auditbeat.reference.yml | 5 +++-- filebeat/filebeat.reference.yml | 5 +++-- heartbeat/heartbeat.reference.yml | 5 +++-- .../output-elasticsearch.reference.yml.tmpl | 5 +++-- libbeat/outputs/elasticsearch/config.go | 2 +- libbeat/outputs/elasticsearch/config_test.go | 22 +++++++++++++++++++ .../elasticsearch/docs/elasticsearch.asciidoc | 2 +- metricbeat/metricbeat.reference.yml | 5 +++-- packetbeat/packetbeat.reference.yml | 5 +++-- winlogbeat/winlogbeat.reference.yml | 5 +++-- x-pack/auditbeat/auditbeat.reference.yml | 5 +++-- x-pack/filebeat/filebeat.reference.yml | 5 +++-- .../functionbeat/functionbeat.reference.yml | 5 +++-- x-pack/heartbeat/heartbeat.reference.yml | 5 +++-- x-pack/metricbeat/metricbeat.reference.yml | 5 +++-- x-pack/osquerybeat/osquerybeat.reference.yml | 5 +++-- x-pack/packetbeat/packetbeat.reference.yml | 5 +++-- x-pack/winlogbeat/winlogbeat.reference.yml | 5 +++-- 19 files changed, 71 insertions(+), 32 deletions(-) diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 21f721492d0..e5b317b1379 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -9,6 +9,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] ==== Breaking changes *Affecting all Beats* +- The Elasticsearch output now enables compression by default. This decreases network data usage by an average of 70-80%, in exchange for 20-25% increased CPU use and ~10% increased ingestion time. The previous default can be restored by setting the flag `compression_level: 0` under `output.elasticsearch`. {pull}36681[36681] + *Auditbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 892d4e1b2d0..c3b10d03f79 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -450,8 +450,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 2de0bc61f56..def62fbca7c 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1546,8 +1546,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index b8281727026..fdbdcbe7042 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -542,8 +542,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl index edc40e632d7..48f1ba2c007 100644 --- a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl +++ b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl @@ -9,8 +9,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index 7134e9333cb..ca77a44b833 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -65,7 +65,7 @@ var ( Password: "", APIKey: "", MaxRetries: 3, - CompressionLevel: 0, + CompressionLevel: 1, EscapeHTML: false, Kerberos: nil, LoadBalance: true, diff --git a/libbeat/outputs/elasticsearch/config_test.go b/libbeat/outputs/elasticsearch/config_test.go index 22fe4bb4c82..32cb90c904c 100644 --- a/libbeat/outputs/elasticsearch/config_test.go +++ b/libbeat/outputs/elasticsearch/config_test.go @@ -97,6 +97,28 @@ non_indexable_policy.dead_letter_index: } } +func TestCompressionIsOnByDefault(t *testing.T) { + config := "" + c := conf.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, 1, elasticsearchOutputConfig.CompressionLevel, "Default compression level should be 1") +} + +func TestExplicitCompressionLevelOverridesDefault(t *testing.T) { + config := ` +compression_level: 0 +` + c := conf.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, 0, elasticsearchOutputConfig.CompressionLevel, "Explicit compression level should override defaults") +} + func readConfig(cfg *conf.C) (*elasticsearchConfig, error) { c := defaultConfig if err := cfg.Unpack(&c); err != nil { diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index cbe74279dcb..5ea65c16dc4 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -105,7 +105,7 @@ The compression level must be in the range of `1` (best speed) to `9` (best comp Increasing the compression level will reduce the network usage but will increase the cpu usage. -The default value is `0`. +The default value is `1`. ===== `escape_html` diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index eabdcc8e918..a03fbf080b4 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1285,8 +1285,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index 7442b7f6a0f..f1e036707b7 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -916,8 +916,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 64377c0fc6d..1c8ebebc91a 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -332,8 +332,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index e9ecd33ae39..7b575e6b0e8 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -506,8 +506,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index 9ea0dabfb0b..ff9b523f947 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3916,8 +3916,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index 7855538f621..5c42e49b44f 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -574,8 +574,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index b8281727026..fdbdcbe7042 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -542,8 +542,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index fa15aca7fb6..5ea3499cee5 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1846,8 +1846,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index 416462f3f47..9d2c21c5ad1 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -293,8 +293,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index 7442b7f6a0f..f1e036707b7 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -916,8 +916,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index 3c6799e7329..0ec02b0e5ae 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -334,8 +334,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false