diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index ebbefb80898..797684c554a 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -202,7 +202,7 @@ steps: key: "serverless-integration-tests" env: TEST_INTEG_AUTH_ESS_REGION: us-east-1 - command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite + command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite artifact_paths: - "build/TEST-**" - "build/diagnostics/*" diff --git a/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml b/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml deleted file mode 100644 index 1ce991c2c38..00000000000 --- a/changelog/fragments/1695389490-Support-flattened-data_stream.-fields.yaml +++ /dev/null @@ -1,35 +0,0 @@ -# Kind can be one of: -# - breaking-change: a change to previously-documented behavior -# - deprecation: functionality that is being removed in a later release -# - bug-fix: fixes a problem in a previous version -# - enhancement: extends functionality but does not break or fix existing behavior -# - feature: new functionality -# - known-issue: problems that we are aware of in a given version -# - security: impacts on the security of a product or a user’s deployment. -# - upgrade: important information for someone upgrading from a prior version -# - other: does not fit into any of the other categories -kind: feature - -# Change summary; a 80ish characters long description of the change. -summary: Support flattened data_stream.* fields in input configuration - -# Long description; in case the summary is not enough to describe the change -# this field accommodate a description without length limits. -# NOTE: This field will be rendered only for breaking-change and known-issue kinds at the moment. -description: >- - An input configuration supports flattened fields, however the - 'data_stream' field was not being correctly decoded when - flattened. This commit fixes this issue. - -# Affected component; usually one of "elastic-agent", "fleet-server", "filebeat", "metricbeat", "auditbeat", "all", etc. -component: elastic-agent - -# PR URL; optional; the PR number that added the changeset. -# If not present is automatically filled by the tooling finding the PR where this changelog fragment has been added. -# NOTE: the tooling supports backports, so it's able to fill the original PR number instead of the backport PR number. -# Please provide it if you are adding a fragment for a different PR. -pr: https://github.com/elastic/elastic-agent/pull/3465 - -# Issue URL; optional; the GitHub issue related to this changeset (either closes or is part of). -# If not present is automatically filled by the tooling with the issue linked to the PR number. -issue: https://github.com/elastic/elastic-agent/issues/3191 diff --git a/pkg/component/component_test.go b/pkg/component/component_test.go index 00c4d1c63cb..424f3a93147 100644 --- a/pkg/component/component_test.go +++ b/pkg/component/component_test.go @@ -2361,74 +2361,3 @@ func gatherDurationFieldPaths(s interface{}, pathSoFar string) []string { return gatheredPaths } - -func TestFlattenedDataStream(t *testing.T) { - expectedNamespace := "test-namespace" - expectedType := "test-type" - expectedDataset := "test-dataset" - - policy := map[string]any{ - "outputs": map[string]any{ - "default": map[string]any{ - "type": "elasticsearch", - "enabled": true, - }, - }, - "inputs": []any{ - map[string]any{ - "type": "filestream", - "id": "filestream-0", - "enabled": true, - "data_stream.type": expectedType, - "data_stream.dataset": expectedDataset, - "data_stream": map[string]any{ - "namespace": expectedNamespace, - }, - }, - }, - } - runtime, err := LoadRuntimeSpecs(filepath.Join("..", "..", "specs"), PlatformDetail{}, SkipBinaryCheck()) - if err != nil { - t.Fatalf("cannot load runtime specs: %s", err) - } - - result, err := runtime.ToComponents(policy, nil, logp.DebugLevel, nil) - if err != nil { - t.Fatalf("cannot convert policy to component: %s", err) - } - - if len(result) != 1 { - t.Fatalf("expecting result to have one element, got %d", len(result)) - } - - if len(result[0].Units) != 2 { - t.Fatalf("expecting result[0].Units to have two elements, got %d", len(result)) - } - - // We do not make assumptions about ordering. - // Get the input Unit - var dataStream *proto.DataStream - for _, unit := range result[0].Units { - if unit.Err != nil { - t.Fatalf("unit.Err: %s", unit.Err) - } - if unit.Type == client.UnitTypeInput { - dataStream = unit.Config.DataStream - break - } - } - - if dataStream == nil { - t.Fatal("DataStream cannot be nil") - } - - if dataStream.Dataset != expectedDataset { - t.Errorf("expecting DataStream.Dataset: %q, got: %q", expectedDataset, dataStream.Dataset) - } - if dataStream.Type != expectedType { - t.Errorf("expecting DataStream.Type: %q, got: %q", expectedType, dataStream.Type) - } - if dataStream.Namespace != expectedNamespace { - t.Errorf("expecting DataStream.Namespace: %q, got: %q", expectedNamespace, dataStream.Namespace) - } -} diff --git a/pkg/component/config.go b/pkg/component/config.go index 781e2e7624f..a0c75d00e32 100644 --- a/pkg/component/config.go +++ b/pkg/component/config.go @@ -15,7 +15,6 @@ import ( "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" - "github.com/elastic/elastic-agent-libs/config" "github.com/elastic/elastic-agent/pkg/limits" ) @@ -101,88 +100,9 @@ func ExpectedConfig(cfg map[string]interface{}) (*proto.UnitExpectedConfig, erro return nil, err } - if err := updateDataStreamsFromSource(result); err != nil { - return nil, fmt.Errorf("could not dedot 'data_stream': %w", err) - } - return result, nil } -func deDotDataStream(ds *proto.DataStream, source *structpb.Struct) (*proto.DataStream, error) { - if ds == nil { - ds = &proto.DataStream{} - } - - cfg, err := config.NewConfigFrom(source.AsMap()) - if err != nil { - return nil, fmt.Errorf("cannot generate config from source field: %w", err) - } - - // Create a temporary struct to unpack the configuration. - // Unpack correctly handles any flattened fields like - // data_stream.type. So all we need to do is to call Unpack, - // ensure the DataStream does not have a different value, - // them merge them both. - tmp := struct { - DataStream struct { - Dataset string `config:"dataset" yaml:"dataset"` - Type string `config:"type" yaml:"type"` - Namespace string `config:"namespace" yaml:"namespace"` - } `config:"data_stream" yaml:"data_stream"` - }{} - - if err := cfg.Unpack(&tmp); err != nil { - return nil, fmt.Errorf("cannot unpack source field into struct: %w", err) - } - - if (ds.Dataset != tmp.DataStream.Dataset) && (ds.Dataset != "" && tmp.DataStream.Dataset != "") { - return nil, errors.New("duplicated key 'datastream.dataset'") - } - - if (ds.Type != tmp.DataStream.Type) && (ds.Type != "" && tmp.DataStream.Type != "") { - return nil, errors.New("duplicated key 'datastream.type'") - } - - if (ds.Namespace != tmp.DataStream.Namespace) && (ds.Namespace != "" && tmp.DataStream.Namespace != "") { - return nil, errors.New("duplicated key 'datastream.namespace'") - } - - ret := &proto.DataStream{ - Dataset: valueOrDefault(tmp.DataStream.Dataset, ds.Dataset), - Type: valueOrDefault(tmp.DataStream.Type, ds.Type), - Namespace: valueOrDefault(tmp.DataStream.Namespace, ds.Namespace), - Source: ds.GetSource(), - } - - return ret, nil -} - -// valueOrDefault returns b if a is an empty string -func valueOrDefault(a, b string) string { - if a == "" { - return b - } - return a -} - -func updateDataStreamsFromSource(unitConfig *proto.UnitExpectedConfig) error { - var err error - unitConfig.DataStream, err = deDotDataStream(unitConfig.GetDataStream(), unitConfig.GetSource()) - if err != nil { - return fmt.Errorf("could not parse data_stream from input: %w", err) - } - - for i, stream := range unitConfig.Streams { - stream.DataStream, err = deDotDataStream(stream.GetDataStream(), stream.GetSource()) - if err != nil { - return fmt.Errorf("could not parse data_stream from stream [%d]: %w", - i, err) - } - } - - return nil -} - func setSource(val interface{}, cfg map[string]interface{}) error { // find the source field on the val resVal := reflect.ValueOf(val).Elem() diff --git a/pkg/component/config_test.go b/pkg/component/config_test.go index 7cdef177829..64dcfe3a697 100644 --- a/pkg/component/config_test.go +++ b/pkg/component/config_test.go @@ -8,10 +8,8 @@ import ( "errors" "testing" - "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "google.golang.org/protobuf/testing/protocmp" "google.golang.org/protobuf/types/known/structpb" "github.com/elastic/elastic-agent-client/v7/pkg/proto" @@ -199,12 +197,7 @@ func TestExpectedConfig(t *testing.T) { assert.Equal(t, err.Error(), scenario.Err.Error()) } else { require.NoError(t, err) - // protocmp.Transform ensures we do not compare any internal - // protobuf fields - if !cmp.Equal(scenario.Expected, observed, protocmp.Transform()) { - t.Errorf("mismatch (-want +got) \n%s", - cmp.Diff(scenario.Expected, observed, protocmp.Transform())) - } + assert.EqualValues(t, scenario.Expected, observed) } }) } diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 3774808efaa..1d852e50277 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -357,14 +357,7 @@ func (f *Fixture) RunBeat(ctx context.Context) error { // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // -// If no `states` are provided then the Elastic Agent runs until the context is cancelled. -// -// The Elastic-Agent is started agent in test mode (--testing-mode) this mode -// expects the initial configuration (full YAML config) via gRPC. -// This configuration should be passed in the State.Configure field. -// -// The `elastic-agent.yml` generated by `Fixture.Configure` is ignored -// when `Run` is called. +// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached. func (f *Fixture) Run(ctx context.Context, states ...State) error { if f.binaryName != "elastic-agent" { return errors.New("Run() can only be used with elastic-agent, use RunBeat()") diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index 304e917d7ee..d6bf69369cd 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -438,9 +438,9 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor } -// GetLogsForDataset returns any logs associated with the datastream -func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) { - return GetLogsForDatasetWithContext(context.Background(), client, index) +// GetLogsForDatastream returns any logs associated with the datastream +func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) { + return GetLogsForDatastreamWithContext(context.Background(), client, index) } // GetLogsForAgentID returns any logs associated with the agent ID @@ -478,8 +478,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, return handleDocsResponse(res) } -// GetLogsForDatasetWithContext returns any logs associated with the datastream -func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { +// GetLogsForDatastreamWithContext returns any logs associated with the datastream +func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ @@ -536,60 +536,6 @@ func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{ return handleDocsResponse(res) } -// GetLogsForDatastream returns any logs associated with the datastream -func GetLogsForDatastream( - ctx context.Context, - client elastictransport.Interface, - dsType, dataset, namespace string) (Documents, error) { - - query := map[string]any{ - "_source": []string{"message"}, - "query": map[string]any{ - "bool": map[string]any{ - "must": []any{ - map[string]any{ - "match": map[string]any{ - "data_stream.dataset": dataset, - }, - }, - map[string]any{ - "match": map[string]any{ - "data_stream.namespace": namespace, - }, - }, - map[string]any{ - "match": map[string]any{ - "data_stream.type": dsType, - }, - }, - }, - }, - }, - } - - var buf bytes.Buffer - if err := json.NewEncoder(&buf).Encode(query); err != nil { - return Documents{}, fmt.Errorf("error creating ES query: %w", err) - } - - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } - - return handleDocsResponse(res) -} - -// handleDocsResponse converts the esapi.Response into Documents, -// it closes the response.Body after reading func handleDocsResponse(res *esapi.Response) (Documents, error) { resultBuf, err := handleResponseRaw(res) if err != nil { diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go deleted file mode 100644 index ba9a84673b0..00000000000 --- a/testing/integration/logs_ingestion_test.go +++ /dev/null @@ -1,430 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package integration - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "math/rand" - "net/http" - "net/http/httputil" - "os" - "path/filepath" - "regexp" - "strings" - "testing" - "text/template" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/elastic-agent-libs/kibana" - "github.com/elastic/elastic-agent/pkg/control/v2/client" - atesting "github.com/elastic/elastic-agent/pkg/testing" - "github.com/elastic/elastic-agent/pkg/testing/define" - "github.com/elastic/elastic-agent/pkg/testing/tools" - "github.com/elastic/elastic-agent/pkg/testing/tools/check" - "github.com/elastic/elastic-agent/pkg/testing/tools/estools" - "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" - "github.com/elastic/elastic-transport-go/v8/elastictransport" -) - -func TestLogIngestionFleetManaged(t *testing.T) { - info := define.Require(t, define.Requirements{ - Stack: &define.Stack{}, - Local: false, - Sudo: true, - }) - ctx := context.Background() - - agentFixture, err := define.NewFixture(t, define.Version()) - require.NoError(t, err) - - // 1. Create a policy in Fleet with monitoring enabled. - // To ensure there are no conflicts with previous test runs against - // the same ESS stack, we add the current time at the end of the policy - // name. This policy does not contain any integration. - t.Log("Enrolling agent in Fleet with a test policy") - createPolicyReq := kibana.AgentPolicy{ - Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), - Namespace: info.Namespace, - Description: "test policy for agent enrollment", - MonitoringEnabled: []kibana.MonitoringEnabledOption{ - kibana.MonitoringEnabledLogs, - kibana.MonitoringEnabledMetrics, - }, - AgentFeatures: []map[string]interface{}{ - { - "name": "test_enroll", - "enabled": true, - }, - }, - } - - installOpts := atesting.InstallOpts{ - NonInteractive: true, - Force: true, - } - - // 2. Install the Elastic-Agent with the policy that - // was just created. - policy, err := tools.InstallAgentWithPolicy( - ctx, - t, - installOpts, - agentFixture, - info.KibanaClient, - createPolicyReq) - require.NoError(t, err) - t.Logf("created policy: %s", policy.ID) - check.ConnectedToFleet(t, agentFixture, 5*time.Minute) - - t.Run("Monitoring logs are shipped", func(t *testing.T) { - testMonitoringLogsAreShipped(t, ctx, info, agentFixture, policy) - }) - - t.Run("Normal logs with flattened data_stream are shipped", func(t *testing.T) { - testFlattenedDatastreamFleetPolicy(t, ctx, info, agentFixture, policy) - }) -} - -func testMonitoringLogsAreShipped( - t *testing.T, - ctx context.Context, - info *define.Info, - agentFixture *atesting.Fixture, - policy kibana.PolicyResponse, -) { - // Stage 1: Make sure metricbeat logs are populated - t.Log("Making sure metricbeat logs are populated") - docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") - }) - t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) - require.NotZero(t, len(docs.Hits.Hits)) - - // Stage 2: make sure all components are healthy - t.Log("Making sure all components are healthy") - status, err := agentFixture.ExecStatus(ctx) - require.NoError(t, err, - "could not get agent status to verify all components are healthy") - for _, c := range status.Components { - assert.Equalf(t, client.Healthy, client.State(c.State), - "component %s: want %s, got %s", - c.Name, client.Healthy, client.State(c.State)) - } - - // Stage 3: Make sure there are no errors in logs - t.Log("Making sure there are no error logs") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ - // acceptable error messages (include reason) - "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated - "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues - "Failed to download artifact", - "Failed to initialize artifact", - "Failed to apply initial policy from on disk configuration", - "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart - }) - }) - t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) - for _, doc := range docs.Hits.Hits { - t.Logf("%#v", doc.Source) - } - require.Empty(t, docs.Hits.Hits) - - // Stage 4: Make sure we have message confirming central management is running - t.Log("Making sure we have message confirming central management is running") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.FindMatchingLogLines(info.ESClient, info.Namespace, - "Parsed configuration and determined agent is managed by Fleet") - }) - require.NotZero(t, len(docs.Hits.Hits)) - - // Stage 5: verify logs from the monitoring components are not sent to the output - t.Log("Check monitoring logs") - hostname, err := os.Hostname() - if err != nil { - t.Fatalf("could not get hostname to filter Agent: %s", err) - } - - agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) - require.NoError(t, err, "could not get Agent ID by hostname") - t.Logf("Agent ID: %q", agentID) - - // We cannot search for `component.id` because at the moment of writing - // this field is not mapped. There is an issue for that: - // https://github.com/elastic/integrations/issues/6545 - // TODO: use runtime fields while the above issue is not resolved. - - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForAgentID(info.ESClient, agentID) - }) - require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", - agentID, err) - - monRegExp := regexp.MustCompile(".*-monitoring$") - for i, d := range docs.Hits.Hits { - // Lazy way to navigate a map[string]any: convert to JSON then - // decode into a struct. - jsonData, err := json.Marshal(d.Source) - if err != nil { - t.Fatalf("could not encode document source as JSON: %s", err) - } - - doc := ESDocument{} - if err := json.Unmarshal(jsonData, &doc); err != nil { - t.Fatalf("could not unmarshal document source: %s", err) - } - - if monRegExp.MatchString(doc.Component.ID) { - t.Errorf("[%d] Document on index %q with 'component.id': %q "+ - "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ - "end in '-monitoring'\n", - i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) - } - } -} - -func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { - var docs estools.Documents - require.Eventually( - t, - func() bool { - var err error - docs, err = findFn() - return err == nil - }, - 3*time.Minute, - 15*time.Second, - ) - - return docs -} - -func testFlattenedDatastreamFleetPolicy( - t *testing.T, - ctx context.Context, - info *define.Info, - agentFixture *atesting.Fixture, - policy kibana.PolicyResponse, -) { - dsType := "logs" - dsNamespace := cleanString(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64())) - dsDataset := cleanString(fmt.Sprintf("%s-dataset", t.Name())) - numEvents := 60 - - tempDir := t.TempDir() - logFilePath := filepath.Join(tempDir, "log.log") - generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents) - - agentFixture, err := define.NewFixture(t, define.Version()) - if err != nil { - t.Fatalf("could not create new fixture: %s", err) - } - - // 1. Prepare a request to add an integration to the policy - tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON) - if err != nil { - t.Fatalf("cannot parse template: %s", err) - } - - // The time here ensures there are no conflicts with the integration name - // in Fleet. - agentPolicyBuilder := strings.Builder{} - err = tmpl.Execute(&agentPolicyBuilder, policyVars{ - Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339), - PolicyID: policy.ID, - LogFilePath: logFilePath, - Namespace: dsNamespace, - Dataset: dsDataset, - }) - if err != nil { - t.Fatalf("could not render template: %s", err) - } - // We keep a copy of the policy for debugging prurposes - agentPolicy := agentPolicyBuilder.String() - - // 2. Call Kibana to create the policy. - // Docs: https://www.elastic.co/guide/en/fleet/current/fleet-api-docs.html#create-integration-policy-api - resp, err := info.KibanaClient.Connection.Send( - http.MethodPost, - "/api/fleet/package_policies", - nil, - nil, - bytes.NewBufferString(agentPolicy)) - if err != nil { - t.Fatalf("could not execute request to Kibana/Fleet: %s", err) - } - if resp.StatusCode != http.StatusOK { - // On error dump the whole request response so we can easily spot - // what went wrong. - t.Errorf("received a non 200-OK when adding package to policy. "+ - "Status code: %d", resp.StatusCode) - respDump, err := httputil.DumpResponse(resp, true) - if err != nil { - t.Fatalf("could not dump error response from Kibana: %s", err) - } - // Make debugging as easy as possible - t.Log("================================================================================") - t.Log("Kibana error response:") - t.Log(string(respDump)) - t.Log("================================================================================") - t.Log("Rendered policy:") - t.Log(agentPolicy) - t.Log("================================================================================") - t.FailNow() - } - - require.Eventually( - t, - ensureDocumentsInES(t, ctx, info.ESClient, dsType, dsDataset, dsNamespace, numEvents), - 120*time.Second, - time.Second, - "could not get all expected documents form ES") -} - -// ensureDocumentsInES asserts the documents were ingested into the correct -// datastream -func ensureDocumentsInES( - t *testing.T, - ctx context.Context, - esClient elastictransport.Interface, - dsType, dsDataset, dsNamespace string, - numEvents int, -) func() bool { - - f := func() bool { - t.Helper() - - docs, err := estools.GetLogsForDatastream(ctx, esClient, dsType, dsDataset, dsNamespace) - if err != nil { - t.Logf("error quering ES, will retry later: %s", err) - } - - if docs.Hits.Total.Value == numEvents { - return true - } - - return false - - } - - return f -} - -// generateLogFile generates a log file by appending new lines every tick -// the lines are composed by the test name and the current time in RFC3339Nano -// This function spans a new goroutine and does not block -func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events int) { - t.Helper() - f, err := os.Create(fullPath) - if err != nil { - t.Fatalf("could not create file '%s: %s", fullPath, err) - } - - go func() { - t.Helper() - ticker := time.NewTicker(tick) - t.Cleanup(ticker.Stop) - - done := make(chan struct{}) - t.Cleanup(func() { close(done) }) - - defer func() { - if err := f.Close(); err != nil { - t.Errorf("could not close log file '%s': %s", fullPath, err) - } - }() - - i := 0 - for { - select { - case <-done: - return - case now := <-ticker.C: - i++ - _, err := fmt.Fprintln(f, t.Name(), "Iteration: ", i, now.Format(time.RFC3339Nano)) - if err != nil { - // The Go compiler does not allow me to call t.Fatalf from a non-test - // goroutine, t.Errorf is our only option - t.Errorf("could not write data to log file '%s': %s", fullPath, err) - return - } - // make sure log lines are synced as quickly as possible - if err := f.Sync(); err != nil { - t.Errorf("could not sync file '%s': %s", fullPath, err) - } - if i == events { - return - } - } - } - }() -} - -func cleanString(s string) string { - return nonAlphanumericRegex.ReplaceAllString(strings.ToLower(s), "") -} - -var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9 ]+`) - -var policyJSON = ` -{ - "policy_id": "{{.PolicyID}}", - "package": { - "name": "log", - "version": "2.3.0" - }, - "name": "{{.Name}}", - "namespace": "{{.Namespace}}", - "inputs": { - "logs-logfile": { - "enabled": true, - "streams": { - "log.logs": { - "enabled": true, - "vars": { - "paths": [ - "{{.LogFilePath | js}}" {{/* we need to escape windows paths */}} - ], - "data_stream.dataset": "{{.Dataset}}" - } - } - } - } - } -}` - -type policyVars struct { - Name string - PolicyID string - LogFilePath string - Namespace string - Dataset string -} - -type ESDocument struct { - ElasticAgent ElasticAgent `json:"elastic_agent"` - Component Component `json:"component"` - Host Host `json:"host"` -} -type ElasticAgent struct { - ID string `json:"id"` - Version string `json:"version"` - Snapshot bool `json:"snapshot"` -} -type Component struct { - Binary string `json:"binary"` - ID string `json:"id"` -} -type Host struct { - Hostname string `json:"hostname"` -} diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go new file mode 100644 index 00000000000..5ebce0043de --- /dev/null +++ b/testing/integration/monitoring_logs_test.go @@ -0,0 +1,213 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "context" + "encoding/json" + "fmt" + "os" + "regexp" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/check" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" +) + +func TestMonitoringLogsShipped(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + Local: false, + Sudo: true, + }) + ctx := context.Background() + + t.Logf("got namespace: %s", info.Namespace) + + agentFixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + t.Log("Enrolling agent in Fleet with a test policy") + createPolicyReq := kibana.AgentPolicy{ + Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), + Namespace: info.Namespace, + Description: "test policy for agent enrollment", + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + AgentFeatures: []map[string]interface{}{ + { + "name": "test_enroll", + "enabled": true, + }, + }, + } + + // Stage 1: Install + // As part of the cleanup process, we'll uninstall the agent + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + } + policy, err := tools.InstallAgentWithPolicy(ctx, t, + installOpts, agentFixture, info.KibanaClient, createPolicyReq) + require.NoError(t, err) + t.Logf("created policy: %s", policy.ID) + + check.ConnectedToFleet(t, agentFixture, 5*time.Minute) + + // Stage 2: check indices + // This is mostly for debugging + resp, err := estools.GetAllindicies(info.ESClient) + require.NoError(t, err) + for _, run := range resp { + t.Logf("%s: %d/%d deleted: %d\n", + run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted) + } + + // Stage 3: Make sure metricbeat logs are populated + t.Log("Making sure metricbeat logs are populated") + docs := findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat") + }) + t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) + require.NotZero(t, len(docs.Hits.Hits)) + + // Stage 4: make sure all components are healthy + t.Log("Making sure all components are healthy") + status, err := agentFixture.ExecStatus(ctx) + require.NoError(t, err, + "could not get agent status to verify all components are healthy") + for _, c := range status.Components { + assert.Equalf(t, client.Healthy, client.State(c.State), + "component %s: want %s, got %s", + c.Name, client.Healthy, client.State(c.State)) + } + + // Stage 5: Make sure there are no errors in logs + t.Log("Making sure there are no error logs") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ + // acceptable error messages (include reason) + "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated + "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues + "Failed to download artifact", + "Failed to initialize artifact", + "Failed to apply initial policy from on disk configuration", + "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart + }) + }) + t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) + for _, doc := range docs.Hits.Hits { + t.Logf("%#v", doc.Source) + } + require.Empty(t, docs.Hits.Hits) + + // Stage 6: Make sure we have message confirming central management is running + t.Log("Making sure we have message confirming central management is running") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.FindMatchingLogLines(info.ESClient, info.Namespace, + "Parsed configuration and determined agent is managed by Fleet") + }) + require.NotZero(t, len(docs.Hits.Hits)) + + // Stage 7: verify logs from the monitoring components are not sent to the output + t.Log("Check monitoring logs") + hostname, err := os.Hostname() + if err != nil { + t.Fatalf("could not get hostname to filter Agent: %s", err) + } + + agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) + require.NoError(t, err, "could not get Agent ID by hostname") + t.Logf("Agent ID: %q", agentID) + + // We cannot search for `component.id` because at the moment of writing + // this field is not mapped. There is an issue for that: + // https://github.com/elastic/integrations/issues/6545 + + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForAgentID(info.ESClient, agentID) + }) + require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", + agentID, err) + + monRegExp := regexp.MustCompile(".*-monitoring$") + for i, d := range docs.Hits.Hits { + // Lazy way to navigate a map[string]any: convert to JSON then + // decode into a struct. + jsonData, err := json.Marshal(d.Source) + if err != nil { + t.Fatalf("could not encode document source as JSON: %s", err) + } + + doc := ESDocument{} + if err := json.Unmarshal(jsonData, &doc); err != nil { + t.Fatalf("could not unmarshal document source: %s", err) + } + + if monRegExp.MatchString(doc.Component.ID) { + t.Errorf("[%d] Document on index %q with 'component.id': %q "+ + "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ + "end in '-monitoring'\n", + i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) + } + } +} + +func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { + var docs estools.Documents + + require.Eventually( + t, + func() bool { + var err error + docs, err = findFn() + return err == nil + }, + 3*time.Minute, + 15*time.Second, + ) + + // TODO: remove after debugging + t.Log("--- debugging: results from ES --- START ---") + for _, doc := range docs.Hits.Hits { + t.Logf("%#v", doc.Source) + } + t.Log("--- debugging: results from ES --- END ---") + + return docs +} + +type ESDocument struct { + ElasticAgent ElasticAgent `json:"elastic_agent"` + Component Component `json:"component"` + Host Host `json:"host"` +} +type ElasticAgent struct { + ID string `json:"id"` + Version string `json:"version"` + Snapshot bool `json:"snapshot"` +} +type Component struct { + Binary string `json:"binary"` + ID string `json:"id"` +} +type Host struct { + Hostname string `json:"hostname"` +}