From 1ef15b92559b6f40a42ba32f3ac861b47271fa9d Mon Sep 17 00:00:00 2001 From: Carson Ip Date: Mon, 21 Oct 2024 04:39:37 +0100 Subject: [PATCH] [exporter/elasticsearch] Enable gzip compression by default (#35865) #### Description Enable gzip compression by default, at hardcoded level BestSpeed. To disable compression, set `compression` to `none`. #### Link to tracking issue #### Testing #### Documentation --- ...lasticsearchexporter_compression-gzip.yaml | 27 ++++++ exporter/elasticsearchexporter/README.md | 2 +- exporter/elasticsearchexporter/bulkindexer.go | 7 ++ .../elasticsearchexporter/bulkindexer_test.go | 94 ++++++++++++++++--- exporter/elasticsearchexporter/config.go | 6 +- exporter/elasticsearchexporter/config_test.go | 30 +++++- exporter/elasticsearchexporter/esclient.go | 10 +- exporter/elasticsearchexporter/factory.go | 2 + exporter/elasticsearchexporter/go.mod | 2 +- .../testdata/config.yaml | 6 ++ exporter/elasticsearchexporter/utils_test.go | 7 +- 11 files changed, 169 insertions(+), 24 deletions(-) create mode 100644 .chloggen/elasticsearchexporter_compression-gzip.yaml diff --git a/.chloggen/elasticsearchexporter_compression-gzip.yaml b/.chloggen/elasticsearchexporter_compression-gzip.yaml new file mode 100644 index 000000000000..207d8f40322a --- /dev/null +++ b/.chloggen/elasticsearchexporter_compression-gzip.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: breaking + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: elasticsearchexporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enable gzip compression by default + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [35865] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: To disable compression, set config `compression` to `none`. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/elasticsearchexporter/README.md b/exporter/elasticsearchexporter/README.md index b620b81158e9..eadb1e309803 100644 --- a/exporter/elasticsearchexporter/README.md +++ b/exporter/elasticsearchexporter/README.md @@ -71,7 +71,7 @@ service: ### HTTP settings -The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp], except for `compression` (all requests are uncompressed). +The Elasticsearch exporter supports common [HTTP Configuration Settings][confighttp]. Gzip compression is enabled by default. To disable compression, set `compression` to `none`. As a consequence of supporting [confighttp], the Elasticsearch exporter also supports common [TLS Configuration Settings][configtls]. The Elasticsearch exporter sets `timeout` (HTTP request timeout) to 90s by default. diff --git a/exporter/elasticsearchexporter/bulkindexer.go b/exporter/elasticsearchexporter/bulkindexer.go index 21b48814914d..1700b03619d6 100644 --- a/exporter/elasticsearchexporter/bulkindexer.go +++ b/exporter/elasticsearchexporter/bulkindexer.go @@ -4,6 +4,7 @@ package elasticsearchexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/elasticsearchexporter" import ( + "compress/gzip" "context" "errors" "io" @@ -15,6 +16,7 @@ import ( "github.com/elastic/go-docappender/v2" "github.com/elastic/go-elasticsearch/v7" + "go.opentelemetry.io/collector/config/configcompression" "go.uber.org/zap" ) @@ -68,12 +70,17 @@ func bulkIndexerConfig(client *elasticsearch.Client, config *Config) docappender maxDocRetries = config.Retry.MaxRetries } } + var compressionLevel int + if config.Compression == configcompression.TypeGzip { + compressionLevel = gzip.BestSpeed + } return docappender.BulkIndexerConfig{ Client: client, MaxDocumentRetries: maxDocRetries, Pipeline: config.Pipeline, RetryOnDocumentStatus: config.Retry.RetryOnStatus, RequireDataStream: config.MappingMode() == MappingOTel, + CompressionLevel: compressionLevel, } } diff --git a/exporter/elasticsearchexporter/bulkindexer_test.go b/exporter/elasticsearchexporter/bulkindexer_test.go index 7a75c6f5a0f1..194890e69aa6 100644 --- a/exporter/elasticsearchexporter/bulkindexer_test.go +++ b/exporter/elasticsearchexporter/bulkindexer_test.go @@ -15,6 +15,7 @@ import ( "github.com/elastic/go-elasticsearch/v7" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/config/confighttp" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -62,13 +63,8 @@ func TestAsyncBulkIndexer_flushOnClose(t *testing.T) { }}) require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &cfg) - require.NoError(t, err) - session, err := bulkIndexer.StartSession(context.Background()) - require.NoError(t, err) + bulkIndexer := runBulkIndexerOnce(t, &cfg, client) - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) - assert.NoError(t, bulkIndexer.Close(context.Background())) assert.Equal(t, int64(1), bulkIndexer.stats.docsIndexed.Load()) } @@ -157,13 +153,7 @@ func TestAsyncBulkIndexer_requireDataStream(t *testing.T) { }}) require.NoError(t, err) - bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, &tt.config) - require.NoError(t, err) - session, err := bulkIndexer.StartSession(context.Background()) - require.NoError(t, err) - - assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) - assert.NoError(t, bulkIndexer.Close(context.Background())) + runBulkIndexerOnce(t, &tt.config, client) assert.Equal(t, tt.wantRequireDataStream, <-requireDataStreamCh) }) @@ -234,6 +224,8 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { bulkIndexer, err := newAsyncBulkIndexer(zap.New(core), client, &cfg) require.NoError(t, err) + defer bulkIndexer.Close(context.Background()) + session, err := bulkIndexer.StartSession(context.Background()) require.NoError(t, err) @@ -241,7 +233,6 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { // should flush time.Sleep(100 * time.Millisecond) assert.Equal(t, int64(0), bulkIndexer.stats.docsIndexed.Load()) - assert.NoError(t, bulkIndexer.Close(context.Background())) messages := observed.FilterMessage(tt.wantMessage) require.Equal(t, 1, messages.Len(), "message not found; observed.All()=%v", observed.All()) for _, wantField := range tt.wantFields { @@ -250,3 +241,78 @@ func TestAsyncBulkIndexer_flush_error(t *testing.T) { }) } } + +func TestAsyncBulkIndexer_logRoundTrip(t *testing.T) { + tests := []struct { + name string + config Config + }{ + { + name: "compression none", + config: Config{ + NumWorkers: 1, + ClientConfig: confighttp.ClientConfig{Compression: "none"}, + }, + }, + { + name: "compression gzip", + config: Config{ + NumWorkers: 1, + ClientConfig: confighttp.ClientConfig{Compression: "gzip"}, + }, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + loggerCore, logObserver := observer.New(zap.DebugLevel) + + esLogger := clientLogger{ + Logger: zap.New(loggerCore), + logRequestBody: true, + logResponseBody: true, + } + + client, err := elasticsearch.NewClient(elasticsearch.Config{ + Transport: &mockTransport{ + RoundTripFunc: func(*http.Request) (*http.Response, error) { + return &http.Response{ + Header: http.Header{"X-Elastic-Product": []string{"Elasticsearch"}}, + Body: io.NopCloser(strings.NewReader(successResp)), + }, nil + }, + }, + Logger: &esLogger, + }) + require.NoError(t, err) + + runBulkIndexerOnce(t, &tt.config, client) + + records := logObserver.AllUntimed() + assert.Len(t, records, 2) + + assert.Equal(t, "/", records[0].ContextMap()["path"]) + assert.Nil(t, records[0].ContextMap()["request_body"]) + assert.JSONEq(t, successResp, records[0].ContextMap()["response_body"].(string)) + + assert.Equal(t, "/_bulk", records[1].ContextMap()["path"]) + assert.Equal(t, "{\"create\":{\"_index\":\"foo\"}}\n{\"foo\": \"bar\"}\n", records[1].ContextMap()["request_body"]) + assert.JSONEq(t, successResp, records[1].ContextMap()["response_body"].(string)) + }) + } +} + +func runBulkIndexerOnce(t *testing.T, config *Config, client *elasticsearch.Client) *asyncBulkIndexer { + bulkIndexer, err := newAsyncBulkIndexer(zap.NewNop(), client, config) + require.NoError(t, err) + session, err := bulkIndexer.StartSession(context.Background()) + require.NoError(t, err) + + assert.NoError(t, session.Add(context.Background(), "foo", strings.NewReader(`{"foo": "bar"}`), nil)) + assert.NoError(t, bulkIndexer.Close(context.Background())) + + return bulkIndexer +} diff --git a/exporter/elasticsearchexporter/config.go b/exporter/elasticsearchexporter/config.go index fe794d6db430..0835396d928f 100644 --- a/exporter/elasticsearchexporter/config.go +++ b/exporter/elasticsearchexporter/config.go @@ -12,6 +12,7 @@ import ( "strings" "time" + "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/config/configopaque" "go.opentelemetry.io/collector/exporter/exporterbatcher" @@ -273,9 +274,8 @@ func (cfg *Config) Validate() error { return fmt.Errorf("unknown mapping mode %q", cfg.Mapping.Mode) } - if cfg.Compression != "" { - // TODO support confighttp.ClientConfig.Compression - return errors.New("compression is not currently configurable") + if cfg.Compression != "none" && cfg.Compression != configcompression.TypeGzip { + return errors.New("compression must be one of [none, gzip]") } if cfg.Retry.MaxRequests != 0 && cfg.Retry.MaxRetries != 0 { diff --git a/exporter/elasticsearchexporter/config_test.go b/exporter/elasticsearchexporter/config_test.go index 9934dbb7365b..baec2bd9646a 100644 --- a/exporter/elasticsearchexporter/config_test.go +++ b/exporter/elasticsearchexporter/config_test.go @@ -6,6 +6,7 @@ package elasticsearchexporter import ( "net/http" "path/filepath" + "strings" "testing" "time" @@ -38,6 +39,7 @@ func TestConfig(t *testing.T) { defaultMaxIdleConns := 100 defaultIdleConnTimeout := 90 * time.Second + defaultCompression := configcompression.TypeGzip tests := []struct { configFile string @@ -80,6 +82,7 @@ func TestConfig(t *testing.T) { cfg.Headers = map[string]configopaque.String{ "myheader": "test", } + cfg.Compression = defaultCompression }), Authentication: AuthenticationSettings{ User: "elastic", @@ -150,6 +153,7 @@ func TestConfig(t *testing.T) { cfg.Headers = map[string]configopaque.String{ "myheader": "test", } + cfg.Compression = defaultCompression }), Authentication: AuthenticationSettings{ User: "elastic", @@ -220,6 +224,7 @@ func TestConfig(t *testing.T) { cfg.Headers = map[string]configopaque.String{ "myheader": "test", } + cfg.Compression = defaultCompression }), Authentication: AuthenticationSettings{ User: "elastic", @@ -301,10 +306,29 @@ func TestConfig(t *testing.T) { cfg.Batcher.Enabled = &enabled }), }, + { + id: component.NewIDWithName(metadata.Type, "compression_none"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.Compression = "none" + }), + }, + { + id: component.NewIDWithName(metadata.Type, "compression_gzip"), + configFile: "config.yaml", + expected: withDefaultConfig(func(cfg *Config) { + cfg.Endpoint = "https://elastic.example.com:9200" + + cfg.Compression = "gzip" + }), + }, } for _, tt := range tests { - t.Run(tt.id.String(), func(t *testing.T) { + tt := tt + t.Run(strings.ReplaceAll(tt.id.String(), "/", "_"), func(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig() @@ -387,9 +411,9 @@ func TestConfig_Validate(t *testing.T) { "compression unsupported": { config: withDefaultConfig(func(cfg *Config) { cfg.Endpoints = []string{"http://test:9200"} - cfg.Compression = configcompression.TypeGzip + cfg.Compression = configcompression.TypeSnappy }), - err: `compression is not currently configurable`, + err: `compression must be one of [none, gzip]`, }, "both max_retries and max_requests specified": { config: withDefaultConfig(func(cfg *Config) { diff --git a/exporter/elasticsearchexporter/esclient.go b/exporter/elasticsearchexporter/esclient.go index 556718242bbf..927f62844ec3 100644 --- a/exporter/elasticsearchexporter/esclient.go +++ b/exporter/elasticsearchexporter/esclient.go @@ -11,6 +11,7 @@ import ( "github.com/cenkalti/backoff/v4" "github.com/elastic/go-elasticsearch/v7" + "github.com/klauspost/compress/gzip" "go.opentelemetry.io/collector/component" "go.uber.org/zap" @@ -32,7 +33,14 @@ func (cl *clientLogger) LogRoundTrip(requ *http.Request, resp *http.Response, cl var fields []zap.Field if cl.logRequestBody && requ != nil && requ.Body != nil { - if b, err := io.ReadAll(requ.Body); err == nil { + body := requ.Body + if requ.Header.Get("Content-Encoding") == "gzip" { + if r, err := gzip.NewReader(body); err == nil { + defer r.Close() + body = r + } + } + if b, err := io.ReadAll(body); err == nil { fields = append(fields, zap.ByteString("request_body", b)) } } diff --git a/exporter/elasticsearchexporter/factory.go b/exporter/elasticsearchexporter/factory.go index 61af38d5cee6..31f9a17895f7 100644 --- a/exporter/elasticsearchexporter/factory.go +++ b/exporter/elasticsearchexporter/factory.go @@ -11,6 +11,7 @@ import ( "time" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/config/configcompression" "go.opentelemetry.io/collector/config/confighttp" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/exporter" @@ -44,6 +45,7 @@ func createDefaultConfig() component.Config { httpClientConfig := confighttp.NewDefaultClientConfig() httpClientConfig.Timeout = 90 * time.Second + httpClientConfig.Compression = configcompression.TypeGzip return &Config{ QueueSettings: qs, diff --git a/exporter/elasticsearchexporter/go.mod b/exporter/elasticsearchexporter/go.mod index c9a675710622..470e56ec3e4d 100644 --- a/exporter/elasticsearchexporter/go.mod +++ b/exporter/elasticsearchexporter/go.mod @@ -8,6 +8,7 @@ require ( github.com/elastic/go-elasticsearch/v7 v7.17.10 github.com/elastic/go-structform v0.0.12 github.com/json-iterator/go v1.1.12 + github.com/klauspost/compress v1.17.10 github.com/lestrrat-go/strftime v1.1.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.111.0 github.com/open-telemetry/opentelemetry-collector-contrib/internal/coreinternal v0.111.0 @@ -44,7 +45,6 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/google/uuid v1.6.0 // indirect github.com/joeshaw/multierror v0.0.0-20140124173710-69b34d4ec901 // indirect - github.com/klauspost/compress v1.17.10 // indirect github.com/knadh/koanf/maps v0.1.1 // indirect github.com/knadh/koanf/providers/confmap v0.1.0 // indirect github.com/knadh/koanf/v2 v2.1.1 // indirect diff --git a/exporter/elasticsearchexporter/testdata/config.yaml b/exporter/elasticsearchexporter/testdata/config.yaml index d76d300a51c1..e3f7ffc67fa9 100644 --- a/exporter/elasticsearchexporter/testdata/config.yaml +++ b/exporter/elasticsearchexporter/testdata/config.yaml @@ -86,3 +86,9 @@ elasticsearch/batcher_disabled: endpoint: https://elastic.example.com:9200 batcher: enabled: false +elasticsearch/compression_none: + endpoint: https://elastic.example.com:9200 + compression: none +elasticsearch/compression_gzip: + endpoint: https://elastic.example.com:9200 + compression: gzip diff --git a/exporter/elasticsearchexporter/utils_test.go b/exporter/elasticsearchexporter/utils_test.go index 05e9d8576a57..94c475219ffb 100644 --- a/exporter/elasticsearchexporter/utils_test.go +++ b/exporter/elasticsearchexporter/utils_test.go @@ -16,6 +16,7 @@ import ( "testing" "time" + "github.com/klauspost/compress/gzip" "github.com/stretchr/testify/assert" "go.opentelemetry.io/collector/pdata/pcommon" "go.opentelemetry.io/collector/pdata/plog" @@ -160,7 +161,11 @@ func newESTestServer(t *testing.T, bulkHandler bulkHandler) *httptest.Server { tsStart := time.Now() var items []itemRequest - dec := json.NewDecoder(req.Body) + body := req.Body + if req.Header.Get("Content-Encoding") == "gzip" { + body, _ = gzip.NewReader(req.Body) + } + dec := json.NewDecoder(body) for dec.More() { var action, doc json.RawMessage if err := dec.Decode(&action); err != nil {