Skip to content

Commit

Permalink
chore: refactor background flushing
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeWinikates committed Nov 17, 2023
1 parent 2c0b8db commit d032aee
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 186 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
all: test lint

test:
go test -timeout 10m -v -race ./...
go test -timeout 1m -v -race ./...
go vet ./...

godoc:
Expand Down
55 changes: 55 additions & 0 deletions internal/background_flusher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package internal

import (
"log"
"time"
)

type BackgroundFlusher interface {
Start()
Stop()
}

type backgroundFlusher struct {
ticker *time.Ticker
interval time.Duration
handler LineHandler
stop chan struct{}
}

func NewBackgroundFlusher(interval time.Duration, handler LineHandler) BackgroundFlusher {
return &backgroundFlusher{
interval: interval,
handler: handler,
stop: make(chan struct{}),
}
}

func (f *backgroundFlusher) Start() {
format := f.handler.Format()
if f.ticker != nil {
return
}
f.ticker = time.NewTicker(f.interval)
go func() {
for {
select {
case tick := <-f.ticker.C:
log.Printf("%s -- flushing at: %s\n", format, tick)
err := f.handler.FlushWithThrottling()
if err != nil {
log.Printf("%s -- error during background flush: %s\n", format, err.Error())
} else {
log.Printf("%s -- flush completed at %s\n", format, time.Now())
}
case <-f.stop:
return
}
}
}()
}

func (f *backgroundFlusher) Stop() {
f.ticker.Stop()
f.stop <- struct{}{}
}
4 changes: 2 additions & 2 deletions internal/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ func (f *HandlerFactory) NewSpanLogHandler(batchSize int) *RealLineHandler {
}

// NewEventHandler creates a RealLineHandler for the Event type
// The Event handler always sets "SetLockOnThrottledError" to true
// The Event handler always sets "ThrottleRequestsOnBackpressure" to true
// And always uses a batch size of exactly 1.
func (f *HandlerFactory) NewEventHandler() *RealLineHandler {
return NewLineHandler(
Expand All @@ -91,6 +91,6 @@ func (f *HandlerFactory) NewEventHandler() *RealLineHandler {
f.bufferSize,
append(f.lineHandlerOptions,
SetHandlerPrefix("events"),
SetLockOnThrottledError(true))...,
ThrottleRequestsOnBackpressure())...,
)
}
2 changes: 2 additions & 0 deletions internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,9 @@ type LineHandler interface {
Start()
Stop()
Flush() error
FlushWithThrottling() error
GetFailureCount() int64
Format() string
}

const (
Expand Down
129 changes: 0 additions & 129 deletions internal/lines_test.go

This file was deleted.

Loading

0 comments on commit d032aee

Please sign in to comment.