diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 002e0d2243cd..72340b4c16e5 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -106,6 +106,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Added a fix for Crowdstrike pipeline handling process arrays {pull}36496[36496] - Ensure winlog input retains metric collection when handling recoverable errors. {issue}36479[36479] {pull}36483[36483] - Revert error introduced in {pull}35734[35734] when symlinks can't be resolved in filestream. {pull}36557[36557] +- Fix ignoring external input configuration in `take_over: true` mode {issue}36378[36378] {pull}36395[36395] *Heartbeat* diff --git a/filebeat/beater/filebeat.go b/filebeat/beater/filebeat.go index 2fddfd3bb4fb..78084c0fc294 100644 --- a/filebeat/beater/filebeat.go +++ b/filebeat/beater/filebeat.go @@ -502,9 +502,17 @@ func newPipelineLoaderFactory(esConfig *conf.C) fileset.PipelineLoaderFactory { // some of the filestreams might want to take over the loginput state // if their `take_over` flag is set to `true`. func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { + inputs, err := fetchInputConfiguration(config) + if err != nil { + return fmt.Errorf("Failed to fetch input configuration when attempting take over: %w", err) + } + if len(inputs) == 0 { + return nil + } + store, err := stateStore.Access() if err != nil { - return fmt.Errorf("Failed to access state for attempting take over: %w", err) + return fmt.Errorf("Failed to access state when attempting take over: %w", err) } defer store.Close() logger := logp.NewLogger("filestream-takeover") @@ -514,5 +522,49 @@ func processLogInputTakeOver(stateStore StateStore, config *cfg.Config) error { backuper := backup.NewRegistryBackuper(logger, registryHome) - return takeover.TakeOverLogInputStates(logger, store, backuper, config) + return takeover.TakeOverLogInputStates(logger, store, backuper, inputs) +} + +// fetches all the defined input configuration available at Filebeat startup including external files. +func fetchInputConfiguration(config *cfg.Config) (inputs []*conf.C, err error) { + if len(config.Inputs) == 0 { + inputs = []*conf.C{} + } else { + inputs = config.Inputs + } + + // reading external input configuration if defined + var dynamicInputCfg cfgfile.DynamicConfig + if config.ConfigInput != nil { + err = config.ConfigInput.Unpack(&dynamicInputCfg) + if err != nil { + return nil, fmt.Errorf("failed to unpack the dynamic input configuration: %w", err) + } + } + if dynamicInputCfg.Path == "" { + return inputs, nil + } + + cfgPaths, err := filepath.Glob(dynamicInputCfg.Path) + if err != nil { + return nil, fmt.Errorf("failed to resolve external input configuration paths: %w", err) + } + + if len(cfgPaths) == 0 { + return inputs, nil + } + + // making a copy so we can safely extend the slice + inputs = make([]*conf.C, len(config.Inputs)) + copy(inputs, config.Inputs) + + for _, p := range cfgPaths { + externalInputs, err := cfgfile.LoadList(p) + if err != nil { + return nil, fmt.Errorf("failed to load external input configuration: %w", err) + } + inputs = append(inputs, externalInputs...) + } + + return inputs, nil } diff --git a/filebeat/beater/filebeat_test.go b/filebeat/beater/filebeat_test.go new file mode 100644 index 000000000000..66a6620bfca5 --- /dev/null +++ b/filebeat/beater/filebeat_test.go @@ -0,0 +1,163 @@ +// Licensed to Elasticsearch B.V. under one or more contributor +// license agreements. See the NOTICE file distributed with +// this work for additional information regarding copyright +// ownership. Elasticsearch B.V. licenses this file to you under +// the Apache License, Version 2.0 (the "License"); you may +// not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package beater + +import ( + "os" + "path/filepath" + "testing" + + "github.com/elastic/beats/v7/filebeat/config" + conf "github.com/elastic/elastic-agent-libs/config" + + "github.com/stretchr/testify/require" +) + +type inputEntry struct { + ID string `config:"id"` +} + +func TestFetchInputConfiguration(t *testing.T) { + dir := t.TempDir() + err := os.WriteFile(filepath.Join(dir, "config1.yml"), []byte(` +- type: filestream + id: external-1 + paths: + - "/some" +- type: filestream + id: external-2 + paths: + - "/another" +`), 0777) + require.NoError(t, err) + err = os.WriteFile(filepath.Join(dir, "config2.yml.disabled"), []byte(` +- type: filestream + id: disabled + paths: + - "/some" +`), 0777) + require.NoError(t, err) + + cases := []struct { + name string + configFile string + expected []inputEntry + }{ + { + name: "loads mixed configuration", + configFile: ` +filebeat.config.inputs: + enabled: true + path: ` + dir + `/*.yml +filebeat.inputs: + - type: filestream + id: internal + paths: + - "/another" +output.console: + enabled: true +`, + expected: []inputEntry{ + { + ID: "internal", + }, + { + ID: "external-1", + }, + { + ID: "external-2", + }, + }, + }, + { + name: "loads only internal configuration", + configFile: ` +filebeat.inputs: + - type: filestream + id: internal + paths: + - "/another" +output.console: + enabled: true +`, + expected: []inputEntry{ + { + ID: "internal", + }, + }, + }, + { + name: "loads only external configuration", + configFile: ` +filebeat.config.inputs: + enabled: true + path: ` + dir + `/*.yml +output.console: + enabled: true +`, + expected: []inputEntry{ + { + ID: "external-1", + }, + { + ID: "external-2", + }, + }, + }, + { + name: "loads nothing", + configFile: ` +filebeat.config.inputs: + enabled: true + path: ` + dir + `/*.nothing +output.console: + enabled: true +`, + expected: []inputEntry{}, + }, + } + + for _, tc := range cases { + t.Run(tc.name, func(t *testing.T) { + rawConfig, err := conf.NewConfigFrom(tc.configFile) + require.NoError(t, err) + + cfg := struct { + Filebeat config.Config `config:"filebeat"` + }{ + Filebeat: config.DefaultConfig, + } + err = rawConfig.Unpack(&cfg) + require.NoError(t, err) + + inputs, err := fetchInputConfiguration(&cfg.Filebeat) + require.NoError(t, err) + + actual := []inputEntry{} + + for _, i := range inputs { + var entry inputEntry + err := i.Unpack(&entry) + require.NoError(t, err) + actual = append(actual, entry) + } + + require.Equal(t, tc.expected, actual) + }) + } +} diff --git a/filebeat/input/filestream/takeover/takeover.go b/filebeat/input/filestream/takeover/takeover.go index 7f4332efeb42..415ddcb4fdde 100644 --- a/filebeat/input/filestream/takeover/takeover.go +++ b/filebeat/input/filestream/takeover/takeover.go @@ -24,11 +24,11 @@ import ( "strings" "github.com/elastic/beats/v7/filebeat/backup" - cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/filebeat/input/file" "github.com/elastic/beats/v7/filebeat/input/filestream" "github.com/elastic/beats/v7/libbeat/statestore" "github.com/elastic/beats/v7/libbeat/statestore/backend" + conf "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent-libs/mapstr" ) @@ -47,8 +47,8 @@ type filestreamMatchers map[string]func(source string) bool // // This mode is created for a smooth loginput->filestream migration experience, so the filestream // inputs would pick up ingesting files from the same point where a loginput stopped. -func TakeOverLogInputStates(log *logp.Logger, store backend.Store, backuper backup.Backuper, cfg *cfg.Config) error { - filestreamMatchers, err := findFilestreams(log, cfg) +func TakeOverLogInputStates(log *logp.Logger, store backend.Store, backuper backup.Backuper, inputsCfg []*conf.C) error { + filestreamMatchers, err := findFilestreams(log, inputsCfg) if err != nil { return fmt.Errorf("failed to read input configuration: %w", err) } @@ -141,10 +141,10 @@ func takeOverStates(log *logp.Logger, store backend.Store, matchers filestreamMa // findFilestreams finds filestream inputs that are marked as `take_over: true` // and creates a file matcher for each such filestream for the future use in state // processing -func findFilestreams(log *logp.Logger, cfg *cfg.Config) (matchers filestreamMatchers, err error) { +func findFilestreams(log *logp.Logger, inputs []*conf.C) (matchers filestreamMatchers, err error) { matchers = make(filestreamMatchers) - for _, input := range cfg.Inputs { + for _, input := range inputs { inputCfg := defaultInputConfig() err := input.Unpack(&inputCfg) if err != nil { diff --git a/filebeat/input/filestream/takeover/takeover_test.go b/filebeat/input/filestream/takeover/takeover_test.go index d2849ae21390..efc0aebd87d1 100644 --- a/filebeat/input/filestream/takeover/takeover_test.go +++ b/filebeat/input/filestream/takeover/takeover_test.go @@ -21,7 +21,6 @@ import ( "testing" "github.com/elastic/beats/v7/filebeat/backup" - cfg "github.com/elastic/beats/v7/filebeat/config" "github.com/elastic/beats/v7/libbeat/statestore/backend" conf "github.com/elastic/elastic-agent-libs/config" @@ -31,62 +30,74 @@ import ( "github.com/stretchr/testify/require" ) +func newInputConfigFrom(t *testing.T, str ...string) []*conf.C { + results := make([]*conf.C, 0, len(str)) + for _, s := range str { + c, err := conf.NewConfigFrom(s) + require.NoError(t, err) + results = append(results, c) + } + return results +} + func TestTakeOverLogInputStates(t *testing.T) { - empty, err := conf.NewConfigFrom(``) - require.NoError(t, err) + empty := newInputConfigFrom(t) - noTakeOver, err := conf.NewConfigFrom(` -inputs: - - type: log - paths: - - "/path/log*.log" - - type: filestream - id: filestream-id-1 - enabled: true - paths: - - "/path/filestream1-*.log" + noTakeOver := newInputConfigFrom(t, + ` +type: log +paths: + - "/path/log*.log" +`, + ` +type: filestream +id: filestream-id-1 +enabled: true +paths: + - "/path/filestream1-*.log" `) - require.NoError(t, err) - - takeOver, err := conf.NewConfigFrom(` -inputs: - - type: filestream - id: filestream-id-1 - enabled: true - paths: - - "/path/filestream1-*.log" - - type: filestream - id: filestream-id-2 - take_over: true - enabled: true - paths: - - "/path/filestream2-*.log" - - "/path/log*.log" # taking over from the log input + takeOver := newInputConfigFrom(t, + ` +type: filestream +id: filestream-id-1 +enabled: true +paths: + - "/path/filestream1-*.log" +`, + ` +type: filestream +id: filestream-id-2 +take_over: true +enabled: true +paths: + - "/path/filestream2-*.log" + - "/path/log*.log" # taking over from the log input `) - require.NoError(t, err) - noUniqueID, err := conf.NewConfigFrom(` -inputs: - - type: filestream - id: filestream-id-2 - take_over: true - enabled: true - paths: - - "/path/filestream2-*.log" - - type: filestream - id: filestream-id-2 # not unique - take_over: true - enabled: true - paths: - - "/path/filestream3-*.log" - - type: filestream - take_over: true # no ID - enabled: true - paths: - - "/path/filestream-*.log" + noUniqueID := newInputConfigFrom(t, ` +type: filestream +id: filestream-id-2 +take_over: true +enabled: true +paths: + - "/path/filestream2-*.log" +`, + ` +type: filestream +id: filestream-id-2 # not unique +take_over: true +enabled: true +paths: + - "/path/filestream3-*.log" +`, + ` +type: filestream +take_over: true # no ID +enabled: true +paths: + - "/path/filestream-*.log" `) - require.NoError(t, err) states := []state{ // this state is to make sure the filestreams without `take_over` remain untouched @@ -162,7 +173,7 @@ inputs: cases := []struct { name string - cfg *conf.C + cfg []*conf.C states []state mustBackup bool mustRemove []string @@ -236,15 +247,11 @@ inputs: for _, tc := range cases { t.Run(tc.name, func(t *testing.T) { - config := cfg.DefaultConfig - err := tc.cfg.Unpack(&config) - require.NoError(t, err) - store := storeMock{ states: tc.states, } backuper := backuperMock{} - err = TakeOverLogInputStates(log, &store, &backuper, &config) + err := TakeOverLogInputStates(log, &store, &backuper, tc.cfg) if tc.expErr != "" { require.Error(t, err) require.Contains(t, err.Error(), tc.expErr)