From 32011508d41acdd905e2011e22a908a470063886 Mon Sep 17 00:00:00 2001 From: michel-laterman Date: Thu, 18 Jan 2024 12:25:56 -0600 Subject: [PATCH] Add e2e test for timeout and 429 on startup --- testing/e2e/stand_alone_test.go | 95 +++++++++++++++++++ testing/e2e/statusProxy.go | 73 ++++++++++++++ .../e2e/testdata/stand-alone-http-proxy.tpl | 13 +++ 3 files changed, 181 insertions(+) create mode 100644 testing/e2e/statusProxy.go create mode 100644 testing/e2e/testdata/stand-alone-http-proxy.tpl diff --git a/testing/e2e/stand_alone_test.go b/testing/e2e/stand_alone_test.go index 82dde008f..e91c0303e 100644 --- a/testing/e2e/stand_alone_test.go +++ b/testing/e2e/stand_alone_test.go @@ -10,6 +10,7 @@ import ( "context" "fmt" "html/template" + "net/http/httptest" "os" "os/exec" "path/filepath" @@ -242,3 +243,97 @@ func (suite *StandAloneSuite) TestStaticTokenAuthentication() { ) tester.Enroll(ctx, enrollmentToken) } + +// TestElasticsearch429OnStartup will check to ensure fleet-server functions as expected (does not crash) +// if Elasticsearch returns 429s on startup. +func (suite *StandAloneSuite) TestElasticsearch429OnStartup() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + + // Create a proxy that returns 429s + proxy := NewStatusProxy(suite.T(), 429) + proxy.Enable() + server := httptest.NewServer(proxy) + + // Create a config file from a template in the test temp dir + dir := suite.T().TempDir() + tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-http-proxy.tpl")) + suite.Require().NoError(err) + f, err := os.Create(filepath.Join(dir, "config.yml")) + suite.Require().NoError(err) + err = tpl.Execute(f, map[string]string{ + "Hosts": suite.ESHosts, + "ServiceToken": suite.ServiceToken, + "Proxy": server.URL, + }) + f.Close() + suite.Require().NoError(err) + + // Run the fleet-server binary + cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml")) + //cmd.Stderr = os.Stderr // NOTE: This can be uncommented to put out logs + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGTERM) + } + cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath} + suite.T().Log("Starting fleet-server") + err = cmd.Start() + suite.Require().NoError(err) + + // FIXME timeout to make sure fleet-server has started + time.Sleep(5 * time.Second) + suite.T().Log("Checking fleet-server status") + // Wait to check that it is Starting. + suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateStarting) // fleet-server returns 503:starting if upstream ES returns 429. + + // Disable proxy and ensure fleet-server recovers + suite.T().Log("Disable proxy") + proxy.Disable() + suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateHealthy) + + cancel() + cmd.Wait() +} + +// TestElasticsearchTimeoutOnStartup will check to ensure fleet-server functions as expected (does not crash) +// if Elasticsearch times out on startup. +func (suite *StandAloneSuite) TestElasticsearchTimeoutOnStartup() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + + proxy, err := suite.StartToxiproxy(ctx).CreateProxy("es", "localhost:0", suite.ESHosts) + suite.Require().NoError(err) + _, err = proxy.AddToxic("force_timeout", "timeout", "upstream", 1.0, toxiproxy.Attributes{}) + suite.Require().NoError(err) + + // Create a config file from a template in the test temp dir + dir := suite.T().TempDir() + tpl, err := template.ParseFiles(filepath.Join("testdata", "stand-alone-http.tpl")) + suite.Require().NoError(err) + f, err := os.Create(filepath.Join(dir, "config.yml")) + suite.Require().NoError(err) + err = tpl.Execute(f, map[string]string{ + "Hosts": "http://" + proxy.Listen, + "ServiceToken": suite.ServiceToken, + }) + f.Close() + suite.Require().NoError(err) + + // Run the fleet-server binary + cmd := exec.CommandContext(ctx, suite.binaryPath, "-c", filepath.Join(dir, "config.yml")) + cmd.Cancel = func() error { + return cmd.Process.Signal(syscall.SIGTERM) + } + cmd.Env = []string{"GOCOVERDIR=" + suite.CoverPath} + err = cmd.Start() + suite.Require().NoError(err) + + // Provoke timeouts, fleet-server should be stuck in the starting state + suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateStarting) + + // Recover the network and wait for the healthcheck to be healthy again. + err = proxy.RemoveToxic("force_timeout") + suite.Require().NoError(err) + suite.FleetServerStatusIs(ctx, "http://localhost:8220", client.UnitStateHealthy) + + cancel() + cmd.Wait() +} diff --git a/testing/e2e/statusProxy.go b/testing/e2e/statusProxy.go new file mode 100644 index 000000000..e348fc1e5 --- /dev/null +++ b/testing/e2e/statusProxy.go @@ -0,0 +1,73 @@ +// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one +// or more contributor license agreements. Licensed under the Elastic License; +// you may not use this file except in compliance with the Elastic License. + +//go:build e2e + +package e2e + +import ( + "context" + "errors" + "io" + "net" + "net/http" + "sync/atomic" + "testing" +) + +type statusProxy struct { + t testing.TB + enabled *atomic.Bool + status int +} + +// Create a new Status Proxy +// If the proxy is enabled, the passed status is returned for all requiests +// If it's disabled requests are made to upstream instead. +func NewStatusProxy(t testing.TB, status int) *statusProxy { + t.Helper() + s := &statusProxy{ + t: t, + enabled: new(atomic.Bool), + status: status, + } + return s +} + +func (s *statusProxy) Enable() { + s.enabled.Store(true) +} + +func (s *statusProxy) Disable() { + s.enabled.Store(false) +} + +func (s *statusProxy) ServeHTTP(wr http.ResponseWriter, req *http.Request) { + if s.enabled.Load() { + wr.WriteHeader(s.status) + return + } + req.RequestURI = "" + + if cIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil { + req.Header.Set("X-Forwarded-For", cIP) + } + + resp, err := (&http.Client{}).Do(req) + if err != nil { + if errors.Is(err, context.Canceled) { + return + } + s.t.Fatal(err) + return + } + defer resp.Body.Close() + for name, values := range resp.Header { + for _, value := range values { + wr.Header().Add(name, value) + } + } + wr.WriteHeader(resp.StatusCode) + io.Copy(wr, resp.Body) +} diff --git a/testing/e2e/testdata/stand-alone-http-proxy.tpl b/testing/e2e/testdata/stand-alone-http-proxy.tpl new file mode 100644 index 000000000..0a4cdc41f --- /dev/null +++ b/testing/e2e/testdata/stand-alone-http-proxy.tpl @@ -0,0 +1,13 @@ +output: + elasticsearch: + hosts: {{ .Hosts }} + service_token: {{ .ServiceToken }} + proxy_url: {{ .Proxy }} + +fleet.agent.id: e2e-test-id + +inputs: +- type: fleet-server + +logging: + to_stderr: true