diff --git a/CHANGELOG.next.asciidoc b/CHANGELOG.next.asciidoc index 72340b4c16e5..f9523f522fd7 100644 --- a/CHANGELOG.next.asciidoc +++ b/CHANGELOG.next.asciidoc @@ -63,7 +63,6 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff] - Support build of projects outside of beats directory {pull}36126[36126] - Add default cgroup regex for add_process_metadata processor {pull}36484[36484] {issue}32961[32961] - Fix environment capture by `add_process_metadata` processor. {issue}36469[36469] {pull}36471[36471] -- Support fattened `data_stream` object when running under Elastic-Agent {pr}36516[36516] *Auditbeat* diff --git a/x-pack/libbeat/management/generate.go b/x-pack/libbeat/management/generate.go index 3bdb1f29c6ae..59537e066862 100644 --- a/x-pack/libbeat/management/generate.go +++ b/x-pack/libbeat/management/generate.go @@ -5,11 +5,8 @@ package management import ( - "errors" "fmt" - "google.golang.org/protobuf/types/known/structpb" - "github.com/elastic/beats/v7/libbeat/common/reload" "github.com/elastic/elastic-agent-client/v7/pkg/client" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -85,78 +82,10 @@ func handleSimpleConfig(raw *proto.UnitExpectedConfig) (map[string]any, error) { return m, nil } -// dataStreamAndSource is a generic way to represent proto mesages -// that contain a source field and a datastream field. -type dataStreamAndSource interface { - GetDataStream() *proto.DataStream - GetSource() *structpb.Struct -} - -// deDotDataStream reads any datastream value from the dotted notation -// (data_stream.*) and returns it as a *proto.DataStream. If raw already -// contains a DataStream but no fields are duplicated, then the values are merged. -func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { - ds := raw.GetDataStream() - if ds == nil { - ds = &proto.DataStream{} - } - - tmp := struct { - DataStream struct { - Dataset string `config:"dataset" yaml:"dataset"` - Type string `config:"type" yaml:"type"` - Namespace string `config:"namespace" yaml:"namespace"` - } `config:"data_stream" yaml:"data_stream"` - }{} - - cfg, err := conf.NewConfigFrom(raw.GetSource().AsMap()) - if err != nil { - return nil, fmt.Errorf("cannot generate config from source field: %w", err) - } - - if err := cfg.Unpack(&tmp); err != nil { - return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) - } - - if ds.Dataset != "" && tmp.DataStream.Dataset != "" { - return nil, errors.New("duplicated key 'datastream.dataset'") - } - - if ds.Type != "" && tmp.DataStream.Type != "" { - return nil, errors.New("duplicated key 'datastream.type'") - } - - if ds.Namespace != "" && tmp.DataStream.Namespace != "" { - return nil, errors.New("duplicated key 'datastream.namespace'") - } - - ret := &proto.DataStream{ - Dataset: merge(tmp.DataStream.Dataset, ds.Dataset), - Type: merge(tmp.DataStream.Type, ds.Type), - Namespace: merge(tmp.DataStream.Namespace, ds.Namespace), - } - - return ret, nil -} - -// merge returns b if a is an empty string -func merge(a, b string) string { - if a == "" { - return b - } - return a -} - // CreateInputsFromStreams breaks down the raw Expected config into an array of individual inputs/modules from the Streams values // that can later be formatted into the reloader's ConfigWithMetaData and sent to an indvidual beat/ // This also performs the basic task of inserting module-level add_field processors into the inputs/modules. func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamType string, agentInfo *client.AgentInfo, defaultProcessors ...mapstr.M) ([]map[string]interface{}, error) { - ds, err := deDotDataStream(raw) - if err != nil { - return nil, fmt.Errorf("could not read 'data_stream': %w", err) - } - raw.DataStream = ds - // If there are no streams, we fall into the 'simple input config' case, // this means the key configuration values are on the root level instead of // an element in the `streams` array. @@ -177,14 +106,8 @@ func CreateInputsFromStreams(raw *proto.UnitExpectedConfig, defaultDataStreamTyp inputs := make([]map[string]interface{}, len(raw.GetStreams())) for iter, stream := range raw.GetStreams() { - ds, err := deDotDataStream(stream) - if err != nil { - return nil, fmt.Errorf("could not read 'data_stream' from stream ID '%s': %w", - stream.GetId(), err) - } - stream.DataStream = ds streamSource := raw.GetStreams()[iter].GetSource().AsMap() - streamSource, err = createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...) + streamSource, err := createStreamRules(raw, streamSource, stream, defaultDataStreamType, agentInfo, defaultProcessors...) if err != nil { return nil, fmt.Errorf("error creating stream rules: %w", err) } diff --git a/x-pack/libbeat/management/generate_test.go b/x-pack/libbeat/management/generate_test.go index 9c0c7df72a16..fb7f88ff7759 100644 --- a/x-pack/libbeat/management/generate_test.go +++ b/x-pack/libbeat/management/generate_test.go @@ -7,10 +7,8 @@ package management import ( "testing" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/client" @@ -232,85 +230,3 @@ func buildConfigMap(t *testing.T, unitRaw *proto.UnitExpectedConfig, agentInfo * require.NoError(t, err, "error in unpack for config %#v", reloadCfg[0].Config) return cfgMap } - -func TestDeDotDataStream(t *testing.T) { - testCases := map[string]struct { - source map[string]any - dataStream *proto.DataStream - wantError bool - expectedDataStream *proto.DataStream - }{ - "all data is flattened": { - source: map[string]any{ - "data_stream.dataset": "my dataset", - "data_stream.namespace": "my namespace", - "data_stream.type": "my type", - }, - expectedDataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - }, - "no data is flattened": { - dataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - expectedDataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - }, - "mix of flattened and data_stream": { - dataStream: &proto.DataStream{ - Dataset: "my dataset", - Type: "my type", - }, - source: map[string]any{ - "data_stream.namespace": "my namespace", - }, - expectedDataStream: &proto.DataStream{ - Dataset: "my dataset", - Namespace: "my namespace", - Type: "my type", - }, - }, - "duplicated keys generate error": { - dataStream: &proto.DataStream{ - Dataset: "my dataset", - }, - source: map[string]any{ - "data_stream.dataset": "another dataset", - }, - wantError: true, - }, - } - - for name, tc := range testCases { - t.Run(name, func(t *testing.T) { - raw := &proto.UnitExpectedConfig{ - Source: requireNewStruct(t, tc.source), - DataStream: tc.dataStream, - } - - final, err := deDotDataStream(raw) - if tc.wantError { - if err == nil { - t.Error("expecting an error") - } - return - } - if err != nil { - t.Fatalf("deDotDataStream returned an error: %s", err) - } - - if !cmp.Equal(final, tc.expectedDataStream, protocmp.Transform()) { - t.Errorf("expecting a different value: --got/++want\n'%s'", - cmp.Diff(final, tc.expectedDataStream, protocmp.Transform())) - } - }) - } -} diff --git a/x-pack/libbeat/management/managerV2_test.go b/x-pack/libbeat/management/managerV2_test.go index 217e54c8fe31..9fe238605b49 100644 --- a/x-pack/libbeat/management/managerV2_test.go +++ b/x-pack/libbeat/management/managerV2_test.go @@ -511,136 +511,6 @@ func TestErrorPerUnit(t *testing.T) { }, 10*time.Second, 100*time.Millisecond, "desired state, was not reached") } -func TestFlattenedDataStreams(t *testing.T) { - stateReached := atomic.Bool{} - - expectedDataset := "my-dataset" - expectedNamespace := "my-namespace" - expectedType := "my-type" - expectedIndex := fmt.Sprintf("%s-%s-%s", - expectedType, expectedDataset, expectedNamespace) - - r := reload.NewRegistry() - - output := &mockOutput{ - ReloadFn: func(config *reload.ConfigWithMeta) error { - return nil - }, - } - r.MustRegisterOutput(output) - - inputs := &mockReloadable{ - ReloadFn: func(configs []*reload.ConfigWithMeta) error { - for _, input := range configs { - tmp := struct { - Index string `config:"index" yaml:"index"` - }{} - - if err := input.Config.Unpack(&tmp); err != nil { - t.Fatalf("error unpacking config: %s", err) - } - - if tmp.Index != expectedIndex { - t.Fatalf("expecting index %q, got %q", expectedIndex, tmp.Index) - } - - stateReached.Store(true) - } - return nil - }, - } - r.MustRegisterInput(inputs) - - outputUnit := proto.UnitExpected{ - Id: "output-unit", - Type: proto.UnitType_OUTPUT, - State: proto.State_HEALTHY, - ConfigStateIdx: 1, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "default", - Type: "mock", - Name: "mock", - Source: integration.RequireNewStruct(t, - map[string]interface{}{ - "Is": "this", - "required?": "Yes!", - }), - }, - } - - inputUnit1 := proto.UnitExpected{ - Id: "input-unit1", - Type: proto.UnitType_INPUT, - State: proto.State_HEALTHY, - ConfigStateIdx: 1, - LogLevel: proto.UnitLogLevel_DEBUG, - Config: &proto.UnitExpectedConfig{ - Id: "input-unit-config-id", - Type: "filestream", - Name: "foo", - Source: requireNewStruct(t, map[string]any{ - "data_stream.dataset": expectedDataset, - "data_stream.namespace": expectedNamespace, - "data_stream.type": expectedType, - }), - Streams: []*proto.Stream{ - { - Id: "filestream-id", - Source: integration.RequireNewStruct(t, map[string]interface{}{ - "id": "input-unit1", - }), - }, - }, - }, - } - units := []*proto.UnitExpected{ - &outputUnit, - &inputUnit1, - } - server := &mock.StubServerV2{ - CheckinV2Impl: func(observed *proto.CheckinObserved) *proto.CheckinExpected { - // Nothing to do here, just keep sending the same units. - return &proto.CheckinExpected{ - Units: units, - } - }, - ActionImpl: func(response *proto.ActionResponse) error { return nil }, - } - - if err := server.Start(); err != nil { - t.Fatalf("could not start mock Elastic-Agent server: %s", err) - } - defer server.Stop() - - client := client.NewV2( - fmt.Sprintf(":%d", server.Port), - "", - client.VersionInfo{}, - grpc.WithTransportCredentials(insecure.NewCredentials())) - - m, err := NewV2AgentManagerWithClient( - &Config{ - Enabled: true, - }, - r, - client, - ) - if err != nil { - t.Fatalf("could not instantiate ManagerV2: %s", err) - } - - if err := m.Start(); err != nil { - t.Fatalf("could not start ManagerV2: %s", err) - } - defer m.Stop() - - require.Eventually(t, func() bool { - return stateReached.Load() - }, 10*time.Second, 100*time.Millisecond, - "did not find expected 'index' field on input final config") -} - type reloadable struct { mx sync.Mutex config *reload.ConfigWithMeta