From c55bbde20f8d3551ea62737ad1344ec036a2c11c Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Tue, 15 Aug 2023 15:01:41 -0700 Subject: [PATCH 01/18] cleaning up --- magefile.go | 56 +++- pkg/testing/define/define.go | 20 +- pkg/testing/ess/serverless.go | 67 ++++- pkg/testing/fetcher_local.go | 10 +- pkg/testing/fetcher_local_test.go | 4 +- pkg/testing/fixture.go | 99 ++++++- pkg/testing/runner/config.go | 20 +- pkg/testing/runner/debian.go | 3 +- pkg/testing/runner/runner.go | 15 +- pkg/testing/tools/elasticsearch.go | 168 ++++++++--- pkg/testing/tools/kibana.go | 63 +++++ testing/integration/beat_serverless_test.go | 296 ++++++++++++++++++++ 12 files changed, 733 insertions(+), 88 deletions(-) create mode 100644 pkg/testing/tools/kibana.go create mode 100644 testing/integration/beat_serverless_test.go diff --git a/magefile.go b/magefile.go index e739963c857..10aacba7640 100644 --- a/magefile.go +++ b/magefile.go @@ -1527,6 +1527,25 @@ func (Integration) Single(ctx context.Context, testName string) error { return integRunner(ctx, false, testName) } +// Run beat serverless tests +func (Integration) TestBeatServerless(ctx context.Context, beatname string) error { + beatBuildPath := filepath.Join("..", "beats", "x-pack", beatname, "build", "distributions") + err := os.Setenv("AGENT_BUILD_DIR", beatBuildPath) + if err != nil { + return fmt.Errorf("error setting build dir: %s", err) + } + // err = os.Setenv("STACK", "serverless") + // if err != nil { + // return fmt.Errorf("error setting serverless stack var: %w", err) + // } + + err = os.Setenv("TEST_BINARY_NAME", beatname) + if err != nil { + return fmt.Errorf("error setting binary name: %w", err) + } + return integRunner(ctx, false, "TestMetricbeatSeverless") +} + // PrepareOnRemote shouldn't be called locally (called on remote host to prepare it for testing) func (Integration) PrepareOnRemote() { mg.Deps(mage.InstallGoTestTools) @@ -1732,23 +1751,34 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche extraEnv["AGENT_KEEP_INSTALLED"] = os.Getenv("AGENT_KEEP_INSTALLED") } + binaryName := os.Getenv("TEST_BINARY_NAME") + if binaryName == "" { + binaryName = "elastic-agent" + } + + repoDir := os.Getenv("TEST_INTEG_REPO_PATH") + if repoDir == "" { + repoDir = "." + } + diagDir := filepath.Join("build", "diagnostics") _ = os.MkdirAll(diagDir, 0755) cfg := runner.Config{ - AgentVersion: agentVersion, - AgentStackVersion: agentStackVersion, - BuildDir: agentBuildDir, - GOVersion: goVersion, - RepoDir: ".", - DiagnosticsDir: diagDir, - Platforms: testPlatforms(), - Matrix: matrix, - SingleTest: singleTest, - VerboseMode: mg.Verbose(), - Timestamp: timestamp, - TestFlags: goTestFlags, - ExtraEnv: extraEnv, + ReleaseVersion: agentVersion, + StackVersion: agentStackVersion, + BuildDir: agentBuildDir, + GOVersion: goVersion, + RepoDir: repoDir, + DiagnosticsDir: diagDir, + Platforms: testPlatforms(), + Matrix: matrix, + SingleTest: singleTest, + VerboseMode: mg.Verbose(), + Timestamp: timestamp, + TestFlags: goTestFlags, + ExtraEnv: extraEnv, + BinaryName: binaryName, } ogcCfg := ogc.Config{ ServiceTokenPath: serviceTokenPath, diff --git a/pkg/testing/define/define.go b/pkg/testing/define/define.go index 3cae2c3b7f5..6c45e18f2a3 100644 --- a/pkg/testing/define/define.go +++ b/pkg/testing/define/define.go @@ -21,6 +21,7 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" + "github.com/stretchr/testify/require" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/utils" @@ -78,19 +79,30 @@ func NewFixture(t *testing.T, version string, opts ...atesting.FixtureOpt) (*ate buildsDir = filepath.Join(projectDir, "build", "distributions") } + fixture, err := NewFixtureWithBinary(t, version, "elastic-agent", buildsDir) + require.NoError(t, err) + return fixture, nil +} + +// NewFixture returns a new Elastic Agent testing fixture with a LocalFetcher and +// the agent logging to the test logger. +func NewFixtureWithBinary(t *testing.T, version string, binary string, buildsDir string, opts ...atesting.FixtureOpt) (*atesting.Fixture, error) { ver, err := semver.ParseVersion(version) if err != nil { return nil, fmt.Errorf("%q is an invalid agent version: %w", version, err) } - var f atesting.Fetcher + var binFetcher atesting.Fetcher if ver.IsSnapshot() { - f = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly()) + binFetcher = atesting.LocalFetcher(buildsDir, binary, atesting.WithLocalSnapshotOnly()) } else { - f = atesting.LocalFetcher(buildsDir) + binFetcher = atesting.LocalFetcher(buildsDir, binary) } - opts = append(opts, atesting.WithFetcher(f), atesting.WithLogOutput()) + opts = append(opts, atesting.WithFetcher(binFetcher), atesting.WithLogOutput()) + if binary != "elastic-agent" { + opts = append(opts, atesting.WithBinaryName(binary)) + } return atesting.NewFixture(t, version, opts...) } diff --git a/pkg/testing/ess/serverless.go b/pkg/testing/ess/serverless.go index 07086635d73..1367eb3fb04 100644 --- a/pkg/testing/ess/serverless.go +++ b/pkg/testing/ess/serverless.go @@ -54,6 +54,13 @@ type Project struct { } `json:"endpoints"` } +// CredResetResponse contains the new auth details for a +// stack credential reset +type CredResetResponse struct { + Password string `json:"password"` + Username string `json:"username"` +} + // NewServerlessClient creates a new instance of the serverless client func NewServerlessClient(region, projectType, api string, logger runner.Logger) *ServerlessClient { return &ServerlessClient{ @@ -97,6 +104,16 @@ func (srv *ServerlessClient) DeployStack(ctx context.Context, req ServerlessRequ return Project{}, fmt.Errorf("error decoding JSON response: %w", err) } srv.proj = serverlessHandle + + // as of 8/8-ish, the serverless ESS cloud no longer provides credentials on the first POST request, we must send an additional POST + // to reset the credentials + updated, err := srv.ResetCredentials(ctx) + if err != nil { + return serverlessHandle, fmt.Errorf("error resetting credentials: %w", err) + } + srv.proj.Credentials.Username = updated.Username + srv.proj.Credentials.Password = updated.Password + return serverlessHandle, nil } @@ -181,27 +198,15 @@ func (srv *ServerlessClient) WaitForEndpoints(ctx context.Context) error { // WaitForElasticsearch waits until the ES endpoint is healthy func (srv *ServerlessClient) WaitForElasticsearch(ctx context.Context) error { - endpoint := fmt.Sprintf("%s/_cluster/health", srv.proj.Endpoints.Elasticsearch) - req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil) + req, err := http.NewRequestWithContext(ctx, "GET", srv.proj.Endpoints.Elasticsearch, nil) if err != nil { return fmt.Errorf("error creating HTTP request: %w", err) } req.SetBasicAuth(srv.proj.Credentials.Username, srv.proj.Credentials.Password) + // _cluster/health no longer works on serverless, just check response code readyFunc := func(resp *http.Response) bool { - var health struct { - Status string `json:"status"` - } - err = json.NewDecoder(resp.Body).Decode(&health) - resp.Body.Close() - if err != nil { - srv.log.Logf("response decoding error: %v", err) - return false - } - if health.Status == "green" { - return true - } - return false + return resp.StatusCode == 200 } err = srv.waitForRemoteState(ctx, req, time.Second*5, readyFunc) @@ -243,6 +248,38 @@ func (srv *ServerlessClient) WaitForKibana(ctx context.Context) error { return nil } +// ResetCredentials resets the credentials for the given ESS instance +func (srv *ServerlessClient) ResetCredentials(ctx context.Context) (CredResetResponse, error) { + resetURL := fmt.Sprintf("%s/api/v1/serverless/projects/%s/%s/_reset-credentials", serverlessURL, srv.projectType, srv.proj.ID) + + resetHandler, err := http.NewRequestWithContext(ctx, "POST", resetURL, nil) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error creating new httpRequest: %w", err) + } + + resetHandler.Header.Set("Content-Type", "application/json") + resetHandler.Header.Set("Authorization", fmt.Sprintf("ApiKey %s", srv.api)) + + resp, err := http.DefaultClient.Do(resetHandler) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error performing HTTP request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + p, _ := io.ReadAll(resp.Body) + return CredResetResponse{}, fmt.Errorf("Non-201 status code returned by server: %d, body: %s", resp.StatusCode, string(p)) + } + + updated := CredResetResponse{} + err = json.NewDecoder(resp.Body).Decode(&updated) + if err != nil { + return CredResetResponse{}, fmt.Errorf("error decoding JSON response: %w", err) + } + + return updated, nil +} + func (srv *ServerlessClient) waitForRemoteState(ctx context.Context, httpHandler *http.Request, tick time.Duration, isReady func(*http.Response) bool) error { timer := time.NewTimer(time.Millisecond) // in cases where we get a timeout, also return the last error returned via HTTP diff --git a/pkg/testing/fetcher_local.go b/pkg/testing/fetcher_local.go index 54526903ede..4c4996517af 100644 --- a/pkg/testing/fetcher_local.go +++ b/pkg/testing/fetcher_local.go @@ -17,6 +17,7 @@ import ( type localFetcher struct { dir string snapshotOnly bool + binaryName string } type localFetcherOpt func(f *localFetcher) @@ -29,9 +30,10 @@ func WithLocalSnapshotOnly() localFetcherOpt { } // LocalFetcher returns a fetcher that pulls the binary of the Elastic Agent from a local location. -func LocalFetcher(dir string, opts ...localFetcherOpt) Fetcher { +func LocalFetcher(dir string, binary string, opts ...localFetcherOpt) Fetcher { f := &localFetcher{ - dir: dir, + dir: dir, + binaryName: binary, } for _, o := range opts { o(f) @@ -56,7 +58,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec return nil, fmt.Errorf("invalid version: %q: %w", ver, err) } - mainBuildfmt := "elastic-agent-%s-%s" + mainBuildfmt := "%s-%s-%s" if f.snapshotOnly && !ver.IsSnapshot() { if ver.Prerelease() == "" { ver = semver.NewParsedSemVer(ver.Major(), ver.Minor(), ver.Patch(), "SNAPSHOT", ver.BuildMetadata()) @@ -66,7 +68,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec } - mainBuild := fmt.Sprintf(mainBuildfmt, ver, suffix) + mainBuild := fmt.Sprintf(mainBuildfmt, f.binaryName, ver, suffix) mainBuildPath := filepath.Join(f.dir, mainBuild) build := mainBuild buildPath := mainBuildPath diff --git a/pkg/testing/fetcher_local_test.go b/pkg/testing/fetcher_local_test.go index a97c5db1c9e..21c68dbc01c 100644 --- a/pkg/testing/fetcher_local_test.go +++ b/pkg/testing/fetcher_local_test.go @@ -17,7 +17,7 @@ import ( ) func TestLocalFetcher_Name(t *testing.T) { - f := LocalFetcher(t.TempDir()) + f := LocalFetcher(t.TempDir(), "elastic-agent") require.Equal(t, "local", f.Name()) } @@ -77,7 +77,7 @@ func TestLocalFetcher(t *testing.T) { for _, tc := range tcs { tmp := t.TempDir() - f := LocalFetcher(testdata, tc.opts...) + f := LocalFetcher(testdata, "elastic-agent", tc.opts...) got, err := f.Fetch( context.Background(), runtime.GOOS, runtime.GOARCH, tc.version) require.NoError(t, err) diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index b03290cd508..dcc1754bf3c 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -43,6 +43,7 @@ type Fixture struct { logOutput bool allowErrs bool connectTimout time.Duration + binaryName string srcPackage string workDir string @@ -101,6 +102,13 @@ func WithConnectTimeout(timeout time.Duration) FixtureOpt { } } +// WithBinaryName sets the name of the binary under test, in cases where tests aren't being run against elastic-agent +func WithBinaryName(name string) FixtureOpt { + return func(f *Fixture) { + f.binaryName = name + } +} + // NewFixture creates a new fixture to setup and manage Elastic Agent. func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, error) { // we store the caller so the fixture can find the cache directory for the artifacts that @@ -176,6 +184,21 @@ func (f *Fixture) Prepare(ctx context.Context, components ...UsableComponent) er return nil } +// WriteFileToWorkDir sends a file to the working directory alongside the unpacked tar build. +func (f *Fixture) WriteFileToWorkDir(ctx context.Context, data string, name string) error { + err := f.ensurePrepared(ctx) + if err != nil { + return fmt.Errorf("error preparing binary: %w", err) + } + + err = os.WriteFile(filepath.Join(f.workDir, name), []byte(data), 644) + if err != nil { + return fmt.Errorf("error writing file: %w", err) + } + f.t.Logf("wrote %s to %s", name, f.workDir) + return nil +} + // Configure replaces the default Agent configuration file with the provided // configuration. This must be called after `Prepare` is called but before `Run` // or `Install` can be called. @@ -227,6 +250,74 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error { return nil } +// Runs the given beat for a specified period of time. Will fail if an error appears in the logs, +// Will refuse to run if the given binary is elastic-agent +func (f *Fixture) RunBeat(ctx context.Context, runFor time.Duration) error { + if f.binaryName == "elastic-agent" { + return fmt.Errorf("binary %s is not a beat", f.binaryName) + } + + err := f.ensurePrepared(ctx) + if err != nil { + return err + } + + var logProxy Logger + if f.logOutput { + logProxy = f.t + } + stdOut := newLogWatcher(logProxy) + stdErr := newLogWatcher(logProxy) + + proc, err := process.Start( + f.binaryPath(), + process.WithContext(ctx), + process.WithArgs([]string{"run", "-e", "-c", filepath.Join(f.workDir, "metricbeat.yml")}), + process.WithCmdOptions(attachOutErr(stdOut, stdErr))) + if err != nil { + return fmt.Errorf("failed to spawn beat: %w", err) + } + + killProc := func() { + _ = proc.Kill() + <-proc.Wait() + } + doneChan := time.After(runFor) + + stopping := false + for { + select { + case <-ctx.Done(): + killProc() + return ctx.Err() + case pstate := <-proc.Wait(): + if stopping { + return nil + } + return fmt.Errorf("beat exited unexpectedly with exit code: %d", pstate.ExitCode()) + case err := <-stdOut.Watch(): + if !f.allowErrs { + // no errors allowed + killProc() + return fmt.Errorf("beat logged an unexpected error: %w", err) + } + case err := <-stdErr.Watch(): + if !f.allowErrs { + // no errors allowed + killProc() + return fmt.Errorf("beat logged an unexpected error: %w", err) + } + case <-doneChan: + if !stopping { + // trigger the stop + stopping = true + _ = proc.Stop() + } + } + } + +} + // Run runs the Elastic Agent. // // If `states` are provided then the Elastic Agent runs until each state has been reached. Once reached the @@ -354,7 +445,7 @@ func (f *Fixture) Exec(ctx context.Context, args []string, opts ...process.CmdOp if err != nil { return nil, fmt.Errorf("error creating cmd: %w", err) } - f.t.Logf(">> running agent with: %v", cmd.Args) + f.t.Logf(">> running binary with: %v", cmd.Args) return cmd.CombinedOutput() } @@ -476,7 +567,11 @@ func (f *Fixture) binaryPath() string { workDir = filepath.Join(paths.DefaultBasePath, "Elastic", "Agent") } } - binary := filepath.Join(workDir, "elastic-agent") + defaultBin := "elastic-agent" + if f.binaryName != "" { + defaultBin = f.binaryName + } + binary := filepath.Join(workDir, defaultBin) if f.operatingSystem == "windows" { binary += ".exe" } diff --git a/pkg/testing/runner/config.go b/pkg/testing/runner/config.go index 2e66ffe47b3..944a7f89189 100644 --- a/pkg/testing/runner/config.go +++ b/pkg/testing/runner/config.go @@ -14,18 +14,22 @@ import ( // Config provides the configuration for running the runner. type Config struct { - AgentVersion string - AgentStackVersion string - BuildDir string - GOVersion string - RepoDir string - DiagnosticsDir string + ReleaseVersion string + StackVersion string + BuildDir string + GOVersion string + RepoDir string + DiagnosticsDir string // Platforms filters the tests to only run on the provided list // of platforms even if the tests supports more than what is // defined in this list. Platforms []string + // BinaryName is the name of the binary package under test, i.e, elastic-agent, metricbeat, etc + // this is used to copy the .tar.gz to the remote host + BinaryName string + // Matrix enables matrix testing. This explodes each test to // run on all supported platforms the runner supports. Matrix bool @@ -48,10 +52,10 @@ type Config struct { // Validate returns an error if the information is invalid. func (c *Config) Validate() error { - if c.AgentVersion == "" { + if c.ReleaseVersion == "" { return errors.New("field AgentVersion must be set") } - if c.AgentStackVersion == "" { + if c.StackVersion == "" { return errors.New("field AgentStackVersion must be set") } if c.BuildDir == "" { diff --git a/pkg/testing/runner/debian.go b/pkg/testing/runner/debian.go index d91c5c8179c..f0a7d058c13 100644 --- a/pkg/testing/runner/debian.go +++ b/pkg/testing/runner/debian.go @@ -97,8 +97,9 @@ func (DebianRunner) Copy(ctx context.Context, sshClient *ssh.Client, logger Logg if err != nil { return fmt.Errorf("failed to SCP repo archive %s: %w", repoArchive, err) } - // ensure that agent directory is removed (possible it already exists if instance already used) + // ensure that base directories are removed (possible it already exists if instance already used) _, _, _ = sshRunCommand(ctx, sshClient, "rm", []string{"-rf", "agent"}, nil) + _, _, _ = sshRunCommand(ctx, sshClient, "rm", []string{"-rf", "beats"}, nil) stdOut, errOut, err := sshRunCommand(ctx, sshClient, "unzip", []string{destRepoName, "-d", "agent"}, nil) if err != nil { return fmt.Errorf("failed to unzip %s to agent directory: %w (stdout: %s, stderr: %s)", destRepoName, err, stdOut, errOut) diff --git a/pkg/testing/runner/runner.go b/pkg/testing/runner/runner.go index f0adb3733c1..29858e42c87 100644 --- a/pkg/testing/runner/runner.go +++ b/pkg/testing/runner/runner.go @@ -355,7 +355,6 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger logger.Logf("Failed to copy files instance: %s", err) return OSRunnerResult{}, fmt.Errorf("failed to copy files to instance %s: %w", instance.Name, err) } - // start with the ExtraEnv first preventing the other environment flags below // from being overwritten env := map[string]string{} @@ -366,6 +365,7 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // ensure that we have all the requirements for the stack if required if batch.Batch.Stack != nil { // wait for the stack to be ready before continuing + logger.Logf("Waiting for stacks to be ready...") r.stacksReady.Wait() if r.stacksErr != nil { return OSRunnerResult{}, fmt.Errorf("%s unable to continue because stack never became ready: %w", instance.Name, r.stacksErr) @@ -385,9 +385,10 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // set the go test flags env["GOTEST_FLAGS"] = r.cfg.TestFlags + env["TEST_BINARY"] = r.cfg.BinaryName // run the actual tests on the host - result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.AgentVersion, batch.ID, batch.Batch, env) + result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.ReleaseVersion, batch.ID, batch.Batch, env) if err != nil { logger.Logf("Failed to execute tests on instance: %s", err) return OSRunnerResult{}, fmt.Errorf("failed to execute tests on instance %s: %w", instance.Name, err) @@ -446,9 +447,13 @@ func (r *Runner) getBuild(b OSBatch) Build { ext = "zip" } hashExt := ".sha512" - packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("elastic-agent-%s-%s-%s.%s", r.cfg.AgentVersion, b.OS.Type, arch, ext)) + name := "elastic-agent" + if r.cfg.BinaryName != "" { + name = r.cfg.BinaryName + } + packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("%s-%s-%s-%s.%s", name, r.cfg.ReleaseVersion, b.OS.Type, arch, ext)) return Build{ - Version: r.cfg.AgentVersion, + Version: r.cfg.ReleaseVersion, Type: b.OS.Type, Arch: arch, Path: packageName, @@ -577,7 +582,7 @@ func (r *Runner) startStacks(ctx context.Context) error { if !lb.Skip && lb.Batch.Stack != nil { if lb.Batch.Stack.Version == "" { // no version defined on the stack; set it to the defined stack version - lb.Batch.Stack.Version = r.cfg.AgentStackVersion + lb.Batch.Stack.Version = r.cfg.StackVersion } if !slices.Contains(versions, lb.Batch.Stack.Version) { versions = append(versions, lb.Batch.Stack.Version) diff --git a/pkg/testing/tools/elasticsearch.go b/pkg/testing/tools/elasticsearch.go index 0d0b040d4dd..76b46411d73 100644 --- a/pkg/testing/tools/elasticsearch.go +++ b/pkg/testing/tools/elasticsearch.go @@ -72,6 +72,37 @@ type ESDoc struct { Source map[string]interface{} `json:"_source"` } +// TemplateResponse is the body of a template data request +type TemplateResponse struct { + IndexTemplates []Template `json:"index_templates"` +} + +// Template is an individual template +type Template struct { + Name string `json:"name"` + IndexTemplate map[string]interface{} `json:"index_template"` +} + +// Pipeline is an individual pipeline +type Pipeline struct { + Description string `json:"description"` + Processors []map[string]interface{} `json:"processors"` +} + +// Ping returns basic ES info +type Ping struct { + Name string `json:"name"` + ClusterName string `json:"cluster_name"` + ClusterUUID string `json:"cluster_uuid"` + Version Version `json:"version"` +} + +// Version contains version and build info from an ES ping +type Version struct { + Number string `json:"number"` + BuildFlavor string `json:"build_flavor"` +} + // GetAllindicies returns a list of indicies on the target ES instance func GetAllindicies(client elastictransport.Interface) ([]Index, error) { return GetIndicesWithContext(context.Background(), client, []string{}) @@ -108,6 +139,67 @@ func FindMatchingLogLines(client elastictransport.Interface, namespace, line str return FindMatchingLogLinesWithContext(context.Background(), client, namespace, line) } +// GetLatestDocumentMatchingQuery returns the last document that matches the given query. +// the query field is inserted into a simple `query` POST request +func GetLatestDocumentMatchingQuery(ctx context.Context, client elastictransport.Interface, query map[string]interface{}, indexPattern string) (Documents, error) { + queryRaw := map[string]interface{}{ + "query": query, + "sort": map[string]interface{}{ + "timestamp": "desc", + }, + "size": 1, + } + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(queryRaw) + if err != nil { + return Documents{}, fmt.Errorf("error creating ES query: %w", err) + } + + return performQueryForRawQuery(ctx, queryRaw, indexPattern, client) +} + +// GetIndexTemplatesForPattern lists all index templates on the system +func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.Interface, name string) (TemplateResponse, error) { + req := esapi.IndicesGetIndexTemplateRequest{Name: name} + resp, err := req.Do(ctx, client) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error fetching index templates: %w", err) + } + + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + parsed := TemplateResponse{} + + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return TemplateResponse{}, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + +// GetPipelines returns a list of installed pipelines that match the given name/pattern +func GetPipelines(ctx context.Context, client elastictransport.Interface, name string) (map[string]Pipeline, error) { + req := esapi.IngestGetPipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return nil, fmt.Errorf("error fetching index templates: %w", err) + } + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return nil, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := map[string]Pipeline{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return nil, fmt.Errorf("error unmarshaling json response: %w", err) + } + return parsed, nil +} + // FindMatchingLogLinesWithContext returns any logs with message fields that match the given line func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) { queryRaw := map[string]interface{}{ @@ -137,20 +229,8 @@ func FindMatchingLogLinesWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } + return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) - return handleDocsResponse(res) } // CheckForErrorsInLogs checks to see if any error-level lines exist @@ -214,20 +294,8 @@ func CheckForErrorsInLogsWithContext(ctx context.Context, client elastictranspor return Documents{}, fmt.Errorf("error creating ES query: %w", err) } - es := esapi.New(client) - res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), - es.Search.WithExpandWildcards("all"), - es.Search.WithBody(&buf), - es.Search.WithTrackTotalHits(true), - es.Search.WithPretty(), - es.Search.WithContext(ctx), - ) - if err != nil { - return Documents{}, fmt.Errorf("error performing ES search: %w", err) - } + return performQueryForRawQuery(ctx, queryRaw, "*ds-logs*", client) - return handleDocsResponse(res) } // GetLogsForDatastream returns any logs associated with the datastream @@ -280,15 +348,39 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor }, } + return performQueryForRawQuery(ctx, indexQuery, "*ds-logs*", client) +} + +func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, error) { + req := esapi.InfoRequest{} + resp, err := req.Do(ctx, client) + if err != nil { + return Ping{}, fmt.Errorf("error in ping request") + } + + respData, err := handleResponseRaw(resp) + if err != nil { + return Ping{}, fmt.Errorf("error in HTTP response: %w", err) + } + pingData := Ping{} + err = json.Unmarshal(respData, &pingData) + if err != nil { + return pingData, fmt.Errorf("error unmarshalling JSON: %w", err) + } + return pingData, nil + +} + +func performQueryForRawQuery(ctx context.Context, queryRaw map[string]interface{}, index string, client elastictransport.Interface) (Documents, error) { var buf bytes.Buffer - err := json.NewEncoder(&buf).Encode(indexQuery) + err := json.NewEncoder(&buf).Encode(queryRaw) if err != nil { return Documents{}, fmt.Errorf("error creating ES query: %w", err) } es := esapi.New(client) res, err := es.Search( - es.Search.WithIndex("*.ds-logs*"), + es.Search.WithIndex(index), es.Search.WithExpandWildcards("all"), es.Search.WithBody(&buf), es.Search.WithTrackTotalHits(true), @@ -303,13 +395,9 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor } func handleDocsResponse(res *esapi.Response) (Documents, error) { - if res.StatusCode >= 300 || res.StatusCode < 200 { - return Documents{}, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) - } - - resultBuf, err := io.ReadAll(res.Body) + resultBuf, err := handleResponseRaw(res) if err != nil { - return Documents{}, fmt.Errorf("error reading response body: %w", err) + return Documents{}, fmt.Errorf("error in HTTP query: %w", err) } respData := Documents{} @@ -320,3 +408,15 @@ func handleDocsResponse(res *esapi.Response) (Documents, error) { return respData, err } + +func handleResponseRaw(res *esapi.Response) ([]byte, error) { + if res.StatusCode >= 300 || res.StatusCode < 200 { + return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", res.StatusCode, res.String()) + } + + resultBuf, err := io.ReadAll(res.Body) + if err != nil { + return nil, fmt.Errorf("error reading response body: %w", err) + } + return resultBuf, nil +} diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go new file mode 100644 index 00000000000..1e04785d54c --- /dev/null +++ b/pkg/testing/tools/kibana.go @@ -0,0 +1,63 @@ +package tools + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + + "github.com/elastic/elastic-agent-libs/kibana" +) + +type DashboardResponse struct { + Page int `json:"page"` + PerPage int `json:"per_page"` + Total int `json:"total"` + SavedObjects []Dashboard `json:"saved_objects"` +} + +type Dashboard struct { + Type string `json:"type"` + ID string `json:"id"` + Namespaces []string `json:"namespaces"` +} + +// GetDashboards returns a list of known dashboards on the system +func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, error) { + params := url.Values{} + params.Add("type", "dashboard") + params.Add("page", "1") + + dashboards := []Dashboard{} + page := 1 + for { + status, resp, err := client.Connection.Request("GET", "/api/saved_objects/_find", params, nil, nil) + if err != nil { + return nil, fmt.Errorf("error making api request: %w", err) + } + + if status != 200 { + return nil, fmt.Errorf("non-200 return code: %v, response: '%s'", status, string(resp)) + } + + dashResp := DashboardResponse{} + err = json.Unmarshal(resp, &dashResp) + if err != nil { + return nil, fmt.Errorf("error unmarshalling dashboard response: %w", err) + } + if len(dashResp.SavedObjects) == 0 { + break + } + + dashboards = append(dashboards, dashResp.SavedObjects...) + // we got all the results in one page + if dashResp.Total == dashResp.PerPage { + break + } + // we COULD calculate the number of pages we need to ask for in total, or just keep iterating until we don't get any results + page++ + params.Set("page", fmt.Sprintf("%d", page)) + } + + return dashboards, nil +} diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go new file mode 100644 index 00000000000..1929aff9f3a --- /dev/null +++ b/testing/integration/beat_serverless_test.go @@ -0,0 +1,296 @@ +// //go:build integration + +package integration + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "os" + "path/filepath" + "strings" + "testing" + "time" + + "github.com/gofrs/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "github.com/stretchr/testify/suite" + + atesting "github.com/elastic/elastic-agent/pkg/testing" + "github.com/elastic/elastic-agent/pkg/testing/define" + "github.com/elastic/elastic-agent/pkg/testing/tools" +) + +type BeatRunner struct { + suite.Suite + requirementsInfo *define.Info + agentFixture *atesting.Fixture + + // connection info + ESHost string + user string + pass string + kibHost string + + testUuid string + testbeatName string +} + +func TestMetricbeatSeverless(t *testing.T) { + info := define.Require(t, define.Requirements{ + OS: []define.OS{ + {Type: define.Linux}, + }, + Stack: &define.Stack{}, + Local: false, + Sudo: true, + }) + + suite.Run(t, &BeatRunner{requirementsInfo: info}) +} + +func (runner *BeatRunner) SetupSuite() { + runner.T().Logf("In SetupSuite") + + runner.testbeatName = os.Getenv("TEST_BINARY") + + agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu") + runner.agentFixture = agentFixture + require.NoError(runner.T(), err) + + // the require.* code will fail without these, so assume the values are non-nil + runner.ESHost = os.Getenv("ELASTICSEARCH_HOST") + runner.user = os.Getenv("ELASTICSEARCH_USERNAME") + runner.pass = os.Getenv("ELASTICSEARCH_PASSWORD") + runner.kibHost = os.Getenv("KIBANA_HOST") + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + mbOutConfig := ` +output.elasticsearch: + hosts: ["%s"] + username: %s + password: %s +setup.kibana: + host: %s +metricbeat.config.modules: + path: ${path.config}/modules.d/*.yml +processors: + - add_fields: + target: host + fields: + test-id: %s +` + + // beats likes to add standard ports to URLs that don't have them, and ESS will sometimes return a URL without a port, assuming :443 + // so try to fix that here + fixedKibanaHost := runner.kibHost + parsedKibana, err := url.Parse(runner.kibHost) + require.NoError(runner.T(), err) + if parsedKibana.Port() == "" { + fixedKibanaHost = fmt.Sprintf("%s:443", fixedKibanaHost) + } + + fixedESHost := runner.ESHost + parsedES, err := url.Parse(runner.ESHost) + require.NoError(runner.T(), err) + if parsedES.Port() == "" { + fixedESHost = fmt.Sprintf("%s:443", fixedESHost) + } + + runner.T().Logf("configuring beats with %s / %s", fixedESHost, fixedKibanaHost) + + testUuid, err := uuid.NewV4() + require.NoError(runner.T(), err) + runner.testUuid = testUuid.String() + parsedCfg := fmt.Sprintf(mbOutConfig, fixedESHost, runner.user, runner.pass, fixedKibanaHost, testUuid.String()) + err = runner.agentFixture.WriteFileToWorkDir(ctx, parsedCfg, fmt.Sprintf("%s.yml", runner.testbeatName)) + require.NoError(runner.T(), err) +} + +// run the beat with default metricsets, ensure no errors in logs + data is ingested +func (runner *BeatRunner) TestRunAndCheckData() { + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*4) + defer cancel() + err := runner.agentFixture.RunBeat(ctx, time.Minute) + require.NoError(runner.T(), err) + + docs, err := tools.GetLatestDocumentMatchingQuery(ctx, runner.requirementsInfo.ESClient, map[string]interface{}{ + "match": map[string]interface{}{ + "host.test-id": runner.testUuid, + }, + }, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), docs.Hits.Hits) +} + +// NOTE for the below tests: the testing framework doesn't guarantee a new stack instance each time, +// which means we might be running against a stack where a previous test has already done setup. +// perhaps CI should run `mage integration:clean` first? + +// tests the [beat] setup --pipelines command +func (runner *BeatRunner) TestSetupPipelines() { + if runner.testbeatName != "filebeat" { + runner.T().Skip("pipelines only available on filebeat") + } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // need to actually enable something that has pipelines + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), + "setup", "--pipelines", "--modules", "apache", "-M", "apache.error.enabled=true", "-M", "apache.access.enabled=true"}) + assert.NoError(runner.T(), err) + + runner.T().Logf("got response from pipeline setup: %s", string(resp)) + + pipelines, err := tools.GetPipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), pipelines) + +} + +// tests the [beat] setup --dashboards command +func (runner *BeatRunner) TestSetupDashboards() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3) //dashboards seem to take a while + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), "setup", "--dashboards"}) + assert.NoError(runner.T(), err) + runner.T().Logf("got response from dashboard setup: %s", string(resp)) + require.True(runner.T(), strings.Contains(string(resp), "Loaded dashboards")) + + //TODO: actually check + _, err = tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) + require.NoError(runner.T(), err) + + runner.Run("export dashboards", runner.SubtestExportDashboards) +} + +// tests the [beat] export dashboard command +func (runner *BeatRunner) SubtestExportDashboards() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + outDir := runner.T().TempDir() + + dashlist, err := tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), dashlist) + + _, err = runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "dashboard", "--folder", outDir, "--id", dashlist[0].ID}) + assert.NoError(runner.T(), err) + + inFolder, err := os.ReadDir(filepath.Join(outDir, "/_meta/kibana/8/dashboard")) + require.NoError(runner.T(), err) + runner.T().Logf("got log contents: %#v", inFolder) + require.NotEmpty(runner.T(), inFolder) + +} + +// test beat setup --index-management with ILM disabled +func (runner *BeatRunner) TestIndexManagementNoILM() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "-E", "setup.ilm.enabled=false"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + assert.NoError(runner.T(), err) + + tmpls, err := tools.GetIndexTemplatesForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + for _, tmpl := range tmpls.IndexTemplates { + runner.T().Logf("got template: %s", tmpl.Name) + } + require.NotEmpty(runner.T(), tmpls.IndexTemplates) + + runner.Run("export templates", runner.SubtestExportTemplates) + runner.Run("export index patterns", runner.SubtestExportIndexPatterns) +} + +// tests beat setup --index-management with ILM explicitly set +// On serverless, this should fail. +// Will not pass right now, may need to change +func (runner *BeatRunner) TestIndexManagementILMEnabledFail() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient) + require.NoError(runner.T(), err) + + if info.Version.BuildFlavor != "serverless" { + runner.T().Skip("must run on serverless") + } + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "-E", "setup.ilm.enabled=true"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + assert.Error(runner.T(), err) + assert.Contains(runner.T(), resp, "not supported") +} + +// tests beat setup ilm-policy +// On serverless, this should fail +func (runner *BeatRunner) TestExportILMFail() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient) + require.NoError(runner.T(), err) + + if info.Version.BuildFlavor != "serverless" { + runner.T().Skip("must run on serverless") + } + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", "ilm-policy"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + assert.Error(runner.T(), err) + assert.Contains(runner.T(), resp, "not supported") + +} + +func (runner *BeatRunner) SubtestExportTemplates() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + outDir := runner.T().TempDir() + + _, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "template", "--dir", outDir}) + assert.NoError(runner.T(), err) + + inFolder, err := os.ReadDir(filepath.Join(outDir, "/template")) + require.NoError(runner.T(), err) + runner.T().Logf("got log contents: %#v", inFolder) + require.NotEmpty(runner.T(), inFolder) +} + +func (runner *BeatRunner) SubtestExportIndexPatterns() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute*2) + defer cancel() + + rawPattern, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", + "index-pattern"}) + assert.NoError(runner.T(), err) + + idxPattern := map[string]interface{}{} + + err = json.Unmarshal(rawPattern, &idxPattern) + require.NoError(runner.T(), err) + require.NotNil(runner.T(), idxPattern["attributes"]) +} From e93938348cb3b0ef8797566a21ae343ef482aae3 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 16 Aug 2023 14:04:54 -0700 Subject: [PATCH 02/18] final bit of cleanup --- magefile.go | 12 +++++++----- pkg/testing/runner/config.go | 2 +- pkg/testing/runner/runner.go | 4 ++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/magefile.go b/magefile.go index 9bc632def14..788d0c5df73 100644 --- a/magefile.go +++ b/magefile.go @@ -1535,10 +1535,10 @@ func (Integration) TestBeatServerless(ctx context.Context, beatname string) erro if err != nil { return fmt.Errorf("error setting build dir: %s", err) } - // err = os.Setenv("STACK", "serverless") - // if err != nil { - // return fmt.Errorf("error setting serverless stack var: %w", err) - // } + err = os.Setenv("STACK_PROVISIONER", "serverless") + if err != nil { + return fmt.Errorf("error setting serverless stack var: %w", err) + } err = os.Setenv("TEST_BINARY_NAME", beatname) if err != nil { @@ -1693,7 +1693,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche if err != nil { return nil, err } - agentStackVersion := os.Getenv("AGENT_STACK_VERSION") + agentVersion := os.Getenv("AGENT_VERSION") if agentVersion == "" { agentVersion, err = mage.DefaultBeatBuildVariableSources.GetBeatVersion() @@ -1710,6 +1710,8 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche agentVersion = fmt.Sprintf("%s-SNAPSHOT", agentVersion) } } + + agentStackVersion := os.Getenv("AGENT_STACK_VERSION") if agentStackVersion == "" { agentStackVersion = agentVersion } diff --git a/pkg/testing/runner/config.go b/pkg/testing/runner/config.go index 114290861cb..ab06a8b0009 100644 --- a/pkg/testing/runner/config.go +++ b/pkg/testing/runner/config.go @@ -55,7 +55,7 @@ type Config struct { // Validate returns an error if the information is invalid. func (c *Config) Validate() error { - if c.ReleaseVersion == "" { + if c.AgentVersion == "" { return errors.New("field AgentVersion must be set") } if c.StackVersion == "" { diff --git a/pkg/testing/runner/runner.go b/pkg/testing/runner/runner.go index 952b5b14b8e..2565872c65e 100644 --- a/pkg/testing/runner/runner.go +++ b/pkg/testing/runner/runner.go @@ -388,7 +388,7 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger env["TEST_BINARY"] = r.cfg.BinaryName // run the actual tests on the host - result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.ReleaseVersion, batch.ID, batch.Batch, env) + result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.AgentVersion, batch.ID, batch.Batch, env) if err != nil { logger.Logf("Failed to execute tests on instance: %s", err) return OSRunnerResult{}, fmt.Errorf("failed to execute tests on instance %s: %w", instance.Name, err) @@ -451,7 +451,7 @@ func (r *Runner) getBuild(b OSBatch) Build { if r.cfg.BinaryName != "" { name = r.cfg.BinaryName } - packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("%s-%s-%s-%s.%s", name, r.cfg.ReleaseVersion, b.OS.Type, arch, ext)) + packageName := filepath.Join(r.cfg.BuildDir, fmt.Sprintf("%s-%s-%s-%s.%s", name, r.cfg.AgentVersion, b.OS.Type, arch, ext)) return Build{ Version: r.cfg.ReleaseVersion, Type: b.OS.Type, From 2afe99853ecac3211d0794ed08be9e245d68811d Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 16 Aug 2023 14:37:25 -0700 Subject: [PATCH 03/18] fix magefile, cleanup docs --- magefile.go | 14 ++++++++++---- pkg/testing/fixture.go | 5 ++++- pkg/testing/tools/elasticsearch.go | 1 + 3 files changed, 15 insertions(+), 5 deletions(-) diff --git a/magefile.go b/magefile.go index 788d0c5df73..fed3ce41701 100644 --- a/magefile.go +++ b/magefile.go @@ -1535,9 +1535,15 @@ func (Integration) TestBeatServerless(ctx context.Context, beatname string) erro if err != nil { return fmt.Errorf("error setting build dir: %s", err) } - err = os.Setenv("STACK_PROVISIONER", "serverless") - if err != nil { - return fmt.Errorf("error setting serverless stack var: %w", err) + + // a bit of bypass logic; run as serverless by default + if os.Getenv("STACK_PROVISIONER") == "" { + err = os.Setenv("STACK_PROVISIONER", "serverless") + if err != nil { + return fmt.Errorf("error setting serverless stack var: %w", err) + } + } else if os.Getenv("STACK_PROVISIONER") == "ess" { + fmt.Printf(">>> Warning: running TestBeatServerless as stateful") } err = os.Setenv("TEST_BINARY_NAME", beatname) @@ -1694,6 +1700,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche return nil, err } + agentStackVersion := os.Getenv("AGENT_STACK_VERSION") agentVersion := os.Getenv("AGENT_VERSION") if agentVersion == "" { agentVersion, err = mage.DefaultBeatBuildVariableSources.GetBeatVersion() @@ -1711,7 +1718,6 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche } } - agentStackVersion := os.Getenv("AGENT_STACK_VERSION") if agentStackVersion == "" { agentStackVersion = agentVersion } diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index 22b557f0399..ad887d6a675 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -258,7 +258,7 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error { return nil } -// Runs the given beat for a specified period of time. Will fail if an error appears in the logs, +// Runs the given beat for a specified period of time. Will fail if an error appears in the logs. // Will refuse to run if the given binary is elastic-agent func (f *Fixture) RunBeat(ctx context.Context, runFor time.Duration) error { if f.binaryName == "elastic-agent" { @@ -337,6 +337,9 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { if f.installed { return errors.New("fixture is installed; cannot be run") } + if f.binaryName != "elastic-agent" { + return fmt.Errorf("Run() can only be run against elastic-agent, got %s", f.binaryName) + } var err error err = f.ensurePrepared(ctx) diff --git a/pkg/testing/tools/elasticsearch.go b/pkg/testing/tools/elasticsearch.go index 76b46411d73..5ddae293abd 100644 --- a/pkg/testing/tools/elasticsearch.go +++ b/pkg/testing/tools/elasticsearch.go @@ -351,6 +351,7 @@ func GetLogsForDatastreamWithContext(ctx context.Context, client elastictranspor return performQueryForRawQuery(ctx, indexQuery, "*ds-logs*", client) } +// GetPing performs a basic ping and returns ES config info func GetPing(ctx context.Context, client elastictransport.Interface) (Ping, error) { req := esapi.InfoRequest{} resp, err := req.Do(ctx, client) From 413c0d732dd488944ca716326c6c4652198cab92 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 16 Aug 2023 14:51:18 -0700 Subject: [PATCH 04/18] clean up errors, make linter happy --- pkg/testing/define/define.go | 6 ++---- pkg/testing/fixture.go | 2 +- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/pkg/testing/define/define.go b/pkg/testing/define/define.go index 6c45e18f2a3..b1c850dde84 100644 --- a/pkg/testing/define/define.go +++ b/pkg/testing/define/define.go @@ -21,7 +21,6 @@ import ( "github.com/elastic/go-elasticsearch/v8" "github.com/elastic/go-sysinfo" "github.com/elastic/go-sysinfo/types" - "github.com/stretchr/testify/require" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/utils" @@ -79,9 +78,8 @@ func NewFixture(t *testing.T, version string, opts ...atesting.FixtureOpt) (*ate buildsDir = filepath.Join(projectDir, "build", "distributions") } - fixture, err := NewFixtureWithBinary(t, version, "elastic-agent", buildsDir) - require.NoError(t, err) - return fixture, nil + return NewFixtureWithBinary(t, version, "elastic-agent", buildsDir) + } // NewFixture returns a new Elastic Agent testing fixture with a LocalFetcher and diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index ad887d6a675..e39994d08d3 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -194,7 +194,7 @@ func (f *Fixture) WriteFileToWorkDir(ctx context.Context, data string, name stri return fmt.Errorf("error preparing binary: %w", err) } - err = os.WriteFile(filepath.Join(f.workDir, name), []byte(data), 644) + err = os.WriteFile(filepath.Join(f.workDir, name), []byte(data), 0644) if err != nil { return fmt.Errorf("error writing file: %w", err) } From a16767d9e7d0e1eaa94f0e26ac3d32027c52ddc9 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 16 Aug 2023 15:01:40 -0700 Subject: [PATCH 05/18] fix headers --- pkg/testing/tools/kibana.go | 4 ++++ testing/integration/beat_serverless_test.go | 4 ++++ 2 files changed, 8 insertions(+) diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go index 1e04785d54c..4a1f6d11e1c 100644 --- a/pkg/testing/tools/kibana.go +++ b/pkg/testing/tools/kibana.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + package tools import ( diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index 1929aff9f3a..d0d95d6acda 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -1,3 +1,7 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + // //go:build integration package integration From 7e8970afe99970debd4a57d1762d521102113028 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Wed, 16 Aug 2023 15:21:09 -0700 Subject: [PATCH 06/18] fix fields in runner config --- pkg/testing/runner/config.go | 19 +++++++++---------- pkg/testing/runner/runner_test.go | 14 +++++++------- 2 files changed, 16 insertions(+), 17 deletions(-) diff --git a/pkg/testing/runner/config.go b/pkg/testing/runner/config.go index ab06a8b0009..1448fa197dd 100644 --- a/pkg/testing/runner/config.go +++ b/pkg/testing/runner/config.go @@ -14,15 +14,14 @@ import ( // Config provides the configuration for running the runner. type Config struct { - AgentVersion string - AgentStackVersion string - StateDir string - ReleaseVersion string - StackVersion string - BuildDir string - GOVersion string - RepoDir string - DiagnosticsDir string + AgentVersion string + StateDir string + ReleaseVersion string + StackVersion string + BuildDir string + GOVersion string + RepoDir string + DiagnosticsDir string // Platforms filters the tests to only run on the provided list // of platforms even if the tests supports more than what is @@ -59,7 +58,7 @@ func (c *Config) Validate() error { return errors.New("field AgentVersion must be set") } if c.StackVersion == "" { - return errors.New("field AgentStackVersion must be set") + return errors.New("field StackVersion must be set") } if c.BuildDir == "" { return errors.New("field BuildDir must be set") diff --git a/pkg/testing/runner/runner_test.go b/pkg/testing/runner/runner_test.go index c24e653e31a..d10b9d524d0 100644 --- a/pkg/testing/runner/runner_test.go +++ b/pkg/testing/runner/runner_test.go @@ -23,13 +23,13 @@ func TestNewRunner_Clean(t *testing.T) { require.NoError(t, err) cfg := Config{ - AgentVersion: "8.10.0", - AgentStackVersion: "8.10.0-SNAPSHOT", - BuildDir: filepath.Join(tmpdir, "build"), - GOVersion: "1.20.7", - RepoDir: filepath.Join(tmpdir, "repo"), - StateDir: stateDir, - ExtraEnv: nil, + AgentVersion: "8.10.0", + StackVersion: "8.10.0-SNAPSHOT", + BuildDir: filepath.Join(tmpdir, "build"), + GOVersion: "1.20.7", + RepoDir: filepath.Join(tmpdir, "repo"), + StateDir: stateDir, + ExtraEnv: nil, } ip := &fakeInstanceProvisioner{} sp := &fakeStackProvisioner{} From 9e26b7caab331522dc166c3ae5f32554de479855 Mon Sep 17 00:00:00 2001 From: Alex Kristiansen Date: Thu, 17 Aug 2023 12:32:33 -0700 Subject: [PATCH 07/18] add dashboard checks --- pkg/testing/tools/kibana.go | 10 +++- testing/integration/beat_serverless_test.go | 63 ++++++++++++--------- 2 files changed, 43 insertions(+), 30 deletions(-) diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go index 4a1f6d11e1c..6c26798710a 100644 --- a/pkg/testing/tools/kibana.go +++ b/pkg/testing/tools/kibana.go @@ -9,6 +9,7 @@ import ( "encoding/json" "fmt" "net/url" + "time" "github.com/elastic/elastic-agent-libs/kibana" ) @@ -21,9 +22,12 @@ type DashboardResponse struct { } type Dashboard struct { - Type string `json:"type"` - ID string `json:"id"` - Namespaces []string `json:"namespaces"` + Type string `json:"type"` + ID string `json:"id"` + Namespaces []string `json:"namespaces"` + UpdatedAt time.Time `json:"updated_at"` + CreatedAt time.Time `json:"created_at"` + Version string `json:"version"` } // GetDashboards returns a list of known dashboards on the system diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index d0d95d6acda..872c0335fe3 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -132,31 +132,6 @@ func (runner *BeatRunner) TestRunAndCheckData() { require.NotEmpty(runner.T(), docs.Hits.Hits) } -// NOTE for the below tests: the testing framework doesn't guarantee a new stack instance each time, -// which means we might be running against a stack where a previous test has already done setup. -// perhaps CI should run `mage integration:clean` first? - -// tests the [beat] setup --pipelines command -func (runner *BeatRunner) TestSetupPipelines() { - if runner.testbeatName != "filebeat" { - runner.T().Skip("pipelines only available on filebeat") - } - ctx, cancel := context.WithTimeout(context.Background(), time.Minute) - defer cancel() - - // need to actually enable something that has pipelines - resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), - "setup", "--pipelines", "--modules", "apache", "-M", "apache.error.enabled=true", "-M", "apache.access.enabled=true"}) - assert.NoError(runner.T(), err) - - runner.T().Logf("got response from pipeline setup: %s", string(resp)) - - pipelines, err := tools.GetPipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") - require.NoError(runner.T(), err) - require.NotEmpty(runner.T(), pipelines) - -} - // tests the [beat] setup --dashboards command func (runner *BeatRunner) TestSetupDashboards() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*3) //dashboards seem to take a while @@ -167,10 +142,20 @@ func (runner *BeatRunner) TestSetupDashboards() { runner.T().Logf("got response from dashboard setup: %s", string(resp)) require.True(runner.T(), strings.Contains(string(resp), "Loaded dashboards")) - //TODO: actually check - _, err = tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) + dashList, err := tools.GetDashboards(ctx, runner.requirementsInfo.KibanaClient) require.NoError(runner.T(), err) + // interesting hack in cases where we don't have a clean environment + // check to see if any of the dashboards were created recently + found := false + for _, dash := range dashList { + if time.Since(dash.UpdatedAt) < time.Minute*5 { + found = true + break + } + } + require.True(runner.T(), found, fmt.Sprintf("could not find dashboard newer than 5 minutes, out of %d dashboards", len(dashList))) + runner.Run("export dashboards", runner.SubtestExportDashboards) } @@ -194,6 +179,30 @@ func (runner *BeatRunner) SubtestExportDashboards() { require.NoError(runner.T(), err) runner.T().Logf("got log contents: %#v", inFolder) require.NotEmpty(runner.T(), inFolder) +} + +// NOTE for the below tests: the testing framework doesn't guarantee a new stack instance each time, +// which means we might be running against a stack where a previous test has already done setup. +// perhaps CI should run `mage integration:clean` first? + +// tests the [beat] setup --pipelines command +func (runner *BeatRunner) TestSetupPipelines() { + if runner.testbeatName != "filebeat" { + runner.T().Skip("pipelines only available on filebeat") + } + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // need to actually enable something that has pipelines + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), + "setup", "--pipelines", "--modules", "apache", "-M", "apache.error.enabled=true", "-M", "apache.access.enabled=true"}) + assert.NoError(runner.T(), err) + + runner.T().Logf("got response from pipeline setup: %s", string(resp)) + + pipelines, err := tools.GetPipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), pipelines) } From 369f62e5f231fd619c19e28931a864d6b3c52494 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Fri, 25 Aug 2023 15:47:49 -0700 Subject: [PATCH 08/18] clean up, refactor --- magefile.go | 2 +- pkg/testing/define/define.go | 4 +- pkg/testing/fetcher_local.go | 11 ++- pkg/testing/fetcher_local_test.go | 4 +- pkg/testing/fixture.go | 88 +++++++++++++++------ pkg/testing/runner/debian.go | 11 ++- testing/integration/beat_serverless_test.go | 8 +- 7 files changed, 94 insertions(+), 34 deletions(-) diff --git a/magefile.go b/magefile.go index fed3ce41701..28b0eb20ae3 100644 --- a/magefile.go +++ b/magefile.go @@ -1543,7 +1543,7 @@ func (Integration) TestBeatServerless(ctx context.Context, beatname string) erro return fmt.Errorf("error setting serverless stack var: %w", err) } } else if os.Getenv("STACK_PROVISIONER") == "ess" { - fmt.Printf(">>> Warning: running TestBeatServerless as stateful") + fmt.Printf(">>> Warning: running TestBeatServerless as stateful\n") } err = os.Setenv("TEST_BINARY_NAME", beatname) diff --git a/pkg/testing/define/define.go b/pkg/testing/define/define.go index b1c850dde84..5c3ad715ea9 100644 --- a/pkg/testing/define/define.go +++ b/pkg/testing/define/define.go @@ -92,9 +92,9 @@ func NewFixtureWithBinary(t *testing.T, version string, binary string, buildsDir var binFetcher atesting.Fetcher if ver.IsSnapshot() { - binFetcher = atesting.LocalFetcher(buildsDir, binary, atesting.WithLocalSnapshotOnly()) + binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly(), atesting.WithCustomBinaryName(binary)) } else { - binFetcher = atesting.LocalFetcher(buildsDir, binary) + binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithCustomBinaryName(binary)) } opts = append(opts, atesting.WithFetcher(binFetcher), atesting.WithLogOutput()) diff --git a/pkg/testing/fetcher_local.go b/pkg/testing/fetcher_local.go index 4c4996517af..214e63c43a7 100644 --- a/pkg/testing/fetcher_local.go +++ b/pkg/testing/fetcher_local.go @@ -29,11 +29,18 @@ func WithLocalSnapshotOnly() localFetcherOpt { } } +// WithCustomBinaryName sets the binary to a custom name, the default is `elastic-agent` +func WithCustomBinaryName(name string) localFetcherOpt { + return func(f *localFetcher) { + f.binaryName = name + } +} + // LocalFetcher returns a fetcher that pulls the binary of the Elastic Agent from a local location. -func LocalFetcher(dir string, binary string, opts ...localFetcherOpt) Fetcher { +func LocalFetcher(dir string, opts ...localFetcherOpt) Fetcher { f := &localFetcher{ dir: dir, - binaryName: binary, + binaryName: "elastic-agent", } for _, o := range opts { o(f) diff --git a/pkg/testing/fetcher_local_test.go b/pkg/testing/fetcher_local_test.go index 21c68dbc01c..a97c5db1c9e 100644 --- a/pkg/testing/fetcher_local_test.go +++ b/pkg/testing/fetcher_local_test.go @@ -17,7 +17,7 @@ import ( ) func TestLocalFetcher_Name(t *testing.T) { - f := LocalFetcher(t.TempDir(), "elastic-agent") + f := LocalFetcher(t.TempDir()) require.Equal(t, "local", f.Name()) } @@ -77,7 +77,7 @@ func TestLocalFetcher(t *testing.T) { for _, tc := range tcs { tmp := t.TempDir() - f := LocalFetcher(testdata, "elastic-agent", tc.opts...) + f := LocalFetcher(testdata, tc.opts...) got, err := f.Fetch( context.Background(), runtime.GOOS, runtime.GOARCH, tc.version) require.NoError(t, err) diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index e39994d08d3..fb1a732cf68 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -44,6 +44,8 @@ type Fixture struct { allowErrs bool connectTimout time.Duration binaryName string + runLength time.Duration + additionalArgs []string srcPackage string workDir string @@ -112,6 +114,19 @@ func WithBinaryName(name string) FixtureOpt { } } +// WithRunLength sets the total time the binary will run +func WithRunLength(run time.Duration) FixtureOpt { + return func(f *Fixture) { + f.runLength = run + } +} + +func WithAdditionalArgs(args []string) FixtureOpt { + return func(f *Fixture) { + f.additionalArgs = args + } +} + // NewFixture creates a new fixture to setup and manage Elastic Agent. func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, error) { // we store the caller so the fixture can find the cache directory for the artifacts that @@ -280,7 +295,7 @@ func (f *Fixture) RunBeat(ctx context.Context, runFor time.Duration) error { proc, err := process.Start( f.binaryPath(), process.WithContext(ctx), - process.WithArgs([]string{"run", "-e", "-c", filepath.Join(f.workDir, "metricbeat.yml")}), + process.WithArgs([]string{"run", "-e", "-c", filepath.Join(f.workDir, fmt.Sprintf("%s.yml", f.binaryName))}), process.WithCmdOptions(attachOutErr(stdOut, stdErr))) if err != nil { return fmt.Errorf("failed to spawn beat: %w", err) @@ -326,19 +341,21 @@ func (f *Fixture) RunBeat(ctx context.Context, runFor time.Duration) error { } -// Run runs the Elastic Agent. +// Run runs the provided binary. // -// If `states` are provided then the Elastic Agent runs until each state has been reached. Once reached the +// If `states` are provided and the binary is Elastic Agent, agent runs until each state has been reached. Once reached the // Elastic Agent is stopped. If at any time the Elastic Agent logs an error log and the Fixture is not started // with `WithAllowErrors()` then `Run` will exit early and return the logged error. // -// If no `states` are provided then the Elastic Agent runs until the context is cancelled. +// If no `states` are provided then the Elastic Agent runs until the context is or the timeout specified with WithRunLength is reached. func (f *Fixture) Run(ctx context.Context, states ...State) error { - if f.installed { - return errors.New("fixture is installed; cannot be run") - } + isBeat := false if f.binaryName != "elastic-agent" { - return fmt.Errorf("Run() can only be run against elastic-agent, got %s", f.binaryName) + isBeat = true + } + + if !isBeat && f.installed { + return errors.New("fixture is installed; cannot be run") } var err error @@ -350,14 +367,32 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { ctx, cancel := context.WithCancel(ctx) defer cancel() - var sm *stateMachine + var smInstance *stateMachine + if states != nil && isBeat { + return errors.New("states not supported when running against beats") + } if states != nil { - sm, err = newStateMachine(states) + smInstance, err = newStateMachine(states) if err != nil { return err } } + // agent-specific setup + var agentClient client.Client + var stateCh chan *client.AgentState + var stateErrCh chan error + if !isBeat { + cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir) + if err != nil { + return fmt.Errorf("failed to get control protcol address: %w", err) + } + agentClient = client.New(client.WithAddress(cAddr)) + f.setClient(agentClient) + defer f.setClient(nil) + stateCh, stateErrCh = watchState(ctx, agentClient, f.connectTimout) + } + var logProxy Logger if f.logOutput { logProxy = f.t @@ -365,29 +400,32 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { stdOut := newLogWatcher(logProxy) stdErr := newLogWatcher(logProxy) - cAddr, err := control.AddressFromPath(f.operatingSystem, f.workDir) - if err != nil { - return fmt.Errorf("failed to get control protcol address: %w", err) + args := []string{"run", "-e", "--disable-encrypted-store", "--testing-mode"} + if isBeat { + args = []string{"run", "-e", "-c", filepath.Join(f.workDir, fmt.Sprintf("%s.yml", f.binaryName))} } + args = append(args, f.additionalArgs...) proc, err := process.Start( f.binaryPath(), process.WithContext(ctx), - process.WithArgs([]string{"run", "-e", "--disable-encrypted-store", "--testing-mode"}), + process.WithArgs(args), process.WithCmdOptions(attachOutErr(stdOut, stdErr))) + if err != nil { - return fmt.Errorf("failed to spawn elastic-agent: %w", err) + return fmt.Errorf("failed to spawn %s: %w", f.binaryName, err) } + killProc := func() { _ = proc.Kill() <-proc.Wait() } - c := client.New(client.WithAddress(cAddr)) - f.setClient(c) - defer f.setClient(nil) + var doneChan <-chan time.Time + if f.runLength != 0 { + doneChan = time.After(f.runLength) + } - stateCh, stateErrCh := watchState(ctx, c, f.connectTimout) stopping := false for { select { @@ -417,9 +455,15 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { killProc() return fmt.Errorf("elastic-agent client received unexpected error: %w", err) } + case <-doneChan: + if !stopping { + // trigger the stop + stopping = true + _ = proc.Stop() + } case state := <-stateCh: - if sm != nil { - cfg, cont, err := sm.next(state) + if smInstance != nil { + cfg, cont, err := smInstance.next(state) if err != nil { killProc() return fmt.Errorf("state management failed with unexpected error: %w", err) @@ -431,7 +475,7 @@ func (f *Fixture) Run(ctx context.Context, states ...State) error { _ = proc.Stop() } } else if cfg != "" { - err := performConfigure(ctx, c, cfg, 3*time.Second) + err := performConfigure(ctx, agentClient, cfg, 3*time.Second) if err != nil { killProc() return err diff --git a/pkg/testing/runner/debian.go b/pkg/testing/runner/debian.go index 5fe3ea067f4..73c337a56cf 100644 --- a/pkg/testing/runner/debian.go +++ b/pkg/testing/runner/debian.go @@ -98,6 +98,16 @@ func (DebianRunner) Copy(ctx context.Context, sshClient *ssh.Client, logger Logg return fmt.Errorf("failed to SCP repo archive %s: %w", repoArchive, err) } + // remove build paths, on cases where the build path is different from agent. + for _, remoteBuildPath := range []string{build.Path, build.SHA512Path} { + relativeAgentDir := filepath.Join("agent", remoteBuildPath) + _, _, err := sshRunCommand(ctx, sshClient, "sudo", []string{"rm", "-rf", relativeAgentDir}, nil) + // doesn't need to be a fatal error. + if err != nil { + logger.Logf("error removing build dir %s: %w", relativeAgentDir, err) + } + } + // ensure that agent directory is removed (possible it already exists if instance already used) stdout, stderr, err := sshRunCommand(ctx, sshClient, "sudo", []string{"rm", "-rf", "agent"}, nil) @@ -106,7 +116,6 @@ func (DebianRunner) Copy(ctx context.Context, sshClient *ssh.Client, logger Logg "failed to remove agent directory before unziping new one: %w. stdout: %q, stderr: %q", err, stdout, stderr) } - _, _, _ = sshRunCommand(ctx, sshClient, "sudo", []string{"rm", "-rf", "beats"}, nil) stdOut, errOut, err := sshRunCommand(ctx, sshClient, "unzip", []string{destRepoName, "-d", "agent"}, nil) if err != nil { diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index 872c0335fe3..587a0599927 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -60,7 +60,7 @@ func (runner *BeatRunner) SetupSuite() { runner.testbeatName = os.Getenv("TEST_BINARY") - agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu") + agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu", atesting.WithRunLength(time.Minute), atesting.WithAdditionalArgs([]string{"-E", "output.elasticsearch.allow_older_versions=true"})) runner.agentFixture = agentFixture require.NoError(runner.T(), err) @@ -120,7 +120,7 @@ func (runner *BeatRunner) TestRunAndCheckData() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*4) defer cancel() - err := runner.agentFixture.RunBeat(ctx, time.Minute) + err := runner.agentFixture.Run(ctx) require.NoError(runner.T(), err) docs, err := tools.GetLatestDocumentMatchingQuery(ctx, runner.requirementsInfo.ESClient, map[string]interface{}{ @@ -250,7 +250,7 @@ func (runner *BeatRunner) TestIndexManagementILMEnabledFail() { "-E", "setup.ilm.enabled=true"}) runner.T().Logf("got response from management setup: %s", string(resp)) assert.Error(runner.T(), err) - assert.Contains(runner.T(), resp, "not supported") + assert.Contains(runner.T(), string(resp), "not supported") } // tests beat setup ilm-policy @@ -270,7 +270,7 @@ func (runner *BeatRunner) TestExportILMFail() { "export", "ilm-policy"}) runner.T().Logf("got response from management setup: %s", string(resp)) assert.Error(runner.T(), err) - assert.Contains(runner.T(), resp, "not supported") + assert.Contains(runner.T(), string(resp), "not supported") } From 0b7afaecafc70652613fadef7f26629c249f57fd Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Fri, 25 Aug 2023 16:45:17 -0700 Subject: [PATCH 09/18] clean up --- pkg/testing/fixture.go | 68 ------------------------------------------ 1 file changed, 68 deletions(-) diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index fb1a732cf68..bad2c84c9fe 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -273,74 +273,6 @@ func ExtractArtifact(l Logger, artifactFile, outputDir string) error { return nil } -// Runs the given beat for a specified period of time. Will fail if an error appears in the logs. -// Will refuse to run if the given binary is elastic-agent -func (f *Fixture) RunBeat(ctx context.Context, runFor time.Duration) error { - if f.binaryName == "elastic-agent" { - return fmt.Errorf("binary %s is not a beat", f.binaryName) - } - - err := f.ensurePrepared(ctx) - if err != nil { - return err - } - - var logProxy Logger - if f.logOutput { - logProxy = f.t - } - stdOut := newLogWatcher(logProxy) - stdErr := newLogWatcher(logProxy) - - proc, err := process.Start( - f.binaryPath(), - process.WithContext(ctx), - process.WithArgs([]string{"run", "-e", "-c", filepath.Join(f.workDir, fmt.Sprintf("%s.yml", f.binaryName))}), - process.WithCmdOptions(attachOutErr(stdOut, stdErr))) - if err != nil { - return fmt.Errorf("failed to spawn beat: %w", err) - } - - killProc := func() { - _ = proc.Kill() - <-proc.Wait() - } - doneChan := time.After(runFor) - - stopping := false - for { - select { - case <-ctx.Done(): - killProc() - return ctx.Err() - case pstate := <-proc.Wait(): - if stopping { - return nil - } - return fmt.Errorf("beat exited unexpectedly with exit code: %d", pstate.ExitCode()) - case err := <-stdOut.Watch(): - if !f.allowErrs { - // no errors allowed - killProc() - return fmt.Errorf("beat logged an unexpected error: %w", err) - } - case err := <-stdErr.Watch(): - if !f.allowErrs { - // no errors allowed - killProc() - return fmt.Errorf("beat logged an unexpected error: %w", err) - } - case <-doneChan: - if !stopping { - // trigger the stop - stopping = true - _ = proc.Stop() - } - } - } - -} - // Run runs the provided binary. // // If `states` are provided and the binary is Elastic Agent, agent runs until each state has been reached. Once reached the From ef23b3a844567b93d3b96c4a525797c1f4a7ece2 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 28 Aug 2023 08:55:43 -0700 Subject: [PATCH 10/18] tinker with env vars --- pkg/testing/runner/runner.go | 2 +- testing/integration/beat_serverless_test.go | 6 +++++- 2 files changed, 6 insertions(+), 2 deletions(-) diff --git a/pkg/testing/runner/runner.go b/pkg/testing/runner/runner.go index 2565872c65e..a303f0f3368 100644 --- a/pkg/testing/runner/runner.go +++ b/pkg/testing/runner/runner.go @@ -385,7 +385,7 @@ func (r *Runner) runInstance(ctx context.Context, sshAuth ssh.AuthMethod, logger // set the go test flags env["GOTEST_FLAGS"] = r.cfg.TestFlags - env["TEST_BINARY"] = r.cfg.BinaryName + env["TEST_BINARY_NAME"] = r.cfg.BinaryName // run the actual tests on the host result, err := batch.OS.Runner.Run(ctx, r.cfg.VerboseMode, client, logger, r.cfg.AgentVersion, batch.ID, batch.Batch, env) diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index 587a0599927..4883316906a 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -58,7 +58,11 @@ func TestMetricbeatSeverless(t *testing.T) { func (runner *BeatRunner) SetupSuite() { runner.T().Logf("In SetupSuite") - runner.testbeatName = os.Getenv("TEST_BINARY") + runner.testbeatName = os.Getenv("TEST_BINARY_NAME") + if runner.testbeatName == "" { + runner.T().Fatalf("TEST_BINARY_NAME must be set") + } + runner.T().Logf("running serverless tests with %s", runner.testbeatName) agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu", atesting.WithRunLength(time.Minute), atesting.WithAdditionalArgs([]string{"-E", "output.elasticsearch.allow_older_versions=true"})) runner.agentFixture = agentFixture From a20930df7562e1060fcdc41ac83fddb7a7077f2f Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 29 Aug 2023 10:08:30 -0700 Subject: [PATCH 11/18] fix defaults in fixture --- pkg/testing/fixture.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/testing/fixture.go b/pkg/testing/fixture.go index bad2c84c9fe..bf03a977ba1 100644 --- a/pkg/testing/fixture.go +++ b/pkg/testing/fixture.go @@ -146,6 +146,8 @@ func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, err operatingSystem: runtime.GOOS, architecture: runtime.GOARCH, connectTimout: 5 * time.Second, + // default to elastic-agent, can be changed by a set FixtureOpt below + binaryName: "elastic-agent", } for _, o := range opts { o(f) From 7d5da83fc73b20fb44570dbe3e8dc484ba61e612 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 29 Aug 2023 14:16:30 -0700 Subject: [PATCH 12/18] check binary name in test setup --- testing/integration/beat_serverless_test.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index 4883316906a..ab46d689452 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -// //go:build integration +//go:build integration package integration @@ -62,6 +62,9 @@ func (runner *BeatRunner) SetupSuite() { if runner.testbeatName == "" { runner.T().Fatalf("TEST_BINARY_NAME must be set") } + if runner.testbeatName == "elastic-agent" { + runner.T().Skipf("tests must be run against a beat, not elastic-agent") + } runner.T().Logf("running serverless tests with %s", runner.testbeatName) agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu", atesting.WithRunLength(time.Minute), atesting.WithAdditionalArgs([]string{"-E", "output.elasticsearch.allow_older_versions=true"})) From 85b4ba5ac10a72c4ed03b26a7a9b84219bde8df0 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 6 Sep 2023 07:43:40 -0700 Subject: [PATCH 13/18] allow ilm override in tests --- testing/integration/beat_serverless_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index ab46d689452..c258632d124 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -254,7 +254,7 @@ func (runner *BeatRunner) TestIndexManagementILMEnabledFail() { runner.agentFixture.WorkDir(), "setup", "--index-management", - "-E", "setup.ilm.enabled=true"}) + "-E", "setup.ilm.enabled=true", "-E", "setup.ilm.overwrite=true"}) runner.T().Logf("got response from management setup: %s", string(resp)) assert.Error(runner.T(), err) assert.Contains(runner.T(), string(resp), "not supported") @@ -274,7 +274,7 @@ func (runner *BeatRunner) TestExportILMFail() { resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), - "export", "ilm-policy"}) + "export", "ilm-policy", "-E", "setup.ilm.overwrite=true"}) runner.T().Logf("got response from management setup: %s", string(resp)) assert.Error(runner.T(), err) assert.Contains(runner.T(), string(resp), "not supported") From 0051120dc6e8144093012e5893a083ca552a2bac Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 6 Sep 2023 15:07:55 -0700 Subject: [PATCH 14/18] fix filebeat tests, add cleanup --- pkg/testing/tools/elasticsearch.go | 38 +++++++++++++++ pkg/testing/tools/kibana.go | 14 ++++++ testing/integration/beat_serverless_test.go | 52 +++++++++++++++++++-- 3 files changed, 100 insertions(+), 4 deletions(-) diff --git a/pkg/testing/tools/elasticsearch.go b/pkg/testing/tools/elasticsearch.go index 5ddae293abd..1cdb197ee35 100644 --- a/pkg/testing/tools/elasticsearch.go +++ b/pkg/testing/tools/elasticsearch.go @@ -180,6 +180,30 @@ func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.In return parsed, nil } +// DeleteIndexTemplatesDataStreams deletes any data streams, then associcated index templates. +func DeleteIndexTemplatesDataStreams(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IndicesDeleteDataStreamRequest{Name: []string{name}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting data streams: %w", err) + } + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for data stream delete: %w", err) + } + + patternReq := esapi.IndicesDeleteIndexTemplateRequest{Name: name} + resp, err = patternReq.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index templates: %w", err) + } + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response for index template delete: %w", err) + } + return nil +} + // GetPipelines returns a list of installed pipelines that match the given name/pattern func GetPipelines(ctx context.Context, client elastictransport.Interface, name string) (map[string]Pipeline, error) { req := esapi.IngestGetPipelineRequest{PipelineID: name} @@ -200,6 +224,20 @@ func GetPipelines(ctx context.Context, client elastictransport.Interface, name s return parsed, nil } +// DeletePipelines deletes all pipelines that match the given pattern +func DeletePipelines(ctx context.Context, client elastictransport.Interface, name string) error { + req := esapi.IngestDeletePipelineRequest{PipelineID: name} + resp, err := req.Do(ctx, client) + if err != nil { + return fmt.Errorf("error deleting index template") + } + _, err = handleResponseRaw(resp) + if err != nil { + return fmt.Errorf("error handling HTTP response: %w", err) + } + return nil +} + // FindMatchingLogLinesWithContext returns any logs with message fields that match the given line func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) { queryRaw := map[string]interface{}{ diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go index 6c26798710a..e02e91948be 100644 --- a/pkg/testing/tools/kibana.go +++ b/pkg/testing/tools/kibana.go @@ -30,6 +30,20 @@ type Dashboard struct { Version string `json:"version"` } +// DeleteDashboard removes the selected dashboard +func DeleteDashboard(ctx context.Context, client *kibana.Client, id string) error { + status, resp, err := client.Connection.Request("DELETE", fmt.Sprintf("/api/saved_objects/dashboard/%s", id), nil, nil, nil) + if err != nil { + return fmt.Errorf("error making API request: %w, response: '%s'", err, string(resp)) + } + + if status != 200 { + return fmt.Errorf("non-200 return code: %v, response: '%s'", status, string(resp)) + } + return nil + +} + // GetDashboards returns a list of known dashboards on the system func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, error) { params := url.Values{} diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index c258632d124..7af021d98a9 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -2,7 +2,7 @@ // or more contributor license agreements. Licensed under the Elastic License; // you may not use this file except in compliance with the Elastic License. -//go:build integration +// //go:build integration package integration @@ -65,6 +65,10 @@ func (runner *BeatRunner) SetupSuite() { if runner.testbeatName == "elastic-agent" { runner.T().Skipf("tests must be run against a beat, not elastic-agent") } + + if runner.testbeatName != "filebeat" && runner.testbeatName != "metricbeat" { + runner.T().Skip("test only supports metricbeat or filebeat") + } runner.T().Logf("running serverless tests with %s", runner.testbeatName) agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu", atesting.WithRunLength(time.Minute), atesting.WithAdditionalArgs([]string{"-E", "output.elasticsearch.allow_older_versions=true"})) @@ -80,7 +84,7 @@ func (runner *BeatRunner) SetupSuite() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() - mbOutConfig := ` + beatOutConfig := ` output.elasticsearch: hosts: ["%s"] username: %s @@ -95,6 +99,27 @@ processors: fields: test-id: %s ` + if runner.testbeatName == "filebeat" { + beatOutConfig = ` +output.elasticsearch: + hosts: ["%s"] + username: %s + password: %s +setup.kibana: + host: %s +filebeat.config.modules: + - modules: system + syslog: + enabled: true + auth: + enabled: true +processors: + - add_fields: + target: host + fields: + test-id: %s +` + } // beats likes to add standard ports to URLs that don't have them, and ESS will sometimes return a URL without a port, assuming :443 // so try to fix that here @@ -117,14 +142,13 @@ processors: testUuid, err := uuid.NewV4() require.NoError(runner.T(), err) runner.testUuid = testUuid.String() - parsedCfg := fmt.Sprintf(mbOutConfig, fixedESHost, runner.user, runner.pass, fixedKibanaHost, testUuid.String()) + parsedCfg := fmt.Sprintf(beatOutConfig, fixedESHost, runner.user, runner.pass, fixedKibanaHost, testUuid.String()) err = runner.agentFixture.WriteFileToWorkDir(ctx, parsedCfg, fmt.Sprintf("%s.yml", runner.testbeatName)) require.NoError(runner.T(), err) } // run the beat with default metricsets, ensure no errors in logs + data is ingested func (runner *BeatRunner) TestRunAndCheckData() { - ctx, cancel := context.WithTimeout(context.Background(), time.Minute*4) defer cancel() err := runner.agentFixture.Run(ctx) @@ -164,6 +188,15 @@ func (runner *BeatRunner) TestSetupDashboards() { require.True(runner.T(), found, fmt.Sprintf("could not find dashboard newer than 5 minutes, out of %d dashboards", len(dashList))) runner.Run("export dashboards", runner.SubtestExportDashboards) + + // cleanup + for _, dash := range dashList { + err = tools.DeleteDashboard(ctx, runner.requirementsInfo.KibanaClient, dash.ID) + if err != nil { + runner.T().Logf("WARNING: could not delete dashboards after test: %s", err) + break + } + } } // tests the [beat] export dashboard command @@ -211,6 +244,11 @@ func (runner *BeatRunner) TestSetupPipelines() { require.NoError(runner.T(), err) require.NotEmpty(runner.T(), pipelines) + /// cleanup + err = tools.DeletePipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*") + if err != nil { + runner.T().Logf("WARNING: could not clean up pipelines: %s", err) + } } // test beat setup --index-management with ILM disabled @@ -235,6 +273,12 @@ func (runner *BeatRunner) TestIndexManagementNoILM() { runner.Run("export templates", runner.SubtestExportTemplates) runner.Run("export index patterns", runner.SubtestExportIndexPatterns) + + // cleanup + err = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + if err != nil { + runner.T().Logf("WARNING: could not clean up index templates/data streams: %s", err) + } } // tests beat setup --index-management with ILM explicitly set From fd88bf25b47ebe7e8b266ebee3e8bf94a750dc60 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Mon, 11 Sep 2023 14:27:31 -0700 Subject: [PATCH 15/18] tinker with dashboards --- pkg/testing/tools/kibana.go | 9 +++++++-- testing/integration/beat_serverless_test.go | 4 +++- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go index e02e91948be..df88a3d2691 100644 --- a/pkg/testing/tools/kibana.go +++ b/pkg/testing/tools/kibana.go @@ -8,6 +8,7 @@ import ( "context" "encoding/json" "fmt" + "net/http" "net/url" "time" @@ -32,7 +33,9 @@ type Dashboard struct { // DeleteDashboard removes the selected dashboard func DeleteDashboard(ctx context.Context, client *kibana.Client, id string) error { - status, resp, err := client.Connection.Request("DELETE", fmt.Sprintf("/api/saved_objects/dashboard/%s", id), nil, nil, nil) + headers := http.Header{} + headers.Add("x-elastic-internal-origin", "integration-tests") + status, resp, err := client.Connection.Request("DELETE", fmt.Sprintf("/api/saved_objects/dashboard/%s", id), nil, headers, nil) if err != nil { return fmt.Errorf("error making API request: %w, response: '%s'", err, string(resp)) } @@ -53,7 +56,9 @@ func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, err dashboards := []Dashboard{} page := 1 for { - status, resp, err := client.Connection.Request("GET", "/api/saved_objects/_find", params, nil, nil) + headers := http.Header{} + headers.Add("x-elastic-internal-origin", "integration-tests") + status, resp, err := client.Connection.Request("GET", "/api/saved_objects/_find", params, headers, nil) if err != nil { return nil, fmt.Errorf("error making api request: %w", err) } diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index 7af021d98a9..a2460685fc4 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -209,10 +209,12 @@ func (runner *BeatRunner) SubtestExportDashboards() { require.NoError(runner.T(), err) require.NotEmpty(runner.T(), dashlist) - _, err = runner.agentFixture.Exec(ctx, []string{"--path.home", + exportOut, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), "export", "dashboard", "--folder", outDir, "--id", dashlist[0].ID}) + + runner.T().Logf("got output: %s", exportOut) assert.NoError(runner.T(), err) inFolder, err := os.ReadDir(filepath.Join(outDir, "/_meta/kibana/8/dashboard")) From ec9c8c590639adacf6f47096d11371a8da6243cb Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 13 Sep 2023 10:53:09 -0700 Subject: [PATCH 16/18] fix ilm tests --- pkg/testing/ess/serverless_provision.go | 3 ++- pkg/testing/tools/kibana.go | 1 + testing/integration/beat_serverless_test.go | 28 +++++++++++++-------- 3 files changed, 21 insertions(+), 11 deletions(-) diff --git a/pkg/testing/ess/serverless_provision.go b/pkg/testing/ess/serverless_provision.go index ea78fc0eab1..553dd238eb2 100644 --- a/pkg/testing/ess/serverless_provision.go +++ b/pkg/testing/ess/serverless_provision.go @@ -80,7 +80,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne for _, req := range requests { client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log) srvReq := ServerlessRequest{Name: req.ID, RegionID: prov.cfg.Region} - _, err := client.DeployStack(ctx, srvReq) + proj, err := client.DeployStack(ctx, srvReq) if err != nil { return nil, fmt.Errorf("error deploying stack for request %s: %w", req.ID, err) } @@ -95,6 +95,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne Kibana: client.proj.Endpoints.Kibana, Username: client.proj.Credentials.Username, Password: client.proj.Credentials.Password, + Internal: map[string]interface{}{"ID": proj.ID}, } stacks = append(stacks, newStack) prov.stacksMut.Lock() diff --git a/pkg/testing/tools/kibana.go b/pkg/testing/tools/kibana.go index df88a3d2691..ed9459307b6 100644 --- a/pkg/testing/tools/kibana.go +++ b/pkg/testing/tools/kibana.go @@ -33,6 +33,7 @@ type Dashboard struct { // DeleteDashboard removes the selected dashboard func DeleteDashboard(ctx context.Context, client *kibana.Client, id string) error { + // In the future there should be logic to check if we need this header, waiting for https://github.com/elastic/kibana/pull/164850 headers := http.Header{} headers.Add("x-elastic-internal-origin", "integration-tests") status, resp, err := client.Connection.Request("DELETE", fmt.Sprintf("/api/saved_objects/dashboard/%s", id), nil, headers, nil) diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index a2460685fc4..11c4f1b7a0f 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -262,9 +262,11 @@ func (runner *BeatRunner) TestIndexManagementNoILM() { runner.agentFixture.WorkDir(), "setup", "--index-management", - "-E", "setup.ilm.enabled=false"}) + "--E=setup.ilm.enabled=false"}) runner.T().Logf("got response from management setup: %s", string(resp)) assert.NoError(runner.T(), err) + // we should not print a warning if we've explicitly disabled ILM + assert.NotContains(runner.T(), string(resp), "not supported") tmpls, err := tools.GetIndexTemplatesForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) require.NoError(runner.T(), err) @@ -286,7 +288,7 @@ func (runner *BeatRunner) TestIndexManagementNoILM() { // tests beat setup --index-management with ILM explicitly set // On serverless, this should fail. // Will not pass right now, may need to change -func (runner *BeatRunner) TestIndexManagementILMEnabledFail() { +func (runner *BeatRunner) TestIndexManagementILMEnabledWarning() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient) @@ -300,15 +302,16 @@ func (runner *BeatRunner) TestIndexManagementILMEnabledFail() { runner.agentFixture.WorkDir(), "setup", "--index-management", - "-E", "setup.ilm.enabled=true", "-E", "setup.ilm.overwrite=true"}) + "--E=setup.ilm.enabled=true", "--E=setup.ilm.overwrite=true"}) runner.T().Logf("got response from management setup: %s", string(resp)) - assert.Error(runner.T(), err) + require.NoError(runner.T(), err) assert.Contains(runner.T(), string(resp), "not supported") } // tests beat setup ilm-policy -// On serverless, this should fail -func (runner *BeatRunner) TestExportILMFail() { +// the export command doesn't actually make a network connection, +// so this won't fail +func (runner *BeatRunner) TestExport() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient) @@ -320,10 +323,15 @@ func (runner *BeatRunner) TestExportILMFail() { resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), - "export", "ilm-policy", "-E", "setup.ilm.overwrite=true"}) - runner.T().Logf("got response from management setup: %s", string(resp)) - assert.Error(runner.T(), err) - assert.Contains(runner.T(), string(resp), "not supported") + "export", "ilm-policy", "--E=setup.ilm.overwrite=true"}) + runner.T().Logf("got response from export: %s", string(resp)) + assert.NoError(runner.T(), err) + // check to see if we got a valid output + policy := map[string]interface{}{} + err = json.Unmarshal(resp, &policy) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), policy["policy"]) } From 8267e473d987ffcc24c76d604dc756edd2eaa7c0 Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Tue, 26 Sep 2023 16:31:10 -0700 Subject: [PATCH 17/18] use API keys for auth --- pkg/testing/tools/elasticsearch.go | 45 +++++++++++++++++++++ testing/integration/beat_serverless_test.go | 15 +++---- 2 files changed, 53 insertions(+), 7 deletions(-) diff --git a/pkg/testing/tools/elasticsearch.go b/pkg/testing/tools/elasticsearch.go index 1cdb197ee35..92b0c1c1b4d 100644 --- a/pkg/testing/tools/elasticsearch.go +++ b/pkg/testing/tools/elasticsearch.go @@ -13,6 +13,7 @@ import ( "strconv" "strings" + "github.com/elastic/elastic-agent-libs/mapstr" "github.com/elastic/elastic-transport-go/v8/elastictransport" "github.com/elastic/go-elasticsearch/v8/esapi" ) @@ -103,6 +104,23 @@ type Version struct { BuildFlavor string `json:"build_flavor"` } +// APIKeyRequest contains the needed data to create an API key in Elasticsearch +type APIKeyRequest struct { + Name string `json:"name"` + Expiration string `json:"expiration"` + RoleDescriptors mapstr.M `json:"role_descriptors,omitempty"` + Metadata mapstr.M `json:"metadata,omitempty"` +} + +// APIKeyResponse contains the response data for an API request +type APIKeyResponse struct { + Id string `json:"id"` + Name string `json:"name"` + Expiration int `json:"expiration"` + APIKey string `json:"api_key"` + Encoded string `json:"encoded"` +} + // GetAllindicies returns a list of indicies on the target ES instance func GetAllindicies(client elastictransport.Interface) ([]Index, error) { return GetIndicesWithContext(context.Background(), client, []string{}) @@ -134,6 +152,33 @@ func GetIndicesWithContext(ctx context.Context, client elastictransport.Interfac return respData, nil } +// CreateAPIKey creates an API key with the given request data +func CreateAPIKey(ctx context.Context, client elastictransport.Interface, req APIKeyRequest) (APIKeyResponse, error) { + var buf bytes.Buffer + err := json.NewEncoder(&buf).Encode(req) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating ES query: %w", err) + } + + apiReq := esapi.SecurityCreateAPIKeyRequest{Body: &buf} + resp, err := apiReq.Do(ctx, client) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error creating API key: %w", err) + } + resultBuf, err := handleResponseRaw(resp) + if err != nil { + return APIKeyResponse{}, fmt.Errorf("error handling HTTP response: %w", err) + } + + parsed := APIKeyResponse{} + err = json.Unmarshal(resultBuf, &parsed) + if err != nil { + return parsed, fmt.Errorf("error unmarshaling json response: %w", err) + } + + return parsed, nil +} + // FindMatchingLogLines returns any logs with message fields that match the given line func FindMatchingLogLines(client elastictransport.Interface, namespace, line string) (Documents, error) { return FindMatchingLogLinesWithContext(context.Background(), client, namespace, line) diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index 11c4f1b7a0f..4de2390853a 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -87,8 +87,7 @@ func (runner *BeatRunner) SetupSuite() { beatOutConfig := ` output.elasticsearch: hosts: ["%s"] - username: %s - password: %s + api_key: "%s:%s" setup.kibana: host: %s metricbeat.config.modules: @@ -121,6 +120,9 @@ processors: ` } + apiResp, err := tools.CreateAPIKey(ctx, runner.requirementsInfo.ESClient, tools.APIKeyRequest{Name: "test-api-key", Expiration: "1d"}) + require.NoError(runner.T(), err) + // beats likes to add standard ports to URLs that don't have them, and ESS will sometimes return a URL without a port, assuming :443 // so try to fix that here fixedKibanaHost := runner.kibHost @@ -142,7 +144,7 @@ processors: testUuid, err := uuid.NewV4() require.NoError(runner.T(), err) runner.testUuid = testUuid.String() - parsedCfg := fmt.Sprintf(beatOutConfig, fixedESHost, runner.user, runner.pass, fixedKibanaHost, testUuid.String()) + parsedCfg := fmt.Sprintf(beatOutConfig, fixedESHost, apiResp.Id, apiResp.APIKey, fixedKibanaHost, testUuid.String()) err = runner.agentFixture.WriteFileToWorkDir(ctx, parsedCfg, fmt.Sprintf("%s.yml", runner.testbeatName)) require.NoError(runner.T(), err) } @@ -287,8 +289,7 @@ func (runner *BeatRunner) TestIndexManagementNoILM() { // tests beat setup --index-management with ILM explicitly set // On serverless, this should fail. -// Will not pass right now, may need to change -func (runner *BeatRunner) TestIndexManagementILMEnabledWarning() { +func (runner *BeatRunner) TestIndexManagementILMEnabledFailure() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute) defer cancel() info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient) @@ -304,8 +305,8 @@ func (runner *BeatRunner) TestIndexManagementILMEnabledWarning() { "--index-management", "--E=setup.ilm.enabled=true", "--E=setup.ilm.overwrite=true"}) runner.T().Logf("got response from management setup: %s", string(resp)) - require.NoError(runner.T(), err) - assert.Contains(runner.T(), string(resp), "not supported") + require.Error(runner.T(), err) + assert.Contains(runner.T(), string(resp), "error creating") } // tests beat setup ilm-policy From acff625546f0c81575518c469bf9ad71827631db Mon Sep 17 00:00:00 2001 From: fearful-symmetry Date: Wed, 27 Sep 2023 14:35:49 -0700 Subject: [PATCH 18/18] add additional integration tests --- pkg/testing/tools/elasticsearch.go | 42 +++++++ testing/integration/beat_serverless_test.go | 118 +++++++++++++++++++- 2 files changed, 158 insertions(+), 2 deletions(-) diff --git a/pkg/testing/tools/elasticsearch.go b/pkg/testing/tools/elasticsearch.go index 92b0c1c1b4d..8b2d554fd98 100644 --- a/pkg/testing/tools/elasticsearch.go +++ b/pkg/testing/tools/elasticsearch.go @@ -121,6 +121,27 @@ type APIKeyResponse struct { Encoded string `json:"encoded"` } +// DataStreams represents the response from an ES _data_stream API +type DataStreams struct { + DataStreams []DataStream `json:"data_streams"` +} + +// DataStream represents a data stream template +type DataStream struct { + Name string `json:"name"` + Indicies []map[string]string `json:"indicies"` + Status string `json:"status"` + Template string `json:"template"` + Lifecycle Lifecycle `json:"lifecycle"` + Hidden bool `json:"hidden"` + System bool `json:"system"` +} + +type Lifecycle struct { + Enabled bool `json:"enabled"` + DataRetention string `json:"data_retention"` +} + // GetAllindicies returns a list of indicies on the target ES instance func GetAllindicies(client elastictransport.Interface) ([]Index, error) { return GetIndicesWithContext(context.Background(), client, []string{}) @@ -225,6 +246,27 @@ func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.In return parsed, nil } +func GetDataStreamsForPattern(ctx context.Context, client elastictransport.Interface, namePattern string) (DataStreams, error) { + req := esapi.IndicesGetDataStreamRequest{Name: []string{namePattern}, ExpandWildcards: "all,hidden"} + resp, err := req.Do(ctx, client) + if err != nil { + return DataStreams{}, fmt.Errorf("error fetching data streams") + } + + raw, err := handleResponseRaw(resp) + if err != nil { + return DataStreams{}, fmt.Errorf("error handling HTTP response for data stream get: %w", err) + } + + data := DataStreams{} + err = json.Unmarshal(raw, &data) + if err != nil { + return DataStreams{}, fmt.Errorf("error unmarshalling datastream: %w", err) + } + + return data, nil +} + // DeleteIndexTemplatesDataStreams deletes any data streams, then associcated index templates. func DeleteIndexTemplatesDataStreams(ctx context.Context, client elastictransport.Interface, name string) error { req := esapi.IndicesDeleteDataStreamRequest{Name: []string{name}, ExpandWildcards: "all,hidden"} diff --git a/testing/integration/beat_serverless_test.go b/testing/integration/beat_serverless_test.go index 4de2390853a..64ef36344cd 100644 --- a/testing/integration/beat_serverless_test.go +++ b/testing/integration/beat_serverless_test.go @@ -22,6 +22,7 @@ import ( "github.com/stretchr/testify/require" "github.com/stretchr/testify/suite" + "github.com/elastic/elastic-agent-libs/mapstr" atesting "github.com/elastic/elastic-agent/pkg/testing" "github.com/elastic/elastic-agent/pkg/testing/define" "github.com/elastic/elastic-agent/pkg/testing/tools" @@ -153,6 +154,10 @@ processors: func (runner *BeatRunner) TestRunAndCheckData() { ctx, cancel := context.WithTimeout(context.Background(), time.Minute*4) defer cancel() + + // in case there's already a running template, delete it, forcing the beat to re-install + _ = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + err := runner.agentFixture.Run(ctx) require.NoError(runner.T(), err) @@ -287,6 +292,52 @@ func (runner *BeatRunner) TestIndexManagementNoILM() { } } +// TestWithCustomLifecyclePolicy uploads a custom DSL policy +func (runner *BeatRunner) TestWithCustomLifecyclePolicy() { + //create a custom policy file + dslPolicy := mapstr.M{ + "data_retention": "1d", + } + + lctemp := runner.T().TempDir() + raw, err := json.MarshalIndent(dslPolicy, "", " ") + require.NoError(runner.T(), err) + + lifecyclePath := filepath.Join(lctemp, "dsl_policy.json") + + err = os.WriteFile(lifecyclePath, raw, 0o744) + require.NoError(runner.T(), err) + + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // pre-delete in case something else missed cleanup + _ = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.dsl.enabled=true", fmt.Sprintf("--E=setup.dsl.policy_file=%s", lifecyclePath)}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + streams, err := tools.GetDataStreamsForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + + foundCustom := false + for _, stream := range streams.DataStreams { + if stream.Lifecycle.DataRetention == "1d" { + foundCustom = true + break + } + } + require.True(runner.T(), foundCustom, "did not find our custom lifecycle policy. Found: %#v", streams) + + err = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("%s*", runner.testbeatName)) + require.NoError(runner.T(), err) +} + // tests beat setup --index-management with ILM explicitly set // On serverless, this should fail. func (runner *BeatRunner) TestIndexManagementILMEnabledFailure() { @@ -309,7 +360,54 @@ func (runner *BeatRunner) TestIndexManagementILMEnabledFailure() { assert.Contains(runner.T(), string(resp), "error creating") } -// tests beat setup ilm-policy +// tests setup with both ILM and DSL enabled, should fail +func (runner *BeatRunner) TestBothLifecyclesEnabled() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=true", "--E=setup.dsl.enabled=true"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.Error(runner.T(), err) +} + +// disable all lifecycle management, ensure it's actually disabled +func (runner *BeatRunner) TestAllLifecyclesDisabled() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + + // in case there's already a running template, delete it, forcing the beat to re-install + _ = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "setup", + "--index-management", + "--E=setup.ilm.enabled=false", "--E=setup.dsl.enabled=false"}) + runner.T().Logf("got response from management setup: %s", string(resp)) + require.NoError(runner.T(), err) + + // make sure we have data streams, but there's no lifecycles + streams, err := tools.GetDataStreamsForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) + + require.NotEmpty(runner.T(), streams.DataStreams, "found no datastreams") + foundPolicy := false + for _, stream := range streams.DataStreams { + if stream.Lifecycle.DataRetention != "" { + foundPolicy = true + break + } + } + require.False(runner.T(), foundPolicy, "Found a lifecycle policy despite disabling lifecycles. Found: %#v", streams) + + err = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName)) + require.NoError(runner.T(), err) +} + // the export command doesn't actually make a network connection, // so this won't fail func (runner *BeatRunner) TestExport() { @@ -324,7 +422,7 @@ func (runner *BeatRunner) TestExport() { resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", runner.agentFixture.WorkDir(), - "export", "ilm-policy", "--E=setup.ilm.overwrite=true"}) + "export", "ilm-policy", "--E=setup.ilm.enabled=true"}) runner.T().Logf("got response from export: %s", string(resp)) assert.NoError(runner.T(), err) // check to see if we got a valid output @@ -333,7 +431,23 @@ func (runner *BeatRunner) TestExport() { require.NoError(runner.T(), err) require.NotEmpty(runner.T(), policy["policy"]) +} + +// tests beat export with DSL +func (runner *BeatRunner) TestExportDSL() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home", + runner.agentFixture.WorkDir(), + "export", "ilm-policy", "--E=setup.dsl.enabled=true"}) + runner.T().Logf("got response from export: %s", string(resp)) + assert.NoError(runner.T(), err) + // check to see if we got a valid output + policy := map[string]interface{}{} + err = json.Unmarshal(resp, &policy) + require.NoError(runner.T(), err) + require.NotEmpty(runner.T(), policy["data_retention"]) } func (runner *BeatRunner) SubtestExportTemplates() {