Skip to content

Commit

Permalink
Fix index pattern when querying ES and condition when searching logs (e…
Browse files Browse the repository at this point in the history
…lastic#3765)

Two issues were causing flakiness on integration tests and are fixed by this PR:
1. The pattern used to query ES was not working on serverless, this commit updates it to a pattern that works on both stateful and serverless as well as make it more specific to the indexes/data streams we want to query
2. `findESDocs` did not wait for the data to be indexed, only to a successful query on ES. In some cases the documents the test wanted were not indexed yet, leading to 0 documents being returned and the test failing with no error in the logs/diagnostics. This is fixed by waiting for a document count > 0 and no error.
  • Loading branch information
belimawr authored Nov 28, 2023
1 parent 85325ad commit a03aa9c
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 17 deletions.
9 changes: 4 additions & 5 deletions pkg/testing/tools/estools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,7 +362,7 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client)
return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)

}

Expand Down Expand Up @@ -434,8 +434,7 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor
return Documents{}, fmt.Errorf("error creating ES query: %w", err)
}

return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client)

return performQueryForRawQuery(ctx, queryRaw, "logs-elastic_agent*", client)
}

// GetLogsForDataset returns any logs associated with the datastream
Expand All @@ -461,7 +460,7 @@ func GetLogsForAgentID(client elastictransport.Interface, id string) (Documents,

es := esapi.New(client)
res, err := es.Search(
es.Search.WithIndex("*.ds-logs*"),
es.Search.WithIndex("logs-elastic_agent*"),
es.Search.WithExpandWildcards("all"),
es.Search.WithBody(&buf),
es.Search.WithTrackTotalHits(true),
Expand All @@ -488,7 +487,7 @@ func GetLogsForDatasetWithContext(ctx context.Context, client elastictransport.I
},
}

return performQueryForRawQuery(ctx, indexQuery, "*ds-logs*", client)
return performQueryForRawQuery(ctx, indexQuery, "logs-elastic_agent*", client)
}

// GetPing performs a basic ping and returns ES config info
Expand Down
47 changes: 35 additions & 12 deletions testing/integration/logs_ingestion_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,12 @@ func testMonitoringLogsAreShipped(
) {
// Stage 1: Make sure metricbeat logs are populated
t.Log("Making sure metricbeat logs are populated")
require.Eventually(t,
func() bool {
docs := findESDocs(t, func() (estools.Documents, error) {
return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat")
})
return len(docs.Hits.Hits) > 0
},
1*time.Minute, 500*time.Millisecond,
"there should be metricbeats logs by now")
docs := findESDocs(t, func() (estools.Documents, error) {
return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat")
})
t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits))
require.NotZero(t, len(docs.Hits.Hits),
"Looking for logs in dataset 'elastic_agent.metricbeat'")

// Stage 2: make sure all components are healthy
t.Log("Making sure all components are healthy")
Expand All @@ -127,7 +124,7 @@ func testMonitoringLogsAreShipped(

// Stage 3: Make sure there are no errors in logs
t.Log("Making sure there are no error logs")
docs := findESDocs(t, func() (estools.Documents, error) {
docs = queryESDocs(t, func() (estools.Documents, error) {
return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{
// acceptable error messages (include reason)
"Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated
Expand Down Expand Up @@ -167,7 +164,6 @@ func testMonitoringLogsAreShipped(
// this field is not mapped. There is an issue for that:
// https://github.com/elastic/integrations/issues/6545
// TODO: use runtime fields while the above issue is not resolved.

docs = findESDocs(t, func() (estools.Documents, error) {
return estools.GetLogsForAgentID(info.ESClient, agentID)
})
Expand Down Expand Up @@ -197,13 +193,18 @@ func testMonitoringLogsAreShipped(
}
}

func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents {
// queryESDocs runs `findFn` until it returns no error. Zero documents returned
// is considered a success.
func queryESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents {
var docs estools.Documents
require.Eventually(
t,
func() bool {
var err error
docs, err = findFn()
if err != nil {
t.Logf("got an error querying ES, retrying. Error: %s", err)
}
return err == nil
},
3*time.Minute,
Expand All @@ -213,6 +214,28 @@ func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.
return docs
}

// findESDocs runs `findFn` until at least one document is returned and there is no error
func findESDocs(t *testing.T, findFn func() (estools.Documents, error)) estools.Documents {
var docs estools.Documents
require.Eventually(
t,
func() bool {
var err error
docs, err = findFn()
if err != nil {
t.Logf("got an error querying ES, retrying. Error: %s", err)
return false
}

return docs.Hits.Total.Value != 0
},
3*time.Minute,
15*time.Second,
)

return docs
}

func testFlattenedDatastreamFleetPolicy(
t *testing.T,
ctx context.Context,
Expand Down

0 comments on commit a03aa9c

Please sign in to comment.