Skip to content

Commit

Permalink
chore: refactor background flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeWinikates committed Oct 27, 2023
1 parent 2c0b8db commit ef47580
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 183 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
all: test lint

test:
go test -timeout 10m -v -race ./...
go test -timeout 1m -v -race ./...
go vet ./...

godoc:
Expand Down
55 changes: 55 additions & 0 deletions internal/flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package internal

import (
"log"
"time"
)

type BackgroundFlusher interface {
Start()
Stop()
}

type backgroundFlusher struct {
ticker *time.Ticker
interval time.Duration
handler LineHandler
stop chan struct{}
}

func NewBackgroundFlusher(interval time.Duration, handler LineHandler) BackgroundFlusher {
return &backgroundFlusher{
interval: interval,
handler: handler,
stop: make(chan struct{}),
}
}

func (f *backgroundFlusher) Start() {
format := f.handler.(*RealLineHandler).Format
if f.ticker != nil {
return
}
f.ticker = time.NewTicker(f.interval)
go func() {
for {
select {
case tick := <-f.ticker.C:
log.Printf("%s -- flushing at: %s\n", format, tick)
err := f.handler.FlushWithThrottling()
if err != nil {
log.Printf("%s -- error during background flush: %s\n", format, err.Error())
} else {
log.Printf("%s -- flush completed at %s\n", format, time.Now())
}
case <-f.stop:
return
}
}
}()
}

func (f *backgroundFlusher) Stop() {
f.ticker.Stop()
f.stop <- struct{}{}
}
4 changes: 2 additions & 2 deletions internal/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (f *HandlerFactory) NewSpanLogHandler(batchSize int) *RealLineHandler {
}

// NewEventHandler creates a RealLineHandler for the Event type
// The Event handler always sets "SetLockOnThrottledError" to true
// The Event handler always sets "ThrottleRequestsOnBackpressure" to true
// And always uses a batch size of exactly 1.
func (f *HandlerFactory) NewEventHandler() *RealLineHandler {
return NewLineHandler(
Expand All @@ -91,6 +91,6 @@ func (f *HandlerFactory) NewEventHandler() *RealLineHandler {
f.bufferSize,
append(f.lineHandlerOptions,
SetHandlerPrefix("events"),
SetLockOnThrottledError(true))...,
ThrottleRequestsOnBackpressure())...,
)
}
1 change: 1 addition & 0 deletions internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type LineHandler interface {
Start()
Stop()
Flush() error
FlushWithThrottling() error
GetFailureCount() int64
}

Expand Down
129 changes: 0 additions & 129 deletions internal/lines_test.go

This file was deleted.

97 changes: 46 additions & 51 deletions internal/lines.go → internal/real_line_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import (
)

const (
metricFormat = "wavefront"
histogramFormat = "histogram"
traceFormat = "trace"
spanLogsFormat = "spanLogs"
eventFormat = "event"
metricFormat = "wavefront"
histogramFormat = "histogram"
traceFormat = "trace"
spanLogsFormat = "spanLogs"
eventFormat = "event"
defaultThrottledSleepDuration = time.Second * 30
)

type RealLineHandler struct {
Expand All @@ -33,19 +34,18 @@ type RealLineHandler struct {
BatchSize int
MaxBufferSize int
Format string
flushTicker *time.Ticker

internalRegistry sdkmetrics.Registry
prefix string
internalRegistry sdkmetrics.Registry
prefix string
throttleOnBackpressure bool
throttledSleepDuration time.Duration
mtx sync.Mutex

mtx sync.Mutex
lockOnErrThrottled bool

buffer chan string
done chan struct{}
buffer chan string
flusher BackgroundFlusher
resumeAt time.Time
}

var throttledSleepDuration = time.Second * 30
var errThrottled = errors.New("error: throttled event creation")

type LineHandlerOption func(*RealLineHandler)
Expand All @@ -62,22 +62,24 @@ func SetHandlerPrefix(prefix string) LineHandlerOption {
}
}

func SetLockOnThrottledError(lock bool) LineHandlerOption {
func ThrottleRequestsOnBackpressure() LineHandlerOption {
return func(handler *RealLineHandler) {
handler.lockOnErrThrottled = lock
handler.throttleOnBackpressure = true
}
}

func NewLineHandler(reporter Reporter, format string, flushInterval time.Duration, batchSize, maxBufferSize int, setters ...LineHandlerOption) *RealLineHandler {
lh := &RealLineHandler{
Reporter: reporter,
BatchSize: batchSize,
MaxBufferSize: maxBufferSize,
flushTicker: time.NewTicker(flushInterval),
Format: format,
lockOnErrThrottled: false,
Reporter: reporter,
BatchSize: batchSize,
MaxBufferSize: maxBufferSize,
Format: format,
throttledSleepDuration: defaultThrottledSleepDuration,
}

lh.buffer = make(chan string, lh.MaxBufferSize)
lh.flusher = NewBackgroundFlusher(flushInterval, lh)

for _, setter := range setters {
setter(lh)
}
Expand All @@ -94,31 +96,7 @@ func NewLineHandler(reporter Reporter, format string, flushInterval time.Duratio
}

func (lh *RealLineHandler) Start() {
lh.buffer = make(chan string, lh.MaxBufferSize)
lh.done = make(chan struct{})

go func() {
for {
select {
case <-lh.flushTicker.C:
err := lh.Flush()
if err != nil {
log.Println(lh.lockOnErrThrottled, "---", err)
if err == errThrottled && lh.lockOnErrThrottled {
go func() {
lh.mtx.Lock()
atomic.AddInt64(&lh.throttled, 1)
log.Printf("sleeping for %v, buffer size: %d\n", throttledSleepDuration, len(lh.buffer))
time.Sleep(throttledSleepDuration)
lh.mtx.Unlock()
}()
}
}
case <-lh.done:
return
}
}
}()
lh.flusher.Start()
}

func (lh *RealLineHandler) HandleLine(line string) error {
Expand All @@ -138,7 +116,7 @@ func minInt(x, y int) int {
return y
}

func (lh *RealLineHandler) Flush() error {
func (lh *RealLineHandler) flush() error {
lh.mtx.Lock()
defer lh.mtx.Unlock()
bufLen := len(lh.buffer)
Expand All @@ -153,6 +131,25 @@ func (lh *RealLineHandler) Flush() error {
return nil
}

func (lh *RealLineHandler) FlushWithThrottling() error {
if time.Now().Before(lh.resumeAt) {
log.Println("attempting to flush, but flushing is currently throttled by the server")
log.Printf("sleeping until: %s\n", lh.resumeAt.Format(time.RFC3339))
time.Sleep(time.Until(lh.resumeAt))
}
return lh.Flush()
}

func (lh *RealLineHandler) Flush() error {
flushErr := lh.flush()
if flushErr == errThrottled && lh.throttleOnBackpressure {
atomic.AddInt64(&lh.throttled, 1)
log.Printf("pausing requests for %v, buffer size: %d\n", lh.throttledSleepDuration, len(lh.buffer))
lh.resumeAt = time.Now().Add(lh.throttledSleepDuration)
}
return flushErr
}

func (lh *RealLineHandler) FlushAll() error {
lh.mtx.Lock()
defer lh.mtx.Unlock()
Expand Down Expand Up @@ -224,11 +221,9 @@ func (lh *RealLineHandler) GetThrottledCount() int64 {
}

func (lh *RealLineHandler) Stop() {
lh.flushTicker.Stop()
lh.done <- struct{}{} // block until goroutine exits
lh.flusher.Stop()
if err := lh.FlushAll(); err != nil {
log.Println(err)
}
lh.done = nil
lh.buffer = nil
}
Loading

0 comments on commit ef47580

Please sign in to comment.