Skip to content

Commit

Permalink
Revert "Support flattened data_stream.* fields (#3465)"
Browse files Browse the repository at this point in the history
This reverts commit b0b8e85.
  • Loading branch information
belimawr authored Nov 10, 2023
1 parent c00eddf commit 3238b7c
Show file tree
Hide file tree
Showing 9 changed files with 221 additions and 692 deletions.
2 changes: 1 addition & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ steps:
key: "serverless-integration-tests"
env:
TEST_INTEG_AUTH_ESS_REGION: us-east-1
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
artifact_paths:
- "build/TEST-**"
- "build/diagnostics/*"
Expand Down

This file was deleted.

71 changes: 0 additions & 71 deletions pkg/component/component_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2361,74 +2361,3 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string {

return gatheredPaths
}

func TestFlattenedDataStream(t *testing.T) {
expectedNamespace := "test-namespace"
expectedType := "test-type"
expectedDataset := "test-dataset"

policy := map[string]any{
"outputs": map[string]any{
"default": map[string]any{
"type": "elasticsearch",
"enabled": true,
},
},
"inputs": []any{
map[string]any{
"type": "filestream",
"id": "filestream-0",
"enabled": true,
"data_stream.type": expectedType,
"data_stream.dataset": expectedDataset,
"data_stream": map[string]any{
"namespace": expectedNamespace,
},
},
},
}
runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck())
if err != nil {
t.Fatalf("cannot load runtime specs: %s", err)
}

result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil)
if err != nil {
t.Fatalf("cannot convert policy to component: %s", err)
}

if len(result) != 1 {
t.Fatalf("expecting result to have one element, got %d", len(result))
}

if len(result[0].Units) != 2 {
t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result))
}

// We do not make assumptions about ordering.
// Get the input Unit
var dataStream *proto.DataStream
for _, unit := range result[0].Units {
if unit.Err != nil {
t.Fatalf("unit.Err: %s", unit.Err)
}
if unit.Type == client.UnitTypeInput {
dataStream = unit.Config.DataStream
break
}
}

if dataStream == nil {
t.Fatal("DataStream cannot be nil")
}

if dataStream.Dataset != expectedDataset {
t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset)
}
if dataStream.Type != expectedType {
t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type)
}
if dataStream.Namespace != expectedNamespace {
t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace)
}
}
80 changes: 0 additions & 80 deletions pkg/component/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent/pkg/limits"
)

Expand Down Expand Up @@ -101,88 +100,9 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro
return nil, err
}

if err := updateDataStreamsFromSource(result); err != nil {
return nil, fmt.Errorf("could not dedot 'data_stream': %w", err)
}

return result, nil
}

func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) {
if ds == nil {
ds = &proto.DataStream{}
}

cfg, err := config.NewConfigFrom(source.AsMap())
if err != nil {
return nil, fmt.Errorf("cannot generate config from source field: %w", err)
}

// Create a temporary struct to unpack the configuration.
// Unpack correctly handles any flattened fields like
// data_stream.type. So all we need to do is to call Unpack,
// ensure the DataStream does not have a different value,
// them merge them both.
tmp := struct {
DataStream struct {
Dataset string `config:"dataset" yaml:"dataset"`
Type string `config:"type" yaml:"type"`
Namespace string `config:"namespace" yaml:"namespace"`
} `config:"data_stream" yaml:"data_stream"`
}{}

if err := cfg.Unpack(&tmp); err != nil {
return nil, fmt.Errorf("cannot unpack source field into struct: %w", err)
}

if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") {
return nil, errors.New("duplicated key 'datastream.dataset'")
}

if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") {
return nil, errors.New("duplicated key 'datastream.type'")
}

if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") {
return nil, errors.New("duplicated key 'datastream.namespace'")
}

ret := &proto.DataStream{
Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset),
Type: valueOrDefault(tmp.DataStream.Type, ds.Type),
Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace),
Source: ds.GetSource(),
}

return ret, nil
}

// valueOrDefault returns b if a is an empty string
func valueOrDefault(a, b string) string {
if a == "" {
return b
}
return a
}

func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error {
var err error
unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from input: %w", err)
}

for i, stream := range unitConfig.Streams {
stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource())
if err != nil {
return fmt.Errorf("could not parse data_stream from stream [%d]: %w",
i, err)
}
}

return nil
}

func setSource(val interface{}, cfg map[string]interface{}) error {
// find the source field on the val
resVal := reflect.ValueOf(val).Elem()
Expand Down
9 changes: 1 addition & 8 deletions pkg/component/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,8 @@ import (
"errors"
"testing"

"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/protobuf/testing/protocmp"
"google.golang.org/protobuf/types/known/structpb"

"github.com/elastic/elastic-agent-client/v7/pkg/proto"
Expand Down Expand Up @@ -199,12 +197,7 @@ func TestExpectedConfig(t *testing.T) {
assert.Equal(t, err.Error(), scenario.Err.Error())
} else {
require.NoError(t, err)
// protocmp.Transform ensures we do not compare any internal
// protobuf fields
if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) {
t.Errorf("mismatch (-want +got) \n%s",
cmp.Diff(scenario.Expected, observed, protocmp.Transform()))
}
assert.EqualValues(t, scenario.Expected, observed)
}
})
}
Expand Down
9 changes: 1 addition & 8 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -357,14 +357,7 @@ func (f *Fixture) RunBeat(ctx context.Context) error {
// Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started
// with `WithAllowErrors()` then `Run` will exit early and return the logged error.
//
// If no `states` are provided then the Elastic Agent runs until the context is cancelled.
//
// The Elastic-Agent is started agent in test mode (--testing-mode) this mode
// expects the initial configuration (full YAML config) via gRPC.
// This configuration should be passed in the State.Configure field.
//
// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored
// when `Run` is called.
// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached.
func (f *Fixture) Run(ctx context.Context, states ...State) error {
if f.binaryName != "elastic-agent" {
return errors.New("Run() can only be used with elastic-agent, use RunBeat()")
Expand Down
64 changes: 5 additions & 59 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,9 +438,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor

}

// GetLogsForDataset returns any logs associated with the datastream
func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatasetWithContext(context.Background(), client, index)
// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) {
return GetLogsForDatastreamWithContext(context.Background(), client, index)
}

// GetLogsForAgentID returns any logs associated with the agent ID
Expand Down Expand Up @@ -478,8 +478,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents,
return handleDocsResponse(res)
}

// GetLogsForDatasetWithContext returns any logs associated with the datastream
func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
// GetLogsForDatastreamWithContext returns any logs associated with the datastream
func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) {
indexQuery := map[string]interface{}{
"query": map[string]interface{}{
"match": map[string]interface{}{
Expand Down Expand Up @@ -536,60 +536,6 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{
return handleDocsResponse(res)
}

// GetLogsForDatastream returns any logs associated with the datastream
func GetLogsForDatastream(
ctx context.Context,
client elastictransport.Interface,
dsType, dataset, namespace string) (Documents, error) {

query := map[string]any{
"_source": []string{"message"},
"query": map[string]any{
"bool": map[string]any{
"must": []any{
map[string]any{
"match": map[string]any{
"data_stream.dataset": dataset,
},
},
map[string]any{
"match": map[string]any{
"data_stream.namespace": namespace,
},
},
map[string]any{
"match": map[string]any{
"data_stream.type": dsType,
},
},
},
},
},
}

var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(query); err != nil {
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
es.Search.WithPretty(),
es.Search.WithContext(ctx),
)
if err != nil {
return Documents{}, fmt.Errorf("error performing ES search: %w", err)
}

return handleDocsResponse(res)
}

// handleDocsResponse converts the esapi.Response into Documents,
// it closes the response.Body after reading
func handleDocsResponse(res *esapi.Response) (Documents, error) {
resultBuf, err := handleResponseRaw(res)
if err != nil {
Expand Down
Loading

0 comments on commit 3238b7c

Please sign in to comment.