Skip to content

Commit

Permalink
refactor: simplify Reporter interface
Browse files Browse the repository at this point in the history
 - add integration test for sending events
 - add live tests
  • Loading branch information
LukeWinikates committed Oct 26, 2023
1 parent 2c849a8 commit 6d45760
Show file tree
Hide file tree
Showing 6 changed files with 113 additions and 46 deletions.
1 change: 0 additions & 1 deletion internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import "net/http"
// Reporter is an interface for reporting data to a Wavefront service.
type Reporter interface {
Report(format string, pointLines string) (*http.Response, error)
ReportEvent(event string) (*http.Response, error)
}

type Flusher interface {
Expand Down
10 changes: 1 addition & 9 deletions internal/lines.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"errors"
"fmt"
"log"
"net/http"
"strings"
"sync"
"sync/atomic"
Expand Down Expand Up @@ -180,14 +179,7 @@ func (lh *RealLineHandler) FlushAll() error {

func (lh *RealLineHandler) report(lines []string) error {
strLines := strings.Join(lines, "")
var resp *http.Response
var err error

if lh.Format == eventFormat {
resp, err = lh.Reporter.ReportEvent(strLines)
} else {
resp, err = lh.Reporter.Report(lh.Format, strLines)
}
resp, err := lh.Reporter.Report(lh.Format, strLines)

if err != nil {
if shouldRetry(err) {
Expand Down
19 changes: 10 additions & 9 deletions internal/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,18 @@ func (reporter reporter) Report(format string, pointLines string) (*http.Respons
return nil, formatError
}

if format == eventFormat {
return reporter.reportEvent(pointLines)
}

requestBody, err := linesToGzippedBytes(pointLines)
if err != nil {
return &http.Response{}, err
return nil, err
}

req, err := reporter.buildRequest(format, requestBody)
if err != nil {
return &http.Response{}, err
return nil, err
}

return reporter.execute(req)
Expand Down Expand Up @@ -80,23 +84,20 @@ func (reporter reporter) buildRequest(format string, body []byte) (*http.Request
return req, nil
}

func (reporter reporter) ReportEvent(event string) (*http.Response, error) {
func (reporter reporter) reportEvent(event string) (*http.Response, error) {
if event == "" {
return nil, formatError
}

apiURL := reporter.serverURL + eventEndpoint
req, err := http.NewRequest("POST", apiURL, strings.NewReader(event))
if err != nil {
return &http.Response{}, err
return nil, err
}

req.Header.Set(contentType, applicationJSON)

if reporter.IsDirect() {
req.Header.Set(contentEncoding, gzipFormat)
req.Header.Set(contentType, applicationJSON)
}

err = reporter.tokenService.Authorize(req)
if err != nil {
return nil, err
Expand All @@ -108,7 +109,7 @@ func (reporter reporter) ReportEvent(event string) (*http.Response, error) {
func (reporter reporter) execute(req *http.Request) (*http.Response, error) {
resp, err := reporter.client.Do(req)
if err != nil {
return resp, err
return nil, err
}
_, _ = io.Copy(io.Discard, resp.Body)
defer resp.Body.Close()
Expand Down
27 changes: 23 additions & 4 deletions senders/integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,14 @@ func TestEndToEnd(t *testing.T) {
sender, err := NewSender(testServer.URL)
require.NoError(t, err)
require.NoError(t, sender.SendMetric("my metric", 20, 0, "localhost", nil))
require.NoError(t, sender.SendEvent("dramatic event", 20, 0, "localhost", nil))
require.NoError(t, sender.Flush())

assert.Equal(t, 1, len(testServer.MetricLines))
assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0])
assert.Equal(t, "/report?f=wavefront", testServer.LastRequestURL)
assert.Equal(t, "@Event 20000 20001 \"dramatic event\" host=\"localhost\"", testServer.EventLines[0])
assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0])
assert.Equal(t, "/api/v2/event", testServer.RequestURLs[1])
}

func TestEndToEndWithPath(t *testing.T) {
Expand All @@ -31,7 +34,7 @@ func TestEndToEndWithPath(t *testing.T) {

assert.Equal(t, 1, len(testServer.MetricLines))
assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0])
assert.Equal(t, "/test-path/report?f=wavefront", testServer.LastRequestURL)
assert.Equal(t, "/test-path/report?f=wavefront", testServer.RequestURLs[0])
}

func TestTLSEndToEnd(t *testing.T) {
Expand Down Expand Up @@ -63,7 +66,7 @@ func TestEndToEndWithInternalMetrics(t *testing.T) {
assert.Equal(t, true, testServer.hasReceivedLine("points.valid"))
assert.Equal(t, 12, len(metricLines))
assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", metricLines[0])
assert.Equal(t, "/report?f=wavefront", testServer.LastRequestURL)
assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0])
}

func TestEndToEndWithoutInternalMetrics(t *testing.T) {
Expand All @@ -80,5 +83,21 @@ func TestEndToEndWithoutInternalMetrics(t *testing.T) {
assert.Equal(t, false, testServer.hasReceivedLine("points.valid"))
assert.Equal(t, 1, len(metricLines))
assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", metricLines[0])
assert.Equal(t, "/report?f=wavefront", testServer.LastRequestURL)
assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0])
}

func TestEndToEndDirect(t *testing.T) {
testServer := startTestServer(false)
defer testServer.Close()
sender, err := NewSender(testServer.URL, APIToken("just make it look direct"))
require.NoError(t, err)
require.NoError(t, sender.SendMetric("my metric", 20, 0, "localhost", nil))
require.NoError(t, sender.SendEvent("dramatic event", 20, 0, "localhost", nil))
require.NoError(t, sender.Flush())

assert.Equal(t, 1, len(testServer.MetricLines))
assert.Equal(t, "\"my-metric\" 20 source=\"localhost\"", testServer.MetricLines[0])
assert.Equal(t, `{"annotations":{},"endTime":20001,"hosts":["localhost"],"name":"dramatic event","startTime":20000}`, testServer.EventLines[0])
assert.Equal(t, "/report?f=wavefront", testServer.RequestURLs[0])
assert.Equal(t, "/api/v2/event", testServer.RequestURLs[1])
}
28 changes: 28 additions & 0 deletions senders/live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package senders
import (
"os"
"testing"
"time"

"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -57,3 +58,30 @@ func TestWF_API_TOKEN_LIVE(t *testing.T) {
assert.NoError(t, sender.Flush())
sender.Close()
}

func TestEventDirectSend_LIVE(t *testing.T) {
skipUnlessVarsAreSet(t)

sender, err := NewSender(
os.Getenv("LIVE_TEST_HOST"),
APIToken(os.Getenv("LIVE_TEST_WF_API_TOKEN")),
)
assert.NoError(t, err)
assert.NoError(t, sender.SendEvent("test.an-event", time.Now().Add(-30*time.Minute).UnixMilli(), time.Now().Add(5*time.Minute).UnixMilli(), "go test",
map[string]string{"scenario": "send-event-direct"}))
assert.NoError(t, sender.Flush())
sender.Close()
}

func TestEventProxySend_LIVE(t *testing.T) {
skipUnlessVarsAreSet(t)

sender, err := NewSender(
os.Getenv("LIVE_TEST_PROXY_HOST"),
)
assert.NoError(t, err)
assert.NoError(t, sender.SendEvent("test.an-event", time.Now().Add(-30*time.Minute).UnixMilli(), time.Now().Add(5*time.Minute).UnixMilli(), "go test",
map[string]string{"scenario": "send-event-proxy"}))
assert.NoError(t, sender.Flush())
sender.Close()
}
74 changes: 51 additions & 23 deletions senders/test_server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,65 +5,93 @@ import (
"compress/gzip"
"crypto/tls"
"crypto/x509"
"fmt"
"io"
"net/http"
"net/http/httptest"
"strings"
)

func startTestServer(useTLS bool) *testServer {
handler := &testServer{}
ts := &testServer{}
handler := http.NewServeMux()
handler.HandleFunc("/api/v2/event", ts.EventAPIEndpoint)
handler.HandleFunc("/", ts.ReportEndpoint)
if useTLS {
handler.httpServer = httptest.NewTLSServer(handler)
ts.httpServer = httptest.NewTLSServer(handler)
} else {
handler.httpServer = httptest.NewServer(handler)
ts.httpServer = httptest.NewServer(handler)
}
handler.URL = handler.httpServer.URL
return handler
ts.URL = ts.httpServer.URL
return ts
}

type testServer struct {
MetricLines []string
AuthHeaders []string
httpServer *httptest.Server
URL string
LastRequestURL string
MetricLines []string
EventLines []string
AuthHeaders []string
httpServer *httptest.Server
URL string
RequestURLs []string
}

func (s *testServer) TLSConfig() *tls.Config {
certpool := x509.NewCertPool()
certpool.AddCert(s.httpServer.Certificate())
certPool := x509.NewCertPool()
certPool.AddCert(s.httpServer.Certificate())
return &tls.Config{
RootCAs: certpool,
RootCAs: certPool,
}
}

func (s *testServer) ServeHTTP(writer http.ResponseWriter, request *http.Request) {
newLines, err := decodeMetricLines(request)
func (s *testServer) ReportEndpoint(writer http.ResponseWriter, request *http.Request) {
newLines, err := decodeLines(request)
if err != nil {
writer.WriteHeader(500)
return
}
s.MetricLines = append(s.MetricLines, newLines...)
s.AuthHeaders = append(s.AuthHeaders, request.Header.Get("Authorization"))
s.LastRequestURL = request.URL.String()
s.RequestURLs = append(s.RequestURLs, request.URL.String())
writer.WriteHeader(200)
}

func decodeMetricLines(request *http.Request) ([]string, error) {
var metricLines []string
reader, err := gzip.NewReader(request.Body)
func (s *testServer) EventAPIEndpoint(writer http.ResponseWriter, request *http.Request) {
fmt.Println(request.URL.Path)
newLines, err := decodeLines(request)
if err != nil {
return metricLines, err
writer.WriteHeader(500)
return
}
scanner := bufio.NewScanner(reader)
s.EventLines = append(s.EventLines, newLines...)
s.AuthHeaders = append(s.AuthHeaders, request.Header.Get("Authorization"))
s.RequestURLs = append(s.RequestURLs, request.URL.String())
writer.WriteHeader(200)
}

func decodeLines(request *http.Request) ([]string, error) {
var metricLines []string
var bodyReader io.Reader
defer request.Body.Close()
if request.Header.Get("Content-Encoding") == "gzip" {
r, err := gzip.NewReader(request.Body)
if err != nil {
return metricLines, err
}
defer r.Close()
bodyReader = r
} else {
bodyReader = request.Body
}

scanner := bufio.NewScanner(bodyReader)
for scanner.Scan() {
line := scanner.Text()
metricLines = append(metricLines, line)
}
if scanner.Err() != nil {
reader.Close()
return metricLines, scanner.Err()
}
return metricLines, reader.Close()
return metricLines, nil
}

func (s *testServer) Close() {
Expand Down

0 comments on commit 6d45760

Please sign in to comment.