-
Notifications
You must be signed in to change notification settings - Fork 14
/
Copy pathconfig.go
355 lines (313 loc) · 13 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
// Licensed to Elasticsearch B.V. under one or more contributor
// license agreements. See the NOTICE file distributed with
// this work for additional information regarding copyright
// ownership. Elasticsearch B.V. licenses this file to you under
// the Apache License, Version 2.0 (the "License"); you may
// not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package docappender
import (
"errors"
"fmt"
"time"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"
"github.com/elastic/elastic-transport-go/v8/elastictransport"
)
type Value int
const (
Unset Value = iota
True
False
)
// Config holds configuration for Appender.
type Config struct {
// Logger holds an optional Logger to use for logging indexing requests.
//
// All Elasticsearch errors will be logged at error level, so in cases
// where the indexer is used for high throughput indexing, is recommended
// that a rate-limited logger is used.
//
// If Logger is nil, logging will be disabled.
Logger *zap.Logger
// TracerProvider holds an optional otel TracerProvider for tracing
// flush requests.
//
// If TracerProvider is nil, requests will not be traced.
// To use this provider Tracer must be nil.
TracerProvider trace.TracerProvider
// CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression)
// to 9 (gzip.BestCompression). Higher values provide greater compression, at a
// greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the
// default compression level.
CompressionLevel int
// MaxRequests holds the maximum number of bulk index requests to execute concurrently.
// The maximum memory usage of Appender is thus approximately MaxRequests*FlushBytes.
//
// If MaxRequests is less than or equal to zero, the default of 10 will be used.
MaxRequests int
// BulkIndexerPool holds an optional pool that is used for creating new BulkIndexers.
// If not set/nil, a new BulkIndexerPool will be created with MaxRequests as the
// guaranteed, local and total maximum number of indexers.
//
// A BulkIndexerPool may be shared between multiple Appender instances. Each has its
// own unique ID to guarantee per Appender limits.
//
// For more information, see [NewBulkIndexerPool].
BulkIndexerPool *BulkIndexerPool
// MaxDocumentRetries holds the maximum number of document retries
MaxDocumentRetries int
// RetryOnDocumentStatus holds the document level statuses that will trigger a document retry.
//
// If RetryOnDocumentStatus is empty or nil, the default of [429] will be used.
RetryOnDocumentStatus []int
// FlushBytes holds the flush threshold in bytes. If Compression is enabled,
// The number of documents that can be buffered will be greater.
//
// If FlushBytes is zero, the default of 1MB will be used.
FlushBytes int
// FlushInterval holds the flush threshold as a duration.
//
// If FlushInterval is zero, the default of 30 seconds will be used.
FlushInterval time.Duration
// FlushTimeout holds the flush timeout as a duration.
//
// If FlushTimeout is zero, no timeout will be used.
FlushTimeout time.Duration
// DocumentBufferSize sets the number of documents that can be buffered before
// they are stored in the active indexer buffer.
//
// If DocumentBufferSize is zero, the default 1024 will be used.
DocumentBufferSize int
// Pipeline holds the ingest pipeline ID.
//
// If Pipeline is empty, no ingest pipeline will be specified in the Bulk request.
Pipeline string
// RequireDataStream, If set to true, an index will be created only if a
// matching index template is found and it contains a data stream template.
// When true, `require_data_stream=true` is set in the bulk request.
// When false or not set, `require_data_stream` is not set in the bulk request.
// Which could cause a classic index to be created if no data stream template
// matches the index in the request.
//
// RequireDataStream is disabled by default.
RequireDataStream bool
// IncludeSourceOnError, if set to True, the response body of a Bulk Index request
// might contain the part of source document on error.
// If Unset the error reason will be dropped.
// Requires Elasticsearch 8.18+ if value is True or False.
// WARNING: if set to True, user is responsible for sanitizing the error as it may contain
// sensitive data.
//
// IncludeSourceOnError is Unset by default
IncludeSourceOnError Value
// PopulateFailedDocsInput controls whether each BulkIndexerResponseItem.Input
// in BulkIndexerResponseStat.FailedDocs is populated with the input of the item,
// which includes the action line and the document line.
//
// WARNING: this is provided for testing and debugging only.
// Use with caution as it may expose sensitive data; any clients
// of go-docappender enabling this should relay this warning to
// their users. Setting this will also add memory overhead.
PopulateFailedDocsInput bool
// Scaling configuration for the docappender.
//
// If unspecified, scaling is enabled by default.
Scaling ScalingConfig
// MeterProvider holds the OTel MeterProvider to be used to create and
// record appender metrics.
//
// If unset, the global OTel MeterProvider will be used, if that is unset,
// no metrics will be recorded.
MeterProvider metric.MeterProvider
// MetricAttributes holds any extra attributes to set in the recorded
// metrics.
MetricAttributes attribute.Set
}
// DefaultConfig returns a copy of cfg with any zero values set to their
// default values.
func DefaultConfig(cl elastictransport.Interface, cfg Config) Config {
if cfg.MaxRequests <= 0 {
cfg.MaxRequests = 10
}
if cfg.FlushBytes <= 0 {
cfg.FlushBytes = 1 * 1024 * 1024
}
if cfg.FlushInterval <= 0 {
cfg.FlushInterval = 30 * time.Second
}
if cfg.DocumentBufferSize <= 0 {
cfg.DocumentBufferSize = 1024
}
if !cfg.Scaling.Disabled {
if cfg.Scaling.ScaleDown.Threshold == 0 {
cfg.Scaling.ScaleDown.Threshold = 30
}
if cfg.Scaling.ScaleDown.CoolDown <= 0 {
cfg.Scaling.ScaleDown.CoolDown = 30 * time.Second
}
if cfg.Scaling.ScaleUp.Threshold == 0 {
cfg.Scaling.ScaleUp.Threshold = 60
}
if cfg.Scaling.ScaleUp.CoolDown <= 0 {
cfg.Scaling.ScaleUp.CoolDown = time.Minute
}
if cfg.Scaling.IdleInterval <= 0 {
cfg.Scaling.IdleInterval = 30 * time.Second
}
if cfg.Scaling.ActiveRatio <= 0 {
cfg.Scaling.ActiveRatio = 0.25
}
}
if cfg.Logger == nil {
cfg.Logger = zap.NewNop()
}
if cfg.BulkIndexerPool == nil {
cfg.BulkIndexerPool = NewBulkIndexerPool(
cfg.MaxRequests, cfg.MaxRequests, cfg.MaxRequests,
BulkIndexerConfigFrom(cl, cfg),
)
}
return cfg
}
// ScalingConfig holds the docappender autoscaling configuration.
type ScalingConfig struct {
// Disabled toggles active indexer scaling on.
//
// It is enabled by default.
Disabled bool
// ActiveRatio defines the threshold for (potential) active indexers to
// GOMAXPROCS. The higher the number, the more potential active indexers
// there will be actively pulling from the BulkIndexerItem channel.
// For example, when ActiveRatio:1 and GOMAXPROCS:2, there can be a max
// of 2 active indexers, or 1 per GOMAXPROCS.
// If set to 0.5, the maximum number of active indexers is 1, since.
// The value must be between 0 and 1.
//
// It defaults to 0.25 by default.
ActiveRatio float64
// ScaleDown configures the Threshold and CoolDown for the scale down
// action. In order to scale down an active indexer, the Threshold has
// to be met after the CoolDown has elapsed. Scale down will only take
// place if there are more than 1 active indexer.
// Active indexers will be destroyed when they aren't needed anymore,
// when enough timed flushes (FlushInterval) are performed by an active
// indexer (controlled by Threshold), or when an active indexer is idle
// for (IdleInterval * Threshold) as long as CoolDown allows it.
//
// When unset, the default of 30 is used for Threshold, and 30 seconds for
// CoolDown.
ScaleDown ScaleActionConfig
// ScaleUp configures the Threshold and CoolDown for the scale up action.
//
// In order for a scale up to occur, the Threshold has to be met after
// the CoolDown has elapsed. By default, a single active indexer is created
// which actively pulls items from the internal buffered queue. When enough
// full flushes (FlushBytes) are performed by an active indexer (controlled
// by Threshold), a new active indexer will be created until GOMAXPROCS / 4
// is reached (25% of CPU capacity) if the CoolDown allows it.
//
// When unspecified, the default of 60 is used for Threshold, and 60 seconds
// for CoolDown.
ScaleUp ScaleActionConfig
// IdleInterval defines how long an active indexer performs an inactivity
// check. The ScaleDown.Threshold and ScaleDown.CoolDown needs to be met
// for an active indexer to be destroyed.
//
// When unspecified, the default of 30 seconds will be used.
IdleInterval time.Duration
}
// ScaleActionConfig holds the configuration for a scaling action
type ScaleActionConfig struct {
// Threshold is the number of consecutive times a scale up/down condition
// has to happen for the scaling action will be triggered.
Threshold uint
// CoolDown is the amount of time needed to elapse between scaling actions
// to trigger it.
CoolDown time.Duration
}
// BulkIndexerConfig holds configuration for BulkIndexer.
type BulkIndexerConfig struct {
// Client holds the Elasticsearch client.
Client elastictransport.Interface
// MaxDocumentRetries holds the maximum number of document retries
MaxDocumentRetries int
// RetryOnDocumentStatus holds the document level statuses that will trigger a document retry.
//
// If RetryOnDocumentStatus is empty or nil, the default of [429] will be used.
RetryOnDocumentStatus []int
// CompressionLevel holds the gzip compression level, from 0 (gzip.NoCompression)
// to 9 (gzip.BestCompression). Higher values provide greater compression, at a
// greater cost of CPU. The special value -1 (gzip.DefaultCompression) selects the
// default compression level.
CompressionLevel int
// Pipeline holds the ingest pipeline ID.
//
// If Pipeline is empty, no ingest pipeline will be specified in the Bulk request.
Pipeline string
// RequireDataStream, If set to true, an index will be created only if a
// matching index template is found and it contains a data stream template.
// When true, `require_data_stream=true` is set in the bulk request.
// When false or not set, `require_data_stream` is not set in the bulk request.
// Which could cause a classic index to be created if no data stream template
// matches the index in the request.
//
// RequireDataStream is disabled by default.
RequireDataStream bool
// IncludeSourceOnError, if set to True, the response body of a Bulk Index request
// might contain the part of source document on error.
// If Unset the error reason will be dropped.
// Requires Elasticsearch 8.18+ if value is True or False.
// WARNING: if set to True, user is responsible for sanitizing the error as it may contain
// sensitive data.
//
// IncludeSourceOnError is Unset by default
IncludeSourceOnError Value
// PopulateFailedDocsInput controls whether each BulkIndexerResponseItem.Input
// in BulkIndexerResponseStat.FailedDocs is populated with the input of the item,
// which includes the action line and the document line.
//
// WARNING: this is provided for testing and debugging only.
// Use with caution as it may expose sensitive data; any clients
// of go-docappender enabling this should relay this warning to
// their users. Setting this will also add memory overhead.
PopulateFailedDocsInput bool
}
// Validate checks the configuration for errors.
func (cfg BulkIndexerConfig) Validate() error {
var errs []error
if cfg.Client == nil {
errs = append(errs, fmt.Errorf("bulk_indexer: client is required"))
}
if cfg.CompressionLevel < -1 || cfg.CompressionLevel > 9 {
errs = append(errs, fmt.Errorf("expected CompressionLevel in range [-1,9], got %d",
cfg.CompressionLevel,
))
}
return errors.Join(errs...)
}
// BulkIndexerConfigFrom creates a BulkIndexerConfig from the provided Config,
// with additional information included as necessary.
func BulkIndexerConfigFrom(cl elastictransport.Interface, cfg Config) BulkIndexerConfig {
return BulkIndexerConfig{
Client: cl,
MaxDocumentRetries: cfg.MaxDocumentRetries,
RetryOnDocumentStatus: cfg.RetryOnDocumentStatus,
CompressionLevel: cfg.CompressionLevel,
Pipeline: cfg.Pipeline,
RequireDataStream: cfg.RequireDataStream,
IncludeSourceOnError: cfg.IncludeSourceOnError,
PopulateFailedDocsInput: cfg.PopulateFailedDocsInput,
}
}