diff --git a/magefile.go b/magefile.go index 3d0e6de1e9e..b822d518bc0 100644 --- a/magefile.go +++ b/magefile.go @@ -1540,6 +1540,31 @@ func (Integration) Single(ctx context.Context, testName string) error { return integRunner(ctx, false, testName) } +// Run beat serverless tests +func (Integration) TestBeatServerless(ctx context.Context, beatname string) error { + beatBuildPath := filepath.Join("..", "beats", "x-pack", beatname, "build", "distributions") + err := os.Setenv("AGENT_BUILD_DIR", beatBuildPath) + if err != nil { + return fmt.Errorf("error setting build dir: %s", err) + } + + // a bit of bypass logic; run as serverless by default + if os.Getenv("STACK_PROVISIONER") == "" { + err = os.Setenv("STACK_PROVISIONER", "serverless") + if err != nil { + return fmt.Errorf("error setting serverless stack var: %w", err) + } + } else if os.Getenv("STACK_PROVISIONER") == "ess" { + fmt.Printf(">>> Warning: running TestBeatServerless as stateful\n") + } + + err = os.Setenv("TEST_BINARY_NAME", beatname) + if err != nil { + return fmt.Errorf("error setting binary name: %w", err) + } + return integRunner(ctx, false, "TestMetricbeatSeverless") +} + // PrepareOnRemote shouldn't be called locally (called on remote host to prepare it for testing) func (Integration) PrepareOnRemote() { mg.Deps(mage.InstallGoTestTools) @@ -1686,6 +1711,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche if err != nil { return nil, err } + agentStackVersion := os.Getenv("AGENT_STACK_VERSION") agentVersion := os.Getenv("AGENT_VERSION") if agentVersion == "" { @@ -1703,6 +1729,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche agentVersion = fmt.Sprintf("%s-SNAPSHOT", agentVersion) } } + if agentStackVersion == "" { agentStackVersion = agentVersion } @@ -1762,24 +1789,35 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche extraEnv["AGENT_KEEP_INSTALLED"] = os.Getenv("AGENT_KEEP_INSTALLED") } + binaryName := os.Getenv("TEST_BINARY_NAME") + if binaryName == "" { + binaryName = "elastic-agent" + } + + repoDir := os.Getenv("TEST_INTEG_REPO_PATH") + if repoDir == "" { + repoDir = "." + } + diagDir := filepath.Join("build", "diagnostics") _ = os.MkdirAll(diagDir, 0755) cfg := runner.Config{ - AgentVersion: agentVersion, - AgentStackVersion: agentStackVersion, - BuildDir: agentBuildDir, - GOVersion: goVersion, - RepoDir: ".", - StateDir: ".integration-cache", - DiagnosticsDir: diagDir, - Platforms: testPlatforms(), - Matrix: matrix, - SingleTest: singleTest, - VerboseMode: mg.Verbose(), - Timestamp: timestamp, - TestFlags: goTestFlags, - ExtraEnv: extraEnv, + AgentVersion: agentVersion, + StackVersion: agentStackVersion, + BuildDir: agentBuildDir, + GOVersion: goVersion, + RepoDir: repoDir, + DiagnosticsDir: diagDir, + StateDir: ".integration-cache", + Platforms: testPlatforms(), + Matrix: matrix, + SingleTest: singleTest, + VerboseMode: mg.Verbose(), + Timestamp: timestamp, + TestFlags: goTestFlags, + ExtraEnv: extraEnv, + BinaryName: binaryName, } ogcCfg := ogc.Config{ ServiceTokenPath: serviceTokenPath, diff --git a/pkg/testing/define/define.go b/pkg/testing/define/define.go index 3cae2c3b7f5..5c3ad715ea9 100644 --- a/pkg/testing/define/define.go +++ b/pkg/testing/define/define.go @@ -78,19 +78,29 @@ func NewFixture(t *testing.T, version string, opts ...atesting.FixtureOpt) (*ate buildsDir = filepath.Join(projectDir, "build", "distributions") } + return NewFixtureWithBinary(t, version, "elastic-agent", buildsDir) + +} + +// NewFixture returns a new Elastic Agent testing fixture with a LocalFetcher and +// the agent logging to the test logger. +func NewFixtureWithBinary(t *testing.T, version string, binary string, buildsDir string, opts ...atesting.FixtureOpt) (*atesting.Fixture, error) { ver, err := semver.ParseVersion(version) if err != nil { return nil, fmt.Errorf("%q is an invalid agent version: %w", version, err) } - var f atesting.Fetcher + var binFetcher atesting.Fetcher if ver.IsSnapshot() { - f = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly()) + binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly(), atesting.WithCustomBinaryName(binary)) } else { - f = atesting.LocalFetcher(buildsDir) + binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithCustomBinaryName(binary)) } - opts = append(opts, atesting.WithFetcher(f), atesting.WithLogOutput()) + opts = append(opts, atesting.WithFetcher(binFetcher), atesting.WithLogOutput()) + if binary != "elastic-agent" { + opts = append(opts, atesting.WithBinaryName(binary)) + } return atesting.NewFixture(t, version, opts...) } diff --git a/pkg/testing/ess/serverless.go b/pkg/testing/ess/serverless.go index 07086635d73..1367eb3fb04 100644 --- a/pkg/testing/ess/serverless.go +++ b/pkg/testing/ess/serverless.go @@ -54,6 +54,13 @@ type Project struct { } `json:"endpoints"` } +// CredResetResponse contains the new auth details for a +// stack credential reset +type CredResetResponse struct { + Password string `json:"password"` + Username string `json:"username"` +} + // NewServerlessClient creates a new instance of the serverless client func NewServerlessClient(region, projectType, api string, logger runner.Logger) *ServerlessClient { return &ServerlessClient{ @@ -97,6 +104,16 @@ func (srv *ServerlessClient) DeployStack(ctx context.Context, req ServerlessRequ return Project{}, fmt.Errorf("error decoding JSON response: %w", err) } srv.proj = serverlessHandle + + // as of 8/8-ish, the serverless ESS cloud no longer provides credentials on the first POST request, we must send an additional POST + // to reset the credentials + updated, err := srv.ResetCredentials(ctx) + if err != nil { + return serverlessHandle, fmt.Errorf("error resetting credentials: %w", err) + } + srv.proj.Credentials.Username = updated.Username + srv.proj.Credentials.Password = updated.Password + return serverlessHandle, nil } @@ -181,27 +198,15 @@ func (srv *ServerlessClient) WaitForEndpoints(ctx context.Context) error { // WaitForElasticsearch waits until the ES endpoint is healthy func (srv *ServerlessClient) WaitForElasticsearch(ctx context.Context) error { - endpoint := fmt.Sprintf("%s/_cluster/health", srv.proj.Endpoints.Elasticsearch) - req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + req, err := http.NewRequestWithContext(ctx, "GET", srv.proj.Endpoints.Elasticsearch, nil) if err != nil { return fmt.Errorf("error creating HTTP request: %w", err) } req.SetBasicAuth(srv.proj.Credentials.Username, srv.proj.Credentials.Password) + // _cluster/health no longer works on serverless, just check response code readyFunc := func(resp *http.Response) bool { - var health struct { - Status string `json:"status"` - } - err = json.NewDecoder(resp.Body).Decode(&health) - resp.Body.Close() - if err != nil { - srv.log.Logf("response decoding error: %v", err) - return false - } - if health.Status == "green" { - return true - } - return false + return resp.StatusCode == 200 } err = srv.waitForRemoteState(ctx, req, time.Second*5, readyFunc) @@ -243,6 +248,38 @@ func (srv *ServerlessClient) WaitForKibana(ctx context.Context) error { return nil } +// ResetCredentials resets the credentials for the given ESS instance +func (srv *ServerlessClient) ResetCredentials(ctx context.Context) (CredResetResponse, error) { + resetURL := fmt.Sprintf("%s/api/v1/serverless/projects/%s/%s/_reset-credentials", serverlessURL, srv.projectType, srv.proj.ID) + + resetHandler, err := http.NewRequestWithContext(ctx, "POST", resetURL, nil) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error creating new httpRequest: %w", err) + } + + resetHandler.Header.Set("Content-Type", "application/json") + resetHandler.Header.Set("Authorization", fmt.Sprintf("ApiKey %s", srv.api)) + + resp, err := http.DefaultClient.Do(resetHandler) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error performing HTTP request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + p, _ := io.ReadAll(resp.Body) + return CredResetResponse{}, fmt.Errorf("Non-201 status code returned by server: %d, body: %s", resp.StatusCode, string(p)) + } + + updated := CredResetResponse{} + err = json.NewDecoder(resp.Body).Decode(&updated) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error decoding JSON response: %w", err) + } + + return updated, nil +} + func (srv *ServerlessClient) waitForRemoteState(ctx context.Context, httpHandler *http.Request, tick time.Duration, isReady func(*http.Response) bool) error { timer := time.NewTimer(time.Millisecond) // in cases where we get a timeout, also return the last error returned via HTTP diff --git a/pkg/testing/ess/serverless_provision.go b/pkg/testing/ess/serverless_provision.go index ea78fc0eab1..553dd238eb2 100644 --- a/pkg/testing/ess/serverless_provision.go +++ b/pkg/testing/ess/serverless_provision.go @@ -80,7 +80,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne for _, req := range requests { client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) srvReq := ServerlessRequest{Name: req.ID, RegionID: prov.cfg.Region} - _, err := client.DeployStack(ctx, srvReq) + proj, err := client.DeployStack(ctx, srvReq) if err != nil { return nil, fmt.Errorf("error deploying stack for request %s: %w", req.ID, err) } @@ -95,6 +95,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne Kibana: client.proj.Endpoints.Kibana, Username: client.proj.Credentials.Username, Password: client.proj.Credentials.Password, + Internal: map[string]interface{}{"ID": proj.ID}, } stacks = append(stacks, newStack) prov.stacksMut.Lock() diff --git a/pkg/testing/fetcher_local.go b/pkg/testing/fetcher_local.go index 54526903ede..214e63c43a7 100644 --- a/pkg/testing/fetcher_local.go +++ b/pkg/testing/fetcher_local.go @@ -17,6 +17,7 @@ import ( type localFetcher struct { dir string snapshotOnly bool + binaryName string } type localFetcherOpt func(f *localFetcher) @@ -28,10 +29,18 @@ func WithLocalSnapshotOnly() localFetcherOpt { } } +// WithCustomBinaryName sets the binary to a custom name, the default is `elastic-agent` +func WithCustomBinaryName(name string) localFetcherOpt { + return func(f *localFetcher) { + f.binaryName = name + } +} + // LocalFetcher returns a fetcher that pulls the binary of the Elastic Agent from a local location. func LocalFetcher(dir string, opts ...localFetcherOpt) Fetcher { f := &localFetcher{ - dir: dir, + dir: dir, + binaryName: "elastic-agent", } for _, o := range opts { o(f) @@ -56,7 +65,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec return nil, fmt.Errorf("invalid version: %q: %w", ver, err) } - mainBuildfmt := "elastic-agent-%s-%s" + mainBuildfmt := "%s-%s-%s" if f.snapshotOnly && !ver.IsSnapshot() { if ver.Prerelease() == "" { ver = semver.NewParsedSemVer(ver.Major(), ver.Minor(), ver.Patch(), "SNAPSHOT", ver.BuildMetadata()) @@ -66,7 +75,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec } - mainBuild := fmt.Sprintf(mainBuildfmt, ver, suffix) + mainBuild := fmt.Sprintf(mainBuildfmt, f.binaryName, ver, suffix) mainBuildPath := filepath.Join(f.dir, mainBuild) build := mainBuild buildPath := mainBuildPath diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index b1439f7162c..bf03a977ba1 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -43,6 +43,9 @@ type Fixture struct { logOutput bool allowErrs bool connectTimout time.Duration + binaryName string + runLength time.Duration + additionalArgs []string srcPackage string workDir string @@ -104,6 +107,26 @@ func WithConnectTimeout(timeout time.Duration) FixtureOpt { } } +// WithBinaryName sets the name of the binary under test, in cases where tests aren't being run against elastic-agent +func WithBinaryName(name string) FixtureOpt { + return func(f *Fixture) { + f.binaryName = name + } +} + +// WithRunLength sets the total time the binary will run +func WithRunLength(run time.Duration) FixtureOpt { + return func(f *Fixture) { + f.runLength = run + } +} + +func WithAdditionalArgs(args []string) FixtureOpt { + return func(f *Fixture) { + f.additionalArgs = args + } +} + // NewFixture creates a new fixture to setup and manage Elastic Agent. func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, error) { // we store the caller so the fixture can find the cache directory for the artifacts that @@ -123,6 +146,8 @@ func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, err operatingSystem: runtime.GOOS, architecture: runtime.GOARCH, connectTimout: 5 * time.Second, + // default to elastic-agent, can be changed by a set FixtureOpt below + binaryName: "elastic-agent", } for _, o := range opts { o(f) @@ -179,6 +204,21 @@ func (f *Fixture) Prepare(ctx context.Context, components ...UsableComponent) er return nil } +// WriteFileToWorkDir sends a file to the working directory alongside the unpacked tar build. +func (f *Fixture) WriteFileToWorkDir(ctx context.Context, data string, name string) error { + err := f.ensurePrepared(ctx) + if err != nil { + return fmt.Errorf("error preparing binary: %w", err) + } + + err = os.WriteFile(filepath.Join(f.workDir, name), []byte(data), 0644) + if err != nil { + return fmt.Errorf("error writing file: %w", err) + } + f.t.Logf("wrote %s to %s", name, f.workDir) + return nil +} + // Configure replaces the default Agent configuration file with the provided // configuration. This must be called after `Prepare` is called but before `Run` // or `Install` can be called. @@ -235,15 +275,20 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error { return nil } -// Run runs the Elastic Agent. +// Run runs the provided binary. // -// If `states` are provided then the Elastic Agent runs until each state has been reached. Once reached the +// If `states` are provided and the binary is Elastic Agent, agent runs until each state has been reached. Once reached the // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // -// If no `states` are provided then the Elastic Agent runs until the context is cancelled. +// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached. func (f *Fixture) Run(ctx context.Context, states ...State) error { - if f.installed { + isBeat := false + if f.binaryName != "elastic-agent" { + isBeat = true + } + + if !isBeat && f.installed { return errors.New("fixture is installed; cannot be run") } @@ -256,14 +301,32 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - var sm *stateMachine + var smInstance *stateMachine + if states != nil && isBeat { + return errors.New("states not supported when running against beats") + } if states != nil { - sm, err = newStateMachine(states) + smInstance, err = newStateMachine(states) if err != nil { return err } } + // agent-specific setup + var agentClient client.Client + var stateCh chan *client.AgentState + var stateErrCh chan error + if !isBeat { + cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir) + if err != nil { + return fmt.Errorf("failed to get control protcol address: %w", err) + } + agentClient = client.New(client.WithAddress(cAddr)) + f.setClient(agentClient) + defer f.setClient(nil) + stateCh, stateErrCh = watchState(ctx, agentClient, f.connectTimout) + } + var logProxy Logger if f.logOutput { logProxy = f.t @@ -271,29 +334,32 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { stdOut := newLogWatcher(logProxy) stdErr := newLogWatcher(logProxy) - cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir) - if err != nil { - return fmt.Errorf("failed to get control protcol address: %w", err) + args := []string{"run", "-e", "--disable-encrypted-store", "--testing-mode"} + if isBeat { + args = []string{"run", "-e", "-c", filepath.Join(f.workDir, fmt.Sprintf("%s.yml", f.binaryName))} } + args = append(args, f.additionalArgs...) proc, err := process.Start( f.binaryPath(), process.WithContext(ctx), - process.WithArgs([]string{"run", "-e", "--disable-encrypted-store", "--testing-mode"}), + process.WithArgs(args), process.WithCmdOptions(attachOutErr(stdOut, stdErr))) + if err != nil { - return fmt.Errorf("failed to spawn elastic-agent: %w", err) + return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err) } + killProc := func() { _ = proc.Kill() <-proc.Wait() } - c := client.New(client.WithAddress(cAddr)) - f.setClient(c) - defer f.setClient(nil) + var doneChan <-chan time.Time + if f.runLength != 0 { + doneChan = time.After(f.runLength) + } - stateCh, stateErrCh := watchState(ctx, c, f.connectTimout) stopping := false for { select { @@ -323,9 +389,15 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { killProc() return fmt.Errorf("elastic-agent client received unexpected error: %w", err) } + case <-doneChan: + if !stopping { + // trigger the stop + stopping = true + _ = proc.Stop() + } case state := <-stateCh: - if sm != nil { - cfg, cont, err := sm.next(state) + if smInstance != nil { + cfg, cont, err := smInstance.next(state) if err != nil { killProc() return fmt.Errorf("state management failed with unexpected error: %w", err) @@ -337,7 +409,7 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { _ = proc.Stop() } } else if cfg != "" { - err := performConfigure(ctx, c, cfg, 3*time.Second) + err := performConfigure(ctx, agentClient, cfg, 3*time.Second) if err != nil { killProc() return err @@ -362,7 +434,7 @@ func (f *Fixture) Exec(ctx context.Context, args []string, opts ...process.CmdOp if err != nil { return nil, fmt.Errorf("error creating cmd: %w", err) } - f.t.Logf(">> running agent with: %v", cmd.Args) + f.t.Logf(">> running binary with: %v", cmd.Args) return cmd.CombinedOutput() } @@ -484,7 +556,11 @@ func (f *Fixture) binaryPath() string { workDir = filepath.Join(paths.DefaultBasePath, "Elastic", "Agent") } } - binary := filepath.Join(workDir, "elastic-agent") + defaultBin := "elastic-agent" + if f.binaryName != "" { + defaultBin = f.binaryName + } + binary := filepath.Join(workDir, defaultBin) if f.operatingSystem == "windows" { binary += ".exe" } diff --git a/pkg/testing/runner/config.go b/pkg/testing/runner/config.go index a65f8a1550e..1448fa197dd 100644 --- a/pkg/testing/runner/config.go +++ b/pkg/testing/runner/config.go @@ -14,19 +14,24 @@ import ( // Config provides the configuration for running the runner. type Config struct { - AgentVersion string - AgentStackVersion string - BuildDir string - GOVersion string - RepoDir string - StateDir string - DiagnosticsDir string + AgentVersion string + StateDir string + ReleaseVersion string + StackVersion string + BuildDir string + GOVersion string + RepoDir string + DiagnosticsDir string // Platforms filters the tests to only run on the provided list // of platforms even if the tests supports more than what is // defined in this list. Platforms []string + // BinaryName is the name of the binary package under test, i.e, elastic-agent, metricbeat, etc + // this is used to copy the .tar.gz to the remote host + BinaryName string + // Matrix enables matrix testing. This explodes each test to // run on all supported platforms the runner supports. Matrix bool @@ -52,8 +57,8 @@ func (c *Config) Validate() error { if c.AgentVersion == "" { return errors.New("field AgentVersion must be set") } - if c.AgentStackVersion == "" { - return errors.New("field AgentStackVersion must be set") + if c.StackVersion == "" { + return errors.New("field StackVersion must be set") } if c.BuildDir == "" { return errors.New("field BuildDir must be set") diff --git a/pkg/testing/runner/debian.go b/pkg/testing/runner/debian.go index a954ed90d66..79517b948ed 100644 --- a/pkg/testing/runner/debian.go +++ b/pkg/testing/runner/debian.go @@ -98,6 +98,16 @@ func (DebianRunner) Copy(ctx context.Context, sshClient *ssh.Client, logger Logg return fmt.Errorf("failed to SCP repo archive %s: %w", repoArchive, err) } + // remove build paths, on cases where the build path is different from agent. + for _, remoteBuildPath := range []string{build.Path, build.SHA512Path} { + relativeAgentDir := filepath.Join("agent", remoteBuildPath) + _, _, err := sshRunCommand(ctx, sshClient, "sudo", []string{"rm", "-rf", relativeAgentDir}, nil) + // doesn't need to be a fatal error. + if err != nil { + logger.Logf("error removing build dir %s: %w", relativeAgentDir, err) + } + } + // ensure that agent directory is removed (possible it already exists if instance already used) stdout, stderr, err := sshRunCommand(ctx, sshClient, "sudo", []string{"rm", "-rf", "agent"}, nil) diff --git a/pkg/testing/runner/runner.go b/pkg/testing/runner/runner.go index 646b86eb669..03df6dd9d13 100644 --- a/pkg/testing/runner/runner.go +++ b/pkg/testing/runner/runner.go @@ -355,7 +355,6 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger logger.Logf("Failed to copy files instance: %s", err) return OSRunnerResult{}, fmt.Errorf("failed to copy files to instance %s: %w", instance.Name, err) } - // start with the ExtraEnv first preventing the other environment flags below // from being overwritten env := map[string]string{} @@ -366,6 +365,7 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // ensure that we have all the requirements for the stack if required if batch.Batch.Stack != nil { // wait for the stack to be ready before continuing + logger.Logf("Waiting for stacks to be ready...") r.stacksReady.Wait() if r.stacksErr != nil { return OSRunnerResult{}, fmt.Errorf("%s unable to continue because stack never became ready: %w", instance.Name, r.stacksErr) @@ -385,6 +385,7 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // set the go test flags env["GOTEST_FLAGS"] = r.cfg.TestFlags + env["TEST_BINARY_NAME"] = r.cfg.BinaryName // run the actual tests on the host result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.AgentVersion, batch.ID, batch.Batch, env) @@ -446,9 +447,13 @@ func (r *Runner) getBuild(b OSBatch) Build { ext = "zip" } hashExt := ".sha512" - packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("elastic-agent-%s-%s-%s.%s", r.cfg.AgentVersion, b.OS.Type, arch, ext)) + name := "elastic-agent" + if r.cfg.BinaryName != "" { + name = r.cfg.BinaryName + } + packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("%s-%s-%s-%s.%s", name, r.cfg.AgentVersion, b.OS.Type, arch, ext)) return Build{ - Version: r.cfg.AgentVersion, + Version: r.cfg.ReleaseVersion, Type: b.OS.Type, Arch: arch, Path: packageName, @@ -577,7 +582,7 @@ func (r *Runner) startStacks(ctx context.Context) error { if !lb.Skip && lb.Batch.Stack != nil { if lb.Batch.Stack.Version == "" { // no version defined on the stack; set it to the defined stack version - lb.Batch.Stack.Version = r.cfg.AgentStackVersion + lb.Batch.Stack.Version = r.cfg.StackVersion } if !slices.Contains(versions, lb.Batch.Stack.Version) { versions = append(versions, lb.Batch.Stack.Version) diff --git a/pkg/testing/runner/runner_test.go b/pkg/testing/runner/runner_test.go index c24e653e31a..d10b9d524d0 100644 --- a/pkg/testing/runner/runner_test.go +++ b/pkg/testing/runner/runner_test.go @@ -23,13 +23,13 @@ func TestNewRunner_Clean(t *testing.T) { require.NoError(t, err) cfg := Config{ - AgentVersion: "8.10.0", - AgentStackVersion: "8.10.0-SNAPSHOT", - BuildDir: filepath.Join(tmpdir, "build"), - GOVersion: "1.20.7", - RepoDir: filepath.Join(tmpdir, "repo"), - StateDir: stateDir, - ExtraEnv: nil, + AgentVersion: "8.10.0", + StackVersion: "8.10.0-SNAPSHOT", + BuildDir: filepath.Join(tmpdir, "build"), + GOVersion: "1.20.7", + RepoDir: filepath.Join(tmpdir, "repo"), + StateDir: stateDir, + ExtraEnv: nil, } ip := &fakeInstanceProvisioner{} sp := &fakeStackProvisioner{} diff --git a/pkg/testing/tools/elasticsearch.go b/pkg/testing/tools/elasticsearch.go index 0d0b040d4dd..8b2d554fd98 100644 --- a/pkg/testing/tools/elasticsearch.go +++ b/pkg/testing/tools/elasticsearch.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-transport-go/v8/elastictransport" "github.com/elastic/go-elasticsearch/v8/esapi" ) @@ -72,6 +73,75 @@ type ESDoc struct { Source map[string]interface{} `json:"_source"` } +// TemplateResponse is the body of a template data request +type TemplateResponse struct { + IndexTemplates []Template `json:"index_templates"` +} + +// Template is an individual template +type Template struct { + Name string `json:"name"` + IndexTemplate map[string]interface{} `json:"index_template"` +} + +// Pipeline is an individual pipeline +type Pipeline struct { + Description string `json:"description"` + Processors []map[string]interface{} `json:"processors"` +} + +// Ping returns basic ES info +type Ping struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version Version `json:"version"` +} + +// Version contains version and build info from an ES ping +type Version struct { + Number string `json:"number"` + BuildFlavor string `json:"build_flavor"` +} + +// APIKeyRequest contains the needed data to create an API key in Elasticsearch +type APIKeyRequest struct { + Name string `json:"name"` + Expiration string `json:"expiration"` + RoleDescriptors mapstr.M `json:"role_descriptors,omitempty"` + Metadata mapstr.M `json:"metadata,omitempty"` +} + +// APIKeyResponse contains the response data for an API request +type APIKeyResponse struct { + Id string `json:"id"` + Name string `json:"name"` + Expiration int `json:"expiration"` + APIKey string `json:"api_key"` + Encoded string `json:"encoded"` +} + +// DataStreams represents the response from an ES _data_stream API +type DataStreams struct { + DataStreams []DataStream `json:"data_streams"` +} + +// DataStream represents a data stream template +type DataStream struct { + Name string `json:"name"` + Indicies []map[string]string `json:"indicies"` + Status string `json:"status"` + Template string `json:"template"` + Lifecycle Lifecycle `json:"lifecycle"` + Hidden bool `json:"hidden"` + System bool `json:"system"` +} + +type Lifecycle struct { + Enabled bool `json:"enabled"` + DataRetention string `json:"data_retention"` +} + // GetAllindicies returns a list of indicies on the target ES instance func GetAllindicies(client elastictransport.Interface) ([]Index, error) { return GetIndicesWithContext(context.Background(), client, []string{}) @@ -103,11 +173,158 @@ func GetIndicesWithContext(ctx context.Context, client elastictransport.Interfac return respData, nil } +// CreateAPIKey creates an API key with the given request data +func CreateAPIKey(ctx context.Context, client elastictransport.Interface, req APIKeyRequest) (APIKeyResponse, error) { + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(req) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating ES query: %w", err) + } + + apiReq := esapi.SecurityCreateAPIKeyRequest{Body: &buf} + resp, err := apiReq.Do(ctx, client) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating API key: %w", err) + } + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := APIKeyResponse{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return parsed, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + // FindMatchingLogLines returns any logs with message fields that match the given line func FindMatchingLogLines(client elastictransport.Interface, namespace, line string) (Documents, error) { return FindMatchingLogLinesWithContext(context.Background(), client, namespace, line) } +// GetLatestDocumentMatchingQuery returns the last document that matches the given query. +// the query field is inserted into a simple `query` POST request +func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport.Interface, query map[string]interface{}, indexPattern string) (Documents, error) { + queryRaw := map[string]interface{}{ + "query": query, + "sort": map[string]interface{}{ + "timestamp": "desc", + }, + "size": 1, + } + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(queryRaw) + if err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + return performQueryForRawQuery(ctx, queryRaw, indexPattern, client) +} + +// GetIndexTemplatesForPattern lists all index templates on the system +func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.Interface, name string) (TemplateResponse, error) { + req := esapi.IndicesGetIndexTemplateRequest{Name: name} + resp, err := req.Do(ctx, client) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error fetching index templates: %w", err) + } + + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + parsed := TemplateResponse{} + + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + +func GetDataStreamsForPattern(ctx context.Context, client elastictransport.Interface, namePattern string) (DataStreams, error) { + req := esapi.IndicesGetDataStreamRequest{Name: []string{namePattern}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return DataStreams{}, fmt.Errorf("error fetching data streams") + } + + raw, err := handleResponseRaw(resp) + if err != nil { + return DataStreams{}, fmt.Errorf("error handling HTTP response for data stream get: %w", err) + } + + data := DataStreams{} + err = json.Unmarshal(raw, &data) + if err != nil { + return DataStreams{}, fmt.Errorf("error unmarshalling datastream: %w", err) + } + + return data, nil +} + +// DeleteIndexTemplatesDataStreams deletes any data streams, then associcated index templates. +func DeleteIndexTemplatesDataStreams(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IndicesDeleteDataStreamRequest{Name: []string{name}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting data streams: %w", err) + } + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for data stream delete: %w", err) + } + + patternReq := esapi.IndicesDeleteIndexTemplateRequest{Name: name} + resp, err = patternReq.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index templates: %w", err) + } + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for index template delete: %w", err) + } + return nil +} + +// GetPipelines returns a list of installed pipelines that match the given name/pattern +func GetPipelines(ctx context.Context, client elastictransport.Interface, name string) (map[string]Pipeline, error) { + req := esapi.IngestGetPipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return nil, fmt.Errorf("error fetching index templates: %w", err) + } + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return nil, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := map[string]Pipeline{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return nil, fmt.Errorf("error unmarshaling json response: %w", err) + } + return parsed, nil +} + +// DeletePipelines deletes all pipelines that match the given pattern +func DeletePipelines(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IngestDeletePipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index template") + } + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response: %w", err) + } + return nil +} + // FindMatchingLogLinesWithContext returns any logs with message fields that match the given line func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) { queryRaw := map[string]interface{}{ @@ -137,20 +354,8 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } + return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) - return handleDocsResponse(res) } // CheckForErrorsInLogs checks to see if any error-level lines exist @@ -214,20 +419,8 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } + return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) - return handleDocsResponse(res) } // GetLogsForDatastream returns any logs associated with the datastream @@ -280,15 +473,40 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor }, } + return performQueryForRawQuery(ctx, indexQuery, "*ds-logs*", client) +} + +// GetPing performs a basic ping and returns ES config info +func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, error) { + req := esapi.InfoRequest{} + resp, err := req.Do(ctx, client) + if err != nil { + return Ping{}, fmt.Errorf("error in ping request") + } + + respData, err := handleResponseRaw(resp) + if err != nil { + return Ping{}, fmt.Errorf("error in HTTP response: %w", err) + } + pingData := Ping{} + err = json.Unmarshal(respData, &pingData) + if err != nil { + return pingData, fmt.Errorf("error unmarshalling JSON: %w", err) + } + return pingData, nil + +} + +func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) { var buf bytes.Buffer - err := json.NewEncoder(&buf).Encode(indexQuery) + err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), + es.Search.WithIndex(index), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), @@ -303,13 +521,9 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor } func handleDocsResponse(res *esapi.Response) (Documents, error) { - if res.StatusCode >= 300 || res.StatusCode < 200 { - return Documents{}, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) - } - - resultBuf, err := io.ReadAll(res.Body) + resultBuf, err := handleResponseRaw(res) if err != nil { - return Documents{}, fmt.Errorf("error reading response body: %w", err) + return Documents{}, fmt.Errorf("error in HTTP query: %w", err) } respData := Documents{} @@ -320,3 +534,15 @@ func handleDocsResponse(res *esapi.Response) (Documents, error) { return respData, err } + +func handleResponseRaw(res *esapi.Response) ([]byte, error) { + if res.StatusCode >= 300 || res.StatusCode < 200 { + return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) + } + + resultBuf, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w", err) + } + return resultBuf, nil +} diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go new file mode 100644 index 00000000000..ed9459307b6 --- /dev/null +++ b/pkg/testing/tools/kibana.go @@ -0,0 +1,91 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +package tools + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/url" + "time" + + "github.com/elastic/elastic-agent-libs/kibana" +) + +type DashboardResponse struct { + Page int `json:"page"` + PerPage int `json:"per_page"` + Total int `json:"total"` + SavedObjects []Dashboard `json:"saved_objects"` +} + +type Dashboard struct { + Type string `json:"type"` + ID string `json:"id"` + Namespaces []string `json:"namespaces"` + UpdatedAt time.Time `json:"updated_at"` + CreatedAt time.Time `json:"created_at"` + Version string `json:"version"` +} + +// DeleteDashboard removes the selected dashboard +func DeleteDashboard(ctx context.Context, client *kibana.Client, id string) error { + // In the future there should be logic to check if we need this header, waiting for https://github.com/elastic/kibana/pull/164850 + headers := http.Header{} + headers.Add("x-elastic-internal-origin", "integration-tests") + status, resp, err := client.Connection.Request("DELETE", fmt.Sprintf("/api/saved_objects/dashboard/%s", id), nil, headers, nil) + if err != nil { + return fmt.Errorf("error making API request: %w, response: '%s'", err, string(resp)) + } + + if status != 200 { + return fmt.Errorf("non-200 return code: %v, response: '%s'", status, string(resp)) + } + return nil + +} + +// GetDashboards returns a list of known dashboards on the system +func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, error) { + params := url.Values{} + params.Add("type", "dashboard") + params.Add("page", "1") + + dashboards := []Dashboard{} + page := 1 + for { + headers := http.Header{} + headers.Add("x-elastic-internal-origin", "integration-tests") + status, resp, err := client.Connection.Request("GET", "/api/saved_objects/_find", params, headers, nil) + if err != nil { + return nil, fmt.Errorf("error making api request: %w", err) + } + + if status != 200 { + return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", status, string(resp)) + } + + dashResp := DashboardResponse{} + err = json.Unmarshal(resp, &dashResp) + if err != nil { + return nil, fmt.Errorf("error unmarshalling dashboard response: %w", err) + } + if len(dashResp.SavedObjects) == 0 { + break + } + + dashboards = append(dashboards, dashResp.SavedObjects...) + // we got all the results in one page + if dashResp.Total == dashResp.PerPage { + break + } + // we COULD calculate the number of pages we need to ask for in total, or just keep iterating until we don't get any results + page++ + params.Set("page", fmt.Sprintf("%d", page)) + } + + return dashboards, nil +} diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go new file mode 100644 index 00000000000..64ef36344cd --- /dev/null +++ b/testing/integration/beat_serverless_test.go @@ -0,0 +1,485 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +// //go:build integration + +package integration + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + "github.com/elastic/elastic-agent-libs/mapstr" + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" +) + +type BeatRunner struct { + suite.Suite + requirementsInfo *define.Info + agentFixture *atesting.Fixture + + // connection info + ESHost string + user string + pass string + kibHost string + + testUuid string + testbeatName string +} + +func TestMetricbeatSeverless(t *testing.T) { + info := define.Require(t, define.Requirements{ + OS: []define.OS{ + {Type: define.Linux}, + }, + Stack: &define.Stack{}, + Local: false, + Sudo: true, + }) + + suite.Run(t, &BeatRunner{requirementsInfo: info}) +} + +func (runner *BeatRunner) SetupSuite() { + runner.T().Logf("In SetupSuite") + + runner.testbeatName = os.Getenv("TEST_BINARY_NAME") + if runner.testbeatName == "" { + runner.T().Fatalf("TEST_BINARY_NAME must be set") + } + if runner.testbeatName == "elastic-agent" { + runner.T().Skipf("tests must be run against a beat, not elastic-agent") + } + + if runner.testbeatName != "filebeat" && runner.testbeatName != "metricbeat" { + runner.T().Skip("test only supports metricbeat or filebeat") + } + runner.T().Logf("running serverless tests with %s", runner.testbeatName) + + agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu", atesting.WithRunLength(time.Minute), atesting.WithAdditionalArgs([]string{"-E", "output.elasticsearch.allow_older_versions=true"})) + runner.agentFixture = agentFixture + require.NoError(runner.T(), err) + + // the require.* code will fail without these, so assume the values are non-nil + runner.ESHost = os.Getenv("ELASTICSEARCH_HOST") + runner.user = os.Getenv("ELASTICSEARCH_USERNAME") + runner.pass = os.Getenv("ELASTICSEARCH_PASSWORD") + runner.kibHost = os.Getenv("KIBANA_HOST") + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + beatOutConfig := ` +output.elasticsearch: + hosts: ["%s"] + api_key: "%s:%s" +setup.kibana: + host: %s +metricbeat.config.modules: + path: ${path.config}/modules.d/*.yml +processors: + - add_fields: + target: host + fields: + test-id: %s +` + if runner.testbeatName == "filebeat" { + beatOutConfig = ` +output.elasticsearch: + hosts: ["%s"] + username: %s + password: %s +setup.kibana: + host: %s +filebeat.config.modules: + - modules: system + syslog: + enabled: true + auth: + enabled: true +processors: + - add_fields: + target: host + fields: + test-id: %s +` + } + + apiResp, err := tools.CreateAPIKey(ctx, runner.requirementsInfo.ESClient, tools.APIKeyRequest{Name: "test-api-key", Expiration: "1d"}) + require.NoError(runner.T(), err) + + // beats likes to add standard ports to URLs that don't have them, and ESS will sometimes return a URL without a port, assuming :443 + // so try to fix that here + fixedKibanaHost := runner.kibHost + parsedKibana, err := url.Parse(runner.kibHost) + require.NoError(runner.T(), err) + if parsedKibana.Port() == "" { + fixedKibanaHost = fmt.Sprintf("%s:443", fixedKibanaHost) + } + + fixedESHost := runner.ESHost + parsedES, err := url.Parse(runner.ESHost) + require.NoError(runner.T(), err) + if parsedES.Port() == "" { + fixedESHost = fmt.Sprintf("%s:443", fixedESHost) + } + + runner.T().Logf("configuring beats with %s / %s", fixedESHost, fixedKibanaHost) + + testUuid, err := uuid.NewV4() + require.NoError(runner.T(), err) + runner.testUuid = testUuid.String() + parsedCfg := fmt.Sprintf(beatOutConfig, fixedESHost, apiResp.Id, apiResp.APIKey, fixedKibanaHost, testUuid.String()) + err = runner.agentFixture.WriteFileToWorkDir(ctx, parsedCfg, fmt.Sprintf("%s.yml", runner.testbeatName)) + require.NoError(runner.T(), err) +} + +// run the beat with default metricsets, ensure no errors in logs + data is ingested +func (runner *BeatRunner) TestRunAndCheckData() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*4) + defer cancel() + + // in case there's already a running template, delete it, forcing the beat to re-install + _ = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + + err := runner.agentFixture.Run(ctx) + require.NoError(runner.T(), err) + + docs, err := tools.GetLatestDocumentMatchingQuery(ctx, runner.requirementsInfo.ESClient, map[string]interface{}{ + "match": map[string]interface{}{ + "host.test-id": runner.testUuid, + }, + }, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), docs.Hits.Hits) +} + +// tests the [beat] setup --dashboards command +func (runner *BeatRunner) TestSetupDashboards() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3) //dashboards seem to take a while + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), "setup", "--dashboards"}) + assert.NoError(runner.T(), err) + runner.T().Logf("got response from dashboard setup: %s", string(resp)) + require.True(runner.T(), strings.Contains(string(resp), "Loaded dashboards")) + + dashList, err := tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) + require.NoError(runner.T(), err) + + // interesting hack in cases where we don't have a clean environment + // check to see if any of the dashboards were created recently + found := false + for _, dash := range dashList { + if time.Since(dash.UpdatedAt) < time.Minute*5 { + found = true + break + } + } + require.True(runner.T(), found, fmt.Sprintf("could not find dashboard newer than 5 minutes, out of %d dashboards", len(dashList))) + + runner.Run("export dashboards", runner.SubtestExportDashboards) + + // cleanup + for _, dash := range dashList { + err = tools.DeleteDashboard(ctx, runner.requirementsInfo.KibanaClient, dash.ID) + if err != nil { + runner.T().Logf("WARNING: could not delete dashboards after test: %s", err) + break + } + } +} + +// tests the [beat] export dashboard command +func (runner *BeatRunner) SubtestExportDashboards() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + outDir := runner.T().TempDir() + + dashlist, err := tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), dashlist) + + exportOut, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "dashboard", "--folder", outDir, "--id", dashlist[0].ID}) + + runner.T().Logf("got output: %s", exportOut) + assert.NoError(runner.T(), err) + + inFolder, err := os.ReadDir(filepath.Join(outDir, "/_meta/kibana/8/dashboard")) + require.NoError(runner.T(), err) + runner.T().Logf("got log contents: %#v", inFolder) + require.NotEmpty(runner.T(), inFolder) +} + +// NOTE for the below tests: the testing framework doesn't guarantee a new stack instance each time, +// which means we might be running against a stack where a previous test has already done setup. +// perhaps CI should run `mage integration:clean` first? + +// tests the [beat] setup --pipelines command +func (runner *BeatRunner) TestSetupPipelines() { + if runner.testbeatName != "filebeat" { + runner.T().Skip("pipelines only available on filebeat") + } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // need to actually enable something that has pipelines + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), + "setup", "--pipelines", "--modules", "apache", "-M", "apache.error.enabled=true", "-M", "apache.access.enabled=true"}) + assert.NoError(runner.T(), err) + + runner.T().Logf("got response from pipeline setup: %s", string(resp)) + + pipelines, err := tools.GetPipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), pipelines) + + /// cleanup + err = tools.DeletePipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") + if err != nil { + runner.T().Logf("WARNING: could not clean up pipelines: %s", err) + } +} + +// test beat setup --index-management with ILM disabled +func (runner *BeatRunner) TestIndexManagementNoILM() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=false"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + assert.NoError(runner.T(), err) + // we should not print a warning if we've explicitly disabled ILM + assert.NotContains(runner.T(), string(resp), "not supported") + + tmpls, err := tools.GetIndexTemplatesForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + for _, tmpl := range tmpls.IndexTemplates { + runner.T().Logf("got template: %s", tmpl.Name) + } + require.NotEmpty(runner.T(), tmpls.IndexTemplates) + + runner.Run("export templates", runner.SubtestExportTemplates) + runner.Run("export index patterns", runner.SubtestExportIndexPatterns) + + // cleanup + err = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + if err != nil { + runner.T().Logf("WARNING: could not clean up index templates/data streams: %s", err) + } +} + +// TestWithCustomLifecyclePolicy uploads a custom DSL policy +func (runner *BeatRunner) TestWithCustomLifecyclePolicy() { + //create a custom policy file + dslPolicy := mapstr.M{ + "data_retention": "1d", + } + + lctemp := runner.T().TempDir() + raw, err := json.MarshalIndent(dslPolicy, "", " ") + require.NoError(runner.T(), err) + + lifecyclePath := filepath.Join(lctemp, "dsl_policy.json") + + err = os.WriteFile(lifecyclePath, raw, 0o744) + require.NoError(runner.T(), err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // pre-delete in case something else missed cleanup + _ = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.dsl.enabled=true", fmt.Sprintf("--E=setup.dsl.policy_file=%s", lifecyclePath)}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + streams, err := tools.GetDataStreamsForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + + foundCustom := false + for _, stream := range streams.DataStreams { + if stream.Lifecycle.DataRetention == "1d" { + foundCustom = true + break + } + } + require.True(runner.T(), foundCustom, "did not find our custom lifecycle policy. Found: %#v", streams) + + err = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + require.NoError(runner.T(), err) +} + +// tests beat setup --index-management with ILM explicitly set +// On serverless, this should fail. +func (runner *BeatRunner) TestIndexManagementILMEnabledFailure() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient) + require.NoError(runner.T(), err) + + if info.Version.BuildFlavor != "serverless" { + runner.T().Skip("must run on serverless") + } + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=true", "--E=setup.ilm.overwrite=true"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.Error(runner.T(), err) + assert.Contains(runner.T(), string(resp), "error creating") +} + +// tests setup with both ILM and DSL enabled, should fail +func (runner *BeatRunner) TestBothLifecyclesEnabled() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=true", "--E=setup.dsl.enabled=true"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.Error(runner.T(), err) +} + +// disable all lifecycle management, ensure it's actually disabled +func (runner *BeatRunner) TestAllLifecyclesDisabled() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // in case there's already a running template, delete it, forcing the beat to re-install + _ = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=false", "--E=setup.dsl.enabled=false"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + // make sure we have data streams, but there's no lifecycles + streams, err := tools.GetDataStreamsForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), streams.DataStreams, "found no datastreams") + foundPolicy := false + for _, stream := range streams.DataStreams { + if stream.Lifecycle.DataRetention != "" { + foundPolicy = true + break + } + } + require.False(runner.T(), foundPolicy, "Found a lifecycle policy despite disabling lifecycles. Found: %#v", streams) + + err = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) +} + +// the export command doesn't actually make a network connection, +// so this won't fail +func (runner *BeatRunner) TestExport() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient) + require.NoError(runner.T(), err) + + if info.Version.BuildFlavor != "serverless" { + runner.T().Skip("must run on serverless") + } + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", "ilm-policy", "--E=setup.ilm.enabled=true"}) + runner.T().Logf("got response from export: %s", string(resp)) + assert.NoError(runner.T(), err) + // check to see if we got a valid output + policy := map[string]interface{}{} + err = json.Unmarshal(resp, &policy) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), policy["policy"]) +} + +// tests beat export with DSL +func (runner *BeatRunner) TestExportDSL() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", "ilm-policy", "--E=setup.dsl.enabled=true"}) + runner.T().Logf("got response from export: %s", string(resp)) + assert.NoError(runner.T(), err) + // check to see if we got a valid output + policy := map[string]interface{}{} + err = json.Unmarshal(resp, &policy) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), policy["data_retention"]) +} + +func (runner *BeatRunner) SubtestExportTemplates() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + outDir := runner.T().TempDir() + + _, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "template", "--dir", outDir}) + assert.NoError(runner.T(), err) + + inFolder, err := os.ReadDir(filepath.Join(outDir, "/template")) + require.NoError(runner.T(), err) + runner.T().Logf("got log contents: %#v", inFolder) + require.NotEmpty(runner.T(), inFolder) +} + +func (runner *BeatRunner) SubtestExportIndexPatterns() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + rawPattern, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "index-pattern"}) + assert.NoError(runner.T(), err) + + idxPattern := map[string]interface{}{} + + err = json.Unmarshal(rawPattern, &idxPattern) + require.NoError(runner.T(), err) + require.NotNil(runner.T(), idxPattern["attributes"]) +}