From a10d7134e8c9b192f93f721315f6c2bd1fbe0e06 Mon Sep 17 00:00:00 2001 From: "mergify[bot]" <37929162+mergify[bot]@users.noreply.github.com> Date: Thu, 30 Nov 2023 20:10:39 +0100 Subject: [PATCH] [8.11](backport #3765) Fix index pattern when querying ES and condition when searching logs (#3829) Two issues were causing flakiness on integration tests and are fixed by this PR: 1. The pattern used to query ES was not working on serverless, this commit updates it to a pattern that works on both stateful and serverless as well as make it more specific to the indexes/data streams we want to query 2. `findESDocs` did not wait for the data to be indexed, only to a successful query on ES. In some cases the documents the test wanted were not indexed yet, leading to 0 documents being returned and the test failing with no error in the logs/diagnostics. This is fixed by waiting for a document count > 0 and no error. (cherry picked from commit a03aa9cf199c9e34b9d4afee8b4441dab012f1d6) # Conflicts: # pkg/testing/tools/estools/elasticsearch.go # testing/integration/logs_ingestion_test.go The `add_cloud_metadata` and some other `add_*_metadata` processors are expected to log some errors if they cannot fetch the necessary information. It is normal to find their error logs in pretty much any deployment, some examples: - When Docker is not installed/running `add_docker_metadata` will log some errors - When the Elastic-Agent is deployed in a non-cloud VM `add_cloud_metadata` will log some errors. This commit removes all those processors from the queries for log errors, as they're expected. Add a 2h timeout for `go test` when run on remote hosts. --------- Co-authored-by: Tiago Queiroz --- magefile.go | 2 +- pkg/testing/tools/estools/elasticsearch.go | 437 ++++++++++++++++--- testing/integration/logs_ingestion_test.go | 457 ++++++++++++++++++++ testing/integration/monitoring_logs_test.go | 217 ---------- 4 files changed, 823 insertions(+), 290 deletions(-) create mode 100644 testing/integration/logs_ingestion_test.go delete mode 100644 testing/integration/monitoring_logs_test.go diff --git a/magefile.go b/magefile.go index 6954b7135fe..f48bfef4034 100644 --- a/magefile.go +++ b/magefile.go @@ -1601,7 +1601,7 @@ func (Integration) TestOnRemote(ctx context.Context) error { extraFlags = append(extraFlags, goTestFlags...) } extraFlags = append(extraFlags, "-test.shuffle", "on", - "-test.timeout", "0", "-test.run", "^("+strings.Join(packageTests, "|")+")$") + "-test.timeout", "2h", "-test.run", "^("+strings.Join(packageTests, "|")+")$") params := mage.GoTestArgs{ LogName: testName, OutputFile: fileName + ".out", diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index ca6dad2dba4..2bd419bba47 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-transport-go/v8/elastictransport" "github.com/elastic/go-elasticsearch/v8/esapi" ) @@ -72,6 +73,75 @@ type ESDoc struct { Source map[string]interface{} `json:"_source"` } +// TemplateResponse is the body of a template data request +type TemplateResponse struct { + IndexTemplates []Template `json:"index_templates"` +} + +// Template is an individual template +type Template struct { + Name string `json:"name"` + IndexTemplate map[string]interface{} `json:"index_template"` +} + +// Pipeline is an individual pipeline +type Pipeline struct { + Description string `json:"description"` + Processors []map[string]interface{} `json:"processors"` +} + +// Ping returns basic ES info +type Ping struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version Version `json:"version"` +} + +// Version contains version and build info from an ES ping +type Version struct { + Number string `json:"number"` + BuildFlavor string `json:"build_flavor"` +} + +// APIKeyRequest contains the needed data to create an API key in Elasticsearch +type APIKeyRequest struct { + Name string `json:"name"` + Expiration string `json:"expiration"` + RoleDescriptors mapstr.M `json:"role_descriptors,omitempty"` + Metadata mapstr.M `json:"metadata,omitempty"` +} + +// APIKeyResponse contains the response data for an API request +type APIKeyResponse struct { + Id string `json:"id"` + Name string `json:"name"` + Expiration int `json:"expiration"` + APIKey string `json:"api_key"` + Encoded string `json:"encoded"` +} + +// DataStreams represents the response from an ES _data_stream API +type DataStreams struct { + DataStreams []DataStream `json:"data_streams"` +} + +// DataStream represents a data stream template +type DataStream struct { + Name string `json:"name"` + Indicies []map[string]string `json:"indicies"` + Status string `json:"status"` + Template string `json:"template"` + Lifecycle Lifecycle `json:"lifecycle"` + Hidden bool `json:"hidden"` + System bool `json:"system"` +} + +type Lifecycle struct { + Enabled bool `json:"enabled"` + DataRetention string `json:"data_retention"` +} + // GetAllindicies returns a list of indicies on the target ES instance func GetAllindicies(client elastictransport.Interface) ([]Index, error) { return GetIndicesWithContext(context.Background(), client, []string{}) @@ -103,11 +173,166 @@ func GetIndicesWithContext(ctx context.Context, client elastictransport.Interfac return respData, nil } +// CreateAPIKey creates an API key with the given request data +func CreateAPIKey(ctx context.Context, client elastictransport.Interface, req APIKeyRequest) (APIKeyResponse, error) { + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(req) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating ES query: %w", err) + } + + apiReq := esapi.SecurityCreateAPIKeyRequest{Body: &buf} + resp, err := apiReq.Do(ctx, client) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating API key: %w", err) + } + defer resp.Body.Close() + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := APIKeyResponse{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return parsed, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + // FindMatchingLogLines returns any logs with message fields that match the given line func FindMatchingLogLines(client elastictransport.Interface, namespace, line string) (Documents, error) { return FindMatchingLogLinesWithContext(context.Background(), client, namespace, line) } +// GetLatestDocumentMatchingQuery returns the last document that matches the given query. +// the query field is inserted into a simple `query` POST request +func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport.Interface, query map[string]interface{}, indexPattern string) (Documents, error) { + queryRaw := map[string]interface{}{ + "query": query, + "sort": map[string]interface{}{ + "@timestamp": "desc", + }, + "size": 1, + } + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(queryRaw) + if err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + return performQueryForRawQuery(ctx, queryRaw, indexPattern, client) +} + +// GetIndexTemplatesForPattern lists all index templates on the system +func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.Interface, name string) (TemplateResponse, error) { + req := esapi.IndicesGetIndexTemplateRequest{Name: name} + resp, err := req.Do(ctx, client) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error fetching index templates: %w", err) + } + defer resp.Body.Close() + + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + parsed := TemplateResponse{} + + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + +func GetDataStreamsForPattern(ctx context.Context, client elastictransport.Interface, namePattern string) (DataStreams, error) { + req := esapi.IndicesGetDataStreamRequest{Name: []string{namePattern}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return DataStreams{}, fmt.Errorf("error fetching data streams") + } + defer resp.Body.Close() + + raw, err := handleResponseRaw(resp) + if err != nil { + return DataStreams{}, fmt.Errorf("error handling HTTP response for data stream get: %w", err) + } + + data := DataStreams{} + err = json.Unmarshal(raw, &data) + if err != nil { + return DataStreams{}, fmt.Errorf("error unmarshalling datastream: %w", err) + } + + return data, nil +} + +// DeleteIndexTemplatesDataStreams deletes any data streams, then associcated index templates. +func DeleteIndexTemplatesDataStreams(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IndicesDeleteDataStreamRequest{Name: []string{name}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting data streams: %w", err) + } + defer resp.Body.Close() + + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for data stream delete: %w", err) + } + + patternReq := esapi.IndicesDeleteIndexTemplateRequest{Name: name} + resp, err = patternReq.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index templates: %w", err) + } + defer resp.Body.Close() + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for index template delete: %w", err) + } + return nil +} + +// GetPipelines returns a list of installed pipelines that match the given name/pattern +func GetPipelines(ctx context.Context, client elastictransport.Interface, name string) (map[string]Pipeline, error) { + req := esapi.IngestGetPipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return nil, fmt.Errorf("error fetching index templates: %w", err) + } + defer resp.Body.Close() + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return nil, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := map[string]Pipeline{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return nil, fmt.Errorf("error unmarshaling json response: %w", err) + } + return parsed, nil +} + +// DeletePipelines deletes all pipelines that match the given pattern +func DeletePipelines(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IngestDeletePipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index template") + } + defer resp.Body.Close() + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response: %w", err) + } + return nil +} + // FindMatchingLogLinesWithContext returns any logs with message fields that match the given line func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) { queryRaw := map[string]interface{}{ @@ -137,20 +362,8 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } + return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client) - return handleDocsResponse(res) } // CheckForErrorsInLogs checks to see if any error-level lines exist @@ -162,7 +375,43 @@ func CheckForErrorsInLogs(client elastictransport.Interface, namespace string, e // CheckForErrorsInLogsWithContext checks to see if any error-level lines exist // excludeStrings can be used to remove any particular error strings from logs func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictransport.Interface, namespace string, excludeStrings []string) (Documents, error) { + excludeStatements := []map[string]interface{}{} + + // Some if not all of those processors are likely to always log some error + // just because they cannot fetch the information, like if docker is not running + // or the Beat is not running on Kubernetes, so we already exclude all errors from them. + processors := []string{ + "add_host_metadata", + "add_cloud_metadata", + "add_docker_metadata", + "add_kubernetes_metadata", + } + for _, p := range processors { + excludeStatements = append(excludeStatements, map[string]interface{}{ + "match": map[string]interface{}{ + "log.logger": p, + }, + }) + } + + if len(excludeStrings) > 0 { + for _, ex := range excludeStrings { + excludeStatements = append(excludeStatements, map[string]interface{}{ + "match_phrase": map[string]interface{}{ + "message": ex, + }, + }) + } + } queryRaw := map[string]interface{}{ + // We need runtime mappings until we have all log.* fields mapped. + // https://github.com/elastic/integrations/issues/6545 + "runtime_mappings": map[string]interface{}{ + "log.logger": map[string]interface{}{ + "type": "keyword", + }, + }, + "query": map[string]interface{}{ "bool": map[string]interface{}{ "must": []map[string]interface{}{ @@ -179,67 +428,23 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor }, }, }, + "must_not": excludeStatements, }, }, } - if len(excludeStrings) > 0 { - excludeStatements := []map[string]interface{}{} - for _, ex := range excludeStrings { - excludeStatements = append(excludeStatements, map[string]interface{}{ - "match_phrase": map[string]interface{}{ - "message": ex, - }, - }) - } - queryRaw = map[string]interface{}{ - "query": map[string]interface{}{ - "bool": map[string]interface{}{ - "must": []map[string]interface{}{ - { - "match": map[string]interface{}{ - "log.level": "error", - }, - }, - { - "term": map[string]interface{}{ - "data_stream.namespace": map[string]interface{}{ - "value": namespace, - }, - }, - }, - }, - "must_not": excludeStatements, - }, - }, - } - } - var buf bytes.Buffer err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } - - return handleDocsResponse(res) + return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client) } -// GetLogsForDatastream returns any logs associated with the datastream -func GetLogsForDatastream(client elastictransport.Interface, index string) (Documents, error) { - return GetLogsForDatastreamWithContext(context.Background(), client, index) +// GetLogsForDataset returns any logs associated with the datastream +func GetLogsForDataset(client elastictransport.Interface, index string) (Documents, error) { + return GetLogsForDatasetWithContext(context.Background(), client, index) } // GetLogsForAgentID returns any logs associated with the agent ID @@ -260,7 +465,7 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, es := esapi.New(client) res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), + es.Search.WithIndex("logs-elastic_agent*"), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), @@ -277,8 +482,8 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, return handleDocsResponse(res) } -// GetLogsForDatastreamWithContext returns any logs associated with the datastream -func GetLogsForDatastreamWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { +// GetLogsForDatasetWithContext returns any logs associated with the datastream +func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.Interface, index string) (Documents, error) { indexQuery := map[string]interface{}{ "query": map[string]interface{}{ "match": map[string]interface{}{ @@ -287,15 +492,41 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor }, } + return performQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client) +} + +// GetPing performs a basic ping and returns ES config info +func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, error) { + req := esapi.InfoRequest{} + resp, err := req.Do(ctx, client) + if err != nil { + return Ping{}, fmt.Errorf("error in ping request") + } + defer resp.Body.Close() + + respData, err := handleResponseRaw(resp) + if err != nil { + return Ping{}, fmt.Errorf("error in HTTP response: %w", err) + } + pingData := Ping{} + err = json.Unmarshal(respData, &pingData) + if err != nil { + return pingData, fmt.Errorf("error unmarshalling JSON: %w", err) + } + return pingData, nil + +} + +func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) { var buf bytes.Buffer - err := json.NewEncoder(&buf).Encode(indexQuery) + err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), + es.Search.WithIndex(index), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), @@ -309,14 +540,64 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor return handleDocsResponse(res) } -func handleDocsResponse(res *esapi.Response) (Documents, error) { - if res.StatusCode >= 300 || res.StatusCode < 200 { - return Documents{}, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) +// GetLogsForDatastream returns any logs associated with the datastream +func GetLogsForDatastream( + ctx context.Context, + client elastictransport.Interface, + dsType, dataset, namespace string) (Documents, error) { + + query := map[string]any{ + "_source": []string{"message"}, + "query": map[string]any{ + "bool": map[string]any{ + "must": []any{ + map[string]any{ + "match": map[string]any{ + "data_stream.dataset": dataset, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.namespace": namespace, + }, + }, + map[string]any{ + "match": map[string]any{ + "data_stream.type": dsType, + }, + }, + }, + }, + }, } - resultBuf, err := io.ReadAll(res.Body) + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(query); err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + es := esapi.New(client) + res, err := es.Search( + es.Search.WithIndex(fmt.Sprintf(".ds-%s*", dsType)), + es.Search.WithExpandWildcards("all"), + es.Search.WithBody(&buf), + es.Search.WithTrackTotalHits(true), + es.Search.WithPretty(), + es.Search.WithContext(ctx), + ) if err != nil { - return Documents{}, fmt.Errorf("error reading response body: %w", err) + return Documents{}, fmt.Errorf("error performing ES search: %w", err) + } + + return handleDocsResponse(res) +} + +// handleDocsResponse converts the esapi.Response into Documents, +// it closes the response.Body after reading +func handleDocsResponse(res *esapi.Response) (Documents, error) { + resultBuf, err := handleResponseRaw(res) + if err != nil { + return Documents{}, fmt.Errorf("error in HTTP query: %w", err) } respData := Documents{} @@ -327,3 +608,15 @@ func handleDocsResponse(res *esapi.Response) (Documents, error) { return respData, err } + +func handleResponseRaw(res *esapi.Response) ([]byte, error) { + if res.StatusCode >= 300 || res.StatusCode < 200 { + return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) + } + + resultBuf, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w", err) + } + return resultBuf, nil +} diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go new file mode 100644 index 00000000000..b0dcbc9b4c3 --- /dev/null +++ b/testing/integration/logs_ingestion_test.go @@ -0,0 +1,457 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build integration + +package integration + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "math/rand" + "net/http" + "net/http/httputil" + "os" + "path/filepath" + "regexp" + "strings" + "testing" + "text/template" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/elastic/elastic-agent-libs/kibana" + "github.com/elastic/elastic-agent/pkg/control/v2/client" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" + "github.com/elastic/elastic-agent/pkg/testing/tools/check" + "github.com/elastic/elastic-agent/pkg/testing/tools/estools" + "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" + "github.com/elastic/elastic-transport-go/v8/elastictransport" +) + +func TestLogIngestionFleetManaged(t *testing.T) { + info := define.Require(t, define.Requirements{ + Stack: &define.Stack{}, + Local: false, + Sudo: true, + }) + ctx := context.Background() + + agentFixture, err := define.NewFixture(t, define.Version()) + require.NoError(t, err) + + // 1. Create a policy in Fleet with monitoring enabled. + // To ensure there are no conflicts with previous test runs against + // the same ESS stack, we add the current time at the end of the policy + // name. This policy does not contain any integration. + t.Log("Enrolling agent in Fleet with a test policy") + createPolicyReq := kibana.AgentPolicy{ + Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), + Namespace: info.Namespace, + Description: "test policy for agent enrollment", + MonitoringEnabled: []kibana.MonitoringEnabledOption{ + kibana.MonitoringEnabledLogs, + kibana.MonitoringEnabledMetrics, + }, + AgentFeatures: []map[string]interface{}{ + { + "name": "test_enroll", + "enabled": true, + }, + }, + } + + installOpts := atesting.InstallOpts{ + NonInteractive: true, + Force: true, + } + + // 2. Install the Elastic-Agent with the policy that + // was just created. + policy, err := tools.InstallAgentWithPolicy( + ctx, + t, + installOpts, + agentFixture, + info.KibanaClient, + createPolicyReq) + require.NoError(t, err) + t.Logf("created policy: %s", policy.ID) + check.ConnectedToFleet(t, agentFixture, 5*time.Minute) + + t.Run("Monitoring logs are shipped", func(t *testing.T) { + testMonitoringLogsAreShipped(t, ctx, info, agentFixture, policy) + }) + + t.Run("Normal logs with flattened data_stream are shipped", func(t *testing.T) { + testFlattenedDatastreamFleetPolicy(t, ctx, info, agentFixture, policy) + }) +} + +func testMonitoringLogsAreShipped( + t *testing.T, + ctx context.Context, + info *define.Info, + agentFixture *atesting.Fixture, + policy kibana.PolicyResponse, +) { + // Stage 1: Make sure metricbeat logs are populated + t.Log("Making sure metricbeat logs are populated") + docs := findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") + }) + t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) + require.NotZero(t, len(docs.Hits.Hits), + "Looking for logs in dataset 'elastic_agent.metricbeat'") + + // Stage 2: make sure all components are healthy + t.Log("Making sure all components are healthy") + status, err := agentFixture.ExecStatus(ctx) + require.NoError(t, err, + "could not get agent status to verify all components are healthy") + for _, c := range status.Components { + assert.Equalf(t, client.Healthy, client.State(c.State), + "component %s: want %s, got %s", + c.Name, client.Healthy, client.State(c.State)) + } + + // Stage 3: Make sure there are no errors in logs + t.Log("Making sure there are no error logs") + docs = queryESDocs(t, func() (estools.Documents, error) { + return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ + // acceptable error messages (include reason) + "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated + "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues + "Failed to download artifact", + "Failed to initialize artifact", + "Failed to apply initial policy from on disk configuration", + "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart + }) + }) + t.Logf("error logs: Got %d documents", len(docs.Hits.Hits)) + for _, doc := range docs.Hits.Hits { + t.Logf("%#v", doc.Source) + } + require.Empty(t, docs.Hits.Hits) + + // Stage 4: Make sure we have message confirming central management is running + t.Log("Making sure we have message confirming central management is running") + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.FindMatchingLogLines(info.ESClient, info.Namespace, + "Parsed configuration and determined agent is managed by Fleet") + }) + require.NotZero(t, len(docs.Hits.Hits)) + + // Stage 5: verify logs from the monitoring components are not sent to the output + t.Log("Check monitoring logs") + hostname, err := os.Hostname() + if err != nil { + t.Fatalf("could not get hostname to filter Agent: %s", err) + } + + agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) + require.NoError(t, err, "could not get Agent ID by hostname") + t.Logf("Agent ID: %q", agentID) + + // We cannot search for `component.id` because at the moment of writing + // this field is not mapped. There is an issue for that: + // https://github.com/elastic/integrations/issues/6545 + // TODO: use runtime fields while the above issue is not resolved. + docs = findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForAgentID(info.ESClient, agentID) + }) + require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", + agentID, err) + + monRegExp := regexp.MustCompile(".*-monitoring$") + for i, d := range docs.Hits.Hits { + // Lazy way to navigate a map[string]any: convert to JSON then + // decode into a struct. + jsonData, err := json.Marshal(d.Source) + if err != nil { + t.Fatalf("could not encode document source as JSON: %s", err) + } + + doc := ESDocument{} + if err := json.Unmarshal(jsonData, &doc); err != nil { + t.Fatalf("could not unmarshal document source: %s", err) + } + + if monRegExp.MatchString(doc.Component.ID) { + t.Errorf("[%d] Document on index %q with 'component.id': %q "+ + "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ + "end in '-monitoring'\n", + i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) + } + } +} + +// queryESDocs runs `findFn` until it returns no error. Zero documents returned +// is considered a success. +func queryESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { + var docs estools.Documents + require.Eventually( + t, + func() bool { + var err error + docs, err = findFn() + if err != nil { + t.Logf("got an error querying ES, retrying. Error: %s", err) + } + return err == nil + }, + 3*time.Minute, + 15*time.Second, + ) + + return docs +} + +// findESDocs runs `findFn` until at least one document is returned and there is no error +func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { + var docs estools.Documents + require.Eventually( + t, + func() bool { + var err error + docs, err = findFn() + if err != nil { + t.Logf("got an error querying ES, retrying. Error: %s", err) + return false + } + + return docs.Hits.Total.Value != 0 + }, + 3*time.Minute, + 15*time.Second, + ) + + return docs +} + +func testFlattenedDatastreamFleetPolicy( + t *testing.T, + ctx context.Context, + info *define.Info, + agentFixture *atesting.Fixture, + policy kibana.PolicyResponse, +) { + dsType := "logs" + dsNamespace := cleanString(fmt.Sprintf("%snamespace%d", t.Name(), rand.Uint64())) + dsDataset := cleanString(fmt.Sprintf("%s-dataset", t.Name())) + numEvents := 60 + + tempDir := t.TempDir() + logFilePath := filepath.Join(tempDir, "log.log") + generateLogFile(t, logFilePath, 2*time.Millisecond, numEvents) + + agentFixture, err := define.NewFixture(t, define.Version()) + if err != nil { + t.Fatalf("could not create new fixture: %s", err) + } + + // 1. Prepare a request to add an integration to the policy + tmpl, err := template.New(t.Name() + "custom-log-policy").Parse(policyJSON) + if err != nil { + t.Fatalf("cannot parse template: %s", err) + } + + // The time here ensures there are no conflicts with the integration name + // in Fleet. + agentPolicyBuilder := strings.Builder{} + err = tmpl.Execute(&agentPolicyBuilder, policyVars{ + Name: "Log-Input-" + t.Name() + "-" + time.Now().Format(time.RFC3339), + PolicyID: policy.ID, + LogFilePath: logFilePath, + Namespace: dsNamespace, + Dataset: dsDataset, + }) + if err != nil { + t.Fatalf("could not render template: %s", err) + } + // We keep a copy of the policy for debugging prurposes + agentPolicy := agentPolicyBuilder.String() + + // 2. Call Kibana to create the policy. + // Docs: https://www.elastic.co/guide/en/fleet/current/fleet-api-docs.html#create-integration-policy-api + resp, err := info.KibanaClient.Connection.Send( + http.MethodPost, + "/api/fleet/package_policies", + nil, + nil, + bytes.NewBufferString(agentPolicy)) + if err != nil { + t.Fatalf("could not execute request to Kibana/Fleet: %s", err) + } + if resp.StatusCode != http.StatusOK { + // On error dump the whole request response so we can easily spot + // what went wrong. + t.Errorf("received a non 200-OK when adding package to policy. "+ + "Status code: %d", resp.StatusCode) + respDump, err := httputil.DumpResponse(resp, true) + if err != nil { + t.Fatalf("could not dump error response from Kibana: %s", err) + } + // Make debugging as easy as possible + t.Log("================================================================================") + t.Log("Kibana error response:") + t.Log(string(respDump)) + t.Log("================================================================================") + t.Log("Rendered policy:") + t.Log(agentPolicy) + t.Log("================================================================================") + t.FailNow() + } + + require.Eventually( + t, + ensureDocumentsInES(t, ctx, info.ESClient, dsType, dsDataset, dsNamespace, numEvents), + 120*time.Second, + time.Second, + "could not get all expected documents form ES") +} + +// ensureDocumentsInES asserts the documents were ingested into the correct +// datastream +func ensureDocumentsInES( + t *testing.T, + ctx context.Context, + esClient elastictransport.Interface, + dsType, dsDataset, dsNamespace string, + numEvents int, +) func() bool { + + f := func() bool { + t.Helper() + + docs, err := estools.GetLogsForDatastream(ctx, esClient, dsType, dsDataset, dsNamespace) + if err != nil { + t.Logf("error quering ES, will retry later: %s", err) + } + + if docs.Hits.Total.Value == numEvents { + return true + } + + return false + + } + + return f +} + +// generateLogFile generates a log file by appending new lines every tick +// the lines are composed by the test name and the current time in RFC3339Nano +// This function spans a new goroutine and does not block +func generateLogFile(t *testing.T, fullPath string, tick time.Duration, events int) { + t.Helper() + f, err := os.Create(fullPath) + if err != nil { + t.Fatalf("could not create file '%s: %s", fullPath, err) + } + + go func() { + t.Helper() + ticker := time.NewTicker(tick) + t.Cleanup(ticker.Stop) + + done := make(chan struct{}) + t.Cleanup(func() { close(done) }) + + defer func() { + if err := f.Close(); err != nil { + t.Errorf("could not close log file '%s': %s", fullPath, err) + } + }() + + i := 0 + for { + select { + case <-done: + return + case now := <-ticker.C: + i++ + _, err := fmt.Fprintln(f, t.Name(), "Iteration: ", i, now.Format(time.RFC3339Nano)) + if err != nil { + // The Go compiler does not allow me to call t.Fatalf from a non-test + // goroutine, t.Errorf is our only option + t.Errorf("could not write data to log file '%s': %s", fullPath, err) + return + } + // make sure log lines are synced as quickly as possible + if err := f.Sync(); err != nil { + t.Errorf("could not sync file '%s': %s", fullPath, err) + } + if i == events { + return + } + } + } + }() +} + +func cleanString(s string) string { + return nonAlphanumericRegex.ReplaceAllString(strings.ToLower(s), "") +} + +var nonAlphanumericRegex = regexp.MustCompile(`[^a-zA-Z0-9 ]+`) + +var policyJSON = ` +{ + "policy_id": "{{.PolicyID}}", + "package": { + "name": "log", + "version": "2.3.0" + }, + "name": "{{.Name}}", + "namespace": "{{.Namespace}}", + "inputs": { + "logs-logfile": { + "enabled": true, + "streams": { + "log.logs": { + "enabled": true, + "vars": { + "paths": [ + "{{.LogFilePath | js}}" {{/* we need to escape windows paths */}} + ], + "data_stream.dataset": "{{.Dataset}}" + } + } + } + } + } +}` + +type policyVars struct { + Name string + PolicyID string + LogFilePath string + Namespace string + Dataset string +} + +type ESDocument struct { + ElasticAgent ElasticAgent `json:"elastic_agent"` + Component Component `json:"component"` + Host Host `json:"host"` +} +type ElasticAgent struct { + ID string `json:"id"` + Version string `json:"version"` + Snapshot bool `json:"snapshot"` +} +type Component struct { + Binary string `json:"binary"` + ID string `json:"id"` +} +type Host struct { + Hostname string `json:"hostname"` +} diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go deleted file mode 100644 index e94b9a1f0ba..00000000000 --- a/testing/integration/monitoring_logs_test.go +++ /dev/null @@ -1,217 +0,0 @@ -// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one -// or more contributor license agreements. Licensed under the Elastic License; -// you may not use this file except in compliance with the Elastic License. - -//go:build integration - -package integration - -import ( - "context" - "encoding/json" - "fmt" - "os" - "regexp" - "testing" - "time" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/elastic/elastic-agent-libs/kibana" - "github.com/elastic/elastic-agent/pkg/control/v2/client" - atesting "github.com/elastic/elastic-agent/pkg/testing" - "github.com/elastic/elastic-agent/pkg/testing/define" - "github.com/elastic/elastic-agent/pkg/testing/tools" - "github.com/elastic/elastic-agent/pkg/testing/tools/check" - "github.com/elastic/elastic-agent/pkg/testing/tools/estools" - "github.com/elastic/elastic-agent/pkg/testing/tools/fleettools" -) - -func TestMonitoringLogsShipped(t *testing.T) { - info := define.Require(t, define.Requirements{ - OS: []define.OS{{Type: define.Linux}}, - Stack: &define.Stack{}, - Local: false, - Sudo: true, - }) - ctx := context.Background() - - t.Logf("got namespace: %s", info.Namespace) - - agentFixture, err := define.NewFixture(t, define.Version()) - require.NoError(t, err) - - t.Log("Enrolling agent in Fleet with a test policy") - createPolicyReq := kibana.AgentPolicy{ - Name: fmt.Sprintf("test-policy-enroll-%d", time.Now().Unix()), - Namespace: info.Namespace, - Description: "test policy for agent enrollment", - MonitoringEnabled: []kibana.MonitoringEnabledOption{ - kibana.MonitoringEnabledLogs, - kibana.MonitoringEnabledMetrics, - }, - AgentFeatures: []map[string]interface{}{ - { - "name": "test_enroll", - "enabled": true, - }, - }, - } - - // Stage 1: Install - // As part of the cleanup process, we'll uninstall the agent - installOpts := atesting.InstallOpts{ - NonInteractive: true, - Force: true, - } - policy, err := tools.InstallAgentWithPolicy(ctx, t, - installOpts, agentFixture, info.KibanaClient, createPolicyReq) - require.NoError(t, err) - t.Logf("created policy: %s", policy.ID) - - check.ConnectedToFleet(t, agentFixture, 5*time.Minute) - - // Stage 2: check indices - // This is mostly for debugging - resp, err := estools.GetAllindicies(info.ESClient) - require.NoError(t, err) - for _, run := range resp { - t.Logf("%s: %d/%d deleted: %d\n", - run.Index, run.DocsCount, run.StoreSizeBytes, run.DocsDeleted) - } - - // Stage 3: Make sure metricbeat logs are populated - t.Log("Making sure metricbeat logs are populated") - docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat") - }) - require.NotZero(t, len(docs.Hits.Hits)) - t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) - - // Stage 4: make sure all components are healthy - t.Log("Making sure all components are healthy") - status, err := agentFixture.ExecStatus(ctx) - require.NoError(t, err, - "could not get agent status to verify all components are healthy") - for _, c := range status.Components { - assert.Equalf(t, client.Healthy, client.State(c.State), - "component %s: want %s, got %s", - c.Name, client.Healthy, client.State(c.State)) - } - - // Stage 5: Make sure there are no errors in logs - t.Log("Making sure there are no error logs") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ - // acceptable error messages (include reason) - "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated - "Global configuration artifact is not available", // Endpoint: failed to load user artifact due to connectivity issues - "Failed to download artifact", - "Failed to initialize artifact", - "Failed to apply initial policy from on disk configuration", - "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart - "add_cloud_metadata: received error failed requesting openstack metadata: Get \\\"https://169.254.169.254/2009-04-04/meta-data/instance-id\\\": dial tcp 169.254.169.254:443: connect: connection refused", // okay for the openstack metadata to not work - "add_cloud_metadata: received error failed requesting openstack metadata: Get \\\"https://169.254.169.254/2009-04-04/meta-data/hostname\\\": dial tcp 169.254.169.254:443: connect: connection refused", // okay for the cloud metadata to not work - "add_cloud_metadata: received error failed with http status code 404", // okay for the cloud metadata to not work - }) - }) - t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) - for _, doc := range docs.Hits.Hits { - t.Logf("%#v", doc.Source) - } - require.Empty(t, docs.Hits.Hits) - - // Stage 6: Make sure we have message confirming central management is running - t.Log("Making sure we have message confirming central management is running") - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.FindMatchingLogLines(info.ESClient, info.Namespace, - "Parsed configuration and determined agent is managed by Fleet") - }) - require.NotZero(t, len(docs.Hits.Hits)) - - // Stage 7: verify logs from the monitoring components are not sent to the output - t.Log("Check monitoring logs") - hostname, err := os.Hostname() - if err != nil { - t.Fatalf("could not get hostname to filter Agent: %s", err) - } - - agentID, err := fleettools.GetAgentIDByHostname(info.KibanaClient, policy.ID, hostname) - require.NoError(t, err, "could not get Agent ID by hostname") - t.Logf("Agent ID: %q", agentID) - - // We cannot search for `component.id` because at the moment of writing - // this field is not mapped. There is an issue for that: - // https://github.com/elastic/integrations/issues/6545 - - docs = findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForAgentID(info.ESClient, agentID) - }) - require.NoError(t, err, "could not get logs from Agent ID: %q, err: %s", - agentID, err) - - monRegExp := regexp.MustCompile(".*-monitoring$") - for i, d := range docs.Hits.Hits { - // Lazy way to navigate a map[string]any: convert to JSON then - // decode into a struct. - jsonData, err := json.Marshal(d.Source) - if err != nil { - t.Fatalf("could not encode document source as JSON: %s", err) - } - - doc := ESDocument{} - if err := json.Unmarshal(jsonData, &doc); err != nil { - t.Fatalf("could not unmarshal document source: %s", err) - } - - if monRegExp.MatchString(doc.Component.ID) { - t.Errorf("[%d] Document on index %q with 'component.id': %q "+ - "and 'elastic_agent.id': %q. 'elastic_agent.id' must not "+ - "end in '-monitoring'\n", - i, d.Index, doc.Component.ID, doc.ElasticAgent.ID) - } - } -} - -func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { - var docs estools.Documents - - require.Eventually( - t, - func() bool { - var err error - docs, err = findFn() - return err == nil - }, - 3*time.Minute, - 15*time.Second, - ) - - // TODO: remove after debugging - t.Log("--- debugging: results from ES --- START ---") - for _, doc := range docs.Hits.Hits { - t.Logf("%#v", doc.Source) - } - t.Log("--- debugging: results from ES --- END ---") - - return docs -} - -type ESDocument struct { - ElasticAgent ElasticAgent `json:"elastic_agent"` - Component Component `json:"component"` - Host Host `json:"host"` -} -type ElasticAgent struct { - ID string `json:"id"` - Version string `json:"version"` - Snapshot bool `json:"snapshot"` -} -type Component struct { - Binary string `json:"binary"` - ID string `json:"id"` -} -type Host struct { - Hostname string `json:"hostname"` -}