diff --git a/.buildkite/hooks/pre-command b/.buildkite/hooks/pre-command index 292aa6918c0..c8a44505148 100755 --- a/.buildkite/hooks/pre-command +++ b/.buildkite/hooks/pre-command @@ -17,8 +17,7 @@ DOCKER_REGISTRY="docker.elastic.co" DOCKER_REGISTRY_SECRET_PATH="kv/ci-shared/platform-ingest/docker_registry_prod" CI_DRA_ROLE_PATH="kv/ci-shared/release/dra-role" CI_GCP_OBS_PATH="kv/ci-shared/observability-ingest/cloud/gcp" -# CI_AGENT_QA_OBS_PATH="kv/ci-shared/observability-ingest/elastic-agent-ess-qa" -CI_ESS_STAGING_PATH="kv/ci-shared/platform-ingest/platform-ingest-ec-staging" +CI_ESS_PATH="kv/ci-shared/platform-ingest/platform-ingest-ec-prod" CI_DRA_ROLE_PATH="kv/ci-shared/release/dra-role" @@ -55,7 +54,7 @@ if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == export TEST_INTEG_AUTH_GCP_SERVICE_TOKEN_FILE=$(realpath ./gcp.json) # ESS credentials - export API_KEY_TOKEN=$(vault kv get -field apiKey ${CI_ESS_STAGING_PATH}) + export API_KEY_TOKEN=$(vault kv get -field apiKey ${CI_ESS_PATH}) echo ${API_KEY_TOKEN} > ./apiKey export TEST_INTEG_AUTH_ESS_APIKEY_FILE=$(realpath ./apiKey) fi diff --git a/.buildkite/hooks/pre-exit b/.buildkite/hooks/pre-exit index 4d0da50cf73..213f51aff7b 100755 --- a/.buildkite/hooks/pre-exit +++ b/.buildkite/hooks/pre-exit @@ -10,7 +10,11 @@ if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == # Perform cleanup of integration tests resources echo "--- Cleaning up integration test resources" - TEST_INTEG_AUTH_ESS_REGION=us-east-1 SNAPSHOT=true mage integration:clean + if [[ "$BUILDKITE_STEP_KEY" == "serverless-integration-tests" ]]; then + STACK_PROVISIONER=serverless SNAPSHOT=true mage integration:clean + else + SNAPSHOT=true mage integration:clean + fi fi if [ -n "$GOOGLE_APPLICATION_CREDENTIALS" ]; then diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 49123f81295..4cbb89ff8d8 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -201,8 +201,6 @@ steps: - label: "Serverless integration test" key: "serverless-integration-tests" - env: - TEST_INTEG_AUTH_ESS_REGION: us-east-1 command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestLogIngestionFleetManaged" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite artifact_paths: - "build/TEST-**" @@ -213,8 +211,6 @@ steps: - label: "Integration tests" key: "integration-tests" - env: - TEST_INTEG_AUTH_ESS_REGION: us-east-1 command: ".buildkite/scripts/steps/integration_tests.sh stateful" artifact_paths: - "build/TEST-**" diff --git a/dev-tools/mage/manifest/manifest.go b/dev-tools/mage/manifest/manifest.go index a2c377d600c..62f82eb2c87 100644 --- a/dev-tools/mage/manifest/manifest.go +++ b/dev-tools/mage/manifest/manifest.go @@ -87,7 +87,6 @@ func DownloadComponentsFromManifest(manifest string, platforms []string, platfor "beats": {"auditbeat", "filebeat", "heartbeat", "metricbeat", "osquerybeat", "packetbeat"}, "cloud-defend": {"cloud-defend"}, "cloudbeat": {"cloudbeat"}, - "assetbeat": {"assetbeat"}, "elastic-agent-shipper": {"elastic-agent-shipper"}, "endpoint-dev": {"endpoint-security"}, "fleet-server": {"fleet-server"}, diff --git a/magefile.go b/magefile.go index 88e937e40d4..e0ef843a7ca 100644 --- a/magefile.go +++ b/magefile.go @@ -929,7 +929,6 @@ func packageAgent(platforms []string, packagingFn func()) { // https://artifacts-snapshot.elastic.co/endpoint-dev/latest/8.11.0-SNAPSHOT.json // https://artifacts-snapshot.elastic.co/fleet-server/latest/8.11.0-SNAPSHOT.json // https://artifacts-snapshot.elastic.co/prodfiler/latest/8.11.0-SNAPSHOT.json - // https://artifacts-snapshot.elastic.co/assetbeat/latest/8.11.0-SNAPSHOT.json externalBinaries := map[string]string{ "auditbeat": "beats", "filebeat": "beats", @@ -945,7 +944,6 @@ func packageAgent(platforms []string, packagingFn func()) { "pf-elastic-collector": "prodfiler", "pf-elastic-symbolizer": "prodfiler", "pf-host-agent": "prodfiler", - "assetbeat": "assetbeat", // only supporting linux/amd64 or linux/arm64 } // Only log fatal logs for logs produced using logrus. This is the global logger @@ -1779,15 +1777,16 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche } datacenter := os.Getenv("TEST_INTEG_AUTH_GCP_DATACENTER") if datacenter == "" { + // us-central1-a is used because T2A instances required for ARM64 testing are only + // available in the central regions datacenter = "us-central1-a" } - // Valid values are gcp-us-central1 (default), azure-eastus2, - // aws-eu-central-1, us-east-1 (which is an AWS region but the - // "aws" CSP prefix is not used by ESS for some reason!) + // Possible to change the region for deployment, default is gcp-us-west2 which is + // the CFT region. essRegion := os.Getenv("TEST_INTEG_AUTH_ESS_REGION") if essRegion == "" { - essRegion = "gcp-us-central1" + essRegion = "gcp-us-west2" } instanceProvisionerMode := os.Getenv("INSTANCE_PROVISIONER") diff --git a/pkg/testing/ess/config.go b/pkg/testing/ess/config.go index c90be94caa5..62ece1be1ef 100644 --- a/pkg/testing/ess/config.go +++ b/pkg/testing/ess/config.go @@ -17,8 +17,13 @@ type Config struct { } func defaultConfig() *Config { + baseURL := os.Getenv("TEST_INTEG_AUTH_ESS_URL") + if baseURL == "" { + baseURL = "https://cloud.elastic.co" + } + url := strings.TrimRight(baseURL, "/") + "/api/v1" return &Config{ - BaseUrl: `https://staging.found.no/api/v1`, + BaseUrl: url, } } diff --git a/pkg/testing/ess/provisioner.go b/pkg/testing/ess/provisioner.go index a051cac39d1..47e8d9dcba2 100644 --- a/pkg/testing/ess/provisioner.go +++ b/pkg/testing/ess/provisioner.go @@ -11,8 +11,6 @@ import ( "strings" "time" - "golang.org/x/sync/errgroup" - "github.com/elastic/elastic-agent/pkg/testing/runner" ) @@ -62,89 +60,77 @@ func (p *provisioner) SetLogger(l runner.Logger) { p.logger = l } -func (p *provisioner) Provision(ctx context.Context, requests []runner.StackRequest) ([]runner.Stack, error) { - results := make(map[runner.StackRequest]*CreateDeploymentResponse) - for _, r := range requests { - // allow up to 2 minutes for each create request - createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute) - resp, err := p.createDeployment(createCtx, r, - map[string]string{ - "division": "engineering", - "org": "ingest", - "team": "elastic-agent", - "project": "elastic-agent", - "integration-tests": "true", - }) - createCancel() - if err != nil { - return nil, err - } - results[r] = resp - } +// Create creates a stack. +func (p *provisioner) Create(ctx context.Context, request runner.StackRequest) (runner.Stack, error) { + // allow up to 2 minutes for request + createCtx, createCancel := context.WithTimeout(ctx, 2*time.Minute) + defer createCancel() + resp, err := p.createDeployment(createCtx, request, + map[string]string{ + "division": "engineering", + "org": "ingest", + "team": "elastic-agent", + "project": "elastic-agent", + "integration-tests": "true", + }) + if err != nil { + return runner.Stack{}, err + } + return runner.Stack{ + ID: request.ID, + Version: request.Version, + Elasticsearch: resp.ElasticsearchEndpoint, + Kibana: resp.KibanaEndpoint, + Username: resp.Username, + Password: resp.Password, + Internal: map[string]interface{}{ + "deployment_id": resp.ID, + }, + Ready: false, + }, nil +} - // set a long timeout - // this context travels up to the magefile, clients that want a shorter timeout can set - // it via mage's -t flag - readyCtx, readyCancel := context.WithTimeout(ctx, 25*time.Minute) - defer readyCancel() - - g, gCtx := errgroup.WithContext(readyCtx) - for req, resp := range results { - g.Go(func(req runner.StackRequest, resp *CreateDeploymentResponse) func() error { - return func() error { - ready, err := p.client.DeploymentIsReady(gCtx, resp.ID, 30*time.Second) - if err != nil { - return fmt.Errorf("failed to check for cloud %s to be ready: %w", req.Version, err) - } - if !ready { - return fmt.Errorf("cloud %s never became ready: %w", req.Version, err) - } - return nil - } - }(req, resp)) +// WaitForReady should block until the stack is ready or the context is cancelled. +func (p *provisioner) WaitForReady(ctx context.Context, stack runner.Stack) (runner.Stack, error) { + deploymentID, err := p.getDeploymentID(stack) + if err != nil { + return stack, fmt.Errorf("failed to get deployment ID from the stack: %w", err) } - err := g.Wait() + // allow up to 10 minutes for it to become ready + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + p.logger.Logf("Waiting for cloud stack %s to be ready [stack_id: %s, deployment_id: %s]", stack.Version, stack.ID, deploymentID) + ready, err := p.client.DeploymentIsReady(ctx, deploymentID, 30*time.Second) if err != nil { - return nil, err + return stack, fmt.Errorf("failed to check for cloud %s [stack_id: %s, deployment_id: %s] to be ready: %w", stack.Version, stack.ID, deploymentID, err) } - - var stacks []runner.Stack - for req, resp := range results { - stacks = append(stacks, runner.Stack{ - ID: req.ID, - Version: req.Version, - Elasticsearch: resp.ElasticsearchEndpoint, - Kibana: resp.KibanaEndpoint, - Username: resp.Username, - Password: resp.Password, - Internal: map[string]interface{}{ - "deployment_id": resp.ID, - }, - }) + if !ready { + return stack, fmt.Errorf("cloud %s [stack_id: %s, deployment_id: %s] never became ready: %w", stack.Version, stack.ID, deploymentID, err) } - return stacks, nil + stack.Ready = true + return stack, nil } -// Clean cleans up all provisioned resources. -func (p *provisioner) Clean(ctx context.Context, stacks []runner.Stack) error { - var errs []error - for _, s := range stacks { - err := p.destroyDeployment(ctx, s) - if err != nil { - errs = append(errs, fmt.Errorf("failed to destroy stack %s (%s): %w", s.Version, s.ID, err)) - } - } - if len(errs) > 0 { - return errors.Join(errs...) +// Delete deletes a stack. +func (p *provisioner) Delete(ctx context.Context, stack runner.Stack) error { + deploymentID, err := p.getDeploymentID(stack) + if err != nil { + return err } - return nil + + // allow up to 1 minute for request + ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) + defer cancel() + + p.logger.Logf("Destroying cloud stack %s [stack_id: %s, deployment_id: %s]", stack.Version, stack.ID, deploymentID) + return p.client.ShutdownDeployment(ctx, deploymentID) } func (p *provisioner) createDeployment(ctx context.Context, r runner.StackRequest, tags map[string]string) (*CreateDeploymentResponse, error) { ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) defer cancel() - p.logger.Logf("Creating stack %s (%s)", r.Version, r.ID) + p.logger.Logf("Creating cloud stack %s [stack_id: %s]", r.Version, r.ID) name := fmt.Sprintf("%s-%s", strings.Replace(p.cfg.Identifier, ".", "-", -1), r.ID) // prepare tags @@ -168,26 +154,21 @@ func (p *provisioner) createDeployment(ctx context.Context, r runner.StackReques p.logger.Logf("Failed to create ESS cloud %s: %s", r.Version, err) return nil, fmt.Errorf("failed to create ESS cloud for version %s: %w", r.Version, err) } - p.logger.Logf("Created stack %s (%s) [id: %s]", r.Version, r.ID, resp.ID) + p.logger.Logf("Created cloud stack %s [stack_id: %s, deployment_id: %s]", r.Version, r.ID, resp.ID) return resp, nil } -func (p *provisioner) destroyDeployment(ctx context.Context, s runner.Stack) error { - if s.Internal == nil { - return fmt.Errorf("missing internal information") +func (p *provisioner) getDeploymentID(stack runner.Stack) (string, error) { + if stack.Internal == nil { + return "", fmt.Errorf("missing internal information") } - deploymentIDRaw, ok := s.Internal["deployment_id"] + deploymentIDRaw, ok := stack.Internal["deployment_id"] if !ok { - return fmt.Errorf("missing internal deployment_id") + return "", fmt.Errorf("missing internal deployment_id") } deploymentID, ok := deploymentIDRaw.(string) if !ok { - return fmt.Errorf("internal deployment_id not a string") + return "", fmt.Errorf("internal deployment_id not a string") } - - ctx, cancel := context.WithTimeout(ctx, 1*time.Minute) - defer cancel() - - p.logger.Logf("Destroying stack %s (%s)", s.Version, s.ID) - return p.client.ShutdownDeployment(ctx, deploymentID) + return deploymentID, nil } diff --git a/pkg/testing/ess/serverless.go b/pkg/testing/ess/serverless.go index 817ee33f03b..df1129e8e97 100644 --- a/pkg/testing/ess/serverless.go +++ b/pkg/testing/ess/serverless.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/elastic-agent/pkg/testing/runner" ) -var serverlessURL = "https://staging.found.no" +var serverlessURL = "https://cloud.elastic.co" // ServerlessClient is the handler the serverless ES instance type ServerlessClient struct { diff --git a/pkg/testing/ess/serverless_provision.go b/pkg/testing/ess/serverless_provision.go index 32ec8f8227b..c9656f628af 100644 --- a/pkg/testing/ess/serverless_provision.go +++ b/pkg/testing/ess/serverless_provision.go @@ -10,7 +10,7 @@ import ( "fmt" "io" "net/http" - "sync" + "time" "github.com/elastic/elastic-agent-libs/logp" "github.com/elastic/elastic-agent/pkg/testing/runner" @@ -18,17 +18,15 @@ import ( // ServerlessProvision contains type ServerlessProvision struct { - stacksMut sync.RWMutex - stacks map[string]stackhandlerData - cfg ProvisionerConfig - log runner.Logger + cfg ProvisionerConfig + log runner.Logger } type defaultLogger struct { wrapped *logp.Logger } -// / implements the runner.Logger interface +// Logf implements the runner.Logger interface func (log *defaultLogger) Logf(format string, args ...any) { if len(args) == 0 { @@ -38,12 +36,6 @@ func (log *defaultLogger) Logf(format string, args ...any) { } -// tracks the data that maps to a single serverless deployment -type stackhandlerData struct { - client *ServerlessClient - stackData runner.Stack -} - // ServerlessRegions is the JSON response from the serverless regions API endpoint type ServerlessRegions struct { CSP string `json:"csp"` @@ -55,9 +47,8 @@ type ServerlessRegions struct { // NewServerlessProvisioner creates a new StackProvisioner instance for serverless func NewServerlessProvisioner(cfg ProvisionerConfig) (runner.StackProvisioner, error) { prov := &ServerlessProvision{ - cfg: cfg, - stacks: map[string]stackhandlerData{}, - log: &defaultLogger{wrapped: logp.L()}, + cfg: cfg, + log: &defaultLogger{wrapped: logp.L()}, } err := prov.CheckCloudRegion() if err != nil { @@ -71,114 +62,118 @@ func (prov *ServerlessProvision) SetLogger(l runner.Logger) { prov.log = l } -// Provision a new set of serverless instances -func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runner.StackRequest) ([]runner.Stack, error) { - upWaiter := sync.WaitGroup{} - depErrs := make(chan error, len(requests)) - depUp := make(chan bool, len(requests)) - stacks := []runner.Stack{} - for _, req := range requests { - client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) - srvReq := ServerlessRequest{Name: req.ID, RegionID: prov.cfg.Region} - proj, err := client.DeployStack(ctx, srvReq) - if err != nil { - return nil, fmt.Errorf("error deploying stack for request %s: %w", req.ID, err) - } - err = client.WaitForEndpoints(ctx) - if err != nil { - return nil, fmt.Errorf("error waiting for endpoints to become available for request: %w", err) - } - newStack := runner.Stack{ - ID: req.ID, - Version: req.Version, - Elasticsearch: client.proj.Endpoints.Elasticsearch, - Kibana: client.proj.Endpoints.Kibana, - Username: client.proj.Credentials.Username, - Password: client.proj.Credentials.Password, - Internal: map[string]interface{}{ - "deployment_id": proj.ID, - "deployment_type": proj.Type, - }, - } - stacks = append(stacks, newStack) - prov.stacksMut.Lock() - prov.stacks[req.ID] = stackhandlerData{client: client, stackData: newStack} - prov.stacksMut.Unlock() +// Create creates a stack. +func (prov *ServerlessProvision) Create(ctx context.Context, request runner.StackRequest) (runner.Stack, error) { + // allow up to 4 minutes for requests + createCtx, createCancel := context.WithTimeout(ctx, 4*time.Minute) + defer createCancel() - upWaiter.Add(1) - go func() { - isUp, err := client.DeploymentIsReady(ctx) - if err != nil { - depErrs <- err + client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) + srvReq := ServerlessRequest{Name: request.ID, RegionID: prov.cfg.Region} - } - depUp <- isUp - }() + prov.log.Logf("Creating serverless stack %s [stack_id: %s]", request.Version, request.ID) + proj, err := client.DeployStack(createCtx, srvReq) + if err != nil { + return runner.Stack{}, fmt.Errorf("error deploying stack for request %s: %w", request.ID, err) + } + err = client.WaitForEndpoints(createCtx) + if err != nil { + return runner.Stack{}, fmt.Errorf("error waiting for endpoints to become available for serverless stack %s [stack_id: %s, deployment_id: %s]: %w", request.Version, request.ID, proj.ID, err) + } + stack := runner.Stack{ + ID: request.ID, + Version: request.Version, + Elasticsearch: client.proj.Endpoints.Elasticsearch, + Kibana: client.proj.Endpoints.Kibana, + Username: client.proj.Credentials.Username, + Password: client.proj.Credentials.Password, + Internal: map[string]interface{}{ + "deployment_id": proj.ID, + "deployment_type": proj.Type, + }, + Ready: false, } + prov.log.Logf("Created serverless stack %s [stack_id: %s, deployment_id: %s]", request.Version, request.ID, proj.ID) + return stack, nil +} + +// WaitForReady should block until the stack is ready or the context is cancelled. +func (prov *ServerlessProvision) WaitForReady(ctx context.Context, stack runner.Stack) (runner.Stack, error) { + deploymentID, deploymentType, err := prov.getDeploymentInfo(stack) + if err != nil { + return stack, fmt.Errorf("failed to get deployment info from the stack: %w", err) + } + + ctx, cancel := context.WithTimeout(ctx, 10*time.Minute) + defer cancel() + + client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) + client.proj.ID = deploymentID + client.proj.Type = deploymentType + client.proj.Region = prov.cfg.Region + client.proj.Endpoints.Elasticsearch = stack.Elasticsearch + client.proj.Endpoints.Kibana = stack.Kibana + client.proj.Credentials.Username = stack.Username + client.proj.Credentials.Password = stack.Password + + prov.log.Logf("Waiting for serverless stack %s to be ready [stack_id: %s, deployment_id: %s]", stack.Version, stack.ID, deploymentID) + + errCh := make(chan error) + var lastErr error + + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() - gotUp := 0 for { select { case <-ctx.Done(): - return nil, ctx.Err() - case err := <-depErrs: - return nil, fmt.Errorf("error waiting for stacks to become available: %w", err) - case isUp := <-depUp: - if isUp { - gotUp++ + if lastErr == nil { + lastErr = ctx.Err() } - if gotUp >= len(requests) { - return stacks, nil + return stack, fmt.Errorf("serverless stack %s [stack_id: %s, deployment_id: %s] never became ready: %w", stack.Version, stack.ID, deploymentID, lastErr) + case <-ticker.C: + go func() { + statusCtx, statusCancel := context.WithTimeout(ctx, 30*time.Second) + defer statusCancel() + ready, err := client.DeploymentIsReady(statusCtx) + if err != nil { + errCh <- err + } else if !ready { + errCh <- fmt.Errorf("serverless stack %s [stack_id: %s, deployment_id: %s] never became ready", stack.Version, stack.ID, deploymentID) + } else { + errCh <- nil + } + }() + case err := <-errCh: + if err == nil { + stack.Ready = true + return stack, nil } + lastErr = err } } - } -// Clean shuts down and removes the deployments -func (prov *ServerlessProvision) Clean(ctx context.Context, stacks []runner.Stack) error { - for _, stack := range stacks { - prov.stacksMut.RLock() - // because of the way the provisioner initializes, - // we can't guarantee that we have a valid client/stack setup, as we might have just re-initialized from a file. - // If that's the case, create a new client - stackRef, ok := prov.stacks[stack.ID] - prov.stacksMut.RUnlock() - // we can't reference the client, it won't be created when we just run mage:clean - // instead, grab the project ID from `stacks`, create a new client - if ok { - err := stackRef.client.DeleteDeployment() - if err != nil { - prov.log.Logf("error removing deployment: %w", err) - } - } else { - // create a new client - client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) - dep_id, ok := stack.Internal["deployment_id"] - if !ok { - return fmt.Errorf("could not find deployment_id for serverless") - } - dep_id_str, ok := dep_id.(string) - if !ok { - return fmt.Errorf("deployment_id is not a string: %v", dep_id) - } - client.proj.ID = dep_id_str - - dep_type, ok := stack.Internal["deployment_type"] - if !ok { - return fmt.Errorf("could not find deployment_type in stack for serverless") - } - dep_type_str, ok := dep_type.(string) - if !ok { - return fmt.Errorf("deployment_type is not a string: %v", dep_id_str) - } - client.proj.Type = dep_type_str - err := client.DeleteDeployment() - if err != nil { - return fmt.Errorf("error removing deployment after re-creating client: %w", err) - } +// Delete deletes a stack. +func (prov *ServerlessProvision) Delete(ctx context.Context, stack runner.Stack) error { + deploymentID, deploymentType, err := prov.getDeploymentInfo(stack) + if err != nil { + return fmt.Errorf("failed to get deployment info from the stack: %w", err) + } - } + client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) + client.proj.ID = deploymentID + client.proj.Type = deploymentType + client.proj.Region = prov.cfg.Region + client.proj.Endpoints.Elasticsearch = stack.Elasticsearch + client.proj.Endpoints.Kibana = stack.Kibana + client.proj.Credentials.Username = stack.Username + client.proj.Credentials.Password = stack.Password + + prov.log.Logf("Destroying serverless stack %s [stack_id: %s, deployment_id: %s]", stack.Version, stack.ID, deploymentID) + err = client.DeleteDeployment() + if err != nil { + return fmt.Errorf("error removing serverless stack %s [stack_id: %s, deployment_id: %s]: %w", stack.Version, stack.ID, deploymentID, err) } return nil } @@ -235,3 +230,26 @@ func (prov *ServerlessProvision) CheckCloudRegion() error { return nil } + +func (prov *ServerlessProvision) getDeploymentInfo(stack runner.Stack) (string, string, error) { + if stack.Internal == nil { + return "", "", fmt.Errorf("missing internal information") + } + deploymentIDRaw, ok := stack.Internal["deployment_id"] + if !ok { + return "", "", fmt.Errorf("missing internal deployment_id") + } + deploymentID, ok := deploymentIDRaw.(string) + if !ok { + return "", "", fmt.Errorf("internal deployment_id not a string") + } + deploymentTypeRaw, ok := stack.Internal["deployment_type"] + if !ok { + return "", "", fmt.Errorf("missing internal deployment_type") + } + deploymentType, ok := deploymentTypeRaw.(string) + if !ok { + return "", "", fmt.Errorf("internal deployment_type is not a string") + } + return deploymentID, deploymentType, nil +} diff --git a/pkg/testing/ess/serverless_test.go b/pkg/testing/ess/serverless_test.go index 7f69e49819f..2fc8e0075b1 100644 --- a/pkg/testing/ess/serverless_test.go +++ b/pkg/testing/ess/serverless_test.go @@ -26,9 +26,8 @@ func TestProvisionGetRegions(t *testing.T) { cfg := ProvisionerConfig{Region: "bad-region-ID", APIKey: key} prov := &ServerlessProvision{ - cfg: cfg, - stacks: map[string]stackhandlerData{}, - log: &defaultLogger{wrapped: logp.L()}, + cfg: cfg, + log: &defaultLogger{wrapped: logp.L()}, } err = prov.CheckCloudRegion() require.NoError(t, err) @@ -48,27 +47,23 @@ func TestStackProvisioner(t *testing.T) { cfg := ProvisionerConfig{Region: "aws-eu-west-1", APIKey: key} provClient, err := NewServerlessProvisioner(cfg) require.NoError(t, err) - stacks := []runner.StackRequest{ - {ID: "stack-test-one", Version: "8.9.0"}, - {ID: "stack-test-two", Version: "8.9.0"}, - } + request := runner.StackRequest{ID: "stack-test-one", Version: "8.9.0"} ctx, cancel := context.WithTimeout(context.Background(), time.Minute*5) defer cancel() - res, err := provClient.Provision(ctx, stacks) + stack, err := provClient.Create(ctx, request) require.NoError(t, err) t.Logf("got results:") - for _, stack := range res { - t.Logf("stack: %#v", stack) - require.NotEmpty(t, stack.Elasticsearch) - require.NotEmpty(t, stack.Kibana) - require.NotEmpty(t, stack.Password) - require.NotEmpty(t, stack.Username) - } + t.Logf("stack: %#v", stack) + require.NotEmpty(t, stack.Elasticsearch) + require.NotEmpty(t, stack.Kibana) + require.NotEmpty(t, stack.Password) + require.NotEmpty(t, stack.Username) + stack, err = provClient.WaitForReady(ctx, stack) + require.NoError(t, err) t.Logf("tearing down...") - err = provClient.Clean(ctx, res) + err = provClient.Delete(ctx, stack) require.NoError(t, err) - } func TestStartServerless(t *testing.T) { diff --git a/pkg/testing/multipass/provisioner.go b/pkg/testing/multipass/provisioner.go index 2be04b806fc..7703dc7e335 100644 --- a/pkg/testing/multipass/provisioner.go +++ b/pkg/testing/multipass/provisioner.go @@ -7,6 +7,7 @@ package multipass import ( "bytes" "context" + "encoding/json" "fmt" "os" "os/exec" @@ -122,6 +123,12 @@ func (p *provisioner) Clean(ctx context.Context, _ runner.Config, instances []ru // launch creates an instance. func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runner.OSBatch) error { + // check if instance already exists + err := p.ensureInstanceNotExist(ctx, batch) + if err != nil { + p.logger.Logf( + "could not check multipass instance %q does not exists, moving on anyway. Err: %v", err) + } args := []string{ "launch", "-c", "2", @@ -145,9 +152,14 @@ func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runne return fmt.Errorf("failed to marshal cloud-init configuration: %w", err) } + p.logger.Logf("Launching multipass instance %s", batch.ID) var output bytes.Buffer - p.logger.Logf("Launching multipass image %s", batch.ID) - proc, err := process.Start("multipass", process.WithContext(ctx), process.WithArgs(args), process.WithCmdOptions(runner.AttachOut(&output), runner.AttachErr(&output))) + proc, err := process.Start("multipass", + process.WithContext(ctx), + process.WithArgs(args), + process.WithCmdOptions( + runner.AttachOut(&output), + runner.AttachErr(&output))) if err != nil { return fmt.Errorf("failed to run multipass launch: %w", err) } @@ -162,7 +174,7 @@ func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runne } _ = proc.Stdin.Close() ps := <-proc.Wait() - if ps.ExitCode() != 0 { + if !ps.Success() { // print the output so its clear what went wrong fmt.Fprintf(os.Stdout, "%s\n", output.Bytes()) return fmt.Errorf("failed to run multipass launch: exited with code: %d", ps.ExitCode()) @@ -170,6 +182,76 @@ func (p *provisioner) launch(ctx context.Context, cfg runner.Config, batch runne return nil } +func (p *provisioner) ensureInstanceNotExist(ctx context.Context, batch runner.OSBatch) error { + var output bytes.Buffer + var stdErr bytes.Buffer + proc, err := process.Start("multipass", + process.WithContext(ctx), + process.WithArgs([]string{"list", "--format", "json"}), + process.WithCmdOptions( + runner.AttachOut(&output), + runner.AttachErr(&stdErr))) + if err != nil { + return fmt.Errorf("multipass list failed to run: %w", err) + } + + state := <-proc.Wait() + if !state.Success() { + msg := fmt.Sprintf("multipass list exited with non-zero status: %s", + state.String()) + p.logger.Logf(msg) + p.logger.Logf("output: %s", output.String()) + p.logger.Logf("stderr: %s", stdErr.String()) + return fmt.Errorf(msg) + } + list := struct { + List []struct { + Ipv4 []string `json:"ipv4"` + Name string `json:"name"` + Release string `json:"release"` + State string `json:"state"` + } `json:"list"` + }{} + err = json.NewDecoder(&output).Decode(&list) + if err != nil { + return fmt.Errorf("could not decode mutipass list output: %w", err) + } + + for _, i := range list.List { + if i.Name == batch.ID { + p.logger.Logf("multipass trying to delete instance %s", batch.ID) + + output.Reset() + stdErr.Reset() + proc, err = process.Start("multipass", + process.WithContext(ctx), + process.WithArgs([]string{"delete", "--purge", batch.ID}), + process.WithCmdOptions( + runner.AttachOut(&output), + runner.AttachErr(&stdErr))) + if err != nil { + return fmt.Errorf( + "multipass instance %q already exist, state %q. Could not delete it: %w", + batch.ID, i.State, err) + } + state = <-proc.Wait() + if !state.Success() { + msg := fmt.Sprintf("failed to delete and purge multipass instance %s: %s", + batch.ID, + state.String()) + p.logger.Logf(msg) + p.logger.Logf("output: %s", output.String()) + p.logger.Logf("stderr: %s", stdErr.String()) + return fmt.Errorf(msg) + } + + break + } + } + + return nil +} + // delete deletes an instance. func (p *provisioner) delete(ctx context.Context, instance runner.Instance) error { args := []string{ diff --git a/pkg/testing/runner/provisioner.go b/pkg/testing/runner/provisioner.go index fb474e2f307..2708b0d204d 100644 --- a/pkg/testing/runner/provisioner.go +++ b/pkg/testing/runner/provisioner.go @@ -57,6 +57,9 @@ type Stack struct { // Version is the version of the stack. Version string `yaml:"version"` + // Ready determines if the stack is ready to be used. + Ready bool `yaml:"ready"` + // Elasticsearch is the URL to communicate with elasticsearch. Elasticsearch string `yaml:"elasticsearch"` @@ -89,11 +92,12 @@ type StackProvisioner interface { // SetLogger sets the logger for it to use. SetLogger(l Logger) - // Provision brings up the stacks - // - // The provision should re-use already prepared stacks when possible. - Provision(ctx context.Context, requests []StackRequest) ([]Stack, error) + // Create creates a stack. + Create(ctx context.Context, request StackRequest) (Stack, error) - // Clean cleans up all provisioned resources. - Clean(ctx context.Context, stacks []Stack) error + // WaitForReady should block until the stack is ready or the context is cancelled. + WaitForReady(ctx context.Context, stack Stack) (Stack, error) + + // Delete deletes the stack. + Delete(ctx context.Context, stack Stack) error } diff --git a/pkg/testing/runner/runner.go b/pkg/testing/runner/runner.go index 0541fa785ec..a2c77f77aa0 100644 --- a/pkg/testing/runner/runner.go +++ b/pkg/testing/runner/runner.go @@ -128,10 +128,11 @@ type Runner struct { ip InstanceProvisioner sp StackProvisioner - batches []OSBatch - batchToStack map[string]Stack - stacksReady sync.WaitGroup - stacksErr error + batches []OSBatch + + batchToStack map[string]stackRes + batchToStackCh map[string]chan stackRes + batchToStackMx sync.Mutex stateMx sync.Mutex state State @@ -172,12 +173,13 @@ func NewRunner(cfg Config, ip InstanceProvisioner, sp StackProvisioner, batches osBatches = filterSupportedOS(osBatches, ip) r := &Runner{ - cfg: cfg, - logger: logger, - ip: ip, - sp: sp, - batches: osBatches, - batchToStack: make(map[string]Stack), + cfg: cfg, + logger: logger, + ip: ip, + sp: sp, + batches: osBatches, + batchToStack: make(map[string]stackRes), + batchToStackCh: make(map[string]chan stackRes), } err = r.loadState() @@ -274,11 +276,15 @@ func (r *Runner) Clean() error { defer cancel() return r.ip.Clean(ctx, r.cfg, instances) }) - g.Go(func() error { - ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) - defer cancel() - return r.sp.Clean(ctx, stacks) - }) + for _, stack := range stacks { + g.Go(func(stack Stack) func() error { + return func() error { + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute) + defer cancel() + return r.sp.Delete(ctx, stack) + } + }(stack)) + } return g.Wait() } @@ -366,14 +372,10 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // ensure that we have all the requirements for the stack if required if batch.Batch.Stack != nil { // wait for the stack to be ready before continuing - logger.Logf("Waiting for stacks to be ready...") - r.stacksReady.Wait() - if r.stacksErr != nil { - return OSRunnerResult{}, fmt.Errorf("%s unable to continue because stack never became ready: %w", instance.Name, r.stacksErr) - } - stack, ok := r.getStackForBatchID(batch.ID) - if !ok { - return OSRunnerResult{}, fmt.Errorf("failed to find stack for batch %s", batch.ID) + logger.Logf("Waiting for stack to be ready...") + stack, err := r.getStackForBatchID(batch.ID) + if err != nil { + return OSRunnerResult{}, err } env["ELASTICSEARCH_HOST"] = stack.Elasticsearch env["ELASTICSEARCH_USERNAME"] = stack.Username @@ -574,9 +576,6 @@ func (r *Runner) createRepoArchive(ctx context.Context, repoDir string, dir stri // startStacks starts the stacks required for the tests to run func (r *Runner) startStacks(ctx context.Context) error { - // stacks never start ready - r.stacksReady.Add(1) - var versions []string batchToVersion := make(map[string]string) for _, lb := range r.batches { @@ -592,59 +591,115 @@ func (r *Runner) startStacks(ctx context.Context) error { } } - var requests []StackRequest + var requests []stackReq for _, version := range versions { id := strings.Replace(version, ".", "", -1) stack, ok := r.findStack(id) if ok { - r.logger.Logf("Reusing stack %s (%s)", version, id) - for batchID, batchVersion := range batchToVersion { - if batchVersion == version { - r.batchToStack[batchID] = stack - } - } + requests = append(requests, stackReq{ + request: StackRequest{ + ID: id, + Version: version, + }, + stack: &stack, + }) } else { - requests = append(requests, StackRequest{ - ID: id, - Version: version, + requests = append(requests, stackReq{ + request: StackRequest{ + ID: id, + Version: version, + }, }) } } - if len(requests) == 0 { - // no need to request any other stacks - r.stacksReady.Done() - return nil + + reportResult := func(version string, stack Stack, err error) { + r.batchToStackMx.Lock() + defer r.batchToStackMx.Unlock() + res := stackRes{ + stack: stack, + err: err, + } + for batchID, batchVersion := range batchToVersion { + if batchVersion == version { + r.batchToStack[batchID] = res + ch, ok := r.batchToStackCh[batchID] + if ok { + ch <- res + } + } + } } - // start go routine to provision the needed stacks - go func(ctx context.Context) { - defer r.stacksReady.Done() + // start goroutines to provision the needed stacks + for _, request := range requests { + go func(ctx context.Context, req stackReq) { + var err error + var stack Stack + if req.stack != nil { + stack = *req.stack + } else { + stack, err = r.sp.Create(ctx, req.request) + if err != nil { + reportResult(req.request.Version, stack, err) + return + } + err = r.addOrUpdateStack(stack) + if err != nil { + reportResult(stack.Version, stack, err) + return + } + } + + if stack.Ready { + reportResult(stack.Version, stack, nil) + return + } - stacks, err := r.sp.Provision(ctx, requests) - if err != nil { - r.stacksErr = err - return - } - for _, stack := range stacks { - err := r.addOrUpdateStack(stack) + stack, err = r.sp.WaitForReady(ctx, stack) if err != nil { - r.stacksErr = err + reportResult(stack.Version, stack, err) return } - for batchID, batchVersion := range batchToVersion { - if batchVersion == stack.Version { - r.batchToStack[batchID] = stack - } + + err = r.addOrUpdateStack(stack) + if err != nil { + reportResult(stack.Version, stack, err) + return } - } - }(ctx) + + reportResult(stack.Version, stack, nil) + }(ctx, request) + } return nil } -func (r *Runner) getStackForBatchID(id string) (Stack, bool) { - stack, ok := r.batchToStack[id] - return stack, ok +func (r *Runner) getStackForBatchID(id string) (Stack, error) { + r.batchToStackMx.Lock() + res, ok := r.batchToStack[id] + if ok { + r.batchToStackMx.Unlock() + return res.stack, res.err + } + _, ok = r.batchToStackCh[id] + if ok { + return Stack{}, fmt.Errorf("getStackForBatchID called twice; this is not allowed") + } + ch := make(chan stackRes, 1) + r.batchToStackCh[id] = ch + r.batchToStackMx.Unlock() + + // 12 minutes is because the stack should have been ready after 10 minutes or returned an error + // this only exists to ensure that if that code is not blocking that this doesn't block forever + t := time.NewTimer(12 * time.Minute) + defer t.Stop() + select { + case <-t.C: + return Stack{}, fmt.Errorf("failed waiting for a response after 12 minutes") + case res = <-ch: + return res.stack, res.err + } } func (r *Runner) findInstance(id string) (StateInstance, bool) { @@ -986,3 +1041,13 @@ type batchLogger struct { func (b *batchLogger) Logf(format string, args ...any) { b.wrapped.Logf("(%s) %s", b.prefix, fmt.Sprintf(format, args...)) } + +type stackRes struct { + stack Stack + err error +} + +type stackReq struct { + request StackRequest + stack *Stack +} diff --git a/pkg/testing/runner/runner_test.go b/pkg/testing/runner/runner_test.go index d10b9d524d0..c46b3b53761 100644 --- a/pkg/testing/runner/runner_test.go +++ b/pkg/testing/runner/runner_test.go @@ -8,6 +8,7 @@ import ( "context" "os" "path/filepath" + "sync" "testing" "github.com/stretchr/testify/assert" @@ -86,7 +87,7 @@ func TestNewRunner_Clean(t *testing.T) { require.NoError(t, err) assert.ElementsMatch(t, ip.instances, []Instance{i1, i2}) - assert.ElementsMatch(t, sp.stacks, []Stack{s1, s2}) + assert.ElementsMatch(t, sp.deletedStacks, []Stack{s1, s2}) } type fakeInstanceProvisioner struct { @@ -123,31 +124,38 @@ func (f *fakeInstanceProvisioner) Clean(_ context.Context, _ Config, instances [ } type fakeStackProvisioner struct { - requests []StackRequest - stacks []Stack + mx sync.Mutex + requests []StackRequest + deletedStacks []Stack } func (f *fakeStackProvisioner) SetLogger(_ Logger) { } -func (f *fakeStackProvisioner) Provision(_ context.Context, requests []StackRequest) ([]Stack, error) { - f.requests = requests - var stacks []Stack - for _, req := range requests { - stacks = append(stacks, Stack{ - ID: req.ID, - Version: req.Version, - Elasticsearch: "http://localhost:9200", - Kibana: "http://localhost:5601", - Username: "elastic", - Password: "changeme", - Internal: nil, - }) - } - return stacks, nil +func (f *fakeStackProvisioner) Create(_ context.Context, request StackRequest) (Stack, error) { + f.mx.Lock() + defer f.mx.Unlock() + f.requests = append(f.requests, request) + return Stack{ + ID: request.ID, + Version: request.Version, + Elasticsearch: "http://localhost:9200", + Kibana: "http://localhost:5601", + Username: "elastic", + Password: "changeme", + Internal: nil, + Ready: false, + }, nil +} + +func (f *fakeStackProvisioner) WaitForReady(_ context.Context, stack Stack) (Stack, error) { + stack.Ready = true + return stack, nil } -func (f *fakeStackProvisioner) Clean(_ context.Context, stacks []Stack) error { - f.stacks = stacks +func (f *fakeStackProvisioner) Delete(_ context.Context, stack Stack) error { + f.mx.Lock() + defer f.mx.Unlock() + f.deletedStacks = append(f.deletedStacks, stack) return nil } diff --git a/testing/integration/logs_ingestion_test.go b/testing/integration/logs_ingestion_test.go index ba9a84673b0..d9fb2f511a8 100644 --- a/testing/integration/logs_ingestion_test.go +++ b/testing/integration/logs_ingestion_test.go @@ -104,11 +104,15 @@ func testMonitoringLogsAreShipped( ) { // Stage 1: Make sure metricbeat logs are populated t.Log("Making sure metricbeat logs are populated") - docs := findESDocs(t, func() (estools.Documents, error) { - return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") - }) - t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) - require.NotZero(t, len(docs.Hits.Hits)) + require.Eventually(t, + func() bool { + docs := findESDocs(t, func() (estools.Documents, error) { + return estools.GetLogsForDataset(info.ESClient, "elastic_agent.metricbeat") + }) + return len(docs.Hits.Hits) > 0 + }, + 1*time.Minute, 500*time.Millisecond, + "there should be metricbeats logs by now") // Stage 2: make sure all components are healthy t.Log("Making sure all components are healthy") @@ -123,7 +127,7 @@ func testMonitoringLogsAreShipped( // Stage 3: Make sure there are no errors in logs t.Log("Making sure there are no error logs") - docs = findESDocs(t, func() (estools.Documents, error) { + docs := findESDocs(t, func() (estools.Documents, error) { return estools.CheckForErrorsInLogs(info.ESClient, info.Namespace, []string{ // acceptable error messages (include reason) "Error dialing dial tcp 127.0.0.1:9200: connect: connection refused", // beat is running default config before its config gets updated @@ -134,7 +138,7 @@ func testMonitoringLogsAreShipped( "elastic-agent-client error: rpc error: code = Canceled desc = context canceled", // can happen on restart }) }) - t.Logf("errors: Got %d documents", len(docs.Hits.Hits)) + t.Logf("error logs: Got %d documents", len(docs.Hits.Hits)) for _, doc := range docs.Hits.Hits { t.Logf("%#v", doc.Source) }