From 6d45760933fab27db8829f0d36999205b39c2dd0 Mon Sep 17 00:00:00 2001 From: Luke Winikates Date: Thu, 12 Oct 2023 09:21:40 -0700 Subject: [PATCH] refactor: simplify Reporter interface - add integration test for sending events - add live tests --- internal/interfaces.go | 1 - internal/lines.go | 10 +---- internal/reporter.go | 19 +++++----- senders/integration_test.go | 27 ++++++++++++-- senders/live_test.go | 28 ++++++++++++++ senders/test_server_test.go | 74 +++++++++++++++++++++++++------------ 6 files changed, 113 insertions(+), 46 deletions(-) diff --git a/internal/interfaces.go b/internal/interfaces.go index f821144..9f001bd 100644 --- a/internal/interfaces.go +++ b/internal/interfaces.go @@ -7,7 +7,6 @@ import "net/http" // Reporter is an interface for reporting data to a Wavefront service. type Reporter interface { Report(format string, pointLines string) (*http.Response, error) - ReportEvent(event string) (*http.Response, error) } type Flusher interface { diff --git a/internal/lines.go b/internal/lines.go index b2b0cba..b8c6efa 100644 --- a/internal/lines.go +++ b/internal/lines.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "log" - "net/http" "strings" "sync" "sync/atomic" @@ -180,14 +179,7 @@ func (lh *RealLineHandler) FlushAll() error { func (lh *RealLineHandler) report(lines []string) error { strLines := strings.Join(lines, "") - var resp *http.Response - var err error - - if lh.Format == eventFormat { - resp, err = lh.Reporter.ReportEvent(strLines) - } else { - resp, err = lh.Reporter.Report(lh.Format, strLines) - } + resp, err := lh.Reporter.Report(lh.Format, strLines) if err != nil { if shouldRetry(err) { diff --git a/internal/reporter.go b/internal/reporter.go index bd35578..e297099 100644 --- a/internal/reporter.go +++ b/internal/reporter.go @@ -32,14 +32,18 @@ func (reporter reporter) Report(format string, pointLines string) (*http.Respons return nil, formatError } + if format == eventFormat { + return reporter.reportEvent(pointLines) + } + requestBody, err := linesToGzippedBytes(pointLines) if err != nil { - return &http.Response{}, err + return nil, err } req, err := reporter.buildRequest(format, requestBody) if err != nil { - return &http.Response{}, err + return nil, err } return reporter.execute(req) @@ -80,7 +84,7 @@ func (reporter reporter) buildRequest(format string, body []byte) (*http.Request return req, nil } -func (reporter reporter) ReportEvent(event string) (*http.Response, error) { +func (reporter reporter) reportEvent(event string) (*http.Response, error) { if event == "" { return nil, formatError } @@ -88,15 +92,12 @@ func (reporter reporter) ReportEvent(event string) (*http.Response, error) { apiURL := reporter.serverURL + eventEndpoint req, err := http.NewRequest("POST", apiURL, strings.NewReader(event)) if err != nil { - return &http.Response{}, err + return nil, err } - req.Header.Set(contentType, applicationJSON) - if reporter.IsDirect() { - req.Header.Set(contentEncoding, gzipFormat) + req.Header.Set(contentType, applicationJSON) } - err = reporter.tokenService.Authorize(req) if err != nil { return nil, err @@ -108,7 +109,7 @@ func (reporter reporter) ReportEvent(event string) (*http.Response, error) { func (reporter reporter) execute(req *http.Request) (*http.Response, error) { resp, err := reporter.client.Do(req) if err != nil { - return resp, err + return nil, err } _, _ = io.Copy(io.Discard, resp.Body) defer resp.Body.Close() diff --git a/senders/integration_test.go b/senders/integration_test.go index 801a2b7..9784ba6 100644 --- a/senders/integration_test.go +++ b/senders/integration_test.go @@ -14,11 +14,14 @@ func TestEndToEnd(t *testing.T) { sender, err := NewSender(testServer.URL) require.NoError(t, err) require.NoError(t, sender.SendMetric("my metric", 20, 0, "localhost", nil)) + require.NoError(t, sender.SendEvent("dramatic event", 20, 0, "localhost", nil)) require.NoError(t, sender.Flush()) assert.Equal(t, 1, len(testServer.MetricLines)) assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0]) - assert.Equal(t, "/report?f=wavefront", testServer.LastRequestURL) + assert.Equal(t, "@Event 20000 20001 \"dramatic event\" host=\"localhost\"", testServer.EventLines[0]) + assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0]) + assert.Equal(t, "/api/v2/event", testServer.RequestURLs[1]) } func TestEndToEndWithPath(t *testing.T) { @@ -31,7 +34,7 @@ func TestEndToEndWithPath(t *testing.T) { assert.Equal(t, 1, len(testServer.MetricLines)) assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0]) - assert.Equal(t, "/test-path/report?f=wavefront", testServer.LastRequestURL) + assert.Equal(t, "/test-path/report?f=wavefront", testServer.RequestURLs[0]) } func TestTLSEndToEnd(t *testing.T) { @@ -63,7 +66,7 @@ func TestEndToEndWithInternalMetrics(t *testing.T) { assert.Equal(t, true, testServer.hasReceivedLine("points.valid")) assert.Equal(t, 12, len(metricLines)) assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", metricLines[0]) - assert.Equal(t, "/report?f=wavefront", testServer.LastRequestURL) + assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0]) } func TestEndToEndWithoutInternalMetrics(t *testing.T) { @@ -80,5 +83,21 @@ func TestEndToEndWithoutInternalMetrics(t *testing.T) { assert.Equal(t, false, testServer.hasReceivedLine("points.valid")) assert.Equal(t, 1, len(metricLines)) assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", metricLines[0]) - assert.Equal(t, "/report?f=wavefront", testServer.LastRequestURL) + assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0]) +} + +func TestEndToEndDirect(t *testing.T) { + testServer := startTestServer(false) + defer testServer.Close() + sender, err := NewSender(testServer.URL, APIToken("just make it look direct")) + require.NoError(t, err) + require.NoError(t, sender.SendMetric("my metric", 20, 0, "localhost", nil)) + require.NoError(t, sender.SendEvent("dramatic event", 20, 0, "localhost", nil)) + require.NoError(t, sender.Flush()) + + assert.Equal(t, 1, len(testServer.MetricLines)) + assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0]) + assert.Equal(t, `{"annotations":{},"endTime":20001,"hosts":["localhost"],"name":"dramatic event","startTime":20000}`, testServer.EventLines[0]) + assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0]) + assert.Equal(t, "/api/v2/event", testServer.RequestURLs[1]) } diff --git a/senders/live_test.go b/senders/live_test.go index f911549..5294566 100644 --- a/senders/live_test.go +++ b/senders/live_test.go @@ -3,6 +3,7 @@ package senders import ( "os" "testing" + "time" "github.com/stretchr/testify/assert" ) @@ -57,3 +58,30 @@ func TestWF_API_TOKEN_LIVE(t *testing.T) { assert.NoError(t, sender.Flush()) sender.Close() } + +func TestEventDirectSend_LIVE(t *testing.T) { + skipUnlessVarsAreSet(t) + + sender, err := NewSender( + os.Getenv("LIVE_TEST_HOST"), + APIToken(os.Getenv("LIVE_TEST_WF_API_TOKEN")), + ) + assert.NoError(t, err) + assert.NoError(t, sender.SendEvent("test.an-event", time.Now().Add(-30*time.Minute).UnixMilli(), time.Now().Add(5*time.Minute).UnixMilli(), "go test", + map[string]string{"scenario": "send-event-direct"})) + assert.NoError(t, sender.Flush()) + sender.Close() +} + +func TestEventProxySend_LIVE(t *testing.T) { + skipUnlessVarsAreSet(t) + + sender, err := NewSender( + os.Getenv("LIVE_TEST_PROXY_HOST"), + ) + assert.NoError(t, err) + assert.NoError(t, sender.SendEvent("test.an-event", time.Now().Add(-30*time.Minute).UnixMilli(), time.Now().Add(5*time.Minute).UnixMilli(), "go test", + map[string]string{"scenario": "send-event-proxy"})) + assert.NoError(t, sender.Flush()) + sender.Close() +} diff --git a/senders/test_server_test.go b/senders/test_server_test.go index 3a0c8ae..7150c9b 100644 --- a/senders/test_server_test.go +++ b/senders/test_server_test.go @@ -5,65 +5,93 @@ import ( "compress/gzip" "crypto/tls" "crypto/x509" + "fmt" + "io" "net/http" "net/http/httptest" "strings" ) func startTestServer(useTLS bool) *testServer { - handler := &testServer{} + ts := &testServer{} + handler := http.NewServeMux() + handler.HandleFunc("/api/v2/event", ts.EventAPIEndpoint) + handler.HandleFunc("/", ts.ReportEndpoint) if useTLS { - handler.httpServer = httptest.NewTLSServer(handler) + ts.httpServer = httptest.NewTLSServer(handler) } else { - handler.httpServer = httptest.NewServer(handler) + ts.httpServer = httptest.NewServer(handler) } - handler.URL = handler.httpServer.URL - return handler + ts.URL = ts.httpServer.URL + return ts } type testServer struct { - MetricLines []string - AuthHeaders []string - httpServer *httptest.Server - URL string - LastRequestURL string + MetricLines []string + EventLines []string + AuthHeaders []string + httpServer *httptest.Server + URL string + RequestURLs []string } func (s *testServer) TLSConfig() *tls.Config { - certpool := x509.NewCertPool() - certpool.AddCert(s.httpServer.Certificate()) + certPool := x509.NewCertPool() + certPool.AddCert(s.httpServer.Certificate()) return &tls.Config{ - RootCAs: certpool, + RootCAs: certPool, } } -func (s *testServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) { - newLines, err := decodeMetricLines(request) +func (s *testServer) ReportEndpoint(writer http.ResponseWriter, request *http.Request) { + newLines, err := decodeLines(request) if err != nil { writer.WriteHeader(500) + return } s.MetricLines = append(s.MetricLines, newLines...) s.AuthHeaders = append(s.AuthHeaders, request.Header.Get("Authorization")) - s.LastRequestURL = request.URL.String() + s.RequestURLs = append(s.RequestURLs, request.URL.String()) writer.WriteHeader(200) } -func decodeMetricLines(request *http.Request) ([]string, error) { - var metricLines []string - reader, err := gzip.NewReader(request.Body) +func (s *testServer) EventAPIEndpoint(writer http.ResponseWriter, request *http.Request) { + fmt.Println(request.URL.Path) + newLines, err := decodeLines(request) if err != nil { - return metricLines, err + writer.WriteHeader(500) + return } - scanner := bufio.NewScanner(reader) + s.EventLines = append(s.EventLines, newLines...) + s.AuthHeaders = append(s.AuthHeaders, request.Header.Get("Authorization")) + s.RequestURLs = append(s.RequestURLs, request.URL.String()) + writer.WriteHeader(200) +} + +func decodeLines(request *http.Request) ([]string, error) { + var metricLines []string + var bodyReader io.Reader + defer request.Body.Close() + if request.Header.Get("Content-Encoding") == "gzip" { + r, err := gzip.NewReader(request.Body) + if err != nil { + return metricLines, err + } + defer r.Close() + bodyReader = r + } else { + bodyReader = request.Body + } + + scanner := bufio.NewScanner(bodyReader) for scanner.Scan() { line := scanner.Text() metricLines = append(metricLines, line) } if scanner.Err() != nil { - reader.Close() return metricLines, scanner.Err() } - return metricLines, reader.Close() + return metricLines, nil } func (s *testServer) Close() {