diff --git a/pkg/component/config.go b/pkg/component/config.go index 50b5e590e6b..781e2e7624f 100644 --- a/pkg/component/config.go +++ b/pkg/component/config.go @@ -108,19 +108,21 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro return result, nil } -// dataStreamAndSource is a generic way to represent proto mesages -// that contain a source field and a datastream field. -type dataStreamAndSource interface { - GetDataStream() *proto.DataStream - GetSource() *structpb.Struct -} - -func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { - ds := raw.GetDataStream() +func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) { if ds == nil { ds = &proto.DataStream{} } + cfg, err := config.NewConfigFrom(source.AsMap()) + if err != nil { + return nil, fmt.Errorf("cannot generate config from source field: %w", err) + } + + // Create a temporary struct to unpack the configuration. + // Unpack correctly handles any flattened fields like + // data_stream.type. So all we need to do is to call Unpack, + // ensure the DataStream does not have a different value, + // them merge them both. tmp := struct { DataStream struct { Dataset string `config:"dataset" yaml:"dataset"` @@ -129,11 +131,6 @@ func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { } `config:"data_stream" yaml:"data_stream"` }{} - cfg, err := config.NewConfigFrom(raw.GetSource().AsMap()) - if err != nil { - return nil, fmt.Errorf("cannot generate config from source field: %w", err) - } - if err := cfg.Unpack(&tmp); err != nil { return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) } @@ -151,17 +148,17 @@ func deDotDataStream(raw dataStreamAndSource) (*proto.DataStream, error) { } ret := &proto.DataStream{ - Dataset: merge(tmp.DataStream.Dataset, ds.Dataset), - Type: merge(tmp.DataStream.Type, ds.Type), - Namespace: merge(tmp.DataStream.Namespace, ds.Namespace), - Source: raw.GetDataStream().GetSource(), + Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset), + Type: valueOrDefault(tmp.DataStream.Type, ds.Type), + Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace), + Source: ds.GetSource(), } return ret, nil } -// merge returns b if a is an empty string -func merge(a, b string) string { +// valueOrDefault returns b if a is an empty string +func valueOrDefault(a, b string) string { if a == "" { return b } @@ -170,13 +167,13 @@ func merge(a, b string) string { func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error { var err error - unitConfig.DataStream, err = deDotDataStream(unitConfig) + unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource()) if err != nil { return fmt.Errorf("could not parse data_stream from input: %w", err) } for i, stream := range unitConfig.Streams { - stream.DataStream, err = deDotDataStream(stream) + stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource()) if err != nil { return fmt.Errorf("could not parse data_stream from stream [%d]: %w", i, err)