From b2d5c98db28a9a4ddbf613f79b11e854f5b53ff1 Mon Sep 17 00:00:00 2001 From: "Lee E. Hinman" Date: Thu, 28 Sep 2023 11:35:59 -0500 Subject: [PATCH] add config validation --- libbeat/cmd/instance/beat.go | 21 ++++++++++++++++++--- libbeat/cmd/instance/beat_test.go | 15 ++++++++++----- 2 files changed, 28 insertions(+), 8 deletions(-) diff --git a/libbeat/cmd/instance/beat.go b/libbeat/cmd/instance/beat.go index 5abde84c79a8..c6e78295b9d4 100644 --- a/libbeat/cmd/instance/beat.go +++ b/libbeat/cmd/instance/beat.go @@ -773,8 +773,8 @@ func (b *Beat) configure(settings Settings) error { return fmt.Errorf("error unpacking config data: %w", err) } - if err := mergeOutputQueueSettings(&b.Config); err != nil { - return fmt.Errorf("could not merge output queue settings: %w", err) + if err := promoteOutputQueueSettings(&b.Config); err != nil { + return fmt.Errorf("could not promote output queue settings: %w", err) } if err := features.UpdateFromConfig(b.RawConfig); err != nil { @@ -1471,7 +1471,7 @@ func sanitizeIPs(ips []string) []string { return validIPs } -func mergeOutputQueueSettings(bc *beatConfig) error { +func promoteOutputQueueSettings(bc *beatConfig) error { if bc.Output.IsSet() && bc.Output.Config().Enabled() { pc := pipeline.Config{} err := bc.Output.Config().Unpack(&pc) @@ -1479,8 +1479,23 @@ func mergeOutputQueueSettings(bc *beatConfig) error { return fmt.Errorf("error unpacking output queue settings: %w", err) } if pc.Queue.IsSet() { + logp.Info("global queue settings replaced with output queue settings") bc.Pipeline.Queue = pc.Queue } } return nil } + +func (bc *beatConfig) Validate() error { + if bc.Output.IsSet() && bc.Output.Config().Enabled() { + pc := pipeline.Config{} + err := bc.Output.Config().Unpack(&pc) + if err != nil { + return fmt.Errorf("error unpacking output queue settings: %w", err) + } + if bc.Pipeline.Queue.IsSet() && pc.Queue.IsSet() { + return fmt.Errorf("top level queue and output level queue settings defined, only one is allowed") + } + } + return nil +} diff --git a/libbeat/cmd/instance/beat_test.go b/libbeat/cmd/instance/beat_test.go index bd7bfba39ee2..4c8443dfb72b 100644 --- a/libbeat/cmd/instance/beat_test.go +++ b/libbeat/cmd/instance/beat_test.go @@ -272,10 +272,11 @@ func (r *outputReloaderMock) Reload( return nil } -func TestMergeOutputQueueSettings(t *testing.T) { +func TestPromoteOutputQueueSettings(t *testing.T) { tests := map[string]struct { - input []byte - memEvents int + input []byte + memEvents int + expectValidationError bool }{ "blank": {input: []byte(""), memEvents: 4096}, @@ -322,7 +323,7 @@ output: mem: events: 8096 `), - memEvents: 8096}, + expectValidationError: true}, } for name, tc := range tests { t.Run(name, func(t *testing.T) { @@ -331,9 +332,13 @@ output: config := beatConfig{} err = cfg.Unpack(&config) + if tc.expectValidationError { + require.Error(t, err) + return + } require.NoError(t, err) - err = mergeOutputQueueSettings(&config) + err = promoteOutputQueueSettings(&config) require.NoError(t, err) ms, err := memqueue.SettingsForUserConfig(config.Pipeline.Queue.Config())