From d032aee660cf33b582708abd710d3d385c46f18a Mon Sep 17 00:00:00 2001 From: Luke Winikates Date: Tue, 24 Oct 2023 09:33:52 -0700 Subject: [PATCH] chore: refactor background flushing --- Makefile | 2 +- internal/background_flusher.go | 55 +++++ internal/handler_factory.go | 4 +- internal/interfaces.go | 2 + internal/lines_test.go | 129 ----------- internal/{lines.go => real_line_handler.go} | 107 +++++----- internal/real_line_handler_test.go | 224 ++++++++++++++++++++ senders/wavefront_sender_test.go | 8 + 8 files changed, 345 insertions(+), 186 deletions(-) create mode 100644 internal/background_flusher.go delete mode 100644 internal/lines_test.go rename internal/{lines.go => real_line_handler.go} (68%) create mode 100644 internal/real_line_handler_test.go diff --git a/Makefile b/Makefile index 9df7563..2023e5e 100644 --- a/Makefile +++ b/Makefile @@ -3,7 +3,7 @@ all: test lint test: - go test -timeout 10m -v -race ./... + go test -timeout 1m -v -race ./... go vet ./... godoc: diff --git a/internal/background_flusher.go b/internal/background_flusher.go new file mode 100644 index 0000000..c14fea9 --- /dev/null +++ b/internal/background_flusher.go @@ -0,0 +1,55 @@ +package internal + +import ( + "log" + "time" +) + +type BackgroundFlusher interface { + Start() + Stop() +} + +type backgroundFlusher struct { + ticker *time.Ticker + interval time.Duration + handler LineHandler + stop chan struct{} +} + +func NewBackgroundFlusher(interval time.Duration, handler LineHandler) BackgroundFlusher { + return &backgroundFlusher{ + interval: interval, + handler: handler, + stop: make(chan struct{}), + } +} + +func (f *backgroundFlusher) Start() { + format := f.handler.Format() + if f.ticker != nil { + return + } + f.ticker = time.NewTicker(f.interval) + go func() { + for { + select { + case tick := <-f.ticker.C: + log.Printf("%s -- flushing at: %s\n", format, tick) + err := f.handler.FlushWithThrottling() + if err != nil { + log.Printf("%s -- error during background flush: %s\n", format, err.Error()) + } else { + log.Printf("%s -- flush completed at %s\n", format, time.Now()) + } + case <-f.stop: + return + } + } + }() +} + +func (f *backgroundFlusher) Stop() { + f.ticker.Stop() + f.stop <- struct{}{} +} diff --git a/internal/handler_factory.go b/internal/handler_factory.go index bce4736..e2482d1 100644 --- a/internal/handler_factory.go +++ b/internal/handler_factory.go @@ -80,7 +80,7 @@ func (f *HandlerFactory) NewSpanLogHandler(batchSize int) *RealLineHandler { } // NewEventHandler creates a RealLineHandler for the Event type -// The Event handler always sets "SetLockOnThrottledError" to true +// The Event handler always sets "ThrottleRequestsOnBackpressure" to true // And always uses a batch size of exactly 1. func (f *HandlerFactory) NewEventHandler() *RealLineHandler { return NewLineHandler( @@ -91,6 +91,6 @@ func (f *HandlerFactory) NewEventHandler() *RealLineHandler { f.bufferSize, append(f.lineHandlerOptions, SetHandlerPrefix("events"), - SetLockOnThrottledError(true))..., + ThrottleRequestsOnBackpressure())..., ) } diff --git a/internal/interfaces.go b/internal/interfaces.go index 9f001bd..1b99c60 100644 --- a/internal/interfaces.go +++ b/internal/interfaces.go @@ -29,7 +29,9 @@ type LineHandler interface { Start() Stop() Flush() error + FlushWithThrottling() error GetFailureCount() int64 + Format() string } const ( diff --git a/internal/lines_test.go b/internal/lines_test.go deleted file mode 100644 index 8519dda..0000000 --- a/internal/lines_test.go +++ /dev/null @@ -1,129 +0,0 @@ -package internal - -import ( - "fmt" - "net/http" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/wavefronthq/wavefront-sdk-go/internal/auth" -) - -type fakeReporter struct { - errorCode int - error error -} - -func (reporter *fakeReporter) Report(string, string) (*http.Response, error) { - if reporter.error != nil { - return nil, reporter.error - } - if reporter.errorCode != 0 { - return &http.Response{StatusCode: reporter.errorCode}, nil - } - return &http.Response{StatusCode: 200}, nil -} - -func (reporter *fakeReporter) ReportEvent(string) (*http.Response, error) { - return &http.Response{StatusCode: 200}, nil -} - -func TestCapacity(t *testing.T) { - lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 - checkLength(lh.buffer, 0, "non-empty lines length", t) - - addLines(lh, 100, 100, t) - err := lh.HandleLine("dummyLine") - if err == nil { - t.Errorf("buffer capacity exceeded but no error") - } -} - -func TestBufferLines(t *testing.T) { - lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 - checkLength(lh.buffer, 0, "non-empty lines length", t) - - addLines(lh, 90, 90, t) - buf := makeBuffer(50) - lh.bufferLines(buf) - checkLength(lh.buffer, 100, "error buffering lines", t) - - // clear lines - lh.buffer = make(chan string, 100) - checkLength(lh.buffer, 0, "error clearing lines", t) - - addLines(lh, 90, 90, t) - buf = makeBuffer(5) - lh.bufferLines(buf) - checkLength(lh.buffer, 95, "error buffering lines", t) -} - -func TestHandleLine_OnAuthError_DoNotBuffer(t *testing.T) { - lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 - lh.Reporter = &fakeReporter{ - error: auth.NewAuthError(fmt.Errorf("fake auth error that shouldn't be buffered")), - } - assert.NoError(t, lh.HandleLine("this is a metric, but CSP is down, or my credentials are wrong")) - assert.Error(t, lh.Flush()) - checkLength(lh.buffer, 0, "", t) - lh.Reporter = &fakeReporter{ - error: fmt.Errorf("error that should be buffered"), - } - assert.NoError(t, lh.HandleLine("this is a metric, but it was a network timeout or something like that")) - assert.Error(t, lh.Flush()) - checkLength(lh.buffer, 1, "", t) -} - -func TestFlush(t *testing.T) { - lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 - - addLines(lh, 100, 100, t) - assert.NoError(t, lh.Flush()) - assert.Equal(t, 90, len(lh.buffer), "error flushing lines") - - e := fmt.Errorf("error reporting points") - lh.Reporter = &fakeReporter{error: e} - assert.Error(t, lh.Flush()) - assert.Equal(t, 90, len(lh.buffer), "error flushing lines") - - lh.Reporter = &fakeReporter{} - lh.buffer = make(chan string, 100) - addLines(lh, 5, 5, t) - assert.NoError(t, lh.Flush()) - assert.Equal(t, 0, len(lh.buffer), "error flushing lines") -} - -func checkLength(buffer chan string, length int, msg string, t *testing.T) { - if len(buffer) != length { - t.Errorf("%s. expected: %d actual: %d", msg, length, len(buffer)) - } -} - -func addLines(lh *RealLineHandler, linesToAdd int, expectedLen int, t *testing.T) { - for i := 0; i < linesToAdd; i++ { - err := lh.HandleLine("dummyLine") - if err != nil { - t.Error(err) - } - } - if len(lh.buffer) != expectedLen { - t.Errorf("error adding lines. expected: %d actual: %d", expectedLen, len(lh.buffer)) - } -} - -func makeBuffer(num int) []string { - buf := make([]string, num) - for i := 0; i < num; i++ { - buf[i] = "dummyLine" - } - return buf -} - -func makeLineHandler(bufSize, batchSize int) *RealLineHandler { - return &RealLineHandler{ - Reporter: &fakeReporter{}, - MaxBufferSize: bufSize, - BatchSize: batchSize, - buffer: make(chan string, bufSize), - } -} diff --git a/internal/lines.go b/internal/real_line_handler.go similarity index 68% rename from internal/lines.go rename to internal/real_line_handler.go index b8c6efa..401ab06 100644 --- a/internal/lines.go +++ b/internal/real_line_handler.go @@ -14,11 +14,12 @@ import ( ) const ( - metricFormat = "wavefront" - histogramFormat = "histogram" - traceFormat = "trace" - spanLogsFormat = "spanLogs" - eventFormat = "event" + metricFormat = "wavefront" + histogramFormat = "histogram" + traceFormat = "trace" + spanLogsFormat = "spanLogs" + eventFormat = "event" + defaultThrottledSleepDuration = time.Second * 30 ) type RealLineHandler struct { @@ -32,20 +33,23 @@ type RealLineHandler struct { Reporter Reporter BatchSize int MaxBufferSize int - Format string - flushTicker *time.Ticker + format string - internalRegistry sdkmetrics.Registry - prefix string + internalRegistry sdkmetrics.Registry + prefix string + throttleOnBackpressure bool + throttledSleepDuration time.Duration + mtx sync.Mutex - mtx sync.Mutex - lockOnErrThrottled bool + buffer chan string + flusher BackgroundFlusher + resumeAt time.Time +} - buffer chan string - done chan struct{} +func (lh *RealLineHandler) Format() string { + return lh.format } -var throttledSleepDuration = time.Second * 30 var errThrottled = errors.New("error: throttled event creation") type LineHandlerOption func(*RealLineHandler) @@ -62,22 +66,24 @@ func SetHandlerPrefix(prefix string) LineHandlerOption { } } -func SetLockOnThrottledError(lock bool) LineHandlerOption { +func ThrottleRequestsOnBackpressure() LineHandlerOption { return func(handler *RealLineHandler) { - handler.lockOnErrThrottled = lock + handler.throttleOnBackpressure = true } } func NewLineHandler(reporter Reporter, format string, flushInterval time.Duration, batchSize, maxBufferSize int, setters ...LineHandlerOption) *RealLineHandler { lh := &RealLineHandler{ - Reporter: reporter, - BatchSize: batchSize, - MaxBufferSize: maxBufferSize, - flushTicker: time.NewTicker(flushInterval), - Format: format, - lockOnErrThrottled: false, + Reporter: reporter, + BatchSize: batchSize, + MaxBufferSize: maxBufferSize, + format: format, + throttledSleepDuration: defaultThrottledSleepDuration, } + lh.buffer = make(chan string, lh.MaxBufferSize) + lh.flusher = NewBackgroundFlusher(flushInterval, lh) + for _, setter := range setters { setter(lh) } @@ -94,31 +100,7 @@ func NewLineHandler(reporter Reporter, format string, flushInterval time.Duratio } func (lh *RealLineHandler) Start() { - lh.buffer = make(chan string, lh.MaxBufferSize) - lh.done = make(chan struct{}) - - go func() { - for { - select { - case <-lh.flushTicker.C: - err := lh.Flush() - if err != nil { - log.Println(lh.lockOnErrThrottled, "---", err) - if err == errThrottled && lh.lockOnErrThrottled { - go func() { - lh.mtx.Lock() - atomic.AddInt64(&lh.throttled, 1) - log.Printf("sleeping for %v, buffer size: %d\n", throttledSleepDuration, len(lh.buffer)) - time.Sleep(throttledSleepDuration) - lh.mtx.Unlock() - }() - } - } - case <-lh.done: - return - } - } - }() + lh.flusher.Start() } func (lh *RealLineHandler) HandleLine(line string) error { @@ -138,7 +120,7 @@ func minInt(x, y int) int { return y } -func (lh *RealLineHandler) Flush() error { +func (lh *RealLineHandler) flush() error { lh.mtx.Lock() defer lh.mtx.Unlock() bufLen := len(lh.buffer) @@ -153,6 +135,25 @@ func (lh *RealLineHandler) Flush() error { return nil } +func (lh *RealLineHandler) FlushWithThrottling() error { + if time.Now().Before(lh.resumeAt) { + log.Println("attempting to flush, but flushing is currently throttled by the server") + log.Printf("sleeping until: %s\n", lh.resumeAt.Format(time.RFC3339)) + time.Sleep(time.Until(lh.resumeAt)) + } + return lh.Flush() +} + +func (lh *RealLineHandler) Flush() error { + flushErr := lh.flush() + if flushErr == errThrottled && lh.throttleOnBackpressure { + atomic.AddInt64(&lh.throttled, 1) + log.Printf("pausing requests for %v, buffer size: %d\n", lh.throttledSleepDuration, len(lh.buffer)) + lh.resumeAt = time.Now().Add(lh.throttledSleepDuration) + } + return flushErr +} + func (lh *RealLineHandler) FlushAll() error { lh.mtx.Lock() defer lh.mtx.Unlock() @@ -179,13 +180,13 @@ func (lh *RealLineHandler) FlushAll() error { func (lh *RealLineHandler) report(lines []string) error { strLines := strings.Join(lines, "") - resp, err := lh.Reporter.Report(lh.Format, strLines) + resp, err := lh.Reporter.Report(lh.format, strLines) if err != nil { if shouldRetry(err) { lh.bufferLines(lines) } - return fmt.Errorf("error reporting %s format data to Wavefront: %q", lh.Format, err) + return fmt.Errorf("error reporting %s format data to Wavefront: %q", lh.format, err) } if 400 <= resp.StatusCode && resp.StatusCode <= 599 { @@ -194,7 +195,7 @@ func (lh *RealLineHandler) report(lines []string) error { if resp.StatusCode == 406 { return errThrottled } - return fmt.Errorf("error reporting %s format data to Wavefront. status=%d", lh.Format, resp.StatusCode) + return fmt.Errorf("error reporting %s format data to Wavefront. status=%d", lh.format, resp.StatusCode) } return nil } @@ -224,11 +225,9 @@ func (lh *RealLineHandler) GetThrottledCount() int64 { } func (lh *RealLineHandler) Stop() { - lh.flushTicker.Stop() - lh.done <- struct{}{} // block until goroutine exits + lh.flusher.Stop() if err := lh.FlushAll(); err != nil { log.Println(err) } - lh.done = nil lh.buffer = nil } diff --git a/internal/real_line_handler_test.go b/internal/real_line_handler_test.go new file mode 100644 index 0000000..3e5818c --- /dev/null +++ b/internal/real_line_handler_test.go @@ -0,0 +1,224 @@ +package internal + +import ( + "fmt" + "net/http" + "sync/atomic" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/wavefronthq/wavefront-sdk-go/internal/auth" +) + +type fakeReporter struct { + httpResponseStatus int64 + reportCallCount int64 + error error + lines []string +} + +func (reporter *fakeReporter) Report(_ string, lines string) (*http.Response, error) { + atomic.AddInt64(&reporter.reportCallCount, 1) + if reporter.error != nil { + return nil, reporter.error + } + status := atomic.LoadInt64(&reporter.httpResponseStatus) + if status != 0 { + return &http.Response{StatusCode: int(status)}, nil + } + reporter.lines = append(reporter.lines, lines) + return &http.Response{StatusCode: 200}, nil +} + +func (reporter *fakeReporter) ReportCallCount() int { + return int(atomic.LoadInt64(&reporter.reportCallCount)) +} + +func (reporter *fakeReporter) ResetReportCallCount() { + atomic.StoreInt64(&reporter.reportCallCount, int64(0)) +} + +func (reporter *fakeReporter) SetHTTPStatus(status int) { + atomic.StoreInt64(&reporter.httpResponseStatus, int64(status)) +} + +func (reporter *fakeReporter) ReportEvent(string) (*http.Response, error) { + return &http.Response{StatusCode: 200}, nil +} + +func TestCapacity(t *testing.T) { + lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 + checkLength(lh.buffer, 0, "non-empty lines length", t) + + addLines(lh, 100, 100, t) + err := lh.HandleLine("dummyLine") + if err == nil { + t.Errorf("buffer capacity exceeded but no error") + } +} + +func TestBufferLines(t *testing.T) { + lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 + checkLength(lh.buffer, 0, "non-empty lines length", t) + + addLines(lh, 90, 90, t) + buf := makeBuffer(50) + lh.bufferLines(buf) + checkLength(lh.buffer, 100, "error buffering lines", t) + + // clear lines + lh.buffer = make(chan string, 100) + checkLength(lh.buffer, 0, "error clearing lines", t) + + addLines(lh, 90, 90, t) + buf = makeBuffer(5) + lh.bufferLines(buf) + checkLength(lh.buffer, 95, "error buffering lines", t) +} + +func TestHandleLine_OnAuthError_DoNotBuffer(t *testing.T) { + lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 + lh.Reporter = &fakeReporter{ + error: auth.NewAuthError(fmt.Errorf("fake auth error that shouldn't be buffered")), + } + assert.NoError(t, lh.HandleLine("this is a metric, but CSP is down, or my credentials are wrong")) + assert.Error(t, lh.Flush()) + checkLength(lh.buffer, 0, "", t) + lh.Reporter = &fakeReporter{ + error: fmt.Errorf("error that should be buffered"), + } + assert.NoError(t, lh.HandleLine("this is a metric, but it was a network timeout or something like that")) + assert.Error(t, lh.Flush()) + checkLength(lh.buffer, 1, "", t) +} + +func TestFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) { + lh := &RealLineHandler{ + Reporter: &fakeReporter{}, + MaxBufferSize: 100, + BatchSize: 10, + buffer: make(chan string, 100), + throttleOnBackpressure: true, + throttledSleepDuration: 1 * time.Second, + } + + addLines(lh, 100, 100, t) + lh.Reporter.(*fakeReporter).SetHTTPStatus(406) + startTime := time.Now().Add(1 * time.Second) + deadline := startTime.Add(1 * time.Second) + assert.Error(t, lh.Flush()) + assert.Equal(t, 100, len(lh.buffer)) + assert.WithinRange(t, lh.resumeAt, startTime, deadline) + lh.Reporter.(*fakeReporter).SetHTTPStatus(0) + assert.NoError(t, lh.FlushWithThrottling()) + assert.Greater(t, time.Now(), lh.resumeAt) + assert.Equal(t, 90, len(lh.buffer)) +} + +func TestBackgroundFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) { + lh := &RealLineHandler{ + Reporter: &fakeReporter{}, + MaxBufferSize: 100, + BatchSize: 10, + buffer: make(chan string, 100), + throttleOnBackpressure: true, + throttledSleepDuration: 1 * time.Second, + } + + addLines(lh, 100, 100, t) + lh.Reporter.(*fakeReporter).SetHTTPStatus(406) + startTime := time.Now().Add(1 * time.Second) + deadline := startTime.Add(1 * time.Second) + assert.Error(t, lh.Flush()) + assert.Equal(t, 100, len(lh.buffer)) + assert.WithinRange(t, lh.resumeAt, startTime, deadline) + lh.Reporter = &fakeReporter{} + assert.NoError(t, lh.FlushWithThrottling()) + assert.Greater(t, time.Now(), lh.resumeAt) + assert.Equal(t, 90, len(lh.buffer)) +} + +func TestFlushTicker_WhenThrottlingEnabled_AndReceives406Error_ThrottlesRequestsUntilNextSleepDuration(t *testing.T) { + throttledSleepDuration := 250 * time.Millisecond + briskTickTime := 50 * time.Millisecond + lh := &RealLineHandler{ + Reporter: &fakeReporter{}, + MaxBufferSize: 100, + BatchSize: 10, + buffer: make(chan string, 100), + throttleOnBackpressure: true, + throttledSleepDuration: throttledSleepDuration, + } + + lh.flusher = NewBackgroundFlusher(briskTickTime, lh) + lh.Start() + addLines(lh, 100, 100, t) + + twoTicksOfTheTicker := 2 * briskTickTime + time.Sleep(twoTicksOfTheTicker + 10*time.Millisecond) + + assert.Equal(t, 2, lh.Reporter.(*fakeReporter).ReportCallCount()) + lh.Reporter.(*fakeReporter).ResetReportCallCount() + + lh.Reporter.(*fakeReporter).SetHTTPStatus(406) + time.Sleep(twoTicksOfTheTicker) + + assert.Equal(t, 1, lh.Reporter.(*fakeReporter).ReportCallCount()) + + lh.Stop() +} + +func TestFlush(t *testing.T) { + lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10 + + addLines(lh, 100, 100, t) + assert.NoError(t, lh.Flush()) + assert.Equal(t, 90, len(lh.buffer), "error flushing lines") + + e := fmt.Errorf("error reporting points") + lh.Reporter = &fakeReporter{error: e} + assert.Error(t, lh.Flush()) + assert.Equal(t, 90, len(lh.buffer), "error flushing lines") + + lh.Reporter = &fakeReporter{} + lh.buffer = make(chan string, 100) + addLines(lh, 5, 5, t) + assert.NoError(t, lh.Flush()) + assert.Equal(t, 0, len(lh.buffer), "error flushing lines") +} + +func checkLength(buffer chan string, length int, msg string, t *testing.T) { + if len(buffer) != length { + t.Errorf("%s. expected: %d actual: %d", msg, length, len(buffer)) + } +} + +func addLines(lh *RealLineHandler, linesToAdd int, expectedLen int, t *testing.T) { + for i := 0; i < linesToAdd; i++ { + err := lh.HandleLine("dummyLine") + if err != nil { + t.Error(err) + } + } + if len(lh.buffer) != expectedLen { + t.Errorf("error adding lines. expected: %d actual: %d", expectedLen, len(lh.buffer)) + } +} + +func makeBuffer(num int) []string { + buf := make([]string, num) + for i := 0; i < num; i++ { + buf[i] = "dummyLine" + } + return buf +} + +func makeLineHandler(bufSize, batchSize int) *RealLineHandler { + return &RealLineHandler{ + Reporter: &fakeReporter{}, + MaxBufferSize: bufSize, + BatchSize: batchSize, + buffer: make(chan string, bufSize), + } +} diff --git a/senders/wavefront_sender_test.go b/senders/wavefront_sender_test.go index 15a2fff..56e2051 100644 --- a/senders/wavefront_sender_test.go +++ b/senders/wavefront_sender_test.go @@ -322,6 +322,10 @@ type mockHandler struct { Lines []string } +func (m *mockHandler) Format() string { + return "mock-handler" +} + func (m *mockHandler) HandleLine(line string) error { m.Lines = append(m.Lines, line) return m.Error @@ -337,6 +341,10 @@ func (m *mockHandler) Flush() error { return m.Error } +func (m *mockHandler) FlushWithThrottling() error { + return m.Flush() +} + func (m *mockHandler) GetFailureCount() int64 { return 0 }