Skip to content

Commit

Permalink
add support for queue settings under outputs
Browse files Browse the repository at this point in the history
- add support for `idle_connection_timeout` for ES output
- add support for queue settings under output

Closes #35615
  • Loading branch information
leehinman committed Sep 27, 2023
1 parent d5f659d commit d219432
Show file tree
Hide file tree
Showing 25 changed files with 185 additions and 8 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,8 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Add support for AWS external IDs. {issue}36321[36321] {pull}36322[36322]
- [Enhanncement for host.ip and host.mac] Disabling netinfo.enabled option of add-host-metadata processor {pull}36506[36506]
Setting environmental variable ELASTIC_NETINFO:false in Elastic Agent pod will disable the netinfo.enabled option of add_host_metadata processor
- allow `queue` configuration settings to be set under the output. {issue}35615[35615] {pull}99999[99999]
- elasticsearch output now supports `idle_connection_timeout`. {issue}35615[35615] {pull}99999[99999]

*Auditbeat*

Expand Down
4 changes: 2 additions & 2 deletions NOTICE.txt
Original file line number Diff line number Diff line change
Expand Up @@ -12712,11 +12712,11 @@ SOFTWARE

--------------------------------------------------------------------------------
Dependency : github.com/elastic/elastic-agent-libs
Version: v0.3.15-0.20230913212237-dbdaf18c898b
Version: v0.4.0
Licence type (autodetected): Apache-2.0
--------------------------------------------------------------------------------

Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.3.15-0.20230913212237-dbdaf18c898b/LICENSE:
Contents of probable licence file $GOMODCACHE/github.com/elastic/elastic-agent-libs@v0.4.0/LICENSE:

Apache License
Version 2.0, January 2004
Expand Down
5 changes: 5 additions & 0 deletions auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -521,6 +521,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1617,6 +1617,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ require (
github.com/awslabs/kinesis-aggregation/go/v2 v2.0.0-20220623125934-28468a6701b5
github.com/elastic/bayeux v1.0.5
github.com/elastic/elastic-agent-autodiscover v0.6.2
github.com/elastic/elastic-agent-libs v0.3.15-0.20230913212237-dbdaf18c898b
github.com/elastic/elastic-agent-libs v0.4.0
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3
github.com/elastic/elastic-agent-system-metrics v0.6.1
github.com/elastic/go-elasticsearch/v8 v8.10.0
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -653,8 +653,8 @@ github.com/elastic/elastic-agent-autodiscover v0.6.2 h1:7P3cbMBWXjbzA80rxitQjc+P
github.com/elastic/elastic-agent-autodiscover v0.6.2/go.mod h1:yXYKFAG+Py+TcE4CCR8EAbJiYb+6Dz9sCDoWgOveqtU=
github.com/elastic/elastic-agent-client/v7 v7.4.0 h1:h75oTkkvIjgiKVm61NpvTZP4cy6QbQ3zrIpXKGigyjo=
github.com/elastic/elastic-agent-client/v7 v7.4.0/go.mod h1:9/amG2K2y2oqx39zURcc+hnqcX+nyJ1cZrLgzsgo5c0=
github.com/elastic/elastic-agent-libs v0.3.15-0.20230913212237-dbdaf18c898b h1:a2iuOokwld+D7VhyFymVtsPoqxZ8fkkOCOOjeYU9CDM=
github.com/elastic/elastic-agent-libs v0.3.15-0.20230913212237-dbdaf18c898b/go.mod h1:mpSfrigixx8x+uMxWKl4LtdlrKIhZbA4yT2eIeIazUQ=
github.com/elastic/elastic-agent-libs v0.4.0 h1:P0b+xcvYK+dEwldvRXObO1dj3rjjR5qEXAl6TwRCAy0=
github.com/elastic/elastic-agent-libs v0.4.0/go.mod h1:mpSfrigixx8x+uMxWKl4LtdlrKIhZbA4yT2eIeIazUQ=
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3 h1:sb+25XJn/JcC9/VL8HX4r4QXSUq4uTNzGS2kxOE7u1U=
github.com/elastic/elastic-agent-shipper-client v0.5.1-0.20230228231646-f04347b666f3/go.mod h1:rWarFM7qYxJKsi9WcV6ONcFjH/NA3niDNpTxO+8/GVI=
github.com/elastic/elastic-agent-system-metrics v0.6.1 h1:LCN1lvQTkdUuU/rKlpKyVMDU/G/I8/iZWCaW6K+mo4o=
Expand Down
5 changes: 5 additions & 0 deletions heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions libbeat/_meta/config/output-elasticsearch.reference.yml.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
18 changes: 18 additions & 0 deletions libbeat/cmd/instance/beat.go
Original file line number Diff line number Diff line change
Expand Up @@ -773,6 +773,10 @@ func (b *Beat) configure(settings Settings) error {
return fmt.Errorf("error unpacking config data: %w", err)
}

if err := mergeOutputQueueSettings(&b.Config); err != nil {
return fmt.Errorf("could not merge output queue settings: %w", err)
}

if err := features.UpdateFromConfig(b.RawConfig); err != nil {
return fmt.Errorf("could not parse features: %w", err)
}
Expand Down Expand Up @@ -1466,3 +1470,17 @@ func sanitizeIPs(ips []string) []string {
}
return validIPs
}

func mergeOutputQueueSettings(bc *beatConfig) error {
if bc.Output.IsSet() && bc.Output.Config().Enabled() {
pc := pipeline.Config{}
err := bc.Output.Config().Unpack(&pc)
if err != nil {
return fmt.Errorf("error unpacking output queue settings: %w", err)
}
if pc.Queue.IsSet() {
bc.Pipeline.Queue = pc.Queue
}
}
return nil
}
73 changes: 73 additions & 0 deletions libbeat/cmd/instance/beat_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@ import (
"github.com/elastic/beats/v7/libbeat/cfgfile"
"github.com/elastic/beats/v7/libbeat/common/reload"
"github.com/elastic/beats/v7/libbeat/outputs"
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/go-ucfg/yaml"

"github.com/gofrs/uuid"
"github.com/stretchr/testify/assert"
Expand Down Expand Up @@ -269,3 +271,74 @@ func (r *outputReloaderMock) Reload(
r.cfg = cfg
return nil
}

func TestMergeOutputQueueSettings(t *testing.T) {
tests := map[string]struct {
input []byte
memEvents int
}{
"blank": {input: []byte(""),
memEvents: 4096},
"defaults": {input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 4096},
"topLevelQueue": {input: []byte(`
name: mockbeat
queue:
mem:
events: 8096
output:
elasticsearch:
hosts:
- "localhost:9200"
`),
memEvents: 8096},
"outputLevelQueue": {input: []byte(`
name: mockbeat
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
memEvents: 8096},
"topAndOutputLevelQueue": {input: []byte(`
name: mockbeat
queue:
mem:
events: 2048
output:
elasticsearch:
hosts:
- "localhost:9200"
queue:
mem:
events: 8096
`),
memEvents: 8096},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
cfg, err := yaml.NewConfig(tc.input)
require.NoError(t, err)

config := beatConfig{}
err = cfg.Unpack(&config)
require.NoError(t, err)

err = mergeOutputQueueSettings(&config)
require.NoError(t, err)

ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())
require.NoError(t, err)
require.Equalf(t, tc.memEvents, ms.Events, "config was: %v", config.Pipeline.Queue.Config())
})
}
}
9 changes: 6 additions & 3 deletions libbeat/docs/queueconfig.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@ queue is responsible for buffering and combining events into batches that can
be consumed by the outputs. The outputs will use bulk operations to send a
batch of events in one transaction.

You can configure the type and behavior of the internal queue by setting
options in the `queue` section of the +{beatname_lc}.yml+ config file. Only one
queue type can be configured.
You can configure the type and behavior of the internal queue by
setting options in the `queue` section of the +{beatname_lc}.yml+
config file or by setting options in the `queue` section of the
output. Only one queue type can be configured. If both the top level
queue section and the output section are specified the output section
takes precedence.


This sample configuration sets the memory queue to buffer up to 4096 events:
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ func NewClient(
CompressionLevel: s.CompressionLevel,
EscapeHTML: s.EscapeHTML,
Transport: s.Transport,
IdleConnTimeout: s.IdleConnTimeout,
})
if err != nil {
return nil, err
Expand Down
4 changes: 4 additions & 0 deletions libbeat/outputs/elasticsearch/docs/elasticsearch.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -689,6 +689,10 @@ default is `1s`.
The maximum number of seconds to wait before attempting to connect to
Elasticsearch after a network error. The default is `60s`.

===== `idle_connection_timeout`

The maximum amount of time an idle connection will remain idle before closing itself. Zero means no limit. The format is a Go language duration (example 60s is 60 seconds). The default is 0.

===== `timeout`

The http request timeout in seconds for the Elasticsearch request. The default is 90.
Expand Down
1 change: 1 addition & 0 deletions libbeat/outputs/elasticsearch/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func makeES(
Observer: observer,
EscapeHTML: config.EscapeHTML,
Transport: config.Transport,
IdleConnTimeout: config.Transport.IdleConnTimeout,
},
Index: index,
Pipeline: pipeline,
Expand Down
5 changes: 5 additions & 0 deletions metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1356,6 +1356,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -403,6 +403,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/auditbeat/auditbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3987,6 +3987,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/functionbeat/functionbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -645,6 +645,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/heartbeat/heartbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -613,6 +613,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/metricbeat/metricbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -1917,6 +1917,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/osquerybeat/osquerybeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -364,6 +364,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/packetbeat/packetbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -987,6 +987,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down
5 changes: 5 additions & 0 deletions x-pack/winlogbeat/winlogbeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -405,6 +405,11 @@ output.elasticsearch:
# Elasticsearch after a network error. The default is 60s.
#backoff.max: 60s

# The maximum amount of time an idle connection will remain idle
# before closing itself. Zero means no limit. The format is a Go
# language duration (example 60s is 60 seconds). The default is 0.
#idle_connection_timeout: 60s

# Configure HTTP request timeout before failing a request to Elasticsearch.
#timeout: 90

Expand Down

0 comments on commit d219432

Please sign in to comment.