diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go index 3bdb1f29c6a..1866fba66f0 100644 --- a/x-pack/libbeat/management/generate.go +++ b/x-pack/libbeat/management/generate.go @@ -118,15 +118,15 @@ func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) } - if ds.Dataset != "" && tmp.DataStream.Dataset != "" { + if ds.Dataset != "" && tmp.DataStream.Dataset != "" && (ds.Dataset != tmp.DataStream.Dataset) { return nil, errors.New("duplicated key 'datastream.dataset'") } - if ds.Type != "" && tmp.DataStream.Type != "" { + if ds.Type != "" && tmp.DataStream.Type != "" && (ds.Type != tmp.DataStream.Type) { return nil, errors.New("duplicated key 'datastream.type'") } - if ds.Namespace != "" && tmp.DataStream.Namespace != "" { + if ds.Namespace != "" && tmp.DataStream.Namespace != "" && (ds.Namespace != tmp.DataStream.Namespace) { return nil, errors.New("duplicated key 'datastream.namespace'") } diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 217e54c8fe3..9b8f2d9d69f 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -531,6 +531,8 @@ func TestFlattenedDataStreams(t *testing.T) { inputs := &mockReloadable{ ReloadFn: func(configs []*reload.ConfigWithMeta) error { + // The datea_stream fields are used to generate the `index` key in + // the config, so we check for it. for _, input := range configs { tmp := struct { Index string `config:"index" yaml:"index"` @@ -584,6 +586,16 @@ func TestFlattenedDataStreams(t *testing.T) { "data_stream.namespace": expectedNamespace, "data_stream.type": expectedType, }), + // If the data_stream is not flattened, Beats will receive + // it twice, in the Config.Source and as DataStream, that is the + // normal behaviour when the Elastic-Agent is managed by Fleet. + // So we also add the DataStream here to make sure there will be + // no conflict. + DataStream: &proto.DataStream{ + Dataset: expectedDataset, + Namespace: expectedNamespace, + Type: expectedType, + }, Streams: []*proto.Stream{ { Id: "filestream-id", @@ -638,7 +650,8 @@ func TestFlattenedDataStreams(t *testing.T) { require.Eventually(t, func() bool { return stateReached.Load() }, 10*time.Second, 100*time.Millisecond, - "did not find expected 'index' field on input final config") + "did not find expected 'index' field on input final config"+ + " or reload was never called.") } type reloadable struct {