diff --git a/.buildkite/hooks/pre-command b/.buildkite/hooks/pre-command index bb4db5f0948..292aa6918c0 100755 --- a/.buildkite/hooks/pre-command +++ b/.buildkite/hooks/pre-command @@ -47,7 +47,7 @@ if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent-package" ]]; then fi fi -if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == "integration-tests" ]]; then +if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == *"integration-tests"* ]]; then # Set GCP credentials export GOOGLE_APPLICATION_GCP_SECRET=$(retry 5 vault kv get -format=json -field=data ${CI_GCP_OBS_PATH}) echo "${GOOGLE_APPLICATION_GCP_SECRET}" > ./gcp.json diff --git a/.buildkite/hooks/pre-exit b/.buildkite/hooks/pre-exit index 381669053c5..c6747e4023f 100755 --- a/.buildkite/hooks/pre-exit +++ b/.buildkite/hooks/pre-exit @@ -2,7 +2,7 @@ set -eo pipefail -if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == "integration-tests" ]]; then +if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == *"integration-tests"* ]]; then if [[ -z "${WORKSPACE-""}" ]]; then WORKSPACE=$(git rev-parse --show-toplevel) fi diff --git a/.buildkite/pipeline.yml b/.buildkite/pipeline.yml index 8ff198708fd..4803aff42df 100644 --- a/.buildkite/pipeline.yml +++ b/.buildkite/pipeline.yml @@ -31,9 +31,19 @@ steps: manual: allowed: true + - label: "Serverless integration test" + key: "serverless-integration-tests" + command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite + artifact_paths: + - "build/TEST-**" + - "build/diagnostics/*" + agents: + provider: "gcp" + machineType: "n1-standard-8" + - label: "Integration tests" key: "integration-tests" - command: ".buildkite/scripts/steps/integration_tests.sh" + command: ".buildkite/scripts/steps/integration_tests.sh stateful" artifact_paths: - "build/TEST-**" - "build/diagnostics/*" diff --git a/.buildkite/scripts/steps/integration_tests.sh b/.buildkite/scripts/steps/integration_tests.sh index d4ebfc03a2e..2a129193267 100755 --- a/.buildkite/scripts/steps/integration_tests.sh +++ b/.buildkite/scripts/steps/integration_tests.sh @@ -3,6 +3,11 @@ set -euo pipefail source .buildkite/scripts/common.sh +STACK_PROVISIONER="${1:-"stateful"}" +MAGE_TARGET="${2:-"integration:test"}" +MAGE_SUBTARGET="${3:-""}" + + # Override the agent package version using a string with format .. # NOTE: use only after version bump when the new version is not yet available, for example: # OVERRIDE_AGENT_PACKAGE_VERSION="8.10.3" otherwise OVERRIDE_AGENT_PACKAGE_VERSION="". @@ -18,7 +23,7 @@ AGENT_PACKAGE_VERSION="${OVERRIDE_AGENT_PACKAGE_VERSION}" DEV=true EXTERNAL=true # Run integration tests set +e -AGENT_VERSION="${OVERRIDE_TEST_AGENT_VERSION}" TEST_INTEG_CLEAN_ON_EXIT=true SNAPSHOT=true mage integration:test +AGENT_VERSION="${OVERRIDE_TEST_AGENT_VERSION}" TEST_INTEG_CLEAN_ON_EXIT=true STACK_PROVISIONER="$STACK_PROVISIONER" SNAPSHOT=true mage $MAGE_TARGET $MAGE_SUBTARGET TESTS_EXIT_STATUS=$? set -e diff --git a/magefile.go b/magefile.go index 75b0f3ef93a..083e91185e3 100644 --- a/magefile.go +++ b/magefile.go @@ -1461,6 +1461,7 @@ func majorMinor() string { // Clean cleans up the integration testing leftovers func (Integration) Clean() error { + fmt.Println("--- Clean mage artifacts") _ = os.RemoveAll(".agent-testing") // Clean out .integration-cache/.ogc-cache always @@ -1702,6 +1703,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche if err != nil { return nil, err } + agentStackVersion := os.Getenv("AGENT_STACK_VERSION") agentVersion := os.Getenv("AGENT_VERSION") if agentVersion == "" { @@ -1719,6 +1721,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche agentVersion = fmt.Sprintf("%s-SNAPSHOT", agentVersion) } } + if agentStackVersion == "" { agentStackVersion = agentVersion } @@ -1756,15 +1759,15 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche instanceProvisionerMode = "ogc" } if instanceProvisionerMode != "ogc" && instanceProvisionerMode != "multipass" { - return nil, errors.New("INSTANCE_PROVISIONER environment variable must be one of 'ogc' or 'multipass'") + return nil, fmt.Errorf("INSTANCE_PROVISIONER environment variable must be one of 'ogc' or 'multipass', not %s", instanceProvisionerMode) } fmt.Printf(">>>> Using %s instance provisioner\n", instanceProvisionerMode) stackProvisionerMode := os.Getenv("STACK_PROVISIONER") if stackProvisionerMode == "" { - stackProvisionerMode = "ess" + stackProvisionerMode = "stateful" } - if stackProvisionerMode != "ess" && stackProvisionerMode != "serverless" { - return nil, errors.New("STACK_PROVISIONER environment variable must be one of 'serverless' or 'ess'") + if stackProvisionerMode != "stateful" && stackProvisionerMode != "serverless" { + return nil, fmt.Errorf("STACK_PROVISIONER environment variable must be one of 'serverless' or 'stateful', not %s", stackProvisionerMode) } fmt.Printf(">>>> Using %s stack provisioner\n", stackProvisionerMode) @@ -1778,24 +1781,37 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche extraEnv["AGENT_KEEP_INSTALLED"] = os.Getenv("AGENT_KEEP_INSTALLED") } + // these following two env vars are currently not used by anything, but can be used in the future to test beats or + // other binaries, see https://github.com/elastic/elastic-agent/pull/3258 + binaryName := os.Getenv("TEST_BINARY_NAME") + if binaryName == "" { + binaryName = "elastic-agent" + } + + repoDir := os.Getenv("TEST_INTEG_REPO_PATH") + if repoDir == "" { + repoDir = "." + } + diagDir := filepath.Join("build", "diagnostics") _ = os.MkdirAll(diagDir, 0755) cfg := runner.Config{ - AgentVersion: agentVersion, - AgentStackVersion: agentStackVersion, - BuildDir: agentBuildDir, - GOVersion: goVersion, - RepoDir: ".", - StateDir: ".integration-cache", - DiagnosticsDir: diagDir, - Platforms: testPlatforms(), - Matrix: matrix, - SingleTest: singleTest, - VerboseMode: mg.Verbose(), - Timestamp: timestamp, - TestFlags: goTestFlags, - ExtraEnv: extraEnv, + AgentVersion: agentVersion, + StackVersion: agentStackVersion, + BuildDir: agentBuildDir, + GOVersion: goVersion, + RepoDir: repoDir, + DiagnosticsDir: diagDir, + StateDir: ".integration-cache", + Platforms: testPlatforms(), + Matrix: matrix, + SingleTest: singleTest, + VerboseMode: mg.Verbose(), + Timestamp: timestamp, + TestFlags: goTestFlags, + ExtraEnv: extraEnv, + BinaryName: binaryName, } ogcCfg := ogc.Config{ ServiceTokenPath: serviceTokenPath, @@ -1824,7 +1840,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche Region: essRegion, } var stackProvisioner runner.StackProvisioner - if stackProvisionerMode == "ess" { + if stackProvisionerMode == "stateful" { stackProvisioner, err = ess.NewProvisioner(provisionCfg) if err != nil { return nil, err diff --git a/pkg/testing/define/define.go b/pkg/testing/define/define.go index 3cae2c3b7f5..5c3ad715ea9 100644 --- a/pkg/testing/define/define.go +++ b/pkg/testing/define/define.go @@ -78,19 +78,29 @@ func NewFixture(t *testing.T, version string, opts ...atesting.FixtureOpt) (*ate buildsDir = filepath.Join(projectDir, "build", "distributions") } + return NewFixtureWithBinary(t, version, "elastic-agent", buildsDir) + +} + +// NewFixture returns a new Elastic Agent testing fixture with a LocalFetcher and +// the agent logging to the test logger. +func NewFixtureWithBinary(t *testing.T, version string, binary string, buildsDir string, opts ...atesting.FixtureOpt) (*atesting.Fixture, error) { ver, err := semver.ParseVersion(version) if err != nil { return nil, fmt.Errorf("%q is an invalid agent version: %w", version, err) } - var f atesting.Fetcher + var binFetcher atesting.Fetcher if ver.IsSnapshot() { - f = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly()) + binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly(), atesting.WithCustomBinaryName(binary)) } else { - f = atesting.LocalFetcher(buildsDir) + binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithCustomBinaryName(binary)) } - opts = append(opts, atesting.WithFetcher(f), atesting.WithLogOutput()) + opts = append(opts, atesting.WithFetcher(binFetcher), atesting.WithLogOutput()) + if binary != "elastic-agent" { + opts = append(opts, atesting.WithBinaryName(binary)) + } return atesting.NewFixture(t, version, opts...) } diff --git a/pkg/testing/ess/provisioner.go b/pkg/testing/ess/provisioner.go index 081b4100869..a051cac39d1 100644 --- a/pkg/testing/ess/provisioner.go +++ b/pkg/testing/ess/provisioner.go @@ -82,8 +82,10 @@ func (p *provisioner) Provision(ctx context.Context, requests []runner.StackRequ results[r] = resp } - // wait 15 minutes for all stacks to be ready - readyCtx, readyCancel := context.WithTimeout(ctx, 15*time.Minute) + // set a long timeout + // this context travels up to the magefile, clients that want a shorter timeout can set + // it via mage's -t flag + readyCtx, readyCancel := context.WithTimeout(ctx, 25*time.Minute) defer readyCancel() g, gCtx := errgroup.WithContext(readyCtx) diff --git a/pkg/testing/ess/serverless.go b/pkg/testing/ess/serverless.go index 07086635d73..817ee33f03b 100644 --- a/pkg/testing/ess/serverless.go +++ b/pkg/testing/ess/serverless.go @@ -17,7 +17,7 @@ import ( "github.com/elastic/elastic-agent/pkg/testing/runner" ) -var serverlessURL = "https://global.qa.cld.elstc.co" +var serverlessURL = "https://staging.found.no" // ServerlessClient is the handler the serverless ES instance type ServerlessClient struct { @@ -54,6 +54,13 @@ type Project struct { } `json:"endpoints"` } +// CredResetResponse contains the new auth details for a +// stack credential reset +type CredResetResponse struct { + Password string `json:"password"` + Username string `json:"username"` +} + // NewServerlessClient creates a new instance of the serverless client func NewServerlessClient(region, projectType, api string, logger runner.Logger) *ServerlessClient { return &ServerlessClient{ @@ -97,6 +104,16 @@ func (srv *ServerlessClient) DeployStack(ctx context.Context, req ServerlessRequ return Project{}, fmt.Errorf("error decoding JSON response: %w", err) } srv.proj = serverlessHandle + + // as of 8/8-ish, the serverless ESS cloud no longer provides credentials on the first POST request, we must send an additional POST + // to reset the credentials + updated, err := srv.ResetCredentials(ctx) + if err != nil { + return serverlessHandle, fmt.Errorf("error resetting credentials: %w", err) + } + srv.proj.Credentials.Username = updated.Username + srv.proj.Credentials.Password = updated.Password + return serverlessHandle, nil } @@ -181,27 +198,15 @@ func (srv *ServerlessClient) WaitForEndpoints(ctx context.Context) error { // WaitForElasticsearch waits until the ES endpoint is healthy func (srv *ServerlessClient) WaitForElasticsearch(ctx context.Context) error { - endpoint := fmt.Sprintf("%s/_cluster/health", srv.proj.Endpoints.Elasticsearch) - req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + req, err := http.NewRequestWithContext(ctx, "GET", srv.proj.Endpoints.Elasticsearch, nil) if err != nil { return fmt.Errorf("error creating HTTP request: %w", err) } req.SetBasicAuth(srv.proj.Credentials.Username, srv.proj.Credentials.Password) + // _cluster/health no longer works on serverless, just check response code readyFunc := func(resp *http.Response) bool { - var health struct { - Status string `json:"status"` - } - err = json.NewDecoder(resp.Body).Decode(&health) - resp.Body.Close() - if err != nil { - srv.log.Logf("response decoding error: %v", err) - return false - } - if health.Status == "green" { - return true - } - return false + return resp.StatusCode == 200 } err = srv.waitForRemoteState(ctx, req, time.Second*5, readyFunc) @@ -243,6 +248,38 @@ func (srv *ServerlessClient) WaitForKibana(ctx context.Context) error { return nil } +// ResetCredentials resets the credentials for the given ESS instance +func (srv *ServerlessClient) ResetCredentials(ctx context.Context) (CredResetResponse, error) { + resetURL := fmt.Sprintf("%s/api/v1/serverless/projects/%s/%s/_reset-credentials", serverlessURL, srv.projectType, srv.proj.ID) + + resetHandler, err := http.NewRequestWithContext(ctx, "POST", resetURL, nil) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error creating new httpRequest: %w", err) + } + + resetHandler.Header.Set("Content-Type", "application/json") + resetHandler.Header.Set("Authorization", fmt.Sprintf("ApiKey %s", srv.api)) + + resp, err := http.DefaultClient.Do(resetHandler) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error performing HTTP request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + p, _ := io.ReadAll(resp.Body) + return CredResetResponse{}, fmt.Errorf("Non-200 status code returned by server: %d, body: %s", resp.StatusCode, string(p)) + } + + updated := CredResetResponse{} + err = json.NewDecoder(resp.Body).Decode(&updated) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error decoding JSON response: %w", err) + } + + return updated, nil +} + func (srv *ServerlessClient) waitForRemoteState(ctx context.Context, httpHandler *http.Request, tick time.Duration, isReady func(*http.Response) bool) error { timer := time.NewTimer(time.Millisecond) // in cases where we get a timeout, also return the last error returned via HTTP diff --git a/pkg/testing/ess/serverless_provision.go b/pkg/testing/ess/serverless_provision.go index ea78fc0eab1..f203105b1ee 100644 --- a/pkg/testing/ess/serverless_provision.go +++ b/pkg/testing/ess/serverless_provision.go @@ -80,7 +80,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne for _, req := range requests { client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) srvReq := ServerlessRequest{Name: req.ID, RegionID: prov.cfg.Region} - _, err := client.DeployStack(ctx, srvReq) + proj, err := client.DeployStack(ctx, srvReq) if err != nil { return nil, fmt.Errorf("error deploying stack for request %s: %w", req.ID, err) } @@ -95,6 +95,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne Kibana: client.proj.Endpoints.Kibana, Username: client.proj.Credentials.Username, Password: client.proj.Credentials.Password, + Internal: map[string]interface{}{"deployment_id": proj.ID}, } stacks = append(stacks, newStack) prov.stacksMut.Lock() diff --git a/pkg/testing/fetcher_local.go b/pkg/testing/fetcher_local.go index 54526903ede..214e63c43a7 100644 --- a/pkg/testing/fetcher_local.go +++ b/pkg/testing/fetcher_local.go @@ -17,6 +17,7 @@ import ( type localFetcher struct { dir string snapshotOnly bool + binaryName string } type localFetcherOpt func(f *localFetcher) @@ -28,10 +29,18 @@ func WithLocalSnapshotOnly() localFetcherOpt { } } +// WithCustomBinaryName sets the binary to a custom name, the default is `elastic-agent` +func WithCustomBinaryName(name string) localFetcherOpt { + return func(f *localFetcher) { + f.binaryName = name + } +} + // LocalFetcher returns a fetcher that pulls the binary of the Elastic Agent from a local location. func LocalFetcher(dir string, opts ...localFetcherOpt) Fetcher { f := &localFetcher{ - dir: dir, + dir: dir, + binaryName: "elastic-agent", } for _, o := range opts { o(f) @@ -56,7 +65,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec return nil, fmt.Errorf("invalid version: %q: %w", ver, err) } - mainBuildfmt := "elastic-agent-%s-%s" + mainBuildfmt := "%s-%s-%s" if f.snapshotOnly && !ver.IsSnapshot() { if ver.Prerelease() == "" { ver = semver.NewParsedSemVer(ver.Major(), ver.Minor(), ver.Patch(), "SNAPSHOT", ver.BuildMetadata()) @@ -66,7 +75,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec } - mainBuild := fmt.Sprintf(mainBuildfmt, ver, suffix) + mainBuild := fmt.Sprintf(mainBuildfmt, f.binaryName, ver, suffix) mainBuildPath := filepath.Join(f.dir, mainBuild) build := mainBuild buildPath := mainBuildPath diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index f713a581f80..1d852e50277 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -43,6 +43,9 @@ type Fixture struct { logOutput bool allowErrs bool connectTimout time.Duration + binaryName string + runLength time.Duration + additionalArgs []string srcPackage string workDir string @@ -104,6 +107,26 @@ func WithConnectTimeout(timeout time.Duration) FixtureOpt { } } +// WithBinaryName sets the name of the binary under test, in cases where tests aren't being run against elastic-agent +func WithBinaryName(name string) FixtureOpt { + return func(f *Fixture) { + f.binaryName = name + } +} + +// WithRunLength sets the total time the binary will run +func WithRunLength(run time.Duration) FixtureOpt { + return func(f *Fixture) { + f.runLength = run + } +} + +func WithAdditionalArgs(args []string) FixtureOpt { + return func(f *Fixture) { + f.additionalArgs = args + } +} + // NewFixture creates a new fixture to setup and manage Elastic Agent. func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, error) { // we store the caller so the fixture can find the cache directory for the artifacts that @@ -123,6 +146,8 @@ func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, err operatingSystem: runtime.GOOS, architecture: runtime.GOARCH, connectTimout: 5 * time.Second, + // default to elastic-agent, can be changed by a set FixtureOpt below + binaryName: "elastic-agent", } for _, o := range opts { o(f) @@ -179,6 +204,21 @@ func (f *Fixture) Prepare(ctx context.Context, components ...UsableComponent) er return nil } +// WriteFileToWorkDir sends a file to the working directory alongside the unpacked tar build. +func (f *Fixture) WriteFileToWorkDir(ctx context.Context, data string, name string) error { + err := f.EnsurePrepared(ctx) + if err != nil { + return fmt.Errorf("error preparing binary: %w", err) + } + + err = os.WriteFile(filepath.Join(f.workDir, name), []byte(data), 0644) + if err != nil { + return fmt.Errorf("error writing file: %w", err) + } + f.t.Logf("wrote %s to %s", name, f.workDir) + return nil +} + // Configure replaces the default Agent configuration file with the provided // configuration. This must be called after `Prepare` is called but before `Run` // or `Install` can be called. @@ -235,14 +275,93 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error { return nil } -// Run runs the Elastic Agent. +// RunBeat runs the given given beat +// the beat will run until an error, or the given timeout is reached +func (f *Fixture) RunBeat(ctx context.Context) error { + if f.binaryName == "elastic-agent" { + return errors.New("RunBeat() can't be run against elastic-agent") + } + + var err error + err = f.EnsurePrepared(ctx) + if err != nil { + return fmt.Errorf("error preparing beat: %w", err) + } + + var logProxy Logger + if f.logOutput { + logProxy = f.t + } + stdOut := newLogWatcher(logProxy) + stdErr := newLogWatcher(logProxy) + args := []string{"run", "-e", "-c", filepath.Join(f.workDir, fmt.Sprintf("%s.yml", f.binaryName))} + + args = append(args, f.additionalArgs...) + + proc, err := process.Start( + f.binaryPath(), + process.WithContext(ctx), + process.WithArgs(args), + process.WithCmdOptions(attachOutErr(stdOut, stdErr))) + + if err != nil { + return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err) + } + + killProc := func() { + _ = proc.Kill() + <-proc.Wait() + } + + var doneChan <-chan time.Time + if f.runLength != 0 { + doneChan = time.After(f.runLength) + } + + stopping := false + for { + select { + case <-ctx.Done(): + killProc() + return ctx.Err() + case ps := <-proc.Wait(): + if stopping { + return nil + } + return fmt.Errorf("elastic-agent exited unexpectedly with exit code: %d", ps.ExitCode()) + case err := <-stdOut.Watch(): + if !f.allowErrs { + // no errors allowed + killProc() + return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) + } + case err := <-stdErr.Watch(): + if !f.allowErrs { + // no errors allowed + killProc() + return fmt.Errorf("elastic-agent logged an unexpected error: %w", err) + } + case <-doneChan: + if !stopping { + // trigger the stop + stopping = true + _ = proc.Stop() + } + } + } +} + +// Run runs the provided binary. // -// If `states` are provided then the Elastic Agent runs until each state has been reached. Once reached the +// If `states` are provided, agent runs until each state has been reached. Once reached the // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // -// If no `states` are provided then the Elastic Agent runs until the context is cancelled. +// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached. func (f *Fixture) Run(ctx context.Context, states ...State) error { + if f.binaryName != "elastic-agent" { + return errors.New("Run() can only be used with elastic-agent, use RunBeat()") + } if f.installed { return errors.New("fixture is installed; cannot be run") } @@ -256,14 +375,28 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - var sm *stateMachine + var smInstance *stateMachine if states != nil { - sm, err = newStateMachine(states) + smInstance, err = newStateMachine(states) if err != nil { return err } } + // agent-specific setup + var agentClient client.Client + var stateCh chan *client.AgentState + var stateErrCh chan error + + cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir) + if err != nil { + return fmt.Errorf("failed to get control protcol address: %w", err) + } + agentClient = client.New(client.WithAddress(cAddr)) + f.setClient(agentClient) + defer f.setClient(nil) + stateCh, stateErrCh = watchState(ctx, agentClient, f.connectTimout) + var logProxy Logger if f.logOutput { logProxy = f.t @@ -271,29 +404,30 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { stdOut := newLogWatcher(logProxy) stdErr := newLogWatcher(logProxy) - cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir) - if err != nil { - return fmt.Errorf("failed to get control protcol address: %w", err) - } + args := []string{"run", "-e", "--disable-encrypted-store", "--testing-mode"} + + args = append(args, f.additionalArgs...) proc, err := process.Start( f.binaryPath(), process.WithContext(ctx), - process.WithArgs([]string{"run", "-e", "--disable-encrypted-store", "--testing-mode"}), + process.WithArgs(args), process.WithCmdOptions(attachOutErr(stdOut, stdErr))) + if err != nil { - return fmt.Errorf("failed to spawn elastic-agent: %w", err) + return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err) } + killProc := func() { _ = proc.Kill() <-proc.Wait() } - c := client.New(client.WithAddress(cAddr)) - f.setClient(c) - defer f.setClient(nil) + var doneChan <-chan time.Time + if f.runLength != 0 { + doneChan = time.After(f.runLength) + } - stateCh, stateErrCh := watchState(ctx, c, f.connectTimout) stopping := false for { select { @@ -323,9 +457,15 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { killProc() return fmt.Errorf("elastic-agent client received unexpected error: %w", err) } + case <-doneChan: + if !stopping { + // trigger the stop + stopping = true + _ = proc.Stop() + } case state := <-stateCh: - if sm != nil { - cfg, cont, err := sm.next(state) + if smInstance != nil { + cfg, cont, err := smInstance.next(state) if err != nil { killProc() return fmt.Errorf("state management failed with unexpected error: %w", err) @@ -337,7 +477,7 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { _ = proc.Stop() } } else if cfg != "" { - err := performConfigure(ctx, c, cfg, 3*time.Second) + err := performConfigure(ctx, agentClient, cfg, 3*time.Second) if err != nil { killProc() return err @@ -362,7 +502,7 @@ func (f *Fixture) Exec(ctx context.Context, args []string, opts ...process.CmdOp if err != nil { return nil, fmt.Errorf("error creating cmd: %w", err) } - f.t.Logf(">> running agent with: %v", cmd.Args) + f.t.Logf(">> running binary with: %v", cmd.Args) return cmd.CombinedOutput() } @@ -502,7 +642,11 @@ func (f *Fixture) binaryPath() string { workDir = filepath.Join(paths.DefaultBasePath, "Elastic", "Agent") } } - binary := filepath.Join(workDir, "elastic-agent") + defaultBin := "elastic-agent" + if f.binaryName != "" { + defaultBin = f.binaryName + } + binary := filepath.Join(workDir, defaultBin) if f.operatingSystem == "windows" { binary += ".exe" } diff --git a/pkg/testing/runner/config.go b/pkg/testing/runner/config.go index a65f8a1550e..1448fa197dd 100644 --- a/pkg/testing/runner/config.go +++ b/pkg/testing/runner/config.go @@ -14,19 +14,24 @@ import ( // Config provides the configuration for running the runner. type Config struct { - AgentVersion string - AgentStackVersion string - BuildDir string - GOVersion string - RepoDir string - StateDir string - DiagnosticsDir string + AgentVersion string + StateDir string + ReleaseVersion string + StackVersion string + BuildDir string + GOVersion string + RepoDir string + DiagnosticsDir string // Platforms filters the tests to only run on the provided list // of platforms even if the tests supports more than what is // defined in this list. Platforms []string + // BinaryName is the name of the binary package under test, i.e, elastic-agent, metricbeat, etc + // this is used to copy the .tar.gz to the remote host + BinaryName string + // Matrix enables matrix testing. This explodes each test to // run on all supported platforms the runner supports. Matrix bool @@ -52,8 +57,8 @@ func (c *Config) Validate() error { if c.AgentVersion == "" { return errors.New("field AgentVersion must be set") } - if c.AgentStackVersion == "" { - return errors.New("field AgentStackVersion must be set") + if c.StackVersion == "" { + return errors.New("field StackVersion must be set") } if c.BuildDir == "" { return errors.New("field BuildDir must be set") diff --git a/pkg/testing/runner/debian.go b/pkg/testing/runner/debian.go index f0d4c837431..268d42fb604 100644 --- a/pkg/testing/runner/debian.go +++ b/pkg/testing/runner/debian.go @@ -96,6 +96,16 @@ func (DebianRunner) Copy(ctx context.Context, sshClient SSHClient, logger Logger return fmt.Errorf("failed to SCP repo archive %s: %w", repoArchive, err) } + // remove build paths, on cases where the build path is different from agent. + for _, remoteBuildPath := range []string{build.Path, build.SHA512Path} { + relativeAgentDir := filepath.Join("agent", remoteBuildPath) + _, _, err := sshClient.Exec(ctx, "sudo", []string{"rm", "-rf", relativeAgentDir}, nil) + // doesn't need to be a fatal error. + if err != nil { + logger.Logf("error removing build dir %s: %w", relativeAgentDir, err) + } + } + // ensure that agent directory is removed (possible it already exists if instance already used) stdout, stderr, err := sshClient.Exec(ctx, "sudo", []string{"rm", "-rf", "agent"}, nil) diff --git a/pkg/testing/runner/runner.go b/pkg/testing/runner/runner.go index 6a3709d6641..af7758caa16 100644 --- a/pkg/testing/runner/runner.go +++ b/pkg/testing/runner/runner.go @@ -356,7 +356,6 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger logger.Logf("Failed to copy files instance: %s", err) return OSRunnerResult{}, fmt.Errorf("failed to copy files to instance %s: %w", instance.Name, err) } - // start with the ExtraEnv first preventing the other environment flags below // from being overwritten env := map[string]string{} @@ -367,6 +366,7 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // ensure that we have all the requirements for the stack if required if batch.Batch.Stack != nil { // wait for the stack to be ready before continuing + logger.Logf("Waiting for stacks to be ready...") r.stacksReady.Wait() if r.stacksErr != nil { return OSRunnerResult{}, fmt.Errorf("%s unable to continue because stack never became ready: %w", instance.Name, r.stacksErr) @@ -386,6 +386,7 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // set the go test flags env["GOTEST_FLAGS"] = r.cfg.TestFlags + env["TEST_BINARY_NAME"] = r.cfg.BinaryName // run the actual tests on the host result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.AgentVersion, batch.ID, batch.Batch, env) @@ -447,9 +448,13 @@ func (r *Runner) getBuild(b OSBatch) Build { ext = "zip" } hashExt := ".sha512" - packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("elastic-agent-%s-%s-%s.%s", r.cfg.AgentVersion, b.OS.Type, arch, ext)) + name := "elastic-agent" + if r.cfg.BinaryName != "" { + name = r.cfg.BinaryName + } + packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("%s-%s-%s-%s.%s", name, r.cfg.AgentVersion, b.OS.Type, arch, ext)) return Build{ - Version: r.cfg.AgentVersion, + Version: r.cfg.ReleaseVersion, Type: b.OS.Type, Arch: arch, Path: packageName, @@ -578,7 +583,7 @@ func (r *Runner) startStacks(ctx context.Context) error { if !lb.Skip && lb.Batch.Stack != nil { if lb.Batch.Stack.Version == "" { // no version defined on the stack; set it to the defined stack version - lb.Batch.Stack.Version = r.cfg.AgentStackVersion + lb.Batch.Stack.Version = r.cfg.StackVersion } if !slices.Contains(versions, lb.Batch.Stack.Version) { versions = append(versions, lb.Batch.Stack.Version) diff --git a/pkg/testing/runner/runner_test.go b/pkg/testing/runner/runner_test.go index c24e653e31a..d10b9d524d0 100644 --- a/pkg/testing/runner/runner_test.go +++ b/pkg/testing/runner/runner_test.go @@ -23,13 +23,13 @@ func TestNewRunner_Clean(t *testing.T) { require.NoError(t, err) cfg := Config{ - AgentVersion: "8.10.0", - AgentStackVersion: "8.10.0-SNAPSHOT", - BuildDir: filepath.Join(tmpdir, "build"), - GOVersion: "1.20.7", - RepoDir: filepath.Join(tmpdir, "repo"), - StateDir: stateDir, - ExtraEnv: nil, + AgentVersion: "8.10.0", + StackVersion: "8.10.0-SNAPSHOT", + BuildDir: filepath.Join(tmpdir, "build"), + GOVersion: "1.20.7", + RepoDir: filepath.Join(tmpdir, "repo"), + StateDir: stateDir, + ExtraEnv: nil, } ip := &fakeInstanceProvisioner{} sp := &fakeStackProvisioner{} diff --git a/pkg/testing/tools/estools/elasticsearch.go b/pkg/testing/tools/estools/elasticsearch.go index ca6dad2dba4..d6bf69369cd 100644 --- a/pkg/testing/tools/estools/elasticsearch.go +++ b/pkg/testing/tools/estools/elasticsearch.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-transport-go/v8/elastictransport" "github.com/elastic/go-elasticsearch/v8/esapi" ) @@ -72,6 +73,75 @@ type ESDoc struct { Source map[string]interface{} `json:"_source"` } +// TemplateResponse is the body of a template data request +type TemplateResponse struct { + IndexTemplates []Template `json:"index_templates"` +} + +// Template is an individual template +type Template struct { + Name string `json:"name"` + IndexTemplate map[string]interface{} `json:"index_template"` +} + +// Pipeline is an individual pipeline +type Pipeline struct { + Description string `json:"description"` + Processors []map[string]interface{} `json:"processors"` +} + +// Ping returns basic ES info +type Ping struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version Version `json:"version"` +} + +// Version contains version and build info from an ES ping +type Version struct { + Number string `json:"number"` + BuildFlavor string `json:"build_flavor"` +} + +// APIKeyRequest contains the needed data to create an API key in Elasticsearch +type APIKeyRequest struct { + Name string `json:"name"` + Expiration string `json:"expiration"` + RoleDescriptors mapstr.M `json:"role_descriptors,omitempty"` + Metadata mapstr.M `json:"metadata,omitempty"` +} + +// APIKeyResponse contains the response data for an API request +type APIKeyResponse struct { + Id string `json:"id"` + Name string `json:"name"` + Expiration int `json:"expiration"` + APIKey string `json:"api_key"` + Encoded string `json:"encoded"` +} + +// DataStreams represents the response from an ES _data_stream API +type DataStreams struct { + DataStreams []DataStream `json:"data_streams"` +} + +// DataStream represents a data stream template +type DataStream struct { + Name string `json:"name"` + Indicies []map[string]string `json:"indicies"` + Status string `json:"status"` + Template string `json:"template"` + Lifecycle Lifecycle `json:"lifecycle"` + Hidden bool `json:"hidden"` + System bool `json:"system"` +} + +type Lifecycle struct { + Enabled bool `json:"enabled"` + DataRetention string `json:"data_retention"` +} + // GetAllindicies returns a list of indicies on the target ES instance func GetAllindicies(client elastictransport.Interface) ([]Index, error) { return GetIndicesWithContext(context.Background(), client, []string{}) @@ -103,11 +173,166 @@ func GetIndicesWithContext(ctx context.Context, client elastictransport.Interfac return respData, nil } +// CreateAPIKey creates an API key with the given request data +func CreateAPIKey(ctx context.Context, client elastictransport.Interface, req APIKeyRequest) (APIKeyResponse, error) { + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(req) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating ES query: %w", err) + } + + apiReq := esapi.SecurityCreateAPIKeyRequest{Body: &buf} + resp, err := apiReq.Do(ctx, client) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating API key: %w", err) + } + defer resp.Body.Close() + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := APIKeyResponse{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return parsed, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + // FindMatchingLogLines returns any logs with message fields that match the given line func FindMatchingLogLines(client elastictransport.Interface, namespace, line string) (Documents, error) { return FindMatchingLogLinesWithContext(context.Background(), client, namespace, line) } +// GetLatestDocumentMatchingQuery returns the last document that matches the given query. +// the query field is inserted into a simple `query` POST request +func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport.Interface, query map[string]interface{}, indexPattern string) (Documents, error) { + queryRaw := map[string]interface{}{ + "query": query, + "sort": map[string]interface{}{ + "timestamp": "desc", + }, + "size": 1, + } + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(queryRaw) + if err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + return performQueryForRawQuery(ctx, queryRaw, indexPattern, client) +} + +// GetIndexTemplatesForPattern lists all index templates on the system +func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.Interface, name string) (TemplateResponse, error) { + req := esapi.IndicesGetIndexTemplateRequest{Name: name} + resp, err := req.Do(ctx, client) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error fetching index templates: %w", err) + } + defer resp.Body.Close() + + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + parsed := TemplateResponse{} + + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + +func GetDataStreamsForPattern(ctx context.Context, client elastictransport.Interface, namePattern string) (DataStreams, error) { + req := esapi.IndicesGetDataStreamRequest{Name: []string{namePattern}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return DataStreams{}, fmt.Errorf("error fetching data streams") + } + defer resp.Body.Close() + + raw, err := handleResponseRaw(resp) + if err != nil { + return DataStreams{}, fmt.Errorf("error handling HTTP response for data stream get: %w", err) + } + + data := DataStreams{} + err = json.Unmarshal(raw, &data) + if err != nil { + return DataStreams{}, fmt.Errorf("error unmarshalling datastream: %w", err) + } + + return data, nil +} + +// DeleteIndexTemplatesDataStreams deletes any data streams, then associcated index templates. +func DeleteIndexTemplatesDataStreams(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IndicesDeleteDataStreamRequest{Name: []string{name}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting data streams: %w", err) + } + defer resp.Body.Close() + + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for data stream delete: %w", err) + } + + patternReq := esapi.IndicesDeleteIndexTemplateRequest{Name: name} + resp, err = patternReq.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index templates: %w", err) + } + defer resp.Body.Close() + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for index template delete: %w", err) + } + return nil +} + +// GetPipelines returns a list of installed pipelines that match the given name/pattern +func GetPipelines(ctx context.Context, client elastictransport.Interface, name string) (map[string]Pipeline, error) { + req := esapi.IngestGetPipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return nil, fmt.Errorf("error fetching index templates: %w", err) + } + defer resp.Body.Close() + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return nil, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := map[string]Pipeline{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return nil, fmt.Errorf("error unmarshaling json response: %w", err) + } + return parsed, nil +} + +// DeletePipelines deletes all pipelines that match the given pattern +func DeletePipelines(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IngestDeletePipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index template") + } + defer resp.Body.Close() + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response: %w", err) + } + return nil +} + // FindMatchingLogLinesWithContext returns any logs with message fields that match the given line func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) { queryRaw := map[string]interface{}{ @@ -137,20 +362,8 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } + return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) - return handleDocsResponse(res) } // CheckForErrorsInLogs checks to see if any error-level lines exist @@ -221,20 +434,8 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } + return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) - return handleDocsResponse(res) } // GetLogsForDatastream returns any logs associated with the datastream @@ -287,15 +488,41 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor }, } + return performQueryForRawQuery(ctx, indexQuery, "*ds-logs*", client) +} + +// GetPing performs a basic ping and returns ES config info +func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, error) { + req := esapi.InfoRequest{} + resp, err := req.Do(ctx, client) + if err != nil { + return Ping{}, fmt.Errorf("error in ping request") + } + defer resp.Body.Close() + + respData, err := handleResponseRaw(resp) + if err != nil { + return Ping{}, fmt.Errorf("error in HTTP response: %w", err) + } + pingData := Ping{} + err = json.Unmarshal(respData, &pingData) + if err != nil { + return pingData, fmt.Errorf("error unmarshalling JSON: %w", err) + } + return pingData, nil + +} + +func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) { var buf bytes.Buffer - err := json.NewEncoder(&buf).Encode(indexQuery) + err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), + es.Search.WithIndex(index), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), @@ -310,13 +537,9 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor } func handleDocsResponse(res *esapi.Response) (Documents, error) { - if res.StatusCode >= 300 || res.StatusCode < 200 { - return Documents{}, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) - } - - resultBuf, err := io.ReadAll(res.Body) + resultBuf, err := handleResponseRaw(res) if err != nil { - return Documents{}, fmt.Errorf("error reading response body: %w", err) + return Documents{}, fmt.Errorf("error in HTTP query: %w", err) } respData := Documents{} @@ -327,3 +550,15 @@ func handleDocsResponse(res *esapi.Response) (Documents, error) { return respData, err } + +func handleResponseRaw(res *esapi.Response) ([]byte, error) { + if res.StatusCode >= 300 || res.StatusCode < 200 { + return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) + } + + resultBuf, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w", err) + } + return resultBuf, nil +} diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go new file mode 100644 index 00000000000..bfb804c6177 --- /dev/null +++ b/pkg/testing/tools/kibana.go @@ -0,0 +1,91 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package tools + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/elastic/elastic-agent-libs/kibana" +) + +type DashboardResponse struct { + Page int `json:"page"` + PerPage int `json:"per_page"` + Total int `json:"total"` + SavedObjects []Dashboard `json:"saved_objects"` +} + +type Dashboard struct { + Type string `json:"type"` + ID string `json:"id"` + Namespaces []string `json:"namespaces"` + UpdatedAt time.Time `json:"updated_at"` + CreatedAt time.Time `json:"created_at"` + Version string `json:"version"` +} + +// DeleteDashboard removes the selected dashboard +func DeleteDashboard(ctx context.Context, client *kibana.Client, id string) error { + // In the future there should be logic to check if we need this header, waiting for https://github.com/elastic/kibana/pull/164850 + headers := http.Header{} + headers.Add("x-elastic-internal-origin", "integration-tests") + status, resp, err := client.Connection.Request("DELETE", fmt.Sprintf("/api/saved_objects/dashboard/%s", id), nil, headers, nil) + if err != nil { + return fmt.Errorf("error making API request: %w, response: '%s'", err, string(resp)) + } + + if status != http.StatusOK { + return fmt.Errorf("non-200 return code: %v, response: '%s'", status, string(resp)) + } + return nil + +} + +// GetDashboards returns a list of known dashboards on the system +func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, error) { + params := url.Values{} + params.Add("type", "dashboard") + params.Add("page", "1") + + dashboards := []Dashboard{} + page := 1 + for { + headers := http.Header{} + headers.Add("x-elastic-internal-origin", "integration-tests") + status, resp, err := client.Connection.Request("GET", "/api/saved_objects/_find", params, headers, nil) + if err != nil { + return nil, fmt.Errorf("error making api request: %w", err) + } + + if status != http.StatusOK { + return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", status, string(resp)) + } + + dashResp := DashboardResponse{} + err = json.Unmarshal(resp, &dashResp) + if err != nil { + return nil, fmt.Errorf("error unmarshalling dashboard response: %w", err) + } + if len(dashResp.SavedObjects) == 0 { + break + } + + dashboards = append(dashboards, dashResp.SavedObjects...) + // we got all the results in one page + if dashResp.Total == dashResp.PerPage { + break + } + // we COULD calculate the number of pages we need to ask for in total, or just keep iterating until we don't get any results + page++ + params.Set("page", fmt.Sprintf("%d", page)) + } + + return dashboards, nil +} diff --git a/testing/integration/monitoring_logs_test.go b/testing/integration/monitoring_logs_test.go index c52b2150d47..5ebce0043de 100644 --- a/testing/integration/monitoring_logs_test.go +++ b/testing/integration/monitoring_logs_test.go @@ -30,7 +30,6 @@ import ( func TestMonitoringLogsShipped(t *testing.T) { info := define.Require(t, define.Requirements{ - OS: []define.OS{{Type: define.Linux}}, Stack: &define.Stack{}, Local: false, Sudo: true, @@ -86,8 +85,8 @@ func TestMonitoringLogsShipped(t *testing.T) { docs := findESDocs(t, func() (estools.Documents, error) { return estools.GetLogsForDatastream(info.ESClient, "elastic_agent.metricbeat") }) - require.NotZero(t, len(docs.Hits.Hits)) t.Logf("metricbeat: Got %d documents", len(docs.Hits.Hits)) + require.NotZero(t, len(docs.Hits.Hits)) // Stage 4: make sure all components are healthy t.Log("Making sure all components are healthy")