diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index a0d830bf0b0f..bae4c18fa93a 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -1486,6 +1486,11 @@ func sanitizeIPs(ips []string) []string { return validIPs } +// promoteOutputQueueSettings checks to see if the output +// configuration has queue settings defined and if so it promotes them +// to the top level queue settings. This is done to allow existing +// behavior of specifying queue settings at the top level or like +// elastic-agent that specifies queue settings under the output func promoteOutputQueueSettings(bc *beatConfig) error { if bc.Output.IsSet() && bc.Output.Config().Enabled() { pc := pipeline.Config{} diff --git a/libbeat/outputs/util.go b/libbeat/outputs/util.go index caf9280af151..ce8765b5c2e9 100644 --- a/libbeat/outputs/util.go +++ b/libbeat/outputs/util.go @@ -31,8 +31,10 @@ import ( // loading an output must return an error. func Fail(err error) (Group, error) { return Group{}, err } -// Success create a valid output Group response for a set of client instances. -// The first argument is expected to contain a queue config.Namespace. +// Success create a valid output Group response for a set of client +// instances. The first argument is expected to contain a queue +// config.Namespace. The queue config is passed to assign the queue +// factory when elastic-agent reloads the output. func Success(cfg config.Namespace, batchSize, retry int, clients ...Client) (Group, error) { var q queue.QueueFactory if cfg.IsSet() && cfg.Config().Enabled() { @@ -75,6 +77,8 @@ func NetworkClients(netclients []NetworkClient) []Client { // SuccessNet create a valid output Group and creates client instances // The first argument is expected to contain a queue config.Namespace. +// The queue config is passed to assign the queue factory when +// elastic-agent reloads the output. func SuccessNet(cfg config.Namespace, loadbalance bool, batchSize, retry int, netclients []NetworkClient) (Group, error) { if !loadbalance { diff --git a/libbeat/publisher/pipeline/controller.go b/libbeat/publisher/pipeline/controller.go index 21fb22f45ceb..1c480c01bce2 100644 --- a/libbeat/publisher/pipeline/controller.go +++ b/libbeat/publisher/pipeline/controller.go @@ -60,8 +60,14 @@ type outputController struct { workerChan chan publisher.Batch - consumer *eventConsumer - workers []outputWorker + consumer *eventConsumer + workers []outputWorker + // The InputQueueSize can be set when the Beat is started, in + // libbeat/cmd/instance/Settings we need to preserve that + // value and pass it into the queue factory. The queue + // factory could be made from elastic-agent output + // configuration reloading which doesn't have access to this + // setting. inputQueueSize int }