diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index a41ac990750..8fe748c1bf5 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -55,6 +55,7 @@ CHANGELOG* /libbeat/ @elastic/elastic-agent-data-plane /libbeat/docs/processors-list.asciidoc @elastic/ingest-docs /libbeat/management @elastic/elastic-agent-control-plane +/libbeat/processors/cache/ @elastic/security-external-integrations /libbeat/processors/community_id/ @elastic/security-external-integrations /libbeat/processors/decode_xml/ @elastic/security-external-integrations /libbeat/processors/decode_xml_wineventlog/ @elastic/security-external-integrations diff --git a/CHANGELOG-developer.next.asciidoc b/CHANGELOG-developer.next.asciidoc index 464afe040f8..ca46d018247 100644 --- a/CHANGELOG-developer.next.asciidoc +++ b/CHANGELOG-developer.next.asciidoc @@ -172,6 +172,8 @@ The list below covers the major changes between 7.0.0-rc2 and main only. - Skip dependabot updates for github.com/elastic/mito. {pull}36158[36158] - Add device handling to Okta API package for entity analytics. {pull}35980[35980] - Make Filebeat HTTPJSON input process responses sequentially. {pull}36493[36493] +- Add initial infrastructure for a caching enrichment processor. {pull}36619[36619] +- Add file-backed cache for cache enrichment processor. {pull}36686[36686] ==== Deprecated diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 228d201f74f..e5b317b1379 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -9,12 +9,15 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] ==== Breaking changes *Affecting all Beats* +- The Elasticsearch output now enables compression by default. This decreases network data usage by an average of 70-80%, in exchange for 20-25% increased CPU use and ~10% increased ingestion time. The previous default can be restored by setting the flag `compression_level: 0` under `output.elasticsearch`. {pull}36681[36681] + *Auditbeat* *Filebeat* +- Switch types of `log.file.device`, `log.file.inode`, `log.file.idxhi`, `log.file.idxlo` and `log.file.vol` fields to strings to better align with ECS and integrations. {pull}36697[36697] *Heartbeat* @@ -218,6 +221,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Disable warning message about ingest pipeline loading when running under Elastic Agent. {pull}36659[36659] - Add input metrics to http_endpoint input. {issue}36402[36402] {pull}36427[36427] - Update mito CEL extension library to v1.6.0. {pull}36651[36651] +- Improve template evaluation logging for HTTPJSON input. {pull}36668[36668] *Auditbeat* @@ -238,12 +242,10 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] *Osquerybeat* -*Packetbeat* - - *Packetbeat* - Improve efficiency of sniffers by deduplicating interface configurations. {issue}36574[36574] {pull}36576[36576] +- Bump Windows Npcap version to v1.76. {issue}36539[36539] {pull}36549[36549] *Winlogbeat* diff --git a/auditbeat/auditbeat.reference.yml b/auditbeat/auditbeat.reference.yml index 7d63bed19ff..8ed00adc44d 100644 --- a/auditbeat/auditbeat.reference.yml +++ b/auditbeat/auditbeat.reference.yml @@ -450,8 +450,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/filebeat/filebeat.reference.yml b/filebeat/filebeat.reference.yml index 7b70826e493..9043a552cf8 100644 --- a/filebeat/filebeat.reference.yml +++ b/filebeat/filebeat.reference.yml @@ -1546,8 +1546,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/heartbeat/heartbeat.reference.yml b/heartbeat/heartbeat.reference.yml index 15ca545740e..d3e124e9f34 100644 --- a/heartbeat/heartbeat.reference.yml +++ b/heartbeat/heartbeat.reference.yml @@ -542,8 +542,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl index edc40e632d7..48f1ba2c007 100644 --- a/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl +++ b/libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl @@ -9,8 +9,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/libbeat/outputs/elasticsearch/config.go b/libbeat/outputs/elasticsearch/config.go index 7134e9333cb..ca77a44b833 100644 --- a/libbeat/outputs/elasticsearch/config.go +++ b/libbeat/outputs/elasticsearch/config.go @@ -65,7 +65,7 @@ var ( Password: "", APIKey: "", MaxRetries: 3, - CompressionLevel: 0, + CompressionLevel: 1, EscapeHTML: false, Kerberos: nil, LoadBalance: true, diff --git a/libbeat/outputs/elasticsearch/config_test.go b/libbeat/outputs/elasticsearch/config_test.go index 22fe4bb4c82..32cb90c904c 100644 --- a/libbeat/outputs/elasticsearch/config_test.go +++ b/libbeat/outputs/elasticsearch/config_test.go @@ -97,6 +97,28 @@ non_indexable_policy.dead_letter_index: } } +func TestCompressionIsOnByDefault(t *testing.T) { + config := "" + c := conf.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, 1, elasticsearchOutputConfig.CompressionLevel, "Default compression level should be 1") +} + +func TestExplicitCompressionLevelOverridesDefault(t *testing.T) { + config := ` +compression_level: 0 +` + c := conf.MustNewConfigFrom(config) + elasticsearchOutputConfig, err := readConfig(c) + if err != nil { + t.Fatalf("Can't create test configuration from valid input") + } + assert.Equal(t, 0, elasticsearchOutputConfig.CompressionLevel, "Explicit compression level should override defaults") +} + func readConfig(cfg *conf.C) (*elasticsearchConfig, error) { c := defaultConfig if err := cfg.Unpack(&c); err != nil { diff --git a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc index cbe74279dcb..5ea65c16dc4 100644 --- a/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc +++ b/libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc @@ -105,7 +105,7 @@ The compression level must be in the range of `1` (best speed) to `9` (best comp Increasing the compression level will reduce the network usage but will increase the cpu usage. -The default value is `0`. +The default value is `1`. ===== `escape_html` diff --git a/libbeat/processors/cache/cache.go b/libbeat/processors/cache/cache.go new file mode 100644 index 00000000000..a7ce4876f50 --- /dev/null +++ b/libbeat/processors/cache/cache.go @@ -0,0 +1,289 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "context" + "errors" + "fmt" + "os" + "time" + + "github.com/elastic/beats/v7/libbeat/beat" + "github.com/elastic/beats/v7/libbeat/common/atomic" + "github.com/elastic/beats/v7/libbeat/processors" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" + "github.com/elastic/elastic-agent-libs/paths" +) + +const name = "cache" + +func init() { + // We cannot use this as a JS plugin as it is stateful and includes a Close method. + processors.RegisterPlugin(name, New) +} + +var ( + // ErrNoMatch is returned when the event doesn't contain the field + // specified in key_field. + ErrNoMatch = errors.New("field in key_field not found in the event") + + // ErrNoData is returned when metadata for an event can't be collected. + ErrNoData = errors.New("metadata not found") + + instanceID atomic.Uint32 +) + +// cache is a caching enrichment processor. +type cache struct { + config config + store Store + cancel context.CancelFunc + log *logp.Logger +} + +// Resulting processor implements `Close()` to release the cache resources. +func New(cfg *conf.C) (beat.Processor, error) { + config := defaultConfig() + err := cfg.Unpack(&config) + if err != nil { + return nil, fmt.Errorf("failed to unpack the %s configuration: %w", name, err) + } + // Logging (each processor instance has a unique ID). + id := int(instanceID.Inc()) + log := logp.NewLogger(name).With("instance_id", id) + + src, cancel, err := getStoreFor(config, log) + if err != nil { + return nil, fmt.Errorf("failed to get the store for %s: %w", name, err) + } + + p := &cache{ + config: config, + store: src, + cancel: cancel, + log: log, + } + p.log.Infow("initialized cache processor", "details", p) + return p, nil +} + +// getStoreFor returns a backing store for the provided configuration, +// and a context cancellation that releases the cache resource when it +// is no longer required. The cancellation should be called when the +// processor is closed. +func getStoreFor(cfg config, log *logp.Logger) (Store, context.CancelFunc, error) { + switch { + case cfg.Store.Memory != nil: + s, cancel := memStores.get(cfg.Store.Memory.ID, cfg) + return s, cancel, nil + + case cfg.Store.File != nil: + err := os.MkdirAll(paths.Resolve(paths.Data, "cache_processor"), 0o700) + if err != nil { + return nil, noop, fmt.Errorf("cache processor could not create store directory: %w", err) + } + s, cancel := fileStores.get(cfg.Store.File.ID, cfg, log) + return s, cancel, nil + + default: + // This should have been caught by config validation. + return nil, noop, errors.New("no configured store") + } +} + +// noop is a no-op context.CancelFunc. +func noop() {} + +// Store is the interface implemented by metadata providers. +type Store interface { + Put(key string, val any) error + Get(key string) (any, error) + Delete(key string) error + + // The string returned from the String method should + // be the backing store ID. Either "file:" or + // "memory:". + fmt.Stringer +} + +type CacheEntry struct { + Key string `json:"key"` + Value any `json:"val"` + Expires time.Time `json:"expires"` + index int +} + +// Run enriches the given event with the host metadata. +func (p *cache) Run(event *beat.Event) (*beat.Event, error) { + switch { + case p.config.Put != nil: + p.log.Debugw("put", "backend_id", p.store, "config", p.config.Put) + err := p.putFrom(event) + if err != nil { + switch { + case errors.Is(err, mapstr.ErrKeyNotFound): + if p.config.IgnoreMissing { + return event, nil + } + return event, err + } + return event, fmt.Errorf("error applying %s put processor: %w", name, err) + } + return event, nil + + case p.config.Get != nil: + p.log.Debugw("get", "backend_id", p.store, "config", p.config.Get) + result, err := p.getFor(event) + if err != nil { + switch { + case errors.Is(err, mapstr.ErrKeyNotFound): + if p.config.IgnoreMissing { + return event, nil + } + case errors.Is(err, ErrNoData): + return event, err + } + return event, fmt.Errorf("error applying %s get processor: %w", name, err) + } + if result != nil { + return result, nil + } + return event, ErrNoMatch + + case p.config.Delete != nil: + p.log.Debugw("delete", "backend_id", p.store, "config", p.config.Delete) + err := p.deleteFor(event) + if err != nil { + return event, fmt.Errorf("error applying %s delete processor: %w", name, err) + } + return event, nil + + default: + // This should never happen, but we don't need to flag it. + return event, nil + } +} + +// putFrom takes the configured value from the event and stores it in the cache +// if it exists. +func (p *cache) putFrom(event *beat.Event) error { + k, err := event.GetValue(p.config.Put.Key) + if err != nil { + return err + } + key, ok := k.(string) + if !ok { + return fmt.Errorf("key field '%s' not a string: %T", p.config.Put.Key, k) + } + p.log.Debugw("put", "backend_id", p.store, "key", key) + + val, err := event.GetValue(p.config.Put.Value) + if err != nil { + return err + } + + err = p.store.Put(key, val) + if err != nil { + return fmt.Errorf("failed to put '%s' into '%s': %w", key, p.config.Put.Value, err) + } + return nil +} + +// getFor gets the configured value from the cache for the event and inserts +// it into the configured field if it exists. +func (p *cache) getFor(event *beat.Event) (result *beat.Event, err error) { + // Check for clobbering. + dst := p.config.Get.Target + if !p.config.OverwriteKeys { + if _, err := event.GetValue(dst); err == nil { + return nil, fmt.Errorf("target field '%s' already exists and overwrite_keys is false", dst) + } + } + + // Get key into store for metadata. + key := p.config.Get.Key + v, err := event.GetValue(key) + if err != nil { + return nil, err + } + k, ok := v.(string) + if !ok { + return nil, fmt.Errorf("key field '%s' not a string: %T", key, v) + } + p.log.Debugw("get", "backend_id", p.store, "key", k) + + // Get metadata... + meta, err := p.store.Get(k) + if err != nil { + return nil, fmt.Errorf("%w for '%s': %w", ErrNoData, k, err) + } + if meta == nil { + return nil, fmt.Errorf("%w for '%s'", ErrNoData, k) + } + if m, ok := meta.(map[string]interface{}); ok { + meta = mapstr.M(m) + } + // ... and write it into the event. + // The implementation of PutValue currently leaves event + // essentially unchanged in the case of an error (in the + // case of an @metadata field there may be a mutation, + // but at most this will be the addition of a Meta field + // value to event). None of this is documented. + if _, err = event.PutValue(dst, meta); err != nil { + return nil, err + } + return event, nil +} + +// deleteFor deletes the configured value from the cache based on the value of +// the configured key. +func (p *cache) deleteFor(event *beat.Event) error { + v, err := event.GetValue(p.config.Delete.Key) + if err != nil { + return err + } + k, ok := v.(string) + if !ok { + return fmt.Errorf("key field '%s' not a string: %T", p.config.Delete.Key, v) + } + return p.store.Delete(k) +} + +func (p *cache) Close() error { + p.cancel() + return nil +} + +// String returns the processor representation formatted as a string +func (p *cache) String() string { + switch { + case p.config.Put != nil: + return fmt.Sprintf("%s=[operation=put, store_id=%s, key_field=%s, value_field=%s, ttl=%v, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Put.Key, p.config.Put.Value, p.config.Put.TTL, p.config.IgnoreMissing, p.config.OverwriteKeys) + case p.config.Get != nil: + return fmt.Sprintf("%s=[operation=get, store_id=%s, key_field=%s, target_field=%s, ignore_missing=%t, overwrite_fields=%t]", + name, p.store, p.config.Get.Key, p.config.Get.Target, p.config.IgnoreMissing, p.config.OverwriteKeys) + case p.config.Delete != nil: + return fmt.Sprintf("%s=[operation=delete, store_id=%s, key_field=%s]", name, p.store, p.config.Delete.Key) + default: + return fmt.Sprintf("%s=[operation=invalid]", name) + } +} diff --git a/libbeat/processors/cache/cache_test.go b/libbeat/processors/cache/cache_test.go new file mode 100644 index 00000000000..8acd22d74d7 --- /dev/null +++ b/libbeat/processors/cache/cache_test.go @@ -0,0 +1,625 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/beats/v7/libbeat/beat" + conf "github.com/elastic/elastic-agent-libs/config" + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/mapstr" +) + +type cacheTestStep struct { + event mapstr.M + want mapstr.M + wantCacheVal map[string]*CacheEntry + wantErr error +} + +var cacheTests = []struct { + name string + configs []testConfig + wantInitErr error + steps []cacheTestStep +}{ + { + name: "invalid_no_backend", + configs: []testConfig{ + { + cfg: mapstr.M{ + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "ttl": "168h", + "value_field": "crowdstrike.metadata", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: missing required field accessing 'backend'"), + }, + { + name: "invalid_no_key_field", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: string value is not set accessing 'put.key_field'"), + }, + { + name: "invalid_no_value_field", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: errors.New("failed to unpack the cache configuration: string value is not set accessing 'put.value_field'"), + }, + { + name: "put_value", + configs: []testConfig{ + { + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_reverse_config", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_with_get_error_no_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: target field 'crowdstrike.metadata_new' already exists and overwrite_keys is false"), + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": mapstr.M{ + "someone_is_already_here": mapstr.M{ + "another_key": "value", + }, + }, + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + }, + }, + { + name: "put_and_get_value_allow_overwrite_but_get_error", + configs: []testConfig{ + { + when: func(e mapstr.M) bool { + return e["put"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "put": mapstr.M{ + "key_field": "crowdstrike.aid", + "value_field": "crowdstrike.metadata", + "ttl": "168h", + }, + }, + }, + { + when: func(e mapstr.M) bool { + return e["get"] == true + }, + cfg: mapstr.M{ + "backend": mapstr.M{ + "memory": mapstr.M{ + "id": "aidmaster", + }, + }, + "overwrite_keys": true, + "get": mapstr.M{ + "key_field": "crowdstrike.aid", + "target_field": "crowdstrike.metadata_new.child", + }, + }, + }, + }, + wantInitErr: nil, + steps: []cacheTestStep{ + { + event: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + want: mapstr.M{ + "put": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata": "metadata_value", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: nil, + }, + { + event: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + want: mapstr.M{ + "get": true, + "crowdstrike": mapstr.M{ + "aid": "one", + "metadata_new": "someone_is_already_here", + }, + }, + wantCacheVal: map[string]*CacheEntry{ + "one": {Key: "one", Value: "metadata_value"}, + }, + wantErr: errors.New("error applying cache get processor: expected map but type is string"), + }, + }, + }, +} + +type testConfig struct { + when func(e mapstr.M) bool + cfg mapstr.M +} + +func TestCache(t *testing.T) { + logp.TestingSetup(logp.WithSelectors(name)) + for _, test := range cacheTests { + t.Run(test.name, func(t *testing.T) { + var processors []beat.Processor + for i, cfg := range test.configs { + config, err := conf.NewConfigFrom(cfg.cfg) + if err != nil { + t.Fatal(err) + } + + p, err := New(config) + if !sameError(err, test.wantInitErr) { + t.Errorf("unexpected error from New: got:%v want:%v", err, test.wantInitErr) + } + if err != nil { + return + } + + t.Log(p) + c, ok := p.(*cache) + if !ok { + t.Fatalf("processor %d is not an *cache", i) + } + + defer func() { + err := c.Close() + if err != nil { + t.Errorf("unexpected error from c.Close(): %v", err) + } + }() + + processors = append(processors, p) + } + + for i, step := range test.steps { + for j, p := range processors { + if test.configs[j].when != nil && !test.configs[j].when(step.event) { + continue + } + got, err := p.Run(&beat.Event{ + Fields: step.event, + }) + if !sameError(err, step.wantErr) { + t.Errorf("unexpected error from Run: got:%v want:%v", err, step.wantErr) + return + } + if !cmp.Equal(step.want, got.Fields) { + t.Errorf("unexpected result %d\n--- want\n+++ got\n%s", i, cmp.Diff(step.want, got.Fields)) + } + switch got := p.(*cache).store.(type) { + case *memStore: + allow := cmp.AllowUnexported(CacheEntry{}) + ignore := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") + if !cmp.Equal(step.wantCacheVal, got.cache, allow, ignore) { + t.Errorf("unexpected cache state result %d:\n--- want\n+++ got\n%s", i, cmp.Diff(step.wantCacheVal, got.cache, allow, ignore)) + } + } + } + } + }) + } +} diff --git a/libbeat/processors/cache/config.go b/libbeat/processors/cache/config.go new file mode 100644 index 00000000000..36eeb423fc2 --- /dev/null +++ b/libbeat/processors/cache/config.go @@ -0,0 +1,118 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "time" +) + +type config struct { + Get *getConfig `config:"get"` + Put *putConfig `config:"put"` + Delete *delConfig `config:"delete"` + + Store *storeConfig `config:"backend" validate:"required"` + + // IgnoreMissing: Ignore errors if event has no matching field. + IgnoreMissing bool `config:"ignore_missing"` + + // OverwriteKeys allow target_fields to overwrite existing fields. + OverwriteKeys bool `config:"overwrite_keys"` +} + +func (cfg *config) Validate() error { + var ops int + if cfg.Put != nil { + ops++ + } + if cfg.Get != nil { + ops++ + } + if cfg.Delete != nil { + ops++ + } + switch ops { + case 0: + return errors.New("no operation specified for cache processor") + case 1: + return nil + default: + return errors.New("cannot specify multiple operations together in a cache processor") + } +} + +type getConfig struct { + // Key is the field containing the key to lookup for matching. + Key string `config:"key_field" validate:"required"` + + // Target is the destination field where fields will be added. + Target string `config:"target_field" validate:"required"` +} + +type putConfig struct { + // Key is the field containing the key to lookup for matching. + Key string `config:"key_field" validate:"required"` + + // Target is the destination field where fields will be added. + Value string `config:"value_field" validate:"required"` + + TTL *time.Duration `config:"ttl" validate:"required"` +} + +type delConfig struct { + // Key is the field containing the key to lookup for deletion. + Key string `config:"key_field" validate:"required"` +} + +func defaultConfig() config { + return config{ + IgnoreMissing: true, + OverwriteKeys: false, + } +} + +type storeConfig struct { + Memory *memConfig `config:"memory"` + File *fileConfig `config:"file"` + + // Capacity and Effort are currently experimental + // and not in public-facing documentation. + Capacity int `config:"capacity"` + Effort int `config:"eviction_effort"` +} + +type memConfig struct { + ID string `config:"id" validate:"required"` +} + +type fileConfig struct { + ID string `config:"id" validate:"required"` + WriteOutEvery time.Duration `config:"write_interval"` +} + +func (cfg *storeConfig) Validate() error { + switch { + case cfg.Memory != nil && cfg.File != nil: + return errors.New("must specify only one of backend.memory.id or backend.file.id") + case cfg.Memory != nil, cfg.File != nil: + default: + return errors.New("must specify one of backend.memory.id or backend.file.id") + } + return nil +} diff --git a/libbeat/processors/cache/config_test.go b/libbeat/processors/cache/config_test.go new file mode 100644 index 00000000000..4a956caef02 --- /dev/null +++ b/libbeat/processors/cache/config_test.go @@ -0,0 +1,218 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "errors" + "testing" + + "github.com/elastic/go-ucfg/yaml" +) + +var validateTests = []struct { + name string + cfg string + want error +}{ + { + name: "put_file", + cfg: ` +backend: + file: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_file_with_periodic_write_out", + cfg: ` +backend: + file: + id: aidmaster + write_interval: 15m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "put_memory", + cfg: ` +backend: + memory: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "get", + cfg: ` +backend: + file: + id: aidmaster +get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +`, + want: nil, + }, + { + name: "delete", + cfg: ` +backend: + file: + id: aidmaster +delete: + key_field: crowdstrike.aid +`, + want: nil, + }, + { + name: "memory_no_id", + cfg: ` +backend: + memory: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.memory.id'"), + }, + { + name: "file_no_id", + cfg: ` +backend: + file: + id: '' +delete: + key_field: crowdstrike.aid +`, + want: errors.New("string value is not set accessing 'backend.file.id'"), + }, + { + name: "no_op", + cfg: ` +backend: + file: + id: aidmaster +`, + want: errors.New("no operation specified for cache processor accessing config"), + }, + { + + name: "too_many_ops", + cfg: ` +backend: + file: + id: aidmaster +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +`, + want: errors.New("cannot specify multiple operations together in a cache processor accessing config"), + }, + { + + name: "no_backend", + cfg: ` +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("missing required field accessing 'backend'"), + }, + { + + name: "incomplete_backend", + cfg: ` +backend: + file: ~ +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("must specify one of backend.memory.id or backend.file.id accessing 'backend'"), + }, + { + + name: "too_many_backends", + cfg: ` +backend: + file: + id: aidmaster_f + memory: + id: aidmaster_m +put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata +`, + want: errors.New("must specify only one of backend.memory.id or backend.file.id accessing 'backend'"), + }, +} + +func TestValidate(t *testing.T) { + for _, test := range validateTests { + t.Run(test.name, func(t *testing.T) { + got := ucfgRigmarole(test.cfg) + if !sameError(got, test.want) { + t.Errorf("unexpected error: got:%v want:%v", got, test.want) + } + }) + } +} + +func ucfgRigmarole(text string) error { + c, err := yaml.NewConfig([]byte(text)) + if err != nil { + return err + } + cfg := defaultConfig() + err = c.Unpack(&cfg) + if err != nil { + return err + } + return cfg.Validate() +} + +func sameError(a, b error) bool { + switch { + case a == nil && b == nil: + return true + case a == nil, b == nil: + return false + default: + return a.Error() == b.Error() + } +} diff --git a/libbeat/processors/cache/docs/cache.asciidoc b/libbeat/processors/cache/docs/cache.asciidoc new file mode 100644 index 00000000000..3e17d37e213 --- /dev/null +++ b/libbeat/processors/cache/docs/cache.asciidoc @@ -0,0 +1,108 @@ +[[add-cached-metadata]] +=== Add cached metadata + +++++ +cache +++++ + +experimental[] + +The `cache` processor enriches events with information from a previously +cached events. + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + put: + key_field: join_key_field + value_field: source_field +------------------------------------------------------------------------------- + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + get: + key_field: join_key_field + target_field: destination_field +------------------------------------------------------------------------------- + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - cache: + backend: + memory: + id: cache_id + delete: + key_field: join_key_field +------------------------------------------------------------------------------- + +The fields added to the target field will depend on the provider. + +It has the following settings: + +One of `backend.memory.id` or `backend.file.id` must be provided. + +`backend.memory.id`:: The ID of a memory-based cache. Use the same ID across instance to reference the same cache. +`backend.file.id`:: The ID of a file-based cache. Use the same ID across instance to reference the same cache. +`backend.file.write_frequency`:: The frequency the cache is periodically written to the backing file. Valid time units are h, m, s, ms, us/µs and ns. Periodic writes are only made if `backend.file.write_frequency` is greater than zero. The contents are always written out to the backing file when the processor is closed. Default is zero, no periodic writes. + +One of `put`, `get` or `delete` must be provided. + +`put.key_field`:: Name of the field containing the key to put into the cache. Required if `put` is present. +`put.value_field`:: Name of the field containing the value to put into the cache. Required if `put` is present. +`put.ttl`:: The TTL to associate with the cached key/value. Valid time units are h, m, s, ms, us/µs and ns. Required if `put` is present. + +`get.key_field`:: Name of the field containing the key to get. Required if `get` is present. +`get.target_field`:: Name of the field to which the cached value will be written. Required if `get` is present. + +`delete.key_field`:: Name of the field containing the key to delete. Required if `delete` is present. + +`ignore_missing`:: (Optional) When set to `false`, events that don't contain any +of the fields in `match_keys` will be discarded and an error will be generated. By +default, this condition is ignored. + +`overwrite_keys`:: (Optional) By default, if a target field already exists, it +will not be overwritten and an error will be logged. If `overwrite_keys` is +set to `true`, this condition will be ignored. + +The `cache` processor can be used to perform joins within the Beat between +documents within an event stream. + +[source,yaml] +------------------------------------------------------------------------------- +processors: + - if: + contains: + log.file.path: fdrv2/aidmaster + then: + - cache: + backend: + memory: + id: aidmaster + put: + ttl: 168h + key_field: crowdstrike.aid + value_field: crowdstrike.metadata + else: + - cache: + backend: + memory: + id: aidmaster + get: + key_field: crowdstrike.aid + target_field: crowdstrike.metadata +------------------------------------------------------------------------------- + +This would enrich an event events with `log.file.path` not equal to +"fdrv2/aidmaster" with the `crowdstrike.metadata` fields from events +with `log.file.path` equal to that value where the `crowdstrike.aid` +field matches between the source and destination documents. diff --git a/libbeat/processors/cache/file_store.go b/libbeat/processors/cache/file_store.go new file mode 100644 index 00000000000..45ed9d84068 --- /dev/null +++ b/libbeat/processors/cache/file_store.go @@ -0,0 +1,294 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "encoding/json" + "errors" + "io" + "io/fs" + "os" + "path/filepath" + "strings" + "sync" + "time" + + "github.com/elastic/elastic-agent-libs/logp" + "github.com/elastic/elastic-agent-libs/paths" +) + +var fileStores = fileStoreSet{stores: map[string]*fileStore{}} + +// fileStoreSet is a collection of shared fileStore caches. +type fileStoreSet struct { + mu sync.Mutex + stores map[string]*fileStore +} + +// get returns a fileStore cache with the provided ID based on the config. +// If a fileStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the fileStore from the set if the +// count reaches zero. +func (s *fileStoreSet) get(id string, cfg config, log *logp.Logger) (*fileStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newFileStore(cfg, id, pathFromConfig(cfg, log), log) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// pathFromConfig returns the mapping form a config to a file-system path. +func pathFromConfig(cfg config, log *logp.Logger) string { + path := filepath.Join(paths.Resolve(paths.Data, "cache_processor"), cleanFilename(cfg.Store.File.ID)) + log.Infow("mapping file-backed cache processor config to file path", "id", cfg.Store.File.ID, "path", path) + return path +} + +// cleanFilename replaces illegal printable characters (and space or dot) in +// filenames, with underscore. +func cleanFilename(s string) string { + return pathCleaner.Replace(s) +} + +var pathCleaner = strings.NewReplacer( + "/", "_", + "<", "_", + ">", "_", + ":", "_", + `"`, "_", + "/", "_", + `\`, "_", + "|", "_", + "?", "_", + "*", "_", + ".", "_", + " ", "_", +) + +// free removes the fileStore with the given ID from the set. free is safe +// for concurrent use. +func (s *fileStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// fileStore is a file-backed cache store. +type fileStore struct { + memStore + + path string + // cancel stops periodic write out operations. + // Write out operations are protected by the + // memStore's mutex. + cancel context.CancelFunc + + log *logp.Logger +} + +// newFileStore returns a new fileStore configured to apply the give TTL duration. +// The fileStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the fileStore is held in. +func newFileStore(cfg config, id, path string, log *logp.Logger) *fileStore { + s := fileStore{ + path: path, + log: log, + memStore: memStore{ + id: id, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + }, + } + s.cancel = noop + if cfg.Store.File.WriteOutEvery > 0 { + var ctx context.Context + ctx, s.cancel = context.WithCancel(context.Background()) + go s.periodicWriteOut(ctx, cfg.Store.File.WriteOutEvery) + } + s.readState() + return &s +} + +func (c *fileStore) String() string { return "file:" + c.id } + +// dropFrom decreases the reference count for the fileStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *fileStore) dropFrom(stores *fileStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + // Stop periodic writes + c.cancel() + // and do a final write out. + c.writeState(true) + + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +func (c *fileStore) readState() { + f, err := os.Open(c.path) + if err != nil { + if errors.Is(err, fs.ErrNotExist) { + c.log.Debugw("no state on file system", "error", err) + } else { + c.log.Errorw("failed to open file to read state", "error", err) + } + return + } + defer f.Close() + + // It would be nice to be able at this stage to determine + // whether the file is stale past the TTL of the cache, but + // we do not have this information yet. So we must read + // through all the elements. If any survive the filter, we + // were alive, otherwise delete the file. + + dec := json.NewDecoder(f) + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + switch err := err.(type) { + case *json.SyntaxError: + c.log.Errorw("failed to read state element", "error", err, "path", c.path, "offset", err.Offset) + default: + c.log.Errorw("failed to read state element", "error", err, "path", c.path) + } + } + break + } + if e.Expires.Before(time.Now()) { + // Don't retain expired elements. + continue + } + c.cache[e.Key] = &e + heap.Push(&c.expiries, &e) + } + + if len(c.cache) != 0 { + return + } + // We had no live entries, so delete the file. + err = os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete stale cache file", "error", err) + } +} + +// periodicWriteOut writes the cache contents to the backing file at the +// specified interval until the context is cancelled. periodicWriteOut is +// safe for concurrent use. +func (c *fileStore) periodicWriteOut(ctx context.Context, every time.Duration) { + tick := time.NewTicker(every) + defer tick.Stop() + for { + select { + case <-tick.C: + c.mu.Lock() + c.writeState(false) + c.mu.Unlock() + case <-ctx.Done(): + return + } + } +} + +// writeState writes the current cache state to the backing file. +// If final is true and the cache is empty, the file will be deleted. +func (c *fileStore) writeState(final bool) { + if len(c.cache) == 0 && final { + err := os.Remove(c.path) + if err != nil { + c.log.Errorw("failed to delete write state when empty", "error", err) + } + return + } + f, err := os.CreateTemp(filepath.Dir(c.path), filepath.Base(c.path)+"-*.tmp") + if err != nil { + c.log.Errorw("failed to open file to write state", "error", err) + return + } + // Try to make sure we are private. + err = os.Chmod(f.Name(), 0o600) + if err != nil { + c.log.Errorw("failed to set state file mode", "error", err) + return + } + tmp := f.Name() + defer func() { + err = f.Sync() + if err != nil { + c.log.Errorw("failed to sync file after writing state", "error", err) + return + } + err = f.Close() + if err != nil { + c.log.Errorw("failed to close file after writing state", "error", err) + return + } + // Try to be atomic. + err = os.Rename(tmp, c.path) + if err != nil { + c.log.Errorw("failed to finalize writing state", "error", err) + } + }() + + enc := json.NewEncoder(f) + enc.SetEscapeHTML(false) + now := time.Now() + for c.expiries.Len() != 0 { + e := c.expiries.pop() + if e.Expires.Before(now) { + // Don't write expired elements. + continue + } + err = enc.Encode(e) + if err != nil { + c.log.Errorw("failed to write state element", "error", err) + return + } + } +} diff --git a/libbeat/processors/cache/file_store_test.go b/libbeat/processors/cache/file_store_test.go new file mode 100644 index 00000000000..557a7eca9a5 --- /dev/null +++ b/libbeat/processors/cache/file_store_test.go @@ -0,0 +1,455 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "encoding/json" + "errors" + "flag" + "fmt" + "io" + "io/fs" + "os" + "path/filepath" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" + + "github.com/elastic/elastic-agent-libs/logp" +) + +var keep = flag.Bool("keep", false, "keep testdata after test complete") + +type fileStoreTestSteps struct { + doTo func(*fileStore) error + want *fileStore +} + +//nolint:errcheck // Paul Hogan was right. +var fileStoreTests = []struct { + name string + cfg config + want *fileStore + steps []fileStoreTestSteps + wantPersisted []*CacheEntry +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &fileStore{path: "testdata/new_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &fileStore{path: "testdata/new_delete", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/new_get_add_put", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }}, + steps: []fileStoreTestSteps{ + 0: { + doTo: func(s *fileStore) error { + putCfg := config{ + Store: &storeConfig{ + File: &fileConfig{ID: "test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 1: { + doTo: func(s *fileStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 2: { + doTo: func(s *fileStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 3: { + doTo: func(s *fileStore) error { + return s.Delete("two") + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 4: { + doTo: func(s *fileStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 5: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if !fileStores.has(s.id) { + return fmt.Errorf("%q fileStore not found after single close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + 6: { + doTo: func(s *fileStore) error { + s.dropFrom(&fileStores) + if fileStores.has(s.id) { + return fmt.Errorf("%q fileStore still found after double close", s.id) + } + return nil + }, + want: &fileStore{path: "testdata/ensemble", memStore: memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }}, + }, + }, + wantPersisted: []*CacheEntry{ + // Numeric values are float due to JSON round-trip. + {Key: "one", Value: 1.0}, + {Key: "three", Value: 3.0}, + }, + }, +} + +func TestFileStore(t *testing.T) { + err := os.RemoveAll("testdata") + if err != nil && !errors.Is(err, fs.ErrNotExist) { + t.Fatalf("failed to clear testdata directory: %v", err) + } + err = os.Mkdir("testdata", 0o755) + if err != nil && !errors.Is(err, fs.ErrExist) { + t.Fatalf("failed to create testdata directory: %v", err) + } + if !*keep { + t.Cleanup(func() { os.RemoveAll("testdata") }) + } + + allow := cmp.AllowUnexported(fileStore{}, memStore{}, CacheEntry{}) + ignoreInFileStore := cmpopts.IgnoreFields(fileStore{}, "cancel", "log") + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") + + for _, test := range fileStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + path := filepath.Join("testdata", test.name) + store := newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + store.add(test.cfg) + fileStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInFileStore, ignoreInMemStore) { + t.Errorf("unexpected new fileStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInFileStore, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected fileStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInFileStore, ignoreInMemStore, ignoreInCacheEntry)) + } + } + if test.wantPersisted == nil { + return + } + + f, err := os.Open(path) + if err != nil { + t.Fatalf("failed to open persisted data: %v", err) + } + defer f.Close() + dec := json.NewDecoder(f) + var got []*CacheEntry + for { + var e CacheEntry + err = dec.Decode(&e) + if err != nil { + if err != io.EOF { + t.Fatalf("unexpected error reading persisted cache data: %v", err) + } + break + } + got = append(got, &e) + } + if !cmp.Equal(test.wantPersisted, got, allow, ignoreInCacheEntry) { + t.Errorf("unexpected persisted state:\n--- want\n+++ got\n%s", + cmp.Diff(test.wantPersisted, got, allow, ignoreInCacheEntry)) + } + wantCache := make(map[string]*CacheEntry) + for _, e := range got { + wantCache[e.Key] = e + } + store = newFileStore(test.cfg, test.cfg.Store.File.ID, path, logp.L()) + // Specialise the in cache entry ignore list to include index. + ignoreMoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires", "index") + if !cmp.Equal(wantCache, store.cache, allow, ignoreMoreInCacheEntry) { + t.Errorf("unexpected restored state:\n--- want\n+++ got\n%s", + cmp.Diff(wantCache, store.cache, allow, ignoreMoreInCacheEntry)) + } + for k, e := range store.cache { + if e.index < 0 || len(store.expiries) <= e.index { + t.Errorf("cache entry %s index out of bounds: got:%d [0,%d)", k, e.index, len(store.expiries)) + continue + } + if !cmp.Equal(e, store.expiries[e.index], allow, ignoreInCacheEntry) { + t.Errorf("unexpected mismatched cache/expiry state %s:\n--- want\n+++ got\n%s", + k, cmp.Diff(e, store.expiries[e.index], allow, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set. It is used only for testing. +func (s *fileStoreSet) add(store *fileStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set. It is used only for testing. +func (s *fileStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} diff --git a/libbeat/processors/cache/mem_store.go b/libbeat/processors/cache/mem_store.go new file mode 100644 index 00000000000..774a019d783 --- /dev/null +++ b/libbeat/processors/cache/mem_store.go @@ -0,0 +1,253 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "container/heap" + "context" + "sync" + "time" +) + +var memStores = memStoreSet{stores: map[string]*memStore{}} + +// memStoreSet is a collection of shared memStore caches. +type memStoreSet struct { + mu sync.Mutex + stores map[string]*memStore +} + +// get returns a memStore cache with the provided ID based on the config. +// If a memStore with the ID already exist, its configuration is adjusted +// and its reference count is increased. The returned context.CancelFunc +// reduces the reference count and deletes the memStore from the set if the +// count reaches zero. +func (s *memStoreSet) get(id string, cfg config) (*memStore, context.CancelFunc) { + s.mu.Lock() + defer s.mu.Unlock() + store, ok := s.stores[id] + if !ok { + store = newMemStore(cfg, id) + s.stores[store.id] = store + } + store.add(cfg) + + return store, func() { + store.dropFrom(s) + } +} + +// free removes the memStore with the given ID from the set. free is safe +// for concurrent use. +func (s *memStoreSet) free(id string) { + s.mu.Lock() + delete(s.stores, id) + s.mu.Unlock() +} + +// memStore is a memory-backed cache store. +type memStore struct { + mu sync.Mutex + cache map[string]*CacheEntry + expiries expiryHeap + ttl time.Duration // ttl is the time entries are valid for in the cache. + refs int // refs is the number of processors referring to this store. + + // id is the index into global cache store for the cache. + id string + + // cap is the maximum number of elements the cache + // will hold. If not positive, no limit. + cap int + // effort is the number of entries to examine during + // expired element eviction. If not positive, full effort. + effort int +} + +// newMemStore returns a new memStore configured to apply the give TTL duration. +// The memStore is guaranteed not to grow larger than cap elements. id is the +// look-up into the global cache store the memStore is held in. +func newMemStore(cfg config, id string) *memStore { + return &memStore{ + id: id, + cache: make(map[string]*CacheEntry), + + // Mark the ttl as invalid until we have had a put + // operation configured. While the shared backing + // data store is incomplete, and has no put operation + // defined, the TTL will be invalid, but will never + // be accessed since all time operations outside put + // refer to absolute times, held by the CacheEntry. + ttl: -1, + cap: -1, + effort: -1, + } +} + +func (c *memStore) String() string { return "memory:" + c.id } + +// add updates the receiver for a new operation. It increases the reference +// count for the receiver, and if the config is a put operation and has no +// previous put operation defined, the TTL, cap and effort will be set from +// cfg. add is safe for concurrent use. +func (c *memStore) add(cfg config) { + c.mu.Lock() + defer c.mu.Unlock() + c.refs++ + + // We may have already constructed the store with + // a get or a delete config, so set the TTL, cap + // and effort if we have a put config. If another + // put config has already been included, we ignore + // the put options now. + if cfg.Put == nil { + return + } + if c.ttl == -1 { + // putConfig.TTL is a required field, so we don't + // need to check for nil-ness. + c.ttl = *cfg.Put.TTL + c.cap = cfg.Store.Capacity + c.effort = cfg.Store.Effort + } +} + +// dropFrom decreases the reference count for the memStore and removes it from +// the stores map if the count is zero. dropFrom is safe for concurrent use. +func (c *memStore) dropFrom(stores *memStoreSet) { + c.mu.Lock() + c.refs-- + if c.refs < 0 { + panic("invalid reference count") + } + if c.refs == 0 { + stores.free(c.id) + // GC assists. + c.cache = nil + c.expiries = nil + } + c.mu.Unlock() +} + +// Get returns the cached value associated with the provided key. If there is +// no value for the key, or the value has expired Get returns ErrNoData. Get +// is safe for concurrent use. +func (c *memStore) Get(key string) (any, error) { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.cache[key] + if !ok { + return nil, ErrNoData + } + if time.Now().After(v.Expires) { + delete(c.cache, key) + return nil, ErrNoData + } + return v.Value, nil +} + +// Put stores the provided value in the cache associated with the given key. +// The value is given an expiry time based on the configured TTL of the cache. +// Put is safe for concurrent use. +func (c *memStore) Put(key string, val any) error { + c.mu.Lock() + defer c.mu.Unlock() + now := time.Now() + c.evictExpired(now) + e := &CacheEntry{ + Key: key, + Value: val, + Expires: now.Add(c.ttl), + } + c.cache[key] = e + heap.Push(&c.expiries, e) + return nil +} + +// evictExpired removes up to effort elements from the cache when the cache +// is below capacity, retaining all elements that have not expired. If the +// cache is at or above capacity, the oldest elements are removed to bring +// it under the capacity limit. +func (c *memStore) evictExpired(now time.Time) { + for n := 0; (c.effort <= 0 || n < c.effort) && len(c.cache) != 0; n++ { + if c.expiries[0].Expires.After(now) { + break + } + e := c.expiries.pop() + delete(c.cache, e.Key) + } + if c.cap <= 0 { + // No cap, so depend on effort. + return + } + for len(c.cache) >= c.cap { + e := c.expiries.pop() + delete(c.cache, e.Key) + } +} + +// Delete removes the value associated with the provided key from the cache. +// Delete is safe for concurrent use. +func (c *memStore) Delete(key string) error { + c.mu.Lock() + defer c.mu.Unlock() + v, ok := c.cache[key] + if !ok { + return nil + } + heap.Remove(&c.expiries, v.index) + delete(c.cache, key) + return nil +} + +var _ heap.Interface = (*expiryHeap)(nil) + +// expiryHeap is a min-date heap. +// +// TODO: This could be a queue instead, though deletion becomes more +// complicated in that case. +type expiryHeap []*CacheEntry + +func (h *expiryHeap) pop() *CacheEntry { + e := heap.Pop(h).(*CacheEntry) + e.index = -1 + return e +} + +func (h expiryHeap) Len() int { + return len(h) +} +func (h expiryHeap) Less(i, j int) bool { + return h[i].Expires.Before(h[j].Expires) +} +func (h expiryHeap) Swap(i, j int) { + h[i], h[j] = h[j], h[i] + h[i].index = i + h[j].index = j +} +func (h *expiryHeap) Push(v any) { + e := v.(*CacheEntry) + e.index = len(*h) + *h = append(*h, e) +} +func (h *expiryHeap) Pop() any { + v := (*h)[len(*h)-1] + (*h)[len(*h)-1] = nil // Help GC. + *h = (*h)[:len(*h)-1] + return v +} diff --git a/libbeat/processors/cache/mem_store_test.go b/libbeat/processors/cache/mem_store_test.go new file mode 100644 index 00000000000..6be5eadb357 --- /dev/null +++ b/libbeat/processors/cache/mem_store_test.go @@ -0,0 +1,379 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package cache + +import ( + "fmt" + "testing" + "time" + + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +type memStoreTestSteps struct { + doTo func(*memStore) error + want *memStore +} + +//nolint:errcheck // Paul Hogan was right. +var memStoreTests = []struct { + name string + cfg config + want *memStore + steps []memStoreTestSteps +}{ + { + name: "new_put", + cfg: config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + { + name: "new_get", + cfg: config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + }, + { + name: "new_delete", + cfg: config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Delete: &delConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + }, + { + name: "new_get_add_put", + cfg: config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + // TTL, capacity and effort are set only by put. + refs: 1, + ttl: -1, + cap: -1, + effort: -1, + }, + steps: []memStoreTestSteps{ + 0: { + doTo: func(s *memStore) error { + putCfg := config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + }, + }, + { + name: "ensemble", + cfg: config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Get: &getConfig{}, + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 1, + // TTL, capacity and effort are set only by put. + ttl: -1, + cap: -1, + effort: -1, + }, + steps: []memStoreTestSteps{ + 0: { + doTo: func(s *memStore) error { + putCfg := config{ + Store: &storeConfig{ + Memory: &memConfig{"test"}, + Capacity: 1000, + Effort: 10, + }, + Put: &putConfig{ + TTL: ptrTo(time.Second), + }, + } + s.add(putCfg) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{}, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 1: { + doTo: func(s *memStore) error { + s.Put("one", 1) + s.Put("two", 2) + s.Put("three", 3) + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 2: { + doTo: func(s *memStore) error { + got, err := s.Get("two") + if got != 2 { + return fmt.Errorf(`unexpected result from Get("two"): got:%v want:2`, got) + } + return err + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "two": {Key: "two", Value: int(2), index: 1}, + "three": {Key: "three", Value: int(3), index: 2}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "two", Value: int(2), index: 1}, + {Key: "three", Value: int(3), index: 2}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 3: { + doTo: func(s *memStore) error { + return s.Delete("two") + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 4: { + doTo: func(s *memStore) error { + got, _ := s.Get("two") + if got != nil { + return fmt.Errorf(`unexpected result from Get("two") after deletion: got:%v want:nil`, got) + } + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 2, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 5: { + doTo: func(s *memStore) error { + s.dropFrom(&memStores) + if !memStores.has(s.id) { + return fmt.Errorf("%q memStore not found after single close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: map[string]*CacheEntry{ + "one": {Key: "one", Value: int(1), index: 0}, + "three": {Key: "three", Value: int(3), index: 1}, + }, + expiries: expiryHeap{ + {Key: "one", Value: int(1), index: 0}, + {Key: "three", Value: int(3), index: 1}, + }, + refs: 1, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + 6: { + doTo: func(s *memStore) error { + s.dropFrom(&memStores) + if memStores.has(s.id) { + return fmt.Errorf("%q memStore still found after double close", s.id) + } + return nil + }, + want: &memStore{ + id: "test", + cache: nil, // assistively nil-ed. + expiries: nil, // assistively nil-ed. + refs: 0, + ttl: time.Second, + cap: 1000, + effort: 10, + }, + }, + }, + }, +} + +func TestMemStore(t *testing.T) { + allow := cmp.AllowUnexported(memStore{}, CacheEntry{}) + ignoreInMemStore := cmpopts.IgnoreFields(memStore{}, "mu") + ignoreInCacheEntry := cmpopts.IgnoreFields(CacheEntry{}, "Expires") + + for _, test := range memStoreTests { + t.Run(test.name, func(t *testing.T) { + // Construct the store and put in into the stores map as + // we would if we were calling Run. + store := newMemStore(test.cfg, test.cfg.Store.Memory.ID) + store.add(test.cfg) + memStores.add(store) + + if !cmp.Equal(test.want, store, allow, ignoreInMemStore) { + t.Errorf("unexpected new memStore result:\n--- want\n+++ got\n%s", + cmp.Diff(test.want, store, allow, ignoreInMemStore)) + } + for i, step := range test.steps { + err := step.doTo(store) + if err != nil { + t.Errorf("unexpected error at step %d: %v", i, err) + } + if !cmp.Equal(step.want, store, allow, ignoreInMemStore, ignoreInCacheEntry) { + t.Errorf("unexpected memStore step %d result:\n--- want\n+++ got\n%s", + i, cmp.Diff(step.want, store, allow, ignoreInMemStore, ignoreInCacheEntry)) + } + } + }) + } +} + +// add adds the store to the set. It is used only for testing. +func (s *memStoreSet) add(store *memStore) { + s.mu.Lock() + s.stores[store.id] = store + s.mu.Unlock() +} + +// has returns whether the store exists in the set. It is used only for testing. +func (s *memStoreSet) has(id string) bool { + s.mu.Lock() + _, ok := s.stores[id] + s.mu.Unlock() + return ok +} + +func ptrTo[T any](v T) *T { return &v } diff --git a/libbeat/processors/ratelimit/docs/rate_limit.asciidoc b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc index 9a7a2f32322..aa847ea4292 100644 --- a/libbeat/processors/ratelimit/docs/rate_limit.asciidoc +++ b/libbeat/processors/ratelimit/docs/rate_limit.asciidoc @@ -1,6 +1,5 @@ [[rate-limit]] === Rate limit the flow of events -beta[] ++++ rate_limit diff --git a/libbeat/reader/readfile/fs_metafields_other.go b/libbeat/reader/readfile/fs_metafields_other.go index 425b7435fe8..cc764c4bbcc 100644 --- a/libbeat/reader/readfile/fs_metafields_other.go +++ b/libbeat/reader/readfile/fs_metafields_other.go @@ -22,6 +22,7 @@ package readfile import ( "fmt" "os" + "strconv" "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/elastic-agent-libs/mapstr" @@ -34,11 +35,11 @@ const ( func setFileSystemMetadata(fi os.FileInfo, fields mapstr.M) error { osstate := file.GetOSState(fi) - _, err := fields.Put(deviceIDKey, osstate.Device) + _, err := fields.Put(deviceIDKey, strconv.FormatUint(osstate.Device, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", deviceIDKey, err) } - _, err = fields.Put(inodeKey, osstate.Inode) + _, err = fields.Put(inodeKey, osstate.InodeString()) if err != nil { return fmt.Errorf("failed to set %q: %w", inodeKey, err) } diff --git a/libbeat/reader/readfile/fs_metafields_windows.go b/libbeat/reader/readfile/fs_metafields_windows.go index 113a74cf829..97bfd5c72de 100644 --- a/libbeat/reader/readfile/fs_metafields_windows.go +++ b/libbeat/reader/readfile/fs_metafields_windows.go @@ -20,6 +20,7 @@ package readfile import ( "fmt" "os" + "strconv" "github.com/elastic/beats/v7/libbeat/common/file" "github.com/elastic/elastic-agent-libs/mapstr" @@ -33,15 +34,15 @@ const ( func setFileSystemMetadata(fi os.FileInfo, fields mapstr.M) error { osstate := file.GetOSState(fi) - _, err := fields.Put(idxhiKey, osstate.IdxHi) + _, err := fields.Put(idxhiKey, strconv.FormatUint(osstate.IdxHi, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", idxhiKey, err) } - _, err = fields.Put(idxloKey, osstate.IdxLo) + _, err = fields.Put(idxloKey, strconv.FormatUint(osstate.IdxLo, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", idxloKey, err) } - _, err = fields.Put(volKey, osstate.Vol) + _, err = fields.Put(volKey, strconv.FormatUint(osstate.Vol, 10)) if err != nil { return fmt.Errorf("failed to set %q: %w", volKey, err) } diff --git a/libbeat/reader/readfile/metafields_other_test.go b/libbeat/reader/readfile/metafields_other_test.go index 7874d24d4ae..b9d25b85420 100644 --- a/libbeat/reader/readfile/metafields_other_test.go +++ b/libbeat/reader/readfile/metafields_other_test.go @@ -44,13 +44,13 @@ func checkFields(t *testing.T, expected, actual mapstr.M) { dev, err := actual.GetValue(deviceIDKey) require.NoError(t, err) - require.Equal(t, uint64(17), dev) + require.Equal(t, "17", dev) err = actual.Delete(deviceIDKey) require.NoError(t, err) inode, err := actual.GetValue(inodeKey) require.NoError(t, err) - require.Equal(t, uint64(999), inode) + require.Equal(t, "999", inode) err = actual.Delete(inodeKey) require.NoError(t, err) diff --git a/libbeat/reader/readfile/metafields_windows_test.go b/libbeat/reader/readfile/metafields_windows_test.go index 37ff5cb4bda..dce0b8d2161 100644 --- a/libbeat/reader/readfile/metafields_windows_test.go +++ b/libbeat/reader/readfile/metafields_windows_test.go @@ -52,19 +52,19 @@ func checkFields(t *testing.T, expected, actual mapstr.M) { idxhi, err := actual.GetValue(idxhiKey) require.NoError(t, err) - require.Equal(t, uint64(100), idxhi) + require.Equal(t, "100", idxhi) err = actual.Delete(idxhiKey) require.NoError(t, err) idxlo, err := actual.GetValue(idxloKey) require.NoError(t, err) - require.Equal(t, uint64(200), idxlo) + require.Equal(t, "200", idxlo) err = actual.Delete(idxloKey) require.NoError(t, err) vol, err := actual.GetValue(volKey) require.NoError(t, err) - require.Equal(t, uint64(300), vol) + require.Equal(t, "300", vol) err = actual.Delete(volKey) require.NoError(t, err) diff --git a/metricbeat/metricbeat.reference.yml b/metricbeat/metricbeat.reference.yml index e147e07e73b..a9b800a9012 100644 --- a/metricbeat/metricbeat.reference.yml +++ b/metricbeat/metricbeat.reference.yml @@ -1285,8 +1285,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/packetbeat/packetbeat.reference.yml b/packetbeat/packetbeat.reference.yml index fae2d02a15d..19db7ba8dd2 100644 --- a/packetbeat/packetbeat.reference.yml +++ b/packetbeat/packetbeat.reference.yml @@ -916,8 +916,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/testing/environments/snapshot.yml b/testing/environments/snapshot.yml index 7a387583a4c..20233b29f4c 100644 --- a/testing/environments/snapshot.yml +++ b/testing/environments/snapshot.yml @@ -3,7 +3,7 @@ version: '2.3' services: elasticsearch: - image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0-c47419c4-SNAPSHOT + image: docker.elastic.co/elasticsearch/elasticsearch:8.11.0-b2ac6b4d-SNAPSHOT # When extend is used it merges healthcheck.tests, see: # https://github.com/docker/compose/issues/8962 # healthcheck: @@ -31,7 +31,7 @@ services: - "./docker/elasticsearch/users_roles:/usr/share/elasticsearch/config/users_roles" logstash: - image: docker.elastic.co/logstash/logstash:8.11.0-c47419c4-SNAPSHOT + image: docker.elastic.co/logstash/logstash:8.11.0-b2ac6b4d-SNAPSHOT healthcheck: test: ["CMD", "curl", "-f", "http://localhost:9600/_node/stats"] retries: 600 @@ -44,7 +44,7 @@ services: - 5055:5055 kibana: - image: docker.elastic.co/kibana/kibana:8.11.0-c47419c4-SNAPSHOT + image: docker.elastic.co/kibana/kibana:8.11.0-b2ac6b4d-SNAPSHOT environment: - "ELASTICSEARCH_USERNAME=kibana_system_user" - "ELASTICSEARCH_PASSWORD=testing" diff --git a/winlogbeat/winlogbeat.reference.yml b/winlogbeat/winlogbeat.reference.yml index 6f5f4a2fdac..1e1744f10ec 100644 --- a/winlogbeat/winlogbeat.reference.yml +++ b/winlogbeat/winlogbeat.reference.yml @@ -332,8 +332,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/auditbeat/auditbeat.reference.yml b/x-pack/auditbeat/auditbeat.reference.yml index bd54d62b799..ca269243e9e 100644 --- a/x-pack/auditbeat/auditbeat.reference.yml +++ b/x-pack/auditbeat/auditbeat.reference.yml @@ -506,8 +506,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/filebeat/filebeat.reference.yml b/x-pack/filebeat/filebeat.reference.yml index b2d7185e4a2..0adff5d9ddb 100644 --- a/x-pack/filebeat/filebeat.reference.yml +++ b/x-pack/filebeat/filebeat.reference.yml @@ -3916,8 +3916,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/filebeat/input/httpjson/cursor.go b/x-pack/filebeat/input/httpjson/cursor.go index 92cd53a52a2..1a864361ee7 100644 --- a/x-pack/filebeat/input/httpjson/cursor.go +++ b/x-pack/filebeat/input/httpjson/cursor.go @@ -50,7 +50,7 @@ func (c *cursor) update(trCtx *transformContext) { } for k, cfg := range c.cfg { - v, _ := cfg.Value.Execute(trCtx, transformable{}, "", cfg.Default, c.log) + v, _ := cfg.Value.Execute(trCtx, transformable{}, k, cfg.Default, c.log) if v != "" || !cfg.mustIgnoreEmptyValue() { _, _ = c.state.Put(k, v) c.log.Debugf("cursor.%s stored with %s", k, v) diff --git a/x-pack/filebeat/input/httpjson/input_test.go b/x-pack/filebeat/input/httpjson/input_test.go index e4d94527e72..de4cc3f11e6 100644 --- a/x-pack/filebeat/input/httpjson/input_test.go +++ b/x-pack/filebeat/input/httpjson/input_test.go @@ -1248,6 +1248,8 @@ var testCases = []struct { } func TestInput(t *testing.T) { + logp.TestingSetup() + for _, test := range testCases { t.Run(test.name, func(t *testing.T) { if test.skipReason != "" { diff --git a/x-pack/filebeat/input/httpjson/rate_limiter.go b/x-pack/filebeat/input/httpjson/rate_limiter.go index dcbed7c1ac9..30c50ae3f05 100644 --- a/x-pack/filebeat/input/httpjson/rate_limiter.go +++ b/x-pack/filebeat/input/httpjson/rate_limiter.go @@ -104,7 +104,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { ctx := emptyTransformContext() ctx.updateLastResponse(response{header: resp.Header.Clone()}) - remaining, _ := r.remaining.Execute(ctx, tr, "", nil, r.log) + remaining, _ := r.remaining.Execute(ctx, tr, "rate-limit_remaining", nil, r.log) if remaining == "" { return 0, errors.New("remaining value is empty") } @@ -119,7 +119,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { if r.earlyLimit != nil { earlyLimit := *r.earlyLimit if earlyLimit > 0 && earlyLimit < 1 { - limit, _ := r.limit.Execute(ctx, tr, "", nil, r.log) + limit, _ := r.limit.Execute(ctx, tr, "early_limit", nil, r.log) if limit != "" { l, err := strconv.ParseInt(limit, 10, 64) if err == nil { @@ -141,7 +141,7 @@ func (r *rateLimiter) getRateLimit(resp *http.Response) (int64, error) { return 0, nil } - reset, _ := r.reset.Execute(ctx, tr, "", nil, r.log) + reset, _ := r.reset.Execute(ctx, tr, "rate-limit_reset", nil, r.log) if reset == "" { return 0, errors.New("reset value is empty") } diff --git a/x-pack/filebeat/input/httpjson/request.go b/x-pack/filebeat/input/httpjson/request.go index f70ab4094df..248918e8116 100644 --- a/x-pack/filebeat/input/httpjson/request.go +++ b/x-pack/filebeat/input/httpjson/request.go @@ -385,7 +385,7 @@ func evaluateResponse(expression *valueTpl, data []byte, log *logp.Logger) (bool lastResponse: &response{body: dataMap}, } - val, err := expression.Execute(paramCtx, tr, "", nil, log) + val, err := expression.Execute(paramCtx, tr, "response_evaluation", nil, log) if err != nil { return false, fmt.Errorf("error while evaluating expression: %w", err) } diff --git a/x-pack/filebeat/input/httpjson/value_tpl.go b/x-pack/filebeat/input/httpjson/value_tpl.go index 133271e726f..97bc75a62d9 100644 --- a/x-pack/filebeat/input/httpjson/value_tpl.go +++ b/x-pack/filebeat/input/httpjson/value_tpl.go @@ -96,7 +96,7 @@ func (t *valueTpl) Unpack(in string) error { func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, targetName string, defaultVal *valueTpl, log *logp.Logger) (val string, err error) { fallback := func(err error) (string, error) { if defaultVal != nil { - log.Debugf("template execution: falling back to default value") + log.Debugw("template execution: falling back to default value", "target", targetName) return defaultVal.Execute(emptyTransformContext(), transformable{}, targetName, nil, log) } return "", err @@ -107,7 +107,7 @@ func (t *valueTpl) Execute(trCtx *transformContext, tr transformable, targetName val, err = fallback(errExecutingTemplate) } if err != nil { - log.Debugf("template execution failed: %v", err) + log.Debugw("template execution failed", "target", targetName, "error", err) } tryDebugTemplateValue(targetName, val, log) }() @@ -142,7 +142,7 @@ func tryDebugTemplateValue(target, val string, log *logp.Logger) { case "Authorization", "Proxy-Authorization": // ignore filtered headers default: - log.Debugf("template execution: evaluated template %q", val) + log.Debugw("evaluated template", "target", target, "value", val) } } diff --git a/x-pack/filebeat/input/httpjson/value_tpl_test.go b/x-pack/filebeat/input/httpjson/value_tpl_test.go index 37589cd8821..487451099ad 100644 --- a/x-pack/filebeat/input/httpjson/value_tpl_test.go +++ b/x-pack/filebeat/input/httpjson/value_tpl_test.go @@ -19,6 +19,8 @@ import ( ) func TestValueTpl(t *testing.T) { + logp.TestingSetup() + cases := []struct { name string value string @@ -764,7 +766,7 @@ func TestValueTpl(t *testing.T) { assert.NoError(t, defTpl.Unpack(tc.paramDefVal)) } - got, err := tpl.Execute(tc.paramCtx, tc.paramTr, "", defTpl, logp.NewLogger("")) + got, err := tpl.Execute(tc.paramCtx, tc.paramTr, tc.name, defTpl, logp.NewLogger("")) assert.Equal(t, tc.expectedVal, got) if tc.expectedError == "" { assert.NoError(t, err) diff --git a/x-pack/functionbeat/functionbeat.reference.yml b/x-pack/functionbeat/functionbeat.reference.yml index cc1f2984e71..e83498588a8 100644 --- a/x-pack/functionbeat/functionbeat.reference.yml +++ b/x-pack/functionbeat/functionbeat.reference.yml @@ -574,8 +574,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/heartbeat/heartbeat.reference.yml b/x-pack/heartbeat/heartbeat.reference.yml index 15ca545740e..d3e124e9f34 100644 --- a/x-pack/heartbeat/heartbeat.reference.yml +++ b/x-pack/heartbeat/heartbeat.reference.yml @@ -542,8 +542,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/metricbeat/metricbeat.reference.yml b/x-pack/metricbeat/metricbeat.reference.yml index 033998d91e3..acf8e279516 100644 --- a/x-pack/metricbeat/metricbeat.reference.yml +++ b/x-pack/metricbeat/metricbeat.reference.yml @@ -1846,8 +1846,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/osquerybeat/osquerybeat.reference.yml b/x-pack/osquerybeat/osquerybeat.reference.yml index 525c62d86e3..e6fbc1c2ece 100644 --- a/x-pack/osquerybeat/osquerybeat.reference.yml +++ b/x-pack/osquerybeat/osquerybeat.reference.yml @@ -293,8 +293,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/packetbeat/LICENSE-Npcap.txt b/x-pack/packetbeat/LICENSE-Npcap.txt index 893b6ff218a..bb51b2fbd6f 100644 --- a/x-pack/packetbeat/LICENSE-Npcap.txt +++ b/x-pack/packetbeat/LICENSE-Npcap.txt @@ -1,228 +1,237 @@ - -NPCAP COPYRIGHT / END USER LICENSE AGREEMENT - -Npcap is a Windows packet sniffing driver and library and is copyright -(c) 2013-2022 by Insecure.Com LLC ("The Nmap Project"). All rights -reserved. - -Even though Npcap source code is publicly available for review, it is -not open source software and may not be redistributed without special -permission from the Nmap Project. The standard version is also -limited to installation on five systems. We fund the Npcap project by -selling two types of commercial licenses to a special Npcap OEM -edition: - -1) Npcap OEM Redistribution License allows companies to redistribute -Npcap with their products. - -2) Npcap OEM Internal Use License allows companies to use Npcap OEM -internally in excess of the free/demo version's normal 5-system -limitation. - -Both of these licenses include updates and support as well as a -warranty. Npcap OEM also includes a silent installer for unattended -installation. Further details about Npcap OEM are available from -https://nmap.org/npcap/oem/, and you are also welcome to contact us at -sales@nmap.com to ask any questions or set up a license for your -organization. - -Free and open source software producers are also welcome to contact us -for redistribution requests. However, we normally recommend that such -authors instead ask your users to download and install Npcap themselves. - -If the Nmap Project (directly or through one of our commercial -licensing customers) has granted you additional rights to Npcap or -Npcap OEM, those additional rights take precedence where they conflict -with the terms of this license agreement. - -Since the Npcap source code is available for download and review, -users sometimes contribute code patches to fix bugs or add new -features. By sending these changes to the Nmap Project (including -through direct email or our mailing lists or submitting pull requests -through our source code repository), it is understood unless you -specify otherwise that you are offering the Nmap Project the -unlimited, non-exclusive right to reuse, modify, and relicence your -code contribution so that we may (but are not obligated to) -incorporate it into Npcap. If you wish to specify special license -conditions or restrictions on your contributions, just say so when you -send them. - -This copy of Npcap (the "Software") and accompanying documentation is -licensed and not sold. This Software is protected by copyright laws -and treaties, as well as laws and treaties related to other forms of -intellectual property. The Nmap Project owns intellectual property -rights in the Software. The Licensee's ("you" or "your") license to -download, use, copy, or change the Software is subject to these rights -and to all the terms and conditions of this End User License Agreement -("Agreement"). - -ACCEPTANCE - -By accepting this agreement or by downloading, installing, using, or -copying the Software, or by clicking "I Agree", you agree to be bound -by the terms of this EULA. If you do not agree to the terms of this -EULA, do not install, use, or copy the Software. - -LICENSE GRANT - -This Agreement entitles you to install and use five (5) copies of the -Software. In addition, you may make archival copies of the Software -which may only be used for the reinstallation of the Software. This -Agreement does not permit the installation or use of more than 5 -copies of the Software, or the installation of the Software on more -than five computer at any given time, on a system that allows shared -used of applications by more than five users, or on any configuration -or system of computers that allows more than five users. A user may -only have one instance of this Agreement active at once. For example, -downloading the software multiple times, downloading multiple versions -of the software, and/or executing the software installer multiple -times do not grant any additional rights such as using the software on -more machines. - -The terms "computer" and "machine" in this license include any -computing device, including software computing instances such as -virtual machines and Docker containers. - -Copies of Npcap do not count toward the five copy, five computer, or -five user limitations imposed by this section if they are installed -and used solely in conjunction with any of the following software: - -o The Nmap Security Scanner, as distributed from https://nmap.org - -o The Wireshark network protocol analyzer, as distributed from - https://www.wireshark.org/ - -o Microsoft Defender for Identity, as distributed from - https://www.microsoft.com/en-us/microsoft-365/security/identity-defender - -Users wishing to redistribute Npcap or exceed the usage limits imposed -by this free license or benefit from commercial support and features -such as a silent installer should contact sales@nmap.com to obtain an -appropriate commercial license agreement. More details on our OEM -edition is also available from https://nmap.org/npcap/oem/. - -DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY - -This program is distributed in the hope that it will be useful, but -WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - -RESTRICTIONS ON TRANSFER - -Without first obtaining the express written consent of the Nmap -Project, you may not assign your rights and obligations under this -Agreement, or redistribute, encumber, sell, rent, lease, sublicense, -or otherwise transfer your rights to the Software Product. - -RESTRICTIONS ON USE - -You may not use, copy, or install the Software Product on more than -five computers, or permit the use, copying, or installation of the -Software Product by more than five users or on more than five -computers. - -RESTRICTIONS ON COPYING - -You may not copy any part of the Software except to the extent that -licensed use inherently demands the creation of a temporary copy -stored in computer memory and not permanently affixed on storage -medium. You may make archival copies as well. - -DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY - -UNLESS OTHERWISE EXPLICITLY AGREED TO IN WRITING BY THE NMAP PROJECT, -THE NMAP PROJECT MAKES NO OTHER WARRANTIES, EXPRESS OR IMPLIED, IN -FACT OR IN LAW, INCLUDING, BUT NOT LIMITED TO, ANY IMPLIED WARRANTIES -OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE OTHER THAN AS -SET FORTH IN THIS AGREEMENT OR IN THE LIMITED WARRANTY DOCUMENTS -PROVIDED WITH THE SOFTWARE. - -The Nmap Project makes no warranty that the Software will meet your -requirements or operate under your specific conditions of use. The -Nmap Project makes no warranty that operation of the Software Product -will be secure, error free, or free from interruption. YOU MUST -DETERMINE WHETHER THE SOFTWARE SUFFICIENTLY MEETS YOUR REQUIREMENTS -FOR SECURITY AND UNINTERRUPTABILITY. YOU BEAR SOLE RESPONSIBILITY AND -ALL LIABILITY FOR ANY LOSS INCURRED DUE TO FAILURE OF THE SOFTWARE TO -MEET YOUR REQUIREMENTS. THE NMAP PROJECT WILL NOT, UNDER ANY -CIRCUMSTANCES, BE RESPONSIBLE OR LIABLE FOR THE LOSS OF DATA ON ANY -COMPUTER OR INFORMATION STORAGE DEVICE. - -UNDER NO CIRCUMSTANCES SHALL THE NMAP PROJECT, ITS DIRECTORS, -OFFICERS, EMPLOYEES OR AGENTS BE LIABLE TO YOU OR ANY OTHER PARTY FOR -INDIRECT, CONSEQUENTIAL, SPECIAL, INCIDENTAL, PUNITIVE, OR EXEMPLARY -DAMAGES OF ANY KIND (INCLUDING LOST REVENUES OR PROFITS OR LOSS OF -BUSINESS) RESULTING FROM THIS AGREEMENT, OR FROM THE FURNISHING, -PERFORMANCE, INSTALLATION, OR USE OF THE SOFTWARE, WHETHER DUE TO A -BREACH OF CONTRACT, BREACH OF WARRANTY, OR THE NEGLIGENCE OF THE NMAP -PROJECT OR ANY OTHER PARTY, EVEN IF THE NMAP PROJECT IS ADVISED -BEFOREHAND OF THE POSSIBILITY OF SUCH DAMAGES. TO THE EXTENT THAT THE -APPLICABLE JURISDICTION LIMITS THE NMAP PROJECT'S ABILITY TO DISCLAIM -ANY IMPLIED WARRANTIES, THIS DISCLAIMER SHALL BE EFFECTIVE TO THE -MAXIMUM EXTENT PERMITTED. - -LIMITATIONS OF REMEDIES AND DAMAGES - -Your remedy for a breach of this Agreement or of any warranty included -in this Agreement is the correction or replacement of the Software or -a refund of the purchase price of the Software, exclusive of any costs -for shipping and handling. Selection of whether to correct or replace -or refund shall be solely at the discretion of the Nmap Project. The -Nmap Project reserves the right to substitute a functionally -equivalent copy of the Software Product as a replacement. - -Any claim must be made within the applicable warranty period. All -warranties cover only defects arising under normal use and do not -include malfunctions or failure resulting from misuse, abuse, neglect, -alteration, problems with electrical power, acts of nature, unusual -temperatures or humidity, improper installation, or damage determined -by the Nmap Project to have been caused by you. All limited warranties -on the Software Product are granted only to you and are -non-transferable. - -You agree to indemnify and hold the Nmap Project harmless from all -claims, judgments, liabilities, expenses, or costs arising from your -breach of this Agreement and/or acts or omissions. - -GOVERNING LAW, JURISDICTION AND COSTS - -This Agreement is governed by the laws the United States of America -and Washington State, without regard to Washington's conflict or -choice of law provisions. - -SEVERABILITY - -If any provision of this Agreement shall be held to be invalid or -unenforceable, the remainder of this Agreement shall remain in full -force and effect. To the extent any express or implied restrictions -are not permitted by applicable laws, these express or implied -restrictions shall remain in force and effect to the maximum extent -permitted by such applicable laws. - -THIRD PARTY SOFTWARE ATTRIBUTION - -Npcap uses several 3rd party open source software libraries: - -* The libpcap portable packet capturing library from https://tcpdump.org -* The Winpcap packet capturing library. It has been abandoned, but is - currently still available from https://www.winpcap.org/. -* The ieee80211_radiotap.h header file from David Young - -All of these are open source with BSD-style licenses that allow for -unlimited use and royalty-free redistribution within other software -(including commercial/proprietary software). Some include a warranty -disclaimer (relating to the original authors) and require a small -amount of acknowledgment text be added somewhere in the documentation -of any software which includes them (including indirect inclusion -through Npcap). - -The required acknowledgement text as well as full license text and -source details for these libraries is available from: -https://npcap.org/src/docs/Npcap-Third-Party-Open-Source.pdf . - -Since Insecure.Com LLC is not the author of this 3rd party code, we -can not waive or modify it’s software copyright or license. Npcap -users and redistributors must comply with the relevant Npcap license -(either the free/demo license or a commercial Npcap OEM license they -may have purchased) as well as the minimal requirements of this 3rd -party open source software. + +NPCAP COPYRIGHT / END USER LICENSE AGREEMENT + +Npcap (https://npcap.com) is a Windows packet sniffing driver and +library and is copyright (c) 2013-2023 by Nmap Software LLC ("The Nmap +Project"). All rights reserved. + +Even though Npcap source code is publicly available for review, it is +not open source software and may not be redistributed or used in other +software without special permission from the Nmap Project. The +standard (free) version is usually limited to installation on five +systems. We fund the Npcap project by selling two types of commercial +licenses to a special Npcap OEM edition: + +1) The Npcap OEM Redistribution License allows companies distribute +Npcap OEM within their products. Licensees generally use the Npcap OEM +silent installer, ensuring a seamless experience for end +users. Licensees may choose between a perpetual unlimited license or +a quarterly term license, along with options for commercial support and +updates. Prices and details: https://npcap.com/oem/redist.html + +2) The Npcap OEM Internal-Use License is for organizations that wish +to use Npcap OEM internally, without redistribution outside their +organization. This allows them to bypass the 5-system usage cap of the +Npcap free edition. It includes commercial support and update options, +and provides the extra Npcap OEM features such as the silent installer +for automated deployment. Prices and details: +https://npcap.com/oem/internal.html + +Both of these licenses include updates and support as well as a +warranty. Npcap OEM also includes a silent installer for unattended +installation. Further details about Npcap OEM are available from +https://npcap.com/oem/, and you are also welcome to contact us at +sales@nmap.com to ask any questions or set up a license for your +organization. + +Free and open source software producers are also welcome to contact us +for redistribution requests. However, we normally recommend that such +authors instead ask your users to download and install Npcap +themselves. It will be free for them if they need 5 or fewer copies. + +If the Nmap Project (directly or through one of our commercial +licensing customers) has granted you additional rights to Npcap or +Npcap OEM, those additional rights take precedence where they conflict +with the terms of this license agreement. + +Since the Npcap source code is available for download and review, +users sometimes contribute code patches to fix bugs or add new +features. By sending these changes to the Nmap Project (including +through direct email or our mailing lists or submitting pull requests +through our source code repository), it is understood unless you +specify otherwise that you are offering the Nmap Project the +unlimited, non-exclusive right to reuse, modify, and relicense your +code contribution so that we may (but are not obligated to) +incorporate it into Npcap. If you wish to specify special license +conditions or restrictions on your contributions, just say so when you +send them. + +This copy of Npcap (the "Software") and accompanying documentation is +licensed and not sold. This Software is protected by copyright laws +and treaties, as well as laws and treaties related to other forms of +intellectual property. The Nmap Project owns intellectual property +rights in the Software. The Licensee's ("you" or "your") license to +download, use, copy, or change the Software is subject to these rights +and to all the terms and conditions of this End User License Agreement +("Agreement"). + +ACCEPTANCE + +By accepting this agreement or by downloading, installing, using, or +copying the Software, or by clicking "I Agree", you agree to be bound +by the terms of this EULA. If you do not agree to the terms of this +EULA, do not install, use, or copy the Software. + +LICENSE GRANT + +This Agreement entitles you to install and use five (5) copies of the +Software. In addition, you may make archival copies of the Software +which may only be used for the reinstallation of the Software. This +Agreement does not permit the installation or use of more than 5 +copies of the Software, or the installation of the Software on more +than five computer at any given time, on a system that allows shared +used of applications by more than five users, or on any configuration +or system of computers that allows more than five users. A user may +only have one instance of this Agreement active at once. For example, +downloading the software multiple times, downloading multiple versions +of the software, and/or executing the software installer multiple +times do not grant any additional rights such as using the software on +more machines. + +The terms "computer" and "machine" in this license include any +computing device, including software computing instances such as +virtual machines and Docker containers. + +Copies of Npcap do not count toward the five copy, five computer, or +five user limitations imposed by this section if they are installed +and used solely in conjunction with any of the following software: + +o The Nmap Security Scanner, as distributed from https://nmap.org + +o The Wireshark network protocol analyzer, as distributed from + https://www.wireshark.org/ + +o Microsoft Defender for Identity, as distributed from + https://www.microsoft.com/en-us/microsoft-365/security/identity-defender + +Users wishing to redistribute Npcap or exceed the usage limits imposed +by this free license or benefit from commercial support and features +such as a silent installer should contact sales@nmap.com to obtain an +appropriate commercial license agreement. More details on our OEM +edition is also available from https://npcap.com/oem/. + +DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY + +This program is distributed in the hope that it will be useful, but +WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + +RESTRICTIONS ON TRANSFER + +Without first obtaining the express written consent of the Nmap +Project, you may not assign your rights and obligations under this +Agreement, or redistribute, encumber, sell, rent, lease, sublicense, +or otherwise transfer your rights to the Software Product. + +RESTRICTIONS ON USE + +You may not use, copy, or install the Software Product on more than +five computers, or permit the use, copying, or installation of the +Software Product by more than five users or on more than five +computers. + +RESTRICTIONS ON COPYING + +You may not copy any part of the Software except to the extent that +licensed use inherently demands the creation of a temporary copy +stored in computer memory and not permanently affixed on storage +medium. You may make archival copies as well. + +DISCLAIMER OF WARRANTIES AND LIMITATION OF LIABILITY + +UNLESS OTHERWISE EXPLICITLY AGREED TO IN WRITING BY THE NMAP PROJECT, +THE NMAP PROJECT MAKES NO OTHER WARRANTIES, EXPRESS OR IMPLIED, IN +FACT OR IN LAW, INCLUDING, BUT NOT LIMITED TO, ANY IMPLIED WARRANTIES +OF MERCHANTABILITY OR FITNESS FOR A PARTICULAR PURPOSE OTHER THAN AS +SET FORTH IN THIS AGREEMENT OR IN THE LIMITED WARRANTY DOCUMENTS +PROVIDED WITH THE SOFTWARE. + +The Nmap Project makes no warranty that the Software will meet your +requirements or operate under your specific conditions of use. The +Nmap Project makes no warranty that operation of the Software Product +will be secure, error free, or free from interruption. YOU MUST +DETERMINE WHETHER THE SOFTWARE SUFFICIENTLY MEETS YOUR REQUIREMENTS +FOR SECURITY AND UNINTERRUPTABILITY. YOU BEAR SOLE RESPONSIBILITY AND +ALL LIABILITY FOR ANY LOSS INCURRED DUE TO FAILURE OF THE SOFTWARE TO +MEET YOUR REQUIREMENTS. THE NMAP PROJECT WILL NOT, UNDER ANY +CIRCUMSTANCES, BE RESPONSIBLE OR LIABLE FOR THE LOSS OF DATA ON ANY +COMPUTER OR INFORMATION STORAGE DEVICE. + +UNDER NO CIRCUMSTANCES SHALL THE NMAP PROJECT, ITS DIRECTORS, +OFFICERS, EMPLOYEES OR AGENTS BE LIABLE TO YOU OR ANY OTHER PARTY FOR +INDIRECT, CONSEQUENTIAL, SPECIAL, INCIDENTAL, PUNITIVE, OR EXEMPLARY +DAMAGES OF ANY KIND (INCLUDING LOST REVENUES OR PROFITS OR LOSS OF +BUSINESS) RESULTING FROM THIS AGREEMENT, OR FROM THE FURNISHING, +PERFORMANCE, INSTALLATION, OR USE OF THE SOFTWARE, WHETHER DUE TO A +BREACH OF CONTRACT, BREACH OF WARRANTY, OR THE NEGLIGENCE OF THE NMAP +PROJECT OR ANY OTHER PARTY, EVEN IF THE NMAP PROJECT IS ADVISED +BEFOREHAND OF THE POSSIBILITY OF SUCH DAMAGES. TO THE EXTENT THAT THE +APPLICABLE JURISDICTION LIMITS THE NMAP PROJECT'S ABILITY TO DISCLAIM +ANY IMPLIED WARRANTIES, THIS DISCLAIMER SHALL BE EFFECTIVE TO THE +MAXIMUM EXTENT PERMITTED. + +LIMITATIONS OF REMEDIES AND DAMAGES + +Your remedy for a breach of this Agreement or of any warranty included +in this Agreement is the correction or replacement of the Software or +a refund of the purchase price of the Software, exclusive of any costs +for shipping and handling. Selection of whether to correct or replace +or refund shall be solely at the discretion of the Nmap Project. The +Nmap Project reserves the right to substitute a functionally +equivalent copy of the Software Product as a replacement. + +Any claim must be made within the applicable warranty period. All +warranties cover only defects arising under normal use and do not +include malfunctions or failure resulting from misuse, abuse, neglect, +alteration, problems with electrical power, acts of nature, unusual +temperatures or humidity, improper installation, or damage determined +by the Nmap Project to have been caused by you. All limited warranties +on the Software Product are granted only to you and are +non-transferable. + +You agree to indemnify and hold the Nmap Project harmless from all +claims, judgments, liabilities, expenses, or costs arising from your +breach of this Agreement and/or acts or omissions. + +GOVERNING LAW, JURISDICTION AND COSTS + +This Agreement is governed by the laws of the United States of America +and Deleware State, without regard to Delaware's conflict or choice of +law provisions. + +SEVERABILITY + +If any provision of this Agreement shall be held to be invalid or +unenforceable, the remainder of this Agreement shall remain in full +force and effect. To the extent any express or implied restrictions +are not permitted by applicable laws, these express or implied +restrictions shall remain in force and effect to the maximum extent +permitted by such applicable laws. + +THIRD PARTY SOFTWARE ATTRIBUTION + +Npcap uses several 3rd party open source software libraries: + +* The libpcap portable packet capturing library from https://tcpdump.org +* The Winpcap packet capturing library. It has been abandoned, but is + currently still available from https://www.winpcap.org/. +* The ieee80211_radiotap.h header file from David Young + +All of these are open source with BSD-style licenses that allow for +unlimited use and royalty-free redistribution within other software +(including commercial/proprietary software). Some include a warranty +disclaimer (relating to the original authors) and require a small +amount of acknowledgment text be added somewhere in the documentation +of any software which includes them (including indirect inclusion +through Npcap). + +The required acknowledgement text as well as full license text and +source details for these libraries is available from: +https://npcap.com/src/docs/Npcap-Third-Party-Open-Source.pdf . + +Since Nmap Software LLC is not the author of this 3rd party code, we +can not waive or modify its software copyright or license. Npcap +users and redistributors must comply with the relevant Npcap license +(either the free/demo license or a commercial Npcap OEM license they +may have purchased) as well as the minimal requirements of this 3rd +party open source software. diff --git a/x-pack/packetbeat/magefile.go b/x-pack/packetbeat/magefile.go index daee711156a..2d0d367a460 100644 --- a/x-pack/packetbeat/magefile.go +++ b/x-pack/packetbeat/magefile.go @@ -36,7 +36,7 @@ import ( // the packetbeat executable. It is used to specify which npcap builder crossbuild // image to use and the installer to obtain from the cloud store for testing. const ( - NpcapVersion = "1.71" + NpcapVersion = "1.76" installer = "npcap-" + NpcapVersion + "-oem.exe" ) diff --git a/x-pack/packetbeat/npcap/installer/LICENSE b/x-pack/packetbeat/npcap/installer/LICENSE index 0bf8aa4d9b6..61173d3226a 100644 --- a/x-pack/packetbeat/npcap/installer/LICENSE +++ b/x-pack/packetbeat/npcap/installer/LICENSE @@ -1,10 +1,10 @@ -------------------------------------------------------------------------------- Dependency : Npcap (https://nmap.org/npcap/) -Version: 1.71 +Version: 1.76 Licence type: Commercial -------------------------------------------------------------------------------- -Npcap is Copyright (c) 2013-2021 Insecure.Com LLC. All rights reserved. +Npcap is Copyright (c) 2013-2023 Insecure.Com LLC. All rights reserved. See https://npcap.org for details. Portions of Npcap are Copyright (c) 1999 - 2005 NetGroup, Politecnico di diff --git a/x-pack/packetbeat/packetbeat.reference.yml b/x-pack/packetbeat/packetbeat.reference.yml index fae2d02a15d..19db7ba8dd2 100644 --- a/x-pack/packetbeat/packetbeat.reference.yml +++ b/x-pack/packetbeat/packetbeat.reference.yml @@ -916,8 +916,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false diff --git a/x-pack/packetbeat/tests/system/app_test.go b/x-pack/packetbeat/tests/system/app_test.go index f6db2aa2977..b7566e7450b 100644 --- a/x-pack/packetbeat/tests/system/app_test.go +++ b/x-pack/packetbeat/tests/system/app_test.go @@ -29,7 +29,7 @@ import ( ) // Keep in sync with NpcapVersion in magefile.go. -const NpcapVersion = "1.71" +const NpcapVersion = "1.76" func TestWindowsNpcapInstaller(t *testing.T) { if runtime.GOOS != "windows" { diff --git a/x-pack/winlogbeat/winlogbeat.reference.yml b/x-pack/winlogbeat/winlogbeat.reference.yml index dce59ef1e76..988a43380af 100644 --- a/x-pack/winlogbeat/winlogbeat.reference.yml +++ b/x-pack/winlogbeat/winlogbeat.reference.yml @@ -334,8 +334,9 @@ output.elasticsearch: # IPv6 addresses should always be defined as: https://[2001:db8::1]:9200 hosts: ["localhost:9200"] - # Set gzip compression level. - #compression_level: 0 + # Set gzip compression level. Set to 0 to disable compression. + # The default is 1. + #compression_level: 1 # Configure escaping HTML symbols in strings. #escape_html: false