Skip to content

Commit

Permalink
refactor: handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
LukeWinikates committed Nov 9, 2023
1 parent ef47580 commit 43a5027
Show file tree
Hide file tree
Showing 12 changed files with 210 additions and 216 deletions.
18 changes: 18 additions & 0 deletions internal/delta/formatter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package delta

import (
"fmt"

"github.com/wavefronthq/wavefront-sdk-go/internal"
"github.com/wavefronthq/wavefront-sdk-go/internal/metric"
)

func Line(name string, value float64, source string, tags map[string]string, defaultSource string) (string, error) {
if name == "" {
return "", fmt.Errorf("empty metric name")
}
if !internal.HasDeltaPrefix(name) {
name = internal.DeltaCounterName(name)
}
return metric.Line(name, value, 0, source, tags, defaultSource)
}
6 changes: 3 additions & 3 deletions internal/flusher.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ type BackgroundFlusher interface {
type backgroundFlusher struct {
ticker *time.Ticker
interval time.Duration
handler LineHandler
handler BatchBuilder
stop chan struct{}
}

func NewBackgroundFlusher(interval time.Duration, handler LineHandler) BackgroundFlusher {
func NewBackgroundFlusher(interval time.Duration, handler BatchBuilder) BackgroundFlusher {
return &backgroundFlusher{
interval: interval,
handler: handler,
Expand All @@ -26,7 +26,7 @@ func NewBackgroundFlusher(interval time.Duration, handler LineHandler) Backgroun
}

func (f *backgroundFlusher) Start() {
format := f.handler.(*RealLineHandler).Format
format := f.handler.(*RealBatchBuilder).Format
if f.ticker != nil {
return
}
Expand Down
46 changes: 34 additions & 12 deletions internal/handler_factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,32 +6,54 @@ import (
"github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics"
)

type HandlerFactory struct {
type SenderFactory struct {
metricsReporter Reporter
tracesReporter Reporter
flushInterval time.Duration
bufferSize int
lineHandlerOptions []LineHandlerOption
lineHandlerOptions []BatchAccumulatorOption
registry sdkmetrics.Registry
}

func NewHandlerFactory(
func NewSenderFactory(
metricsReporter,
tracesReporter Reporter,
flushInterval time.Duration,
bufferSize int,
registry sdkmetrics.Registry) *HandlerFactory {
return &HandlerFactory{
registry sdkmetrics.Registry) *SenderFactory {
return &SenderFactory{
registry: registry,
metricsReporter: metricsReporter,
tracesReporter: tracesReporter,
flushInterval: flushInterval,
bufferSize: bufferSize,
lineHandlerOptions: []LineHandlerOption{
lineHandlerOptions: []BatchAccumulatorOption{
SetRegistry(registry),
},
}
}

func (f *HandlerFactory) NewPointHandler(batchSize int) *RealLineHandler {
func (f *SenderFactory) NewPointSender(batchSize int) TypedSender {
return NewTypedSender(f.registry.PointsTracker(), f.newPointHandler(batchSize))
}

func (f *SenderFactory) NewHistogramSender(batchSize int) TypedSender {
return NewTypedSender(f.registry.HistogramsTracker(), f.NewHistogramHandler(batchSize))
}

func (f *SenderFactory) NewSpanSender(batchSize int) TypedSender {
return NewTypedSender(f.registry.SpansTracker(), f.newSpanHandler(batchSize))
}

func (f *SenderFactory) NewEventsSender() TypedSender {
return NewTypedSender(f.registry.EventsTracker(), f.newEventHandler())
}

func (f *SenderFactory) NewSpanLogSender(batchSize int) TypedSender {
return NewTypedSender(f.registry.SpanLogsTracker(), f.newSpanLogHandler(batchSize))
}

func (f *SenderFactory) newPointHandler(batchSize int) *RealBatchBuilder {
return NewLineHandler(
f.metricsReporter,
metricFormat,
Expand All @@ -43,7 +65,7 @@ func (f *HandlerFactory) NewPointHandler(batchSize int) *RealLineHandler {
)
}

func (f *HandlerFactory) NewHistogramHandler(batchSize int) *RealLineHandler {
func (f *SenderFactory) NewHistogramHandler(batchSize int) *RealBatchBuilder {
return NewLineHandler(
f.metricsReporter,
histogramFormat,
Expand All @@ -55,7 +77,7 @@ func (f *HandlerFactory) NewHistogramHandler(batchSize int) *RealLineHandler {
)
}

func (f *HandlerFactory) NewSpanHandler(batchSize int) *RealLineHandler {
func (f *SenderFactory) newSpanHandler(batchSize int) *RealBatchBuilder {
return NewLineHandler(
f.tracesReporter,
traceFormat,
Expand All @@ -67,7 +89,7 @@ func (f *HandlerFactory) NewSpanHandler(batchSize int) *RealLineHandler {
)
}

func (f *HandlerFactory) NewSpanLogHandler(batchSize int) *RealLineHandler {
func (f *SenderFactory) newSpanLogHandler(batchSize int) *RealBatchBuilder {
return NewLineHandler(
f.tracesReporter,
spanLogsFormat,
Expand All @@ -79,10 +101,10 @@ func (f *HandlerFactory) NewSpanLogHandler(batchSize int) *RealLineHandler {
)
}

// NewEventHandler creates a RealLineHandler for the Event type
// NewEventHandler creates a RealBatchBuilder for the Event type
// The Event handler always sets "ThrottleRequestsOnBackpressure" to true
// And always uses a batch size of exactly 1.
func (f *HandlerFactory) NewEventHandler() *RealLineHandler {
func (f *SenderFactory) newEventHandler() *RealBatchBuilder {
return NewLineHandler(
f.metricsReporter,
eventFormat,
Expand Down
2 changes: 1 addition & 1 deletion internal/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ type ConnectionHandler interface {
Flusher
}

type LineHandler interface {
type BatchBuilder interface {
HandleLine(line string) error
Start()
Stop()
Expand Down
42 changes: 21 additions & 21 deletions internal/real_line_handler.go → internal/real_batch_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ const (
defaultThrottledSleepDuration = time.Second * 30
)

type RealLineHandler struct {
type RealBatchBuilder struct {
// keep these two fields as first element of struct
// to guarantee 64-bit alignment on 32-bit machines.
// atomic.* functions crash if operands are not 64-bit aligned.
Expand All @@ -48,28 +48,28 @@ type RealLineHandler struct {

var errThrottled = errors.New("error: throttled event creation")

type LineHandlerOption func(*RealLineHandler)
type BatchAccumulatorOption func(*RealBatchBuilder)

func SetRegistry(registry sdkmetrics.Registry) LineHandlerOption {
return func(handler *RealLineHandler) {
func SetRegistry(registry sdkmetrics.Registry) BatchAccumulatorOption {
return func(handler *RealBatchBuilder) {
handler.internalRegistry = registry
}
}

func SetHandlerPrefix(prefix string) LineHandlerOption {
return func(handler *RealLineHandler) {
func SetHandlerPrefix(prefix string) BatchAccumulatorOption {
return func(handler *RealBatchBuilder) {
handler.prefix = prefix
}
}

func ThrottleRequestsOnBackpressure() LineHandlerOption {
return func(handler *RealLineHandler) {
func ThrottleRequestsOnBackpressure() BatchAccumulatorOption {
return func(handler *RealBatchBuilder) {
handler.throttleOnBackpressure = true
}
}

func NewLineHandler(reporter Reporter, format string, flushInterval time.Duration, batchSize, maxBufferSize int, setters ...LineHandlerOption) *RealLineHandler {
lh := &RealLineHandler{
func NewLineHandler(reporter Reporter, format string, flushInterval time.Duration, batchSize, maxBufferSize int, setters ...BatchAccumulatorOption) *RealBatchBuilder {
lh := &RealBatchBuilder{
Reporter: reporter,
BatchSize: batchSize,
MaxBufferSize: maxBufferSize,
Expand All @@ -95,11 +95,11 @@ func NewLineHandler(reporter Reporter, format string, flushInterval time.Duratio
return lh
}

func (lh *RealLineHandler) Start() {
func (lh *RealBatchBuilder) Start() {
lh.flusher.Start()
}

func (lh *RealLineHandler) HandleLine(line string) error {
func (lh *RealBatchBuilder) HandleLine(line string) error {
select {
case lh.buffer <- line:
return nil
Expand All @@ -116,7 +116,7 @@ func minInt(x, y int) int {
return y
}

func (lh *RealLineHandler) flush() error {
func (lh *RealBatchBuilder) flush() error {
lh.mtx.Lock()
defer lh.mtx.Unlock()
bufLen := len(lh.buffer)
Expand All @@ -131,7 +131,7 @@ func (lh *RealLineHandler) flush() error {
return nil
}

func (lh *RealLineHandler) FlushWithThrottling() error {
func (lh *RealBatchBuilder) FlushWithThrottling() error {
if time.Now().Before(lh.resumeAt) {
log.Println("attempting to flush, but flushing is currently throttled by the server")
log.Printf("sleeping until: %s\n", lh.resumeAt.Format(time.RFC3339))
Expand All @@ -140,7 +140,7 @@ func (lh *RealLineHandler) FlushWithThrottling() error {
return lh.Flush()
}

func (lh *RealLineHandler) Flush() error {
func (lh *RealBatchBuilder) Flush() error {
flushErr := lh.flush()
if flushErr == errThrottled && lh.throttleOnBackpressure {
atomic.AddInt64(&lh.throttled, 1)
Expand All @@ -150,7 +150,7 @@ func (lh *RealLineHandler) Flush() error {
return flushErr
}

func (lh *RealLineHandler) FlushAll() error {
func (lh *RealBatchBuilder) FlushAll() error {
lh.mtx.Lock()
defer lh.mtx.Unlock()
bufLen := len(lh.buffer)
Expand All @@ -174,7 +174,7 @@ func (lh *RealLineHandler) FlushAll() error {
return nil
}

func (lh *RealLineHandler) report(lines []string) error {
func (lh *RealBatchBuilder) report(lines []string) error {
strLines := strings.Join(lines, "")
resp, err := lh.Reporter.Report(lh.Format, strLines)

Expand Down Expand Up @@ -204,23 +204,23 @@ func shouldRetry(err error) bool {
return true
}

func (lh *RealLineHandler) bufferLines(batch []string) {
func (lh *RealBatchBuilder) bufferLines(batch []string) {
log.Println("error reporting to Wavefront. buffering lines.")
for _, line := range batch {
_ = lh.HandleLine(line)
}
}

func (lh *RealLineHandler) GetFailureCount() int64 {
func (lh *RealBatchBuilder) GetFailureCount() int64 {
return atomic.LoadInt64(&lh.failures)
}

// GetThrottledCount returns the number of Throttled errors received.
func (lh *RealLineHandler) GetThrottledCount() int64 {
func (lh *RealBatchBuilder) GetThrottledCount() int64 {
return atomic.LoadInt64(&lh.throttled)
}

func (lh *RealLineHandler) Stop() {
func (lh *RealBatchBuilder) Stop() {
lh.flusher.Stop()
if err := lh.FlushAll(); err != nil {
log.Println(err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func TestHandleLine_OnAuthError_DoNotBuffer(t *testing.T) {
}

func TestFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) {
lh := &RealLineHandler{
lh := &RealBatchBuilder{
Reporter: &fakeReporter{},
MaxBufferSize: 100,
BatchSize: 10,
Expand All @@ -117,7 +117,7 @@ func TestFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testin
}

func TestBackgroundFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) {
lh := &RealLineHandler{
lh := &RealBatchBuilder{
Reporter: &fakeReporter{},
MaxBufferSize: 100,
BatchSize: 10,
Expand All @@ -142,7 +142,7 @@ func TestBackgroundFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval
func TestFlushTicker_WhenThrottlingEnabled_AndReceives406Error_ThrottlesRequestsUntilNextSleepDuration(t *testing.T) {
throttledSleepDuration := 250 * time.Millisecond
briskTickTime := 50 * time.Millisecond
lh := &RealLineHandler{
lh := &RealBatchBuilder{
Reporter: &fakeReporter{},
MaxBufferSize: 100,
BatchSize: 10,
Expand Down Expand Up @@ -194,7 +194,7 @@ func checkLength(buffer chan string, length int, msg string, t *testing.T) {
}
}

func addLines(lh *RealLineHandler, linesToAdd int, expectedLen int, t *testing.T) {
func addLines(lh *RealBatchBuilder, linesToAdd int, expectedLen int, t *testing.T) {
for i := 0; i < linesToAdd; i++ {
err := lh.HandleLine("dummyLine")
if err != nil {
Expand All @@ -214,8 +214,8 @@ func makeBuffer(num int) []string {
return buf
}

func makeLineHandler(bufSize, batchSize int) *RealLineHandler {
return &RealLineHandler{
func makeLineHandler(bufSize, batchSize int) *RealBatchBuilder {
return &RealBatchBuilder{
Reporter: &fakeReporter{},
MaxBufferSize: bufSize,
BatchSize: batchSize,
Expand Down
Loading

0 comments on commit 43a5027

Please sign in to comment.