Skip to content

Commit

Permalink
Update serverless support in integration tests, expand test tooling, …
Browse files Browse the repository at this point in the history
…add support for beats testing (elastic#3486)

* cleaning up

* final bit of cleanup

* fix magefile, cleanup docs

* clean up errors, make linter happy

* fix headers

* fix fields in runner config

* add dashboard checks

* clean up, refactor

* clean up

* tinker with env vars

* fix defaults in fixture

* check binary name in test setup

* allow ilm override in tests

* fix filebeat tests, add cleanup

* tinker with dashboards

* fix ilm tests

* use API keys for auth

* add additional integration tests

* remove beats-specific code

* hack in serverless tests

* tinker with tests

* change env var naming

* actually use correct provisioner name

* tinker with buildkite again

* fix things after refactor

* fix buildkite

* fix my bash scripts

* my bash is a tad rusty

* tinker with script hooks

* not sure what ci role I broke

* clean up es handlers

* deal with recent refactor

* fix my broken refactor

* change url, see what happens

* tinker with tests more

* swap pipelines, see what happens

* break apart beat runners

* fix a few serverless bugs, timeout

* remove os restriction

* increase timeouts

* revert timeout change
  • Loading branch information
fearful-symmetry authored Oct 24, 2023
1 parent 9ae9a4c commit cc61764
Show file tree
Hide file tree
Showing 18 changed files with 704 additions and 125 deletions.
2 changes: 1 addition & 1 deletion .buildkite/hooks/pre-command
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent-package" ]]; then
fi
fi

if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == "integration-tests" ]]; then
if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == *"integration-tests"* ]]; then
# Set GCP credentials
export GOOGLE_APPLICATION_GCP_SECRET=$(retry 5 vault kv get -format=json -field=data ${CI_GCP_OBS_PATH})
echo "${GOOGLE_APPLICATION_GCP_SECRET}" > ./gcp.json
Expand Down
2 changes: 1 addition & 1 deletion .buildkite/hooks/pre-exit
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

set -eo pipefail

if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == "integration-tests" ]]; then
if [[ "$BUILDKITE_PIPELINE_SLUG" == "elastic-agent" && "$BUILDKITE_STEP_KEY" == *"integration-tests"* ]]; then
if [[ -z "${WORKSPACE-""}" ]]; then
WORKSPACE=$(git rev-parse --show-toplevel)
fi
Expand Down
12 changes: 11 additions & 1 deletion .buildkite/pipeline.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,19 @@ steps:
manual:
allowed: true

- label: "Serverless integration test"
key: "serverless-integration-tests"
command: ".buildkite/scripts/steps/integration_tests.sh serverless integration:single TestMonitoringLogsShipped" #right now, run a single test in serverless mode as a sort of smoke test, instead of re-running the entire suite
artifact_paths:
- "build/TEST-**"
- "build/diagnostics/*"
agents:
provider: "gcp"
machineType: "n1-standard-8"

- label: "Integration tests"
key: "integration-tests"
command: ".buildkite/scripts/steps/integration_tests.sh"
command: ".buildkite/scripts/steps/integration_tests.sh stateful"
artifact_paths:
- "build/TEST-**"
- "build/diagnostics/*"
Expand Down
7 changes: 6 additions & 1 deletion .buildkite/scripts/steps/integration_tests.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,11 @@ set -euo pipefail

source .buildkite/scripts/common.sh

STACK_PROVISIONER="${1:-"stateful"}"
MAGE_TARGET="${2:-"integration:test"}"
MAGE_SUBTARGET="${3:-""}"


# Override the agent package version using a string with format <major>.<minor>.<patch>
# NOTE: use only after version bump when the new version is not yet available, for example:
# OVERRIDE_AGENT_PACKAGE_VERSION="8.10.3" otherwise OVERRIDE_AGENT_PACKAGE_VERSION="".
Expand All @@ -18,7 +23,7 @@ AGENT_PACKAGE_VERSION="${OVERRIDE_AGENT_PACKAGE_VERSION}" DEV=true EXTERNAL=true

# Run integration tests
set +e
AGENT_VERSION="${OVERRIDE_TEST_AGENT_VERSION}" TEST_INTEG_CLEAN_ON_EXIT=true SNAPSHOT=true mage integration:test
AGENT_VERSION="${OVERRIDE_TEST_AGENT_VERSION}" TEST_INTEG_CLEAN_ON_EXIT=true STACK_PROVISIONER="$STACK_PROVISIONER" SNAPSHOT=true mage $MAGE_TARGET $MAGE_SUBTARGET
TESTS_EXIT_STATUS=$?
set -e

Expand Down
54 changes: 35 additions & 19 deletions magefile.go
Original file line number Diff line number Diff line change
Expand Up @@ -1461,6 +1461,7 @@ func majorMinor() string {

// Clean cleans up the integration testing leftovers
func (Integration) Clean() error {
fmt.Println("--- Clean mage artifacts")
_ = os.RemoveAll(".agent-testing")

// Clean out .integration-cache/.ogc-cache always
Expand Down Expand Up @@ -1702,6 +1703,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche
if err != nil {
return nil, err
}

agentStackVersion := os.Getenv("AGENT_STACK_VERSION")
agentVersion := os.Getenv("AGENT_VERSION")
if agentVersion == "" {
Expand All @@ -1719,6 +1721,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche
agentVersion = fmt.Sprintf("%s-SNAPSHOT", agentVersion)
}
}

if agentStackVersion == "" {
agentStackVersion = agentVersion
}
Expand Down Expand Up @@ -1756,15 +1759,15 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche
instanceProvisionerMode = "ogc"
}
if instanceProvisionerMode != "ogc" && instanceProvisionerMode != "multipass" {
return nil, errors.New("INSTANCE_PROVISIONER environment variable must be one of 'ogc' or 'multipass'")
return nil, fmt.Errorf("INSTANCE_PROVISIONER environment variable must be one of 'ogc' or 'multipass', not %s", instanceProvisionerMode)
}
fmt.Printf(">>>> Using %s instance provisioner\n", instanceProvisionerMode)
stackProvisionerMode := os.Getenv("STACK_PROVISIONER")
if stackProvisionerMode == "" {
stackProvisionerMode = "ess"
stackProvisionerMode = "stateful"
}
if stackProvisionerMode != "ess" && stackProvisionerMode != "serverless" {
return nil, errors.New("STACK_PROVISIONER environment variable must be one of 'serverless' or 'ess'")
if stackProvisionerMode != "stateful" && stackProvisionerMode != "serverless" {
return nil, fmt.Errorf("STACK_PROVISIONER environment variable must be one of 'serverless' or 'stateful', not %s", stackProvisionerMode)
}
fmt.Printf(">>>> Using %s stack provisioner\n", stackProvisionerMode)

Expand All @@ -1778,24 +1781,37 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche
extraEnv["AGENT_KEEP_INSTALLED"] = os.Getenv("AGENT_KEEP_INSTALLED")
}

// these following two env vars are currently not used by anything, but can be used in the future to test beats or
// other binaries, see https://github.com/elastic/elastic-agent/pull/3258
binaryName := os.Getenv("TEST_BINARY_NAME")
if binaryName == "" {
binaryName = "elastic-agent"
}

repoDir := os.Getenv("TEST_INTEG_REPO_PATH")
if repoDir == "" {
repoDir = "."
}

diagDir := filepath.Join("build", "diagnostics")
_ = os.MkdirAll(diagDir, 0755)

cfg := runner.Config{
AgentVersion: agentVersion,
AgentStackVersion: agentStackVersion,
BuildDir: agentBuildDir,
GOVersion: goVersion,
RepoDir: ".",
StateDir: ".integration-cache",
DiagnosticsDir: diagDir,
Platforms: testPlatforms(),
Matrix: matrix,
SingleTest: singleTest,
VerboseMode: mg.Verbose(),
Timestamp: timestamp,
TestFlags: goTestFlags,
ExtraEnv: extraEnv,
AgentVersion: agentVersion,
StackVersion: agentStackVersion,
BuildDir: agentBuildDir,
GOVersion: goVersion,
RepoDir: repoDir,
DiagnosticsDir: diagDir,
StateDir: ".integration-cache",
Platforms: testPlatforms(),
Matrix: matrix,
SingleTest: singleTest,
VerboseMode: mg.Verbose(),
Timestamp: timestamp,
TestFlags: goTestFlags,
ExtraEnv: extraEnv,
BinaryName: binaryName,
}
ogcCfg := ogc.Config{
ServiceTokenPath: serviceTokenPath,
Expand Down Expand Up @@ -1824,7 +1840,7 @@ func createTestRunner(matrix bool, singleTest string, goTestFlags string, batche
Region: essRegion,
}
var stackProvisioner runner.StackProvisioner
if stackProvisionerMode == "ess" {
if stackProvisionerMode == "stateful" {
stackProvisioner, err = ess.NewProvisioner(provisionCfg)
if err != nil {
return nil, err
Expand Down
18 changes: 14 additions & 4 deletions pkg/testing/define/define.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,19 +78,29 @@ func NewFixture(t *testing.T, version string, opts ...atesting.FixtureOpt) (*ate
buildsDir = filepath.Join(projectDir, "build", "distributions")
}

return NewFixtureWithBinary(t, version, "elastic-agent", buildsDir)

}

// NewFixture returns a new Elastic Agent testing fixture with a LocalFetcher and
// the agent logging to the test logger.
func NewFixtureWithBinary(t *testing.T, version string, binary string, buildsDir string, opts ...atesting.FixtureOpt) (*atesting.Fixture, error) {
ver, err := semver.ParseVersion(version)
if err != nil {
return nil, fmt.Errorf("%q is an invalid agent version: %w", version, err)
}

var f atesting.Fetcher
var binFetcher atesting.Fetcher
if ver.IsSnapshot() {
f = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly())
binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithLocalSnapshotOnly(), atesting.WithCustomBinaryName(binary))
} else {
f = atesting.LocalFetcher(buildsDir)
binFetcher = atesting.LocalFetcher(buildsDir, atesting.WithCustomBinaryName(binary))
}

opts = append(opts, atesting.WithFetcher(f), atesting.WithLogOutput())
opts = append(opts, atesting.WithFetcher(binFetcher), atesting.WithLogOutput())
if binary != "elastic-agent" {
opts = append(opts, atesting.WithBinaryName(binary))
}
return atesting.NewFixture(t, version, opts...)
}

Expand Down
6 changes: 4 additions & 2 deletions pkg/testing/ess/provisioner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,10 @@ func (p *provisioner) Provision(ctx context.Context, requests []runner.StackRequ
results[r] = resp
}

// wait 15 minutes for all stacks to be ready
readyCtx, readyCancel := context.WithTimeout(ctx, 15*time.Minute)
// set a long timeout
// this context travels up to the magefile, clients that want a shorter timeout can set
// it via mage's -t flag
readyCtx, readyCancel := context.WithTimeout(ctx, 25*time.Minute)
defer readyCancel()

g, gCtx := errgroup.WithContext(readyCtx)
Expand Down
69 changes: 53 additions & 16 deletions pkg/testing/ess/serverless.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
"github.com/elastic/elastic-agent/pkg/testing/runner"
)

var serverlessURL = "https://global.qa.cld.elstc.co"
var serverlessURL = "https://staging.found.no"

// ServerlessClient is the handler the serverless ES instance
type ServerlessClient struct {
Expand Down Expand Up @@ -54,6 +54,13 @@ type Project struct {
} `json:"endpoints"`
}

// CredResetResponse contains the new auth details for a
// stack credential reset
type CredResetResponse struct {
Password string `json:"password"`
Username string `json:"username"`
}

// NewServerlessClient creates a new instance of the serverless client
func NewServerlessClient(region, projectType, api string, logger runner.Logger) *ServerlessClient {
return &ServerlessClient{
Expand Down Expand Up @@ -97,6 +104,16 @@ func (srv *ServerlessClient) DeployStack(ctx context.Context, req ServerlessRequ
return Project{}, fmt.Errorf("error decoding JSON response: %w", err)
}
srv.proj = serverlessHandle

// as of 8/8-ish, the serverless ESS cloud no longer provides credentials on the first POST request, we must send an additional POST
// to reset the credentials
updated, err := srv.ResetCredentials(ctx)
if err != nil {
return serverlessHandle, fmt.Errorf("error resetting credentials: %w", err)
}
srv.proj.Credentials.Username = updated.Username
srv.proj.Credentials.Password = updated.Password

return serverlessHandle, nil
}

Expand Down Expand Up @@ -181,27 +198,15 @@ func (srv *ServerlessClient) WaitForEndpoints(ctx context.Context) error {

// WaitForElasticsearch waits until the ES endpoint is healthy
func (srv *ServerlessClient) WaitForElasticsearch(ctx context.Context) error {
endpoint := fmt.Sprintf("%s/_cluster/health", srv.proj.Endpoints.Elasticsearch)
req, err := http.NewRequestWithContext(ctx, "GET", endpoint, nil)
req, err := http.NewRequestWithContext(ctx, "GET", srv.proj.Endpoints.Elasticsearch, nil)
if err != nil {
return fmt.Errorf("error creating HTTP request: %w", err)
}
req.SetBasicAuth(srv.proj.Credentials.Username, srv.proj.Credentials.Password)

// _cluster/health no longer works on serverless, just check response code
readyFunc := func(resp *http.Response) bool {
var health struct {
Status string `json:"status"`
}
err = json.NewDecoder(resp.Body).Decode(&health)
resp.Body.Close()
if err != nil {
srv.log.Logf("response decoding error: %v", err)
return false
}
if health.Status == "green" {
return true
}
return false
return resp.StatusCode == 200
}

err = srv.waitForRemoteState(ctx, req, time.Second*5, readyFunc)
Expand Down Expand Up @@ -243,6 +248,38 @@ func (srv *ServerlessClient) WaitForKibana(ctx context.Context) error {
return nil
}

// ResetCredentials resets the credentials for the given ESS instance
func (srv *ServerlessClient) ResetCredentials(ctx context.Context) (CredResetResponse, error) {
resetURL := fmt.Sprintf("%s/api/v1/serverless/projects/%s/%s/_reset-credentials", serverlessURL, srv.projectType, srv.proj.ID)

resetHandler, err := http.NewRequestWithContext(ctx, "POST", resetURL, nil)
if err != nil {
return CredResetResponse{}, fmt.Errorf("error creating new httpRequest: %w", err)
}

resetHandler.Header.Set("Content-Type", "application/json")
resetHandler.Header.Set("Authorization", fmt.Sprintf("ApiKey %s", srv.api))

resp, err := http.DefaultClient.Do(resetHandler)
if err != nil {
return CredResetResponse{}, fmt.Errorf("error performing HTTP request: %w", err)
}
defer resp.Body.Close()

if resp.StatusCode != http.StatusOK {
p, _ := io.ReadAll(resp.Body)
return CredResetResponse{}, fmt.Errorf("Non-200 status code returned by server: %d, body: %s", resp.StatusCode, string(p))
}

updated := CredResetResponse{}
err = json.NewDecoder(resp.Body).Decode(&updated)
if err != nil {
return CredResetResponse{}, fmt.Errorf("error decoding JSON response: %w", err)
}

return updated, nil
}

func (srv *ServerlessClient) waitForRemoteState(ctx context.Context, httpHandler *http.Request, tick time.Duration, isReady func(*http.Response) bool) error {
timer := time.NewTimer(time.Millisecond)
// in cases where we get a timeout, also return the last error returned via HTTP
Expand Down
3 changes: 2 additions & 1 deletion pkg/testing/ess/serverless_provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne
for _, req := range requests {
client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log)
srvReq := ServerlessRequest{Name: req.ID, RegionID: prov.cfg.Region}
_, err := client.DeployStack(ctx, srvReq)
proj, err := client.DeployStack(ctx, srvReq)
if err != nil {
return nil, fmt.Errorf("error deploying stack for request %s: %w", req.ID, err)
}
Expand All @@ -95,6 +95,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne
Kibana: client.proj.Endpoints.Kibana,
Username: client.proj.Credentials.Username,
Password: client.proj.Credentials.Password,
Internal: map[string]interface{}{"deployment_id": proj.ID},
}
stacks = append(stacks, newStack)
prov.stacksMut.Lock()
Expand Down
15 changes: 12 additions & 3 deletions pkg/testing/fetcher_local.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
type localFetcher struct {
dir string
snapshotOnly bool
binaryName string
}

type localFetcherOpt func(f *localFetcher)
Expand All @@ -28,10 +29,18 @@ func WithLocalSnapshotOnly() localFetcherOpt {
}
}

// WithCustomBinaryName sets the binary to a custom name, the default is `elastic-agent`
func WithCustomBinaryName(name string) localFetcherOpt {
return func(f *localFetcher) {
f.binaryName = name
}
}

// LocalFetcher returns a fetcher that pulls the binary of the Elastic Agent from a local location.
func LocalFetcher(dir string, opts ...localFetcherOpt) Fetcher {
f := &localFetcher{
dir: dir,
dir: dir,
binaryName: "elastic-agent",
}
for _, o := range opts {
o(f)
Expand All @@ -56,7 +65,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec
return nil, fmt.Errorf("invalid version: %q: %w", ver, err)
}

mainBuildfmt := "elastic-agent-%s-%s"
mainBuildfmt := "%s-%s-%s"
if f.snapshotOnly && !ver.IsSnapshot() {
if ver.Prerelease() == "" {
ver = semver.NewParsedSemVer(ver.Major(), ver.Minor(), ver.Patch(), "SNAPSHOT", ver.BuildMetadata())
Expand All @@ -66,7 +75,7 @@ func (f *localFetcher) Fetch(_ context.Context, operatingSystem string, architec

}

mainBuild := fmt.Sprintf(mainBuildfmt, ver, suffix)
mainBuild := fmt.Sprintf(mainBuildfmt, f.binaryName, ver, suffix)
mainBuildPath := filepath.Join(f.dir, mainBuild)
build := mainBuild
buildPath := mainBuildPath
Expand Down
Loading

0 comments on commit cc61764

Please sign in to comment.