Skip to content

Commit

Permalink
Support flattened data_stream.* fields
Browse files Browse the repository at this point in the history
An input configuration supports flattened fields, however the
'data_stream' field was not being correctly decoded when
flattened. This commit fixes this issue.
  • Loading branch information
belimawr committed Sep 22, 2023
1 parent 735a8fb commit a91af36
Show file tree
Hide file tree
Showing 3 changed files with 188 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
# Kind can be one of:
# - breaking-change: a change to previously-documented behavior
# - deprecation: functionality that is being removed in a later release
# - bug-fix: fixes a problem in a previous version
# - enhancement: extends functionality but does not break or fix existing behavior
# - feature: new functionality
# - known-issue: problems that we are aware of in a given version
# - security: impacts on the security of a product or a user’s deployment.
# - upgrade: important information for someone upgrading from a prior version
# - other: does not fit into any of the other categories
kind: feature

# Change summary; a 80ish characters long description of the change.
summary: Support flattened data_stream.* fields

# Long description; in case the summary is not enough to describe the change
# this field accommodate a description without length limits.
# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment.
description: >-
An input configuration supports flattened fields, however the
'data_stream' field was not being correctly decoded when
flattened. This commit fixes this issue.
# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc.
component: elastic-agent

# PR URL; optional; the PR number that added the changeset.
# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added.
# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number.
# Please provide it if you are adding a fragment for a different PR.
pr: https://github.com/elastic/elastic-agent/pull/3465

# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of).
# If not present is automatically filled by the tooling with the issue linked to the PR number.
issue: https://github.com/elastic/elastic-agent/issues/3191
71 changes: 71 additions & 0 deletions pkg/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string {

return gatheredPaths
}

func TestFlattenedDataStream(t *testing.T) {
expectedNamespace := "test-namespace"
expectedType := "test-type"
expectedDataset := "test-dataset"

policy := map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"enabled": true,
},
},
"inputs": []any{
map[string]any{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
"data_stream.type": expectedType,
"data_stream.dataset": expectedDataset,
"data_stream": map[string]any{
"namespace": expectedNamespace,
},
},
},
}
runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck())
if err != nil {
t.Fatalf("cannot load runtime specs: %s", err)
}

result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil)
if err != nil {
t.Fatalf("cannot convert policy to component: %s", err)
}

if len(result) != 1 {
t.Fatalf("expecting result to have one element, got %d", len(result))
}

if len(result[0].Units) != 2 {
t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result))
}

// We do not make assumptions about ordering.
// Get the input Unit
var dataStream *proto.DataStream
for _, unit := range result[0].Units {
if unit.Err != nil {
t.Fatalf("unit.Err: %s", unit.Err)
}
if unit.Type == client.UnitTypeInput {
dataStream = unit.Config.DataStream
break
}
}

if dataStream == nil {
t.Fatal("DataStream cannot be nil")
}

if dataStream.Dataset != expectedDataset {
t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset)
}
if dataStream.Type != expectedType {
t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type)
}
if dataStream.Namespace != expectedNamespace {
t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace)
}
}
82 changes: 82 additions & 0 deletions pkg/component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent/pkg/limits"
)

Expand Down Expand Up @@ -100,9 +101,90 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro
return nil, err
}

if err := updateDataStreamsFromSource(result); err != nil {
return nil, fmt.Errorf("could not dedot 'data_stream': %w", err)
}

return result, nil
}

// dataStreamAndSource is a generic way to represent proto mesages
// that contain a source field and a datastream field.
type dataStreamAndSource interface {
GetDataStream() *proto.DataStream
GetSource() *structpb.Struct
}

func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) {
ds := raw.GetDataStream()
if ds == nil {
ds = &proto.DataStream{}
}

tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

cfg, err := config.NewConfigFrom(raw.GetSource().AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if ds.Dataset != "" && tmp.DataStream.Dataset != "" {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if ds.Type != "" && tmp.DataStream.Type != "" {
return nil, errors.New("duplicated key 'datastream.type'")
}

if ds.Namespace != "" && tmp.DataStream.Namespace != "" {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

ret := &proto.DataStream{
Dataset: merge(tmp.DataStream.Dataset, ds.Dataset),
Type: merge(tmp.DataStream.Type, ds.Type),
Namespace: merge(tmp.DataStream.Namespace, ds.Namespace),
}

return ret, nil
}

// merge returns b if a is an empty string
func merge(a, b string) string {
if a == "" {
return b
}
return a
}

func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error {
var err error
unitConfig.DataStream, err = deDotDataStream(unitConfig)
if err != nil {
return fmt.Errorf("could not parse data_stream from input: %w", err)
}

for i, stream := range unitConfig.Streams {
stream.DataStream, err = deDotDataStream(stream)
if err != nil {
return fmt.Errorf("could not parse data_stream from stream [%d]: %w",
i, err)
}
}

return nil
}

func setSource(val interface{}, cfg map[string]interface{}) error {
// find the source field on the val
resVal := reflect.ValueOf(val).Elem()
Expand Down

0 comments on commit a91af36

Please sign in to comment.