Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/serverless-tests' into serverles…
Browse files Browse the repository at this point in the history
…s-tests
  • Loading branch information
fearful-symmetry committed Sep 21, 2023
2 parents 27f8814 + e312d0b commit 5611ff9
Show file tree
Hide file tree
Showing 5 changed files with 134 additions and 16 deletions.
3 changes: 2 additions & 1 deletion pkg/testing/ess/serverless_provision.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne
for _, req := range requests {
client := NewServerlessClient(prov.cfg.Region, "observability", prov.cfg.APIKey, prov.log)
srvReq := ServerlessRequest{Name: req.ID, RegionID: prov.cfg.Region}
_, err := client.DeployStack(ctx, srvReq)
proj, err := client.DeployStack(ctx, srvReq)
if err != nil {
return nil, fmt.Errorf("error deploying stack for request %s: %w", req.ID, err)
}
Expand All @@ -95,6 +95,7 @@ func (prov *ServerlessProvision) Provision(ctx context.Context, requests []runne
Kibana: client.proj.Endpoints.Kibana,
Username: client.proj.Credentials.Username,
Password: client.proj.Credentials.Password,
Internal: map[string]interface{}{"ID": proj.ID},
}
stacks = append(stacks, newStack)
prov.stacksMut.Lock()
Expand Down
2 changes: 2 additions & 0 deletions pkg/testing/fixture.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ func NewFixture(t *testing.T, version string, opts ...FixtureOpt) (*Fixture, err
operatingSystem: runtime.GOOS,
architecture: runtime.GOARCH,
connectTimout: 5 * time.Second,
// default to elastic-agent, can be changed by a set FixtureOpt below
binaryName: "elastic-agent",
}
for _, o := range opts {
o(f)
Expand Down
38 changes: 38 additions & 0 deletions pkg/testing/tools/elasticsearch.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,30 @@ func GetIndexTemplatesForPattern(ctx context.Context, client elastictransport.In
return parsed, nil
}

// DeleteIndexTemplatesDataStreams deletes any data streams, then associcated index templates.
func DeleteIndexTemplatesDataStreams(ctx context.Context, client elastictransport.Interface, name string) error {
req := esapi.IndicesDeleteDataStreamRequest{Name: []string{name}, ExpandWildcards: "all,hidden"}
resp, err := req.Do(ctx, client)
if err != nil {
return fmt.Errorf("error deleting data streams: %w", err)
}
_, err = handleResponseRaw(resp)
if err != nil {
return fmt.Errorf("error handling HTTP response for data stream delete: %w", err)
}

patternReq := esapi.IndicesDeleteIndexTemplateRequest{Name: name}
resp, err = patternReq.Do(ctx, client)
if err != nil {
return fmt.Errorf("error deleting index templates: %w", err)
}
_, err = handleResponseRaw(resp)
if err != nil {
return fmt.Errorf("error handling HTTP response for index template delete: %w", err)
}
return nil
}

// GetPipelines returns a list of installed pipelines that match the given name/pattern
func GetPipelines(ctx context.Context, client elastictransport.Interface, name string) (map[string]Pipeline, error) {
req := esapi.IngestGetPipelineRequest{PipelineID: name}
Expand All @@ -200,6 +224,20 @@ func GetPipelines(ctx context.Context, client elastictransport.Interface, name s
return parsed, nil
}

// DeletePipelines deletes all pipelines that match the given pattern
func DeletePipelines(ctx context.Context, client elastictransport.Interface, name string) error {
req := esapi.IngestDeletePipelineRequest{PipelineID: name}
resp, err := req.Do(ctx, client)
if err != nil {
return fmt.Errorf("error deleting index template")
}
_, err = handleResponseRaw(resp)
if err != nil {
return fmt.Errorf("error handling HTTP response: %w", err)
}
return nil
}

// FindMatchingLogLinesWithContext returns any logs with message fields that match the given line
func FindMatchingLogLinesWithContext(ctx context.Context, client elastictransport.Interface, namespace, line string) (Documents, error) {
queryRaw := map[string]interface{}{
Expand Down
22 changes: 21 additions & 1 deletion pkg/testing/tools/kibana.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"time"

Expand All @@ -30,6 +31,23 @@ type Dashboard struct {
Version string `json:"version"`
}

// DeleteDashboard removes the selected dashboard
func DeleteDashboard(ctx context.Context, client *kibana.Client, id string) error {
// In the future there should be logic to check if we need this header, waiting for https://github.com/elastic/kibana/pull/164850
headers := http.Header{}
headers.Add("x-elastic-internal-origin", "integration-tests")
status, resp, err := client.Connection.Request("DELETE", fmt.Sprintf("/api/saved_objects/dashboard/%s", id), nil, headers, nil)
if err != nil {
return fmt.Errorf("error making API request: %w, response: '%s'", err, string(resp))
}

if status != 200 {
return fmt.Errorf("non-200 return code: %v, response: '%s'", status, string(resp))
}
return nil

}

// GetDashboards returns a list of known dashboards on the system
func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, error) {
params := url.Values{}
Expand All @@ -39,7 +57,9 @@ func GetDashboards(ctx context.Context, client *kibana.Client) ([]Dashboard, err
dashboards := []Dashboard{}
page := 1
for {
status, resp, err := client.Connection.Request("GET", "/api/saved_objects/_find", params, nil, nil)
headers := http.Header{}
headers.Add("x-elastic-internal-origin", "integration-tests")
status, resp, err := client.Connection.Request("GET", "/api/saved_objects/_find", params, headers, nil)
if err != nil {
return nil, fmt.Errorf("error making api request: %w", err)
}
Expand Down
85 changes: 71 additions & 14 deletions testing/integration/beat_serverless_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,13 @@ func (runner *BeatRunner) SetupSuite() {
if runner.testbeatName == "" {
runner.T().Fatalf("TEST_BINARY_NAME must be set")
}
if runner.testbeatName == "elastic-agent" {
runner.T().Skipf("tests must be run against a beat, not elastic-agent")
}

if runner.testbeatName != "filebeat" && runner.testbeatName != "metricbeat" {
runner.T().Skip("test only supports metricbeat or filebeat")
}
runner.T().Logf("running serverless tests with %s", runner.testbeatName)

agentFixture, err := define.NewFixtureWithBinary(runner.T(), define.Version(), runner.testbeatName, "/home/ubuntu", atesting.WithRunLength(time.Minute), atesting.WithAdditionalArgs([]string{"-E", "output.elasticsearch.allow_older_versions=true"}))
Expand All @@ -77,7 +84,7 @@ func (runner *BeatRunner) SetupSuite() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()

mbOutConfig := `
beatOutConfig := `
output.elasticsearch:
hosts: ["%s"]
username: %s
Expand All @@ -92,6 +99,27 @@ processors:
fields:
test-id: %s
`
if runner.testbeatName == "filebeat" {
beatOutConfig = `
output.elasticsearch:
hosts: ["%s"]
username: %s
password: %s
setup.kibana:
host: %s
filebeat.config.modules:
- modules: system
syslog:
enabled: true
auth:
enabled: true
processors:
- add_fields:
target: host
fields:
test-id: %s
`
}

// beats likes to add standard ports to URLs that don't have them, and ESS will sometimes return a URL without a port, assuming :443
// so try to fix that here
Expand All @@ -114,14 +142,13 @@ processors:
testUuid, err := uuid.NewV4()
require.NoError(runner.T(), err)
runner.testUuid = testUuid.String()
parsedCfg := fmt.Sprintf(mbOutConfig, fixedESHost, runner.user, runner.pass, fixedKibanaHost, testUuid.String())
parsedCfg := fmt.Sprintf(beatOutConfig, fixedESHost, runner.user, runner.pass, fixedKibanaHost, testUuid.String())
err = runner.agentFixture.WriteFileToWorkDir(ctx, parsedCfg, fmt.Sprintf("%s.yml", runner.testbeatName))
require.NoError(runner.T(), err)
}

// run the beat with default metricsets, ensure no errors in logs + data is ingested
func (runner *BeatRunner) TestRunAndCheckData() {

ctx, cancel := context.WithTimeout(context.Background(), time.Minute*4)
defer cancel()
err := runner.agentFixture.Run(ctx)
Expand Down Expand Up @@ -161,6 +188,15 @@ func (runner *BeatRunner) TestSetupDashboards() {
require.True(runner.T(), found, fmt.Sprintf("could not find dashboard newer than 5 minutes, out of %d dashboards", len(dashList)))

runner.Run("export dashboards", runner.SubtestExportDashboards)

// cleanup
for _, dash := range dashList {
err = tools.DeleteDashboard(ctx, runner.requirementsInfo.KibanaClient, dash.ID)
if err != nil {
runner.T().Logf("WARNING: could not delete dashboards after test: %s", err)
break
}
}
}

// tests the [beat] export dashboard command
Expand All @@ -173,10 +209,12 @@ func (runner *BeatRunner) SubtestExportDashboards() {
require.NoError(runner.T(), err)
require.NotEmpty(runner.T(), dashlist)

_, err = runner.agentFixture.Exec(ctx, []string{"--path.home",
exportOut, err := runner.agentFixture.Exec(ctx, []string{"--path.home",
runner.agentFixture.WorkDir(),
"export",
"dashboard", "--folder", outDir, "--id", dashlist[0].ID})

runner.T().Logf("got output: %s", exportOut)
assert.NoError(runner.T(), err)

inFolder, err := os.ReadDir(filepath.Join(outDir, "/_meta/kibana/8/dashboard"))
Expand Down Expand Up @@ -208,6 +246,11 @@ func (runner *BeatRunner) TestSetupPipelines() {
require.NoError(runner.T(), err)
require.NotEmpty(runner.T(), pipelines)

/// cleanup
err = tools.DeletePipelines(ctx, runner.requirementsInfo.ESClient, "*filebeat*")
if err != nil {
runner.T().Logf("WARNING: could not clean up pipelines: %s", err)
}
}

// test beat setup --index-management with ILM disabled
Expand All @@ -219,9 +262,11 @@ func (runner *BeatRunner) TestIndexManagementNoILM() {
runner.agentFixture.WorkDir(),
"setup",
"--index-management",
"-E", "setup.ilm.enabled=false"})
"--E=setup.ilm.enabled=false"})
runner.T().Logf("got response from management setup: %s", string(resp))
assert.NoError(runner.T(), err)
// we should not print a warning if we've explicitly disabled ILM
assert.NotContains(runner.T(), string(resp), "not supported")

tmpls, err := tools.GetIndexTemplatesForPattern(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName))
require.NoError(runner.T(), err)
Expand All @@ -232,12 +277,18 @@ func (runner *BeatRunner) TestIndexManagementNoILM() {

runner.Run("export templates", runner.SubtestExportTemplates)
runner.Run("export index patterns", runner.SubtestExportIndexPatterns)

// cleanup
err = tools.DeleteIndexTemplatesDataStreams(ctx, runner.requirementsInfo.ESClient, fmt.Sprintf("*%s*", runner.testbeatName))
if err != nil {
runner.T().Logf("WARNING: could not clean up index templates/data streams: %s", err)
}
}

// tests beat setup --index-management with ILM explicitly set
// On serverless, this should fail.
// Will not pass right now, may need to change
func (runner *BeatRunner) TestIndexManagementILMEnabledFail() {
func (runner *BeatRunner) TestIndexManagementILMEnabledWarning() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient)
Expand All @@ -251,15 +302,16 @@ func (runner *BeatRunner) TestIndexManagementILMEnabledFail() {
runner.agentFixture.WorkDir(),
"setup",
"--index-management",
"-E", "setup.ilm.enabled=true"})
"--E=setup.ilm.enabled=true", "--E=setup.ilm.overwrite=true"})
runner.T().Logf("got response from management setup: %s", string(resp))
assert.Error(runner.T(), err)
require.NoError(runner.T(), err)
assert.Contains(runner.T(), string(resp), "not supported")
}

// tests beat setup ilm-policy
// On serverless, this should fail
func (runner *BeatRunner) TestExportILMFail() {
// the export command doesn't actually make a network connection,
// so this won't fail
func (runner *BeatRunner) TestExport() {
ctx, cancel := context.WithTimeout(context.Background(), time.Minute)
defer cancel()
info, err := tools.GetPing(ctx, runner.requirementsInfo.ESClient)
Expand All @@ -271,10 +323,15 @@ func (runner *BeatRunner) TestExportILMFail() {

resp, err := runner.agentFixture.Exec(ctx, []string{"--path.home",
runner.agentFixture.WorkDir(),
"export", "ilm-policy"})
runner.T().Logf("got response from management setup: %s", string(resp))
assert.Error(runner.T(), err)
assert.Contains(runner.T(), string(resp), "not supported")
"export", "ilm-policy", "--E=setup.ilm.overwrite=true"})
runner.T().Logf("got response from export: %s", string(resp))
assert.NoError(runner.T(), err)
// check to see if we got a valid output
policy := map[string]interface{}{}
err = json.Unmarshal(resp, &policy)
require.NoError(runner.T(), err)

require.NotEmpty(runner.T(), policy["policy"])

}

Expand Down

0 comments on commit 5611ff9

Please sign in to comment.