From b0b8e85bce33642de86f2f13a81b3544b3f8e778 Mon Sep 17 00:00:00 2001 From: Tiago Queiroz Date: Wed, 8 Nov 2023 11:04:14 +0100 Subject: [PATCH] Support flattened data_stream.* fields (#3465) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit An input configuration supports flattened fields, however the 'data_stream' field was not being correctly decoded when flattened. This commit fixes this issue. Some small additions and refactoring are also implemented in the integration test framework as well as some more detailed documentation. --------- Co-authored-by: Paolo Chilà --- .buildkite/pipeline.yml | 2 +- ...Support-flattened-data_stream.-fields.yaml | 35 ++ pkg/component/component_test.go | 71 +++ pkg/component/config.go | 80 ++++ pkg/component/config_test.go | 9 +- pkg/testing/fixture.go | 9 +- pkg/testing/tools/estools/elasticsearch.go | 64 ++- testing/integration/logs_ingestion_test.go | 430 ++++++++++++++++++ testing/integration/monitoring_logs_test.go | 213 --------- 9 files changed, 692 insertions(+), 221 deletions(-) create mode 100644 changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml create mode 100644 testing/integration/logs_ingestion_test.go delete mode 100644 testing/integration/monitoring_logs_test.go diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 0f114dcaa4a..cbd0ae35884 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -140,7 +140,7 @@ steps: key: "serverless-integration-tests" env: TEST_INTEG_AUTH_ESS_REGION: us-east-1 - command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite + command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite artifact_paths: - "build/TEST-**" - "build/diagnostics/*" diff --git a/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml new file mode 100644 index 00000000000..1ce991c2c38 --- /dev/null +++ b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml @@ -0,0 +1,35 @@ +# Kind can be one of: +# - breaking-change: a change to previously-documented behavior +# - deprecation: functionality that is being removed in a later release +# - bug-fix: fixes a problem in a previous version +# - enhancement: extends functionality but does not break or fix existing behavior +# - feature: new functionality +# - known-issue: problems that we are aware of in a given version +# - security: impacts on the security of a product or a user’s deployment. +# - upgrade: important information for someone upgrading from a prior version +# - other: does not fit into any of the other categories +kind: feature + +# Change summary; a 80ish characters long description of the change. +summary: Support flattened data_stream.* fields in input configuration + +# Long description; in case the summary is not enough to describe the change +# this field accommodate a description without length limits. +# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. +description: >- + An input configuration supports flattened fields, however the + 'data_stream' field was not being correctly decoded when + flattened. This commit fixes this issue. + +# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. +component: elastic-agent + +# PR URL; optional; the PR number that added the changeset. +# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. +# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. +# Please provide it if you are adding a fragment for a different PR. +pr: https://github.com/elastic/elastic-agent/pull/3465 + +# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). +# If not present is automatically filled by the tooling with the issue linked to the PR number. +issue: https://github.com/elastic/elastic-agent/issues/3191 diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 424f3a93147..00c4d1c63cb 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2361,3 +2361,74 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string { return gatheredPaths } + +func TestFlattenedDataStream(t *testing.T) { + expectedNamespace := "test-namespace" + expectedType := "test-type" + expectedDataset := "test-dataset" + + policy := map[string]any{ + "outputs": map[string]any{ + "default": map[string]any{ + "type": "elasticsearch", + "enabled": true, + }, + }, + "inputs": []any{ + map[string]any{ + "type": "filestream", + "id": "filestream-0", + "enabled": true, + "data_stream.type": expectedType, + "data_stream.dataset": expectedDataset, + "data_stream": map[string]any{ + "namespace": expectedNamespace, + }, + }, + }, + } + runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck()) + if err != nil { + t.Fatalf("cannot load runtime specs: %s", err) + } + + result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) + if err != nil { + t.Fatalf("cannot convert policy to component: %s", err) + } + + if len(result) != 1 { + t.Fatalf("expecting result to have one element, got %d", len(result)) + } + + if len(result[0].Units) != 2 { + t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result)) + } + + // We do not make assumptions about ordering. + // Get the input Unit + var dataStream *proto.DataStream + for _, unit := range result[0].Units { + if unit.Err != nil { + t.Fatalf("unit.Err: %s", unit.Err) + } + if unit.Type == client.UnitTypeInput { + dataStream = unit.Config.DataStream + break + } + } + + if dataStream == nil { + t.Fatal("DataStream cannot be nil") + } + + if dataStream.Dataset != expectedDataset { + t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset) + } + if dataStream.Type != expectedType { + t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type) + } + if dataStream.Namespace != expectedNamespace { + t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace) + } +} diff --git a/pkg/component/config.go b/pkg/component/config.go index a0c75d00e32..781e2e7624f 100644 --- a/pkg/component/config.go +++ b/pkg/component/config.go @@ -15,6 +15,7 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" + "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent/pkg/limits" ) @@ -100,9 +101,88 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro return nil, err } + if err := updateDataStreamsFromSource(result); err != nil { + return nil, fmt.Errorf("could not dedot 'data_stream': %w", err) + } + return result, nil } +func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) { + if ds == nil { + ds = &proto.DataStream{} + } + + cfg, err := config.NewConfigFrom(source.AsMap()) + if err != nil { + return nil, fmt.Errorf("cannot generate config from source field: %w", err) + } + + // Create a temporary struct to unpack the configuration. + // Unpack correctly handles any flattened fields like + // data_stream.type. So all we need to do is to call Unpack, + // ensure the DataStream does not have a different value, + // them merge them both. + tmp := struct { + DataStream struct { + Dataset string `config:"dataset" yaml:"dataset"` + Type string `config:"type" yaml:"type"` + Namespace string `config:"namespace" yaml:"namespace"` + } `config:"data_stream" yaml:"data_stream"` + }{} + + if err := cfg.Unpack(&tmp); err != nil { + return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) + } + + if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") { + return nil, errors.New("duplicated key 'datastream.dataset'") + } + + if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") { + return nil, errors.New("duplicated key 'datastream.type'") + } + + if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") { + return nil, errors.New("duplicated key 'datastream.namespace'") + } + + ret := &proto.DataStream{ + Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset), + Type: valueOrDefault(tmp.DataStream.Type, ds.Type), + Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace), + Source: ds.GetSource(), + } + + return ret, nil +} + +// valueOrDefault returns b if a is an empty string +func valueOrDefault(a, b string) string { + if a == "" { + return b + } + return a +} + +func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error { + var err error + unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource()) + if err != nil { + return fmt.Errorf("could not parse data_stream from input: %w", err) + } + + for i, stream := range unitConfig.Streams { + stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource()) + if err != nil { + return fmt.Errorf("could not parse data_stream from stream [%d]: %w", + i, err) + } + } + + return nil +} + func setSource(val interface{}, cfg map[string]interface{}) error { // find the source field on the val resVal := reflect.ValueOf(val).Elem() diff --git a/pkg/component/config_test.go b/pkg/component/config_test.go index 64dcfe3a697..7cdef177829 100644 --- a/pkg/component/config_test.go +++ b/pkg/component/config_test.go @@ -8,8 +8,10 @@ import ( "errors" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -197,7 +199,12 @@ func TestExpectedConfig(t *testing.T) { assert.Equal(t, err.Error(), scenario.Err.Error()) } else { require.NoError(t, err) - assert.EqualValues(t, scenario.Expected, observed) + // protocmp.Transform ensures we do not compare any internal + // protobuf fields + if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) { + t.Errorf("mismatch (-want +got) \n%s", + cmp.Diff(scenario.Expected, observed, protocmp.Transform())) + } } }) } diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 1d852e50277..3774808efaa 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -357,7 +357,14 @@ func (f *Fixture) RunBeat(ctx context.Context) error { // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // -// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached. +// If no `states` are provided then the Elastic Agent runs until the context is cancelled. +// +// The Elastic-Agent is started agent in test mode (--testing-mode) this mode +// expects the initial configuration (full YAML config) via gRPC. +// This configuration should be passed in the State.Configure field. +// +// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored +// when `Run` is called. func (f *Fixture) Run(ctx context.Context, states ...State) error { if f.binaryName != "elastic-agent" { return errors.New("Run() can only be used with elastic-agent, use RunBeat()") diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index d6bf69369cd..304e917d7ee 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -438,9 +438,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor } -// GetLogsForDatastream returns any logs associated with the datastream -func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) { - return GetLogsForDatastreamWithContext(context.Background(), client, index) +// GetLogsForDataset returns any logs associated with the datastream +func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) { + return GetLogsForDatasetWithContext(context.Background(), client, index) } // GetLogsForAgentID returns any logs associated with the agent ID @@ -478,8 +478,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, return handleDocsResponse(res) } -// GetLogsForDatastreamWithContext returns any logs associated with the datastream -func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { +// GetLogsForDatasetWithContext returns any logs associated with the datastream +func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ @@ -536,6 +536,60 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{ return handleDocsResponse(res) } +// GetLogsForDatastream returns any logs associated with the datastream +func GetLogsForDatastream( + ctx context.Context, + client elastictransport.Interface, + dsType, dataset, namespace string) (Documents, error) { + + query := map[string]any{ + "_source": []string{"message"}, + "query": map[string]any{ + "bool": map[string]any{ + "must": []any{ + map[string]any{ + "match": map[string]any{ + "data_stream.dataset": dataset, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.namespace": namespace, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.type": dsType, + }, + }, + }, + }, + }, + } + + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(query); err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + es := esapi.New(client) + res, err := es.Search( + es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)), + es.Search.WithExpandWildcards("all"), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + es.Search.WithContext(ctx), + ) + if err != nil { + return Documents{}, fmt.Errorf("error performing ES search: %w", err) + } + + return handleDocsResponse(res) +} + +// handleDocsResponse converts the esapi.Response into Documents, +// it closes the response.Body after reading func handleDocsResponse(res *esapi.Response) (Documents, error) { resultBuf, err := handleResponseRaw(res) if err != nil { diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go new file mode 100644 index 00000000000..ba9a84673b0 --- /dev/null +++ b/testing/integration/logs_ingestion_test.go @@ -0,0 +1,430 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/http/httputil" + "os" + "path/filepath" + "regexp" + "strings" + "testing" + "text/template" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/check" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" + "github.com/elastic/elastic-transport-go/v8/elastictransport" +) + +func TestLogIngestionFleetManaged(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + Local: false, + Sudo: true, + }) + ctx := context.Background() + + agentFixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + // 1. Create a policy in Fleet with monitoring enabled. + // To ensure there are no conflicts with previous test runs against + // the same ESS stack, we add the current time at the end of the policy + // name. This policy does not contain any integration. + t.Log("Enrolling agent in Fleet with a test policy") + createPolicyReq := kibana.AgentPolicy{ + Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), + Namespace: info.Namespace, + Description: "test policy for agent enrollment", + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + AgentFeatures: []map[string]interface{}{ + { + "name": "test_enroll", + "enabled": true, + }, + }, + } + + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + } + + // 2. Install the Elastic-Agent with the policy that + // was just created. + policy, err := tools.InstallAgentWithPolicy( + ctx, + t, + installOpts, + agentFixture, + info.KibanaClient, + createPolicyReq) + require.NoError(t, err) + t.Logf("created policy: %s", policy.ID) + check.ConnectedToFleet(t, agentFixture, 5*time.Minute) + + t.Run("Monitoring logs are shipped", func(t *testing.T) { + testMonitoringLogsAreShipped(t, ctx, info, agentFixture, policy) + }) + + t.Run("Normal logs with flattened data_stream are shipped", func(t *testing.T) { + testFlattenedDatastreamFleetPolicy(t, ctx, info, agentFixture, policy) + }) +} + +func testMonitoringLogsAreShipped( + t *testing.T, + ctx context.Context, + info *define.Info, + agentFixture *atesting.Fixture, + policy kibana.PolicyResponse, +) { + // Stage 1: Make sure metricbeat logs are populated + t.Log("Making sure metricbeat logs are populated") + docs := findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") + }) + t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) + require.NotZero(t, len(docs.Hits.Hits)) + + // Stage 2: make sure all components are healthy + t.Log("Making sure all components are healthy") + status, err := agentFixture.ExecStatus(ctx) + require.NoError(t, err, + "could not get agent status to verify all components are healthy") + for _, c := range status.Components { + assert.Equalf(t, client.Healthy, client.State(c.State), + "component %s: want %s, got %s", + c.Name, client.Healthy, client.State(c.State)) + } + + // Stage 3: Make sure there are no errors in logs + t.Log("Making sure there are no error logs") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ + // acceptable error messages (include reason) + "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated + "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues + "Failed to download artifact", + "Failed to initialize artifact", + "Failed to apply initial policy from on disk configuration", + "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart + }) + }) + t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) + for _, doc := range docs.Hits.Hits { + t.Logf("%#v", doc.Source) + } + require.Empty(t, docs.Hits.Hits) + + // Stage 4: Make sure we have message confirming central management is running + t.Log("Making sure we have message confirming central management is running") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.FindMatchingLogLines(info.ESClient, info.Namespace, + "Parsed configuration and determined agent is managed by Fleet") + }) + require.NotZero(t, len(docs.Hits.Hits)) + + // Stage 5: verify logs from the monitoring components are not sent to the output + t.Log("Check monitoring logs") + hostname, err := os.Hostname() + if err != nil { + t.Fatalf("could not get hostname to filter Agent: %s", err) + } + + agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) + require.NoError(t, err, "could not get Agent ID by hostname") + t.Logf("Agent ID: %q", agentID) + + // We cannot search for `component.id` because at the moment of writing + // this field is not mapped. There is an issue for that: + // https://github.com/elastic/integrations/issues/6545 + // TODO: use runtime fields while the above issue is not resolved. + + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForAgentID(info.ESClient, agentID) + }) + require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", + agentID, err) + + monRegExp := regexp.MustCompile(".*-monitoring$") + for i, d := range docs.Hits.Hits { + // Lazy way to navigate a map[string]any: convert to JSON then + // decode into a struct. + jsonData, err := json.Marshal(d.Source) + if err != nil { + t.Fatalf("could not encode document source as JSON: %s", err) + } + + doc := ESDocument{} + if err := json.Unmarshal(jsonData, &doc); err != nil { + t.Fatalf("could not unmarshal document source: %s", err) + } + + if monRegExp.MatchString(doc.Component.ID) { + t.Errorf("[%d] Document on index %q with 'component.id': %q "+ + "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ + "end in '-monitoring'\n", + i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) + } + } +} + +func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { + var docs estools.Documents + require.Eventually( + t, + func() bool { + var err error + docs, err = findFn() + return err == nil + }, + 3*time.Minute, + 15*time.Second, + ) + + return docs +} + +func testFlattenedDatastreamFleetPolicy( + t *testing.T, + ctx context.Context, + info *define.Info, + agentFixture *atesting.Fixture, + policy kibana.PolicyResponse, +) { + dsType := "logs" + dsNamespace := cleanString(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64())) + dsDataset := cleanString(fmt.Sprintf("%s-dataset", t.Name())) + numEvents := 60 + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents) + + agentFixture, err := define.NewFixture(t, define.Version()) + if err != nil { + t.Fatalf("could not create new fixture: %s", err) + } + + // 1. Prepare a request to add an integration to the policy + tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + // The time here ensures there are no conflicts with the integration name + // in Fleet. + agentPolicyBuilder := strings.Builder{} + err = tmpl.Execute(&agentPolicyBuilder, policyVars{ + Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339), + PolicyID: policy.ID, + LogFilePath: logFilePath, + Namespace: dsNamespace, + Dataset: dsDataset, + }) + if err != nil { + t.Fatalf("could not render template: %s", err) + } + // We keep a copy of the policy for debugging prurposes + agentPolicy := agentPolicyBuilder.String() + + // 2. Call Kibana to create the policy. + // Docs: https://www.elastic.co/guide/en/fleet/current/fleet-api-docs.html#create-integration-policy-api + resp, err := info.KibanaClient.Connection.Send( + http.MethodPost, + "/api/fleet/package_policies", + nil, + nil, + bytes.NewBufferString(agentPolicy)) + if err != nil { + t.Fatalf("could not execute request to Kibana/Fleet: %s", err) + } + if resp.StatusCode != http.StatusOK { + // On error dump the whole request response so we can easily spot + // what went wrong. + t.Errorf("received a non 200-OK when adding package to policy. "+ + "Status code: %d", resp.StatusCode) + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("could not dump error response from Kibana: %s", err) + } + // Make debugging as easy as possible + t.Log("================================================================================") + t.Log("Kibana error response:") + t.Log(string(respDump)) + t.Log("================================================================================") + t.Log("Rendered policy:") + t.Log(agentPolicy) + t.Log("================================================================================") + t.FailNow() + } + + require.Eventually( + t, + ensureDocumentsInES(t, ctx, info.ESClient, dsType, dsDataset, dsNamespace, numEvents), + 120*time.Second, + time.Second, + "could not get all expected documents form ES") +} + +// ensureDocumentsInES asserts the documents were ingested into the correct +// datastream +func ensureDocumentsInES( + t *testing.T, + ctx context.Context, + esClient elastictransport.Interface, + dsType, dsDataset, dsNamespace string, + numEvents int, +) func() bool { + + f := func() bool { + t.Helper() + + docs, err := estools.GetLogsForDatastream(ctx, esClient, dsType, dsDataset, dsNamespace) + if err != nil { + t.Logf("error quering ES, will retry later: %s", err) + } + + if docs.Hits.Total.Value == numEvents { + return true + } + + return false + + } + + return f +} + +// generateLogFile generates a log file by appending new lines every tick +// the lines are composed by the test name and the current time in RFC3339Nano +// This function spans a new goroutine and does not block +func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events int) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(tick) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + i := 0 + for { + select { + case <-done: + return + case now := <-ticker.C: + i++ + _, err := fmt.Fprintln(f, t.Name(), "Iteration: ", i, now.Format(time.RFC3339Nano)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, t.Errorf is our only option + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + if i == events { + return + } + } + } + }() +} + +func cleanString(s string) string { + return nonAlphanumericRegex.ReplaceAllString(strings.ToLower(s), "") +} + +var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9 ]+`) + +var policyJSON = ` +{ + "policy_id": "{{.PolicyID}}", + "package": { + "name": "log", + "version": "2.3.0" + }, + "name": "{{.Name}}", + "namespace": "{{.Namespace}}", + "inputs": { + "logs-logfile": { + "enabled": true, + "streams": { + "log.logs": { + "enabled": true, + "vars": { + "paths": [ + "{{.LogFilePath | js}}" {{/* we need to escape windows paths */}} + ], + "data_stream.dataset": "{{.Dataset}}" + } + } + } + } + } +}` + +type policyVars struct { + Name string + PolicyID string + LogFilePath string + Namespace string + Dataset string +} + +type ESDocument struct { + ElasticAgent ElasticAgent `json:"elastic_agent"` + Component Component `json:"component"` + Host Host `json:"host"` +} +type ElasticAgent struct { + ID string `json:"id"` + Version string `json:"version"` + Snapshot bool `json:"snapshot"` +} +type Component struct { + Binary string `json:"binary"` + ID string `json:"id"` +} +type Host struct { + Hostname string `json:"hostname"` +} diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go deleted file mode 100644 index 5ebce0043de..00000000000 --- a/testing/integration/monitoring_logs_test.go +++ /dev/null @@ -1,213 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package integration - -import ( - "context" - "encoding/json" - "fmt" - "os" - "regexp" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/elastic-agent-libs/kibana" - "github.com/elastic/elastic-agent/pkg/control/v2/client" - atesting "github.com/elastic/elastic-agent/pkg/testing" - "github.com/elastic/elastic-agent/pkg/testing/define" - "github.com/elastic/elastic-agent/pkg/testing/tools" - "github.com/elastic/elastic-agent/pkg/testing/tools/check" - "github.com/elastic/elastic-agent/pkg/testing/tools/estools" - "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" -) - -func TestMonitoringLogsShipped(t *testing.T) { - info := define.Require(t, define.Requirements{ - Stack: &define.Stack{}, - Local: false, - Sudo: true, - }) - ctx := context.Background() - - t.Logf("got namespace: %s", info.Namespace) - - agentFixture, err := define.NewFixture(t, define.Version()) - require.NoError(t, err) - - t.Log("Enrolling agent in Fleet with a test policy") - createPolicyReq := kibana.AgentPolicy{ - Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), - Namespace: info.Namespace, - Description: "test policy for agent enrollment", - MonitoringEnabled: []kibana.MonitoringEnabledOption{ - kibana.MonitoringEnabledLogs, - kibana.MonitoringEnabledMetrics, - }, - AgentFeatures: []map[string]interface{}{ - { - "name": "test_enroll", - "enabled": true, - }, - }, - } - - // Stage 1: Install - // As part of the cleanup process, we'll uninstall the agent - installOpts := atesting.InstallOpts{ - NonInteractive: true, - Force: true, - } - policy, err := tools.InstallAgentWithPolicy(ctx, t, - installOpts, agentFixture, info.KibanaClient, createPolicyReq) - require.NoError(t, err) - t.Logf("created policy: %s", policy.ID) - - check.ConnectedToFleet(t, agentFixture, 5*time.Minute) - - // Stage 2: check indices - // This is mostly for debugging - resp, err := estools.GetAllindicies(info.ESClient) - require.NoError(t, err) - for _, run := range resp { - t.Logf("%s: %d/%d deleted: %d\n", - run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted) - } - - // Stage 3: Make sure metricbeat logs are populated - t.Log("Making sure metricbeat logs are populated") - docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat") - }) - t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) - require.NotZero(t, len(docs.Hits.Hits)) - - // Stage 4: make sure all components are healthy - t.Log("Making sure all components are healthy") - status, err := agentFixture.ExecStatus(ctx) - require.NoError(t, err, - "could not get agent status to verify all components are healthy") - for _, c := range status.Components { - assert.Equalf(t, client.Healthy, client.State(c.State), - "component %s: want %s, got %s", - c.Name, client.Healthy, client.State(c.State)) - } - - // Stage 5: Make sure there are no errors in logs - t.Log("Making sure there are no error logs") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ - // acceptable error messages (include reason) - "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated - "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues - "Failed to download artifact", - "Failed to initialize artifact", - "Failed to apply initial policy from on disk configuration", - "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart - }) - }) - t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) - for _, doc := range docs.Hits.Hits { - t.Logf("%#v", doc.Source) - } - require.Empty(t, docs.Hits.Hits) - - // Stage 6: Make sure we have message confirming central management is running - t.Log("Making sure we have message confirming central management is running") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.FindMatchingLogLines(info.ESClient, info.Namespace, - "Parsed configuration and determined agent is managed by Fleet") - }) - require.NotZero(t, len(docs.Hits.Hits)) - - // Stage 7: verify logs from the monitoring components are not sent to the output - t.Log("Check monitoring logs") - hostname, err := os.Hostname() - if err != nil { - t.Fatalf("could not get hostname to filter Agent: %s", err) - } - - agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) - require.NoError(t, err, "could not get Agent ID by hostname") - t.Logf("Agent ID: %q", agentID) - - // We cannot search for `component.id` because at the moment of writing - // this field is not mapped. There is an issue for that: - // https://github.com/elastic/integrations/issues/6545 - - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForAgentID(info.ESClient, agentID) - }) - require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", - agentID, err) - - monRegExp := regexp.MustCompile(".*-monitoring$") - for i, d := range docs.Hits.Hits { - // Lazy way to navigate a map[string]any: convert to JSON then - // decode into a struct. - jsonData, err := json.Marshal(d.Source) - if err != nil { - t.Fatalf("could not encode document source as JSON: %s", err) - } - - doc := ESDocument{} - if err := json.Unmarshal(jsonData, &doc); err != nil { - t.Fatalf("could not unmarshal document source: %s", err) - } - - if monRegExp.MatchString(doc.Component.ID) { - t.Errorf("[%d] Document on index %q with 'component.id': %q "+ - "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ - "end in '-monitoring'\n", - i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) - } - } -} - -func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { - var docs estools.Documents - - require.Eventually( - t, - func() bool { - var err error - docs, err = findFn() - return err == nil - }, - 3*time.Minute, - 15*time.Second, - ) - - // TODO: remove after debugging - t.Log("--- debugging: results from ES --- START ---") - for _, doc := range docs.Hits.Hits { - t.Logf("%#v", doc.Source) - } - t.Log("--- debugging: results from ES --- END ---") - - return docs -} - -type ESDocument struct { - ElasticAgent ElasticAgent `json:"elastic_agent"` - Component Component `json:"component"` - Host Host `json:"host"` -} -type ElasticAgent struct { - ID string `json:"id"` - Version string `json:"version"` - Snapshot bool `json:"snapshot"` -} -type Component struct { - Binary string `json:"binary"` - ID string `json:"id"` -} -type Host struct { - Hostname string `json:"hostname"` -}