diff --git a/pkg/testing/multipass/provisioner.go b/pkg/testing/multipass/provisioner.go index 2be04b806fc..7703dc7e335 100644 --- a/pkg/testing/multipass/provisioner.go +++ b/pkg/testing/multipass/provisioner.go @@ -7,6 +7,7 @@ package multipass import ( "bytes" "context" + "encoding/json" "fmt" "os" "os/exec" @@ -122,6 +123,12 @@ func (p *provisioner) Clean(ctx context.Context, _ runner.Config, instances []ru // launch creates an instance. func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runner.OSBatch) error { + // check if instance already exists + err := p.ensureInstanceNotExist(ctx, batch) + if err != nil { + p.logger.Logf( + "could not check multipass instance %q does not exists, moving on anyway. Err: %v", err) + } args := []string{ "launch", "-c", "2", @@ -145,9 +152,14 @@ func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runne return fmt.Errorf("failed to marshal cloud-init configuration: %w", err) } + p.logger.Logf("Launching multipass instance %s", batch.ID) var output bytes.Buffer - p.logger.Logf("Launching multipass image %s", batch.ID) - proc, err := process.Start("multipass", process.WithContext(ctx), process.WithArgs(args), process.WithCmdOptions(runner.AttachOut(&output), runner.AttachErr(&output))) + proc, err := process.Start("multipass", + process.WithContext(ctx), + process.WithArgs(args), + process.WithCmdOptions( + runner.AttachOut(&output), + runner.AttachErr(&output))) if err != nil { return fmt.Errorf("failed to run multipass launch: %w", err) } @@ -162,7 +174,7 @@ func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runne } _ = proc.Stdin.Close() ps := <-proc.Wait() - if ps.ExitCode() != 0 { + if !ps.Success() { // print the output so its clear what went wrong fmt.Fprintf(os.Stdout, "%s\n", output.Bytes()) return fmt.Errorf("failed to run multipass launch: exited with code: %d", ps.ExitCode()) @@ -170,6 +182,76 @@ func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runne return nil } +func (p *provisioner) ensureInstanceNotExist(ctx context.Context, batch runner.OSBatch) error { + var output bytes.Buffer + var stdErr bytes.Buffer + proc, err := process.Start("multipass", + process.WithContext(ctx), + process.WithArgs([]string{"list", "--format", "json"}), + process.WithCmdOptions( + runner.AttachOut(&output), + runner.AttachErr(&stdErr))) + if err != nil { + return fmt.Errorf("multipass list failed to run: %w", err) + } + + state := <-proc.Wait() + if !state.Success() { + msg := fmt.Sprintf("multipass list exited with non-zero status: %s", + state.String()) + p.logger.Logf(msg) + p.logger.Logf("output: %s", output.String()) + p.logger.Logf("stderr: %s", stdErr.String()) + return fmt.Errorf(msg) + } + list := struct { + List []struct { + Ipv4 []string `json:"ipv4"` + Name string `json:"name"` + Release string `json:"release"` + State string `json:"state"` + } `json:"list"` + }{} + err = json.NewDecoder(&output).Decode(&list) + if err != nil { + return fmt.Errorf("could not decode mutipass list output: %w", err) + } + + for _, i := range list.List { + if i.Name == batch.ID { + p.logger.Logf("multipass trying to delete instance %s", batch.ID) + + output.Reset() + stdErr.Reset() + proc, err = process.Start("multipass", + process.WithContext(ctx), + process.WithArgs([]string{"delete", "--purge", batch.ID}), + process.WithCmdOptions( + runner.AttachOut(&output), + runner.AttachErr(&stdErr))) + if err != nil { + return fmt.Errorf( + "multipass instance %q already exist, state %q. Could not delete it: %w", + batch.ID, i.State, err) + } + state = <-proc.Wait() + if !state.Success() { + msg := fmt.Sprintf("failed to delete and purge multipass instance %s: %s", + batch.ID, + state.String()) + p.logger.Logf(msg) + p.logger.Logf("output: %s", output.String()) + p.logger.Logf("stderr: %s", stdErr.String()) + return fmt.Errorf(msg) + } + + break + } + } + + return nil +} + // delete deletes an instance. func (p *provisioner) delete(ctx context.Context, instance runner.Instance) error { args := []string{ diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go index ba9a84673b0..d9fb2f511a8 100644 --- a/testing/integration/logs_ingestion_test.go +++ b/testing/integration/logs_ingestion_test.go @@ -104,11 +104,15 @@ func testMonitoringLogsAreShipped( ) { // Stage 1: Make sure metricbeat logs are populated t.Log("Making sure metricbeat logs are populated") - docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") - }) - t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) - require.NotZero(t, len(docs.Hits.Hits)) + require.Eventually(t, + func() bool { + docs := findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") + }) + return len(docs.Hits.Hits) > 0 + }, + 1*time.Minute, 500*time.Millisecond, + "there should be metricbeats logs by now") // Stage 2: make sure all components are healthy t.Log("Making sure all components are healthy") @@ -123,7 +127,7 @@ func testMonitoringLogsAreShipped( // Stage 3: Make sure there are no errors in logs t.Log("Making sure there are no error logs") - docs = findESDocs(t, func() (estools.Documents, error) { + docs := findESDocs(t, func() (estools.Documents, error) { return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ // acceptable error messages (include reason) "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated @@ -134,7 +138,7 @@ func testMonitoringLogsAreShipped( "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart }) }) - t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) + t.Logf("error logs: Got %d documents", len(docs.Hits.Hits)) for _, doc := range docs.Hits.Hits { t.Logf("%#v", doc.Source) }