diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index 1c85ed788f3..c87172ff011 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -362,7 +362,7 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) + return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client) } @@ -434,8 +434,7 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) - + return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client) } // GetLogsForDataset returns any logs associated with the datastream @@ -461,7 +460,7 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents, es := esapi.New(client) res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), + es.Search.WithIndex("logs-elastic_agent*"), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), @@ -488,7 +487,7 @@ func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.I }, } - return performQueryForRawQuery(ctx, indexQuery, "*ds-logs*", client) + return performQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client) } // GetPing performs a basic ping and returns ES config info diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go index d9fb2f511a8..b0dcbc9b4c3 100644 --- a/testing/integration/logs_ingestion_test.go +++ b/testing/integration/logs_ingestion_test.go @@ -104,15 +104,12 @@ func testMonitoringLogsAreShipped( ) { // Stage 1: Make sure metricbeat logs are populated t.Log("Making sure metricbeat logs are populated") - require.Eventually(t, - func() bool { - docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") - }) - return len(docs.Hits.Hits) > 0 - }, - 1*time.Minute, 500*time.Millisecond, - "there should be metricbeats logs by now") + docs := findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") + }) + t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) + require.NotZero(t, len(docs.Hits.Hits), + "Looking for logs in dataset 'elastic_agent.metricbeat'") // Stage 2: make sure all components are healthy t.Log("Making sure all components are healthy") @@ -127,7 +124,7 @@ func testMonitoringLogsAreShipped( // Stage 3: Make sure there are no errors in logs t.Log("Making sure there are no error logs") - docs := findESDocs(t, func() (estools.Documents, error) { + docs = queryESDocs(t, func() (estools.Documents, error) { return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ // acceptable error messages (include reason) "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated @@ -167,7 +164,6 @@ func testMonitoringLogsAreShipped( // this field is not mapped. There is an issue for that: // https://github.com/elastic/integrations/issues/6545 // TODO: use runtime fields while the above issue is not resolved. - docs = findESDocs(t, func() (estools.Documents, error) { return estools.GetLogsForAgentID(info.ESClient, agentID) }) @@ -197,13 +193,18 @@ func testMonitoringLogsAreShipped( } } -func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { +// queryESDocs runs `findFn` until it returns no error. Zero documents returned +// is considered a success. +func queryESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { var docs estools.Documents require.Eventually( t, func() bool { var err error docs, err = findFn() + if err != nil { + t.Logf("got an error querying ES, retrying. Error: %s", err) + } return err == nil }, 3*time.Minute, @@ -213,6 +214,28 @@ func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools. return docs } +// findESDocs runs `findFn` until at least one document is returned and there is no error +func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents { + var docs estools.Documents + require.Eventually( + t, + func() bool { + var err error + docs, err = findFn() + if err != nil { + t.Logf("got an error querying ES, retrying. Error: %s", err) + return false + } + + return docs.Hits.Total.Value != 0 + }, + 3*time.Minute, + 15*time.Second, + ) + + return docs +} + func testFlattenedDatastreamFleetPolicy( t *testing.T, ctx context.Context,