Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeWinikates committed Oct 26, 2023
1 parent a4cf53c commit db16644
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 11 deletions.
2 changes: 1 addition & 1 deletion internal/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (f *backgroundFlusher) Start() {
select {
case tick := <-f.ticker.C:
log.Printf("%s -- flushing at: %s\n", format, tick)
err := f.handler.Flush()
err := f.handler.FlushWithThrottling()
if err != nil {
log.Printf("%s -- error during background flush: %s\n", format, err.Error())
} else {
Expand Down
1 change: 1 addition & 0 deletions internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ type LineHandler interface {
Start()
Stop()
Flush() error
FlushWithThrottling() error
GetFailureCount() int64
}

Expand Down
6 changes: 5 additions & 1 deletion internal/real_line_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,12 +132,16 @@ func (lh *RealLineHandler) flush() error {
return nil
}

func (lh *RealLineHandler) Flush() error {
func (lh *RealLineHandler) FlushWithThrottling() error {
if time.Now().Before(lh.resumeAt) {
log.Println("attempting to flush, but flushing is currently throttled by the server")
log.Printf("sleeping until: %s\n", lh.resumeAt.Format(time.RFC3339))
time.Sleep(time.Until(lh.resumeAt))
}
return lh.Flush()
}

func (lh *RealLineHandler) Flush() error {
flushErr := lh.flush()
if flushErr == errThrottled && lh.throttleOnBackpressure {
atomic.AddInt64(&lh.throttled, 1)
Expand Down
89 changes: 80 additions & 9 deletions internal/real_line_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package internal
import (
"fmt"
"net/http"
"sync/atomic"
"testing"
"time"

Expand All @@ -11,20 +12,37 @@ import (
)

type fakeReporter struct {
errorCode int
error error
httpResponseStatus int64
reportCallCount int64
error error
lines []string
}

func (reporter *fakeReporter) Report(string, string) (*http.Response, error) {
func (reporter *fakeReporter) Report(_ string, lines string) (*http.Response, error) {
atomic.AddInt64(&reporter.reportCallCount, 1)
if reporter.error != nil {
return nil, reporter.error
}
if reporter.errorCode != 0 {
return &http.Response{StatusCode: reporter.errorCode}, nil
status := atomic.LoadInt64(&reporter.httpResponseStatus)
if status != 0 {
return &http.Response{StatusCode: int(status)}, nil
}
reporter.lines = append(reporter.lines, lines)
return &http.Response{StatusCode: 200}, nil
}

func (reporter *fakeReporter) ReportCallCount() int {
return int(atomic.LoadInt64(&reporter.reportCallCount))
}

func (reporter *fakeReporter) ResetReportCallCount() {
atomic.StoreInt64(&reporter.reportCallCount, int64(0))
}

func (reporter *fakeReporter) SetHTTPStatus(status int) {
atomic.StoreInt64(&reporter.httpResponseStatus, int64(status))
}

func (reporter *fakeReporter) ReportEvent(string) (*http.Response, error) {
return &http.Response{StatusCode: 200}, nil
}
Expand Down Expand Up @@ -75,7 +93,7 @@ func TestHandleLine_OnAuthError_DoNotBuffer(t *testing.T) {
checkLength(lh.buffer, 1, "", t)
}

func TestFlush_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) {
func TestFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) {
lh := &RealLineHandler{
Reporter: &fakeReporter{},
MaxBufferSize: 100,
Expand All @@ -86,18 +104,71 @@ func TestFlush_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) {
}

addLines(lh, 100, 100, t)
lh.Reporter = &fakeReporter{errorCode: 406}
lh.Reporter.(*fakeReporter).SetHTTPStatus(406)
startTime := time.Now().Add(1 * time.Second)
deadline := startTime.Add(1 * time.Second)
assert.Error(t, lh.Flush())
assert.Equal(t, 100, len(lh.buffer))
assert.WithinRange(t, lh.resumeAt, startTime, deadline)
lh.Reporter.(*fakeReporter).SetHTTPStatus(0)
assert.NoError(t, lh.FlushWithThrottling())
assert.Greater(t, time.Now(), lh.resumeAt)
assert.Equal(t, 90, len(lh.buffer))
}

func TestBackgroundFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) {
lh := &RealLineHandler{
Reporter: &fakeReporter{},
MaxBufferSize: 100,
BatchSize: 10,
buffer: make(chan string, 100),
throttleOnBackpressure: true,
throttledSleepDuration: 1 * time.Second,
}

addLines(lh, 100, 100, t)
lh.Reporter.(*fakeReporter).SetHTTPStatus(406)
startTime := time.Now().Add(1 * time.Second)
deadline := startTime.Add(1 * time.Second)
assert.Error(t, lh.Flush())
assert.Equal(t, 100, len(lh.buffer))
assert.WithinRange(t, lh.resumeAt, startTime, deadline)
lh.Reporter = &fakeReporter{}
assert.NoError(t, lh.Flush())
assert.Greater(t, deadline, time.Now())
assert.NoError(t, lh.FlushWithThrottling())
assert.Greater(t, time.Now(), lh.resumeAt)
assert.Equal(t, 90, len(lh.buffer))
}

func TestFlushTicker_WhenThrottlingEnabled_AndReceives406Error_ThrottlesRequestsUntilNextSleepDuration(t *testing.T) {
throttledSleepDuration := 250 * time.Millisecond
briskTickTime := 50 * time.Millisecond
lh := &RealLineHandler{
Reporter: &fakeReporter{},
MaxBufferSize: 100,
BatchSize: 10,
buffer: make(chan string, 100),
throttleOnBackpressure: true,
throttledSleepDuration: throttledSleepDuration,
}

lh.flusher = NewBackgroundFlusher(briskTickTime, lh)
lh.Start()
addLines(lh, 100, 100, t)

twoTicksOfTheTicker := 2 * briskTickTime
time.Sleep(twoTicksOfTheTicker + 10*time.Millisecond)

assert.Equal(t, 2, lh.Reporter.(*fakeReporter).ReportCallCount())
lh.Reporter.(*fakeReporter).ResetReportCallCount()

lh.Reporter.(*fakeReporter).SetHTTPStatus(406)
time.Sleep(twoTicksOfTheTicker)

assert.Equal(t, 1, lh.Reporter.(*fakeReporter).ReportCallCount())

lh.Stop()
}

func TestFlush(t *testing.T) {
lh := makeLineHandler(100, 10) // cap: 100, batchSize: 10

Expand Down
4 changes: 4 additions & 0 deletions senders/wavefront_sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,10 @@ func (m *mockHandler) Flush() error {
return m.Error
}

func (m *mockHandler) FlushWithThrottling() error {
return m.Flush()
}

func (m *mockHandler) GetFailureCount() int64 {
return 0
}
Expand Down

0 comments on commit db16644

Please sign in to comment.