Skip to content

Commit

Permalink
Fix flattened data_stream.* fields under Elastic-Agent
Browse files Browse the repository at this point in the history
If the data_stream is not flattened, Beats will receive it twice, in
the Config.Source and as DataStream, that is the normal behaviour when
the Elastic-Agent is managed by Fleet and was causing a duplicated key
error.

This commit fixes that by only returning the error if the values in
Config.Source and DataStream are different.

The tests now cover this case.
  • Loading branch information
belimawr committed Sep 18, 2023
1 parent 73774d5 commit 1b02af4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 4 deletions.
6 changes: 3 additions & 3 deletions x-pack/libbeat/management/generate.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,15 +118,15 @@ func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if ds.Dataset != "" && tmp.DataStream.Dataset != "" {
if ds.Dataset != "" && tmp.DataStream.Dataset != "" && (ds.Dataset != tmp.DataStream.Dataset) {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if ds.Type != "" && tmp.DataStream.Type != "" {
if ds.Type != "" && tmp.DataStream.Type != "" && (ds.Type != tmp.DataStream.Type) {
return nil, errors.New("duplicated key 'datastream.type'")
}

if ds.Namespace != "" && tmp.DataStream.Namespace != "" {
if ds.Namespace != "" && tmp.DataStream.Namespace != "" && (ds.Namespace != tmp.DataStream.Namespace) {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

Expand Down
15 changes: 14 additions & 1 deletion x-pack/libbeat/management/managerV2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -531,6 +531,8 @@ func TestFlattenedDataStreams(t *testing.T) {

inputs := &mockReloadable{
ReloadFn: func(configs []*reload.ConfigWithMeta) error {
// The datea_stream fields are used to generate the `index` key in
// the config, so we check for it.
for _, input := range configs {
tmp := struct {
Index string `config:"index" yaml:"index"`
Expand Down Expand Up @@ -584,6 +586,16 @@ func TestFlattenedDataStreams(t *testing.T) {
"data_stream.namespace": expectedNamespace,
"data_stream.type": expectedType,
}),
// If the data_stream is not flattened, Beats will receive
// it twice, in the Config.Source and as DataStream, that is the
// normal behaviour when the Elastic-Agent is managed by Fleet.
// So we also add the DataStream here to make sure there will be
// no conflict.
DataStream: &proto.DataStream{
Dataset: expectedDataset,
Namespace: expectedNamespace,
Type: expectedType,
},
Streams: []*proto.Stream{
{
Id: "filestream-id",
Expand Down Expand Up @@ -638,7 +650,8 @@ func TestFlattenedDataStreams(t *testing.T) {
require.Eventually(t, func() bool {
return stateReached.Load()
}, 10*time.Second, 100*time.Millisecond,
"did not find expected 'index' field on input final config")
"did not find expected 'index' field on input final config"+
" or reload was never called.")
}

type reloadable struct {
Expand Down

0 comments on commit 1b02af4

Please sign in to comment.