From 224db84299638246cbc398665abb2d4b63251171 Mon Sep 17 00:00:00 2001 From: Luke Winikates Date: Thu, 9 Nov 2023 10:29:20 -0800 Subject: [PATCH] refactor: handlers --- internal/background_flusher.go | 4 +- internal/delta/formatter.go | 18 +++ internal/handler_factory.go | 46 +++++-- internal/interfaces.go | 2 +- ..._line_handler.go => real_batch_builder.go} | 42 +++--- ...ler_test.go => real_batch_builder_test.go} | 12 +- internal/reporter.go | 18 +-- internal/reporter_test.go | 2 +- internal/typed_sender.go | 53 ++++++++ senders/new_sender.go | 12 +- senders/real_sender.go | 122 +++++------------- senders/wavefront_sender_test.go | 93 ++++--------- 12 files changed, 209 insertions(+), 215 deletions(-) create mode 100644 internal/delta/formatter.go rename internal/{real_line_handler.go => real_batch_builder.go} (80%) rename internal/{real_line_handler_test.go => real_batch_builder_test.go} (96%) create mode 100644 internal/typed_sender.go diff --git a/internal/background_flusher.go b/internal/background_flusher.go index c14fea9..2a5ad9f 100644 --- a/internal/background_flusher.go +++ b/internal/background_flusher.go @@ -13,11 +13,11 @@ type BackgroundFlusher interface { type backgroundFlusher struct { ticker *time.Ticker interval time.Duration - handler LineHandler + handler BatchBuilder stop chan struct{} } -func NewBackgroundFlusher(interval time.Duration, handler LineHandler) BackgroundFlusher { +func NewBackgroundFlusher(interval time.Duration, handler BatchBuilder) BackgroundFlusher { return &backgroundFlusher{ interval: interval, handler: handler, diff --git a/internal/delta/formatter.go b/internal/delta/formatter.go new file mode 100644 index 0000000..388448f --- /dev/null +++ b/internal/delta/formatter.go @@ -0,0 +1,18 @@ +package delta + +import ( + "fmt" + + "github.com/wavefronthq/wavefront-sdk-go/internal" + "github.com/wavefronthq/wavefront-sdk-go/internal/metric" +) + +func Line(name string, value float64, source string, tags map[string]string, defaultSource string) (string, error) { + if name == "" { + return "", fmt.Errorf("empty metric name") + } + if !internal.HasDeltaPrefix(name) { + name = internal.DeltaCounterName(name) + } + return metric.Line(name, value, 0, source, tags, defaultSource) +} diff --git a/internal/handler_factory.go b/internal/handler_factory.go index e2482d1..6876023 100644 --- a/internal/handler_factory.go +++ b/internal/handler_factory.go @@ -6,32 +6,54 @@ import ( "github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics" ) -type HandlerFactory struct { +type SenderFactory struct { metricsReporter Reporter tracesReporter Reporter flushInterval time.Duration bufferSize int - lineHandlerOptions []LineHandlerOption + lineHandlerOptions []BatchAccumulatorOption + registry sdkmetrics.Registry } -func NewHandlerFactory( +func NewSenderFactory( metricsReporter, tracesReporter Reporter, flushInterval time.Duration, bufferSize int, - registry sdkmetrics.Registry) *HandlerFactory { - return &HandlerFactory{ + registry sdkmetrics.Registry) *SenderFactory { + return &SenderFactory{ + registry: registry, metricsReporter: metricsReporter, tracesReporter: tracesReporter, flushInterval: flushInterval, bufferSize: bufferSize, - lineHandlerOptions: []LineHandlerOption{ + lineHandlerOptions: []BatchAccumulatorOption{ SetRegistry(registry), }, } } -func (f *HandlerFactory) NewPointHandler(batchSize int) *RealLineHandler { +func (f *SenderFactory) NewPointSender(batchSize int) TypedSender { + return NewTypedSender(f.registry.PointsTracker(), f.newPointHandler(batchSize)) +} + +func (f *SenderFactory) NewHistogramSender(batchSize int) TypedSender { + return NewTypedSender(f.registry.HistogramsTracker(), f.NewHistogramHandler(batchSize)) +} + +func (f *SenderFactory) NewSpanSender(batchSize int) TypedSender { + return NewTypedSender(f.registry.SpansTracker(), f.newSpanHandler(batchSize)) +} + +func (f *SenderFactory) NewEventsSender() TypedSender { + return NewTypedSender(f.registry.EventsTracker(), f.newEventHandler()) +} + +func (f *SenderFactory) NewSpanLogSender(batchSize int) TypedSender { + return NewTypedSender(f.registry.SpanLogsTracker(), f.newSpanLogHandler(batchSize)) +} + +func (f *SenderFactory) newPointHandler(batchSize int) *RealBatchBuilder { return NewLineHandler( f.metricsReporter, metricFormat, @@ -43,7 +65,7 @@ func (f *HandlerFactory) NewPointHandler(batchSize int) *RealLineHandler { ) } -func (f *HandlerFactory) NewHistogramHandler(batchSize int) *RealLineHandler { +func (f *SenderFactory) NewHistogramHandler(batchSize int) *RealBatchBuilder { return NewLineHandler( f.metricsReporter, histogramFormat, @@ -55,7 +77,7 @@ func (f *HandlerFactory) NewHistogramHandler(batchSize int) *RealLineHandler { ) } -func (f *HandlerFactory) NewSpanHandler(batchSize int) *RealLineHandler { +func (f *SenderFactory) newSpanHandler(batchSize int) *RealBatchBuilder { return NewLineHandler( f.tracesReporter, traceFormat, @@ -67,7 +89,7 @@ func (f *HandlerFactory) NewSpanHandler(batchSize int) *RealLineHandler { ) } -func (f *HandlerFactory) NewSpanLogHandler(batchSize int) *RealLineHandler { +func (f *SenderFactory) newSpanLogHandler(batchSize int) *RealBatchBuilder { return NewLineHandler( f.tracesReporter, spanLogsFormat, @@ -79,10 +101,10 @@ func (f *HandlerFactory) NewSpanLogHandler(batchSize int) *RealLineHandler { ) } -// NewEventHandler creates a RealLineHandler for the Event type +// NewEventHandler creates a RealBatchBuilder for the Event type // The Event handler always sets "ThrottleRequestsOnBackpressure" to true // And always uses a batch size of exactly 1. -func (f *HandlerFactory) NewEventHandler() *RealLineHandler { +func (f *SenderFactory) newEventHandler() *RealBatchBuilder { return NewLineHandler( f.metricsReporter, eventFormat, diff --git a/internal/interfaces.go b/internal/interfaces.go index 1b99c60..d62eaf2 100644 --- a/internal/interfaces.go +++ b/internal/interfaces.go @@ -24,7 +24,7 @@ type ConnectionHandler interface { Flusher } -type LineHandler interface { +type BatchBuilder interface { HandleLine(line string) error Start() Stop() diff --git a/internal/real_line_handler.go b/internal/real_batch_builder.go similarity index 80% rename from internal/real_line_handler.go rename to internal/real_batch_builder.go index 401ab06..7517e28 100644 --- a/internal/real_line_handler.go +++ b/internal/real_batch_builder.go @@ -22,7 +22,7 @@ const ( defaultThrottledSleepDuration = time.Second * 30 ) -type RealLineHandler struct { +type RealBatchBuilder struct { // keep these two fields as first element of struct // to guarantee 64-bit alignment on 32-bit machines. // atomic.* functions crash if operands are not 64-bit aligned. @@ -52,28 +52,28 @@ func (lh *RealLineHandler) Format() string { var errThrottled = errors.New("error: throttled event creation") -type LineHandlerOption func(*RealLineHandler) +type BatchAccumulatorOption func(*RealBatchBuilder) -func SetRegistry(registry sdkmetrics.Registry) LineHandlerOption { - return func(handler *RealLineHandler) { +func SetRegistry(registry sdkmetrics.Registry) BatchAccumulatorOption { + return func(handler *RealBatchBuilder) { handler.internalRegistry = registry } } -func SetHandlerPrefix(prefix string) LineHandlerOption { - return func(handler *RealLineHandler) { +func SetHandlerPrefix(prefix string) BatchAccumulatorOption { + return func(handler *RealBatchBuilder) { handler.prefix = prefix } } -func ThrottleRequestsOnBackpressure() LineHandlerOption { - return func(handler *RealLineHandler) { +func ThrottleRequestsOnBackpressure() BatchAccumulatorOption { + return func(handler *RealBatchBuilder) { handler.throttleOnBackpressure = true } } -func NewLineHandler(reporter Reporter, format string, flushInterval time.Duration, batchSize, maxBufferSize int, setters ...LineHandlerOption) *RealLineHandler { - lh := &RealLineHandler{ +func NewLineHandler(reporter Reporter, format string, flushInterval time.Duration, batchSize, maxBufferSize int, setters ...BatchAccumulatorOption) *RealBatchBuilder { + lh := &RealBatchBuilder{ Reporter: reporter, BatchSize: batchSize, MaxBufferSize: maxBufferSize, @@ -99,11 +99,11 @@ func NewLineHandler(reporter Reporter, format string, flushInterval time.Duratio return lh } -func (lh *RealLineHandler) Start() { +func (lh *RealBatchBuilder) Start() { lh.flusher.Start() } -func (lh *RealLineHandler) HandleLine(line string) error { +func (lh *RealBatchBuilder) HandleLine(line string) error { select { case lh.buffer <- line: return nil @@ -120,7 +120,7 @@ func minInt(x, y int) int { return y } -func (lh *RealLineHandler) flush() error { +func (lh *RealBatchBuilder) flush() error { lh.mtx.Lock() defer lh.mtx.Unlock() bufLen := len(lh.buffer) @@ -135,7 +135,7 @@ func (lh *RealLineHandler) flush() error { return nil } -func (lh *RealLineHandler) FlushWithThrottling() error { +func (lh *RealBatchBuilder) FlushWithThrottling() error { if time.Now().Before(lh.resumeAt) { log.Println("attempting to flush, but flushing is currently throttled by the server") log.Printf("sleeping until: %s\n", lh.resumeAt.Format(time.RFC3339)) @@ -144,7 +144,7 @@ func (lh *RealLineHandler) FlushWithThrottling() error { return lh.Flush() } -func (lh *RealLineHandler) Flush() error { +func (lh *RealBatchBuilder) Flush() error { flushErr := lh.flush() if flushErr == errThrottled && lh.throttleOnBackpressure { atomic.AddInt64(&lh.throttled, 1) @@ -154,7 +154,7 @@ func (lh *RealLineHandler) Flush() error { return flushErr } -func (lh *RealLineHandler) FlushAll() error { +func (lh *RealBatchBuilder) FlushAll() error { lh.mtx.Lock() defer lh.mtx.Unlock() bufLen := len(lh.buffer) @@ -178,7 +178,7 @@ func (lh *RealLineHandler) FlushAll() error { return nil } -func (lh *RealLineHandler) report(lines []string) error { +func (lh *RealBatchBuilder) report(lines []string) error { strLines := strings.Join(lines, "") resp, err := lh.Reporter.Report(lh.format, strLines) @@ -208,23 +208,23 @@ func shouldRetry(err error) bool { return true } -func (lh *RealLineHandler) bufferLines(batch []string) { +func (lh *RealBatchBuilder) bufferLines(batch []string) { log.Println("error reporting to Wavefront. buffering lines.") for _, line := range batch { _ = lh.HandleLine(line) } } -func (lh *RealLineHandler) GetFailureCount() int64 { +func (lh *RealBatchBuilder) GetFailureCount() int64 { return atomic.LoadInt64(&lh.failures) } // GetThrottledCount returns the number of Throttled errors received. -func (lh *RealLineHandler) GetThrottledCount() int64 { +func (lh *RealBatchBuilder) GetThrottledCount() int64 { return atomic.LoadInt64(&lh.throttled) } -func (lh *RealLineHandler) Stop() { +func (lh *RealBatchBuilder) Stop() { lh.flusher.Stop() if err := lh.FlushAll(); err != nil { log.Println(err) diff --git a/internal/real_line_handler_test.go b/internal/real_batch_builder_test.go similarity index 96% rename from internal/real_line_handler_test.go rename to internal/real_batch_builder_test.go index 3e5818c..7718275 100644 --- a/internal/real_line_handler_test.go +++ b/internal/real_batch_builder_test.go @@ -94,7 +94,7 @@ func TestHandleLine_OnAuthError_DoNotBuffer(t *testing.T) { } func TestFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) { - lh := &RealLineHandler{ + lh := &RealBatchBuilder{ Reporter: &fakeReporter{}, MaxBufferSize: 100, BatchSize: 10, @@ -117,7 +117,7 @@ func TestFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testin } func TestBackgroundFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval(t *testing.T) { - lh := &RealLineHandler{ + lh := &RealBatchBuilder{ Reporter: &fakeReporter{}, MaxBufferSize: 100, BatchSize: 10, @@ -142,7 +142,7 @@ func TestBackgroundFlushWithThrottling_WhenThrottling_DelayUntilThrottleInterval func TestFlushTicker_WhenThrottlingEnabled_AndReceives406Error_ThrottlesRequestsUntilNextSleepDuration(t *testing.T) { throttledSleepDuration := 250 * time.Millisecond briskTickTime := 50 * time.Millisecond - lh := &RealLineHandler{ + lh := &RealBatchBuilder{ Reporter: &fakeReporter{}, MaxBufferSize: 100, BatchSize: 10, @@ -194,7 +194,7 @@ func checkLength(buffer chan string, length int, msg string, t *testing.T) { } } -func addLines(lh *RealLineHandler, linesToAdd int, expectedLen int, t *testing.T) { +func addLines(lh *RealBatchBuilder, linesToAdd int, expectedLen int, t *testing.T) { for i := 0; i < linesToAdd; i++ { err := lh.HandleLine("dummyLine") if err != nil { @@ -214,8 +214,8 @@ func makeBuffer(num int) []string { return buf } -func makeLineHandler(bufSize, batchSize int) *RealLineHandler { - return &RealLineHandler{ +func makeLineHandler(bufSize, batchSize int) *RealBatchBuilder { + return &RealBatchBuilder{ Reporter: &fakeReporter{}, MaxBufferSize: bufSize, BatchSize: batchSize, diff --git a/internal/reporter.go b/internal/reporter.go index e297099..bfc4938 100644 --- a/internal/reporter.go +++ b/internal/reporter.go @@ -10,8 +10,8 @@ import ( "github.com/wavefronthq/wavefront-sdk-go/internal/auth" ) -// The implementation of a Reporter that reports points directly to a Wavefront server. -type reporter struct { +// batchingHTTPReporter is a Reporter that reports points in batches via HTTP(S) +type batchingHTTPReporter struct { serverURL string tokenService auth.Service client *http.Client @@ -19,7 +19,7 @@ type reporter struct { // NewReporter creates a metrics Reporter func NewReporter(server string, tokenService auth.Service, client *http.Client) Reporter { - return &reporter{ + return &batchingHTTPReporter{ serverURL: server, tokenService: tokenService, client: client, @@ -27,7 +27,7 @@ func NewReporter(server string, tokenService auth.Service, client *http.Client) } // Report creates and sends a POST to the reportEndpoint with the given pointLines -func (reporter reporter) Report(format string, pointLines string) (*http.Response, error) { +func (reporter batchingHTTPReporter) Report(format string, pointLines string) (*http.Response, error) { if format == "" || pointLines == "" { return nil, formatError } @@ -63,7 +63,7 @@ func linesToGzippedBytes(pointLines string) ([]byte, error) { return buf.Bytes(), err } -func (reporter reporter) buildRequest(format string, body []byte) (*http.Request, error) { +func (reporter batchingHTTPReporter) buildRequest(format string, body []byte) (*http.Request, error) { apiURL := reporter.serverURL + reportEndpoint req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(body)) if err != nil { @@ -84,7 +84,7 @@ func (reporter reporter) buildRequest(format string, body []byte) (*http.Request return req, nil } -func (reporter reporter) reportEvent(event string) (*http.Response, error) { +func (reporter batchingHTTPReporter) reportEvent(event string) (*http.Response, error) { if event == "" { return nil, formatError } @@ -106,7 +106,7 @@ func (reporter reporter) reportEvent(event string) (*http.Response, error) { return reporter.execute(req) } -func (reporter reporter) execute(req *http.Request) (*http.Response, error) { +func (reporter batchingHTTPReporter) execute(req *http.Request) (*http.Response, error) { resp, err := reporter.client.Do(req) if err != nil { return nil, err @@ -116,10 +116,10 @@ func (reporter reporter) execute(req *http.Request) (*http.Response, error) { return resp, nil } -func (reporter reporter) Close() { +func (reporter batchingHTTPReporter) Close() { reporter.tokenService.Close() } -func (reporter reporter) IsDirect() bool { +func (reporter batchingHTTPReporter) IsDirect() bool { return reporter.tokenService.IsDirect() } diff --git a/internal/reporter_test.go b/internal/reporter_test.go index 5e73a1b..12eb3c9 100644 --- a/internal/reporter_test.go +++ b/internal/reporter_test.go @@ -10,7 +10,7 @@ import ( ) func TestReporter_BuildRequest(t *testing.T) { - r := NewReporter("http://localhost:8010/wavefront", auth.NewNoopTokenService(), &http.Client{}).(*reporter) + r := NewReporter("http://localhost:8010/wavefront", auth.NewNoopTokenService(), &http.Client{}).(*batchingHTTPReporter) request, err := r.buildRequest("wavefront", nil) require.NoError(t, err) assert.Equal(t, "http://localhost:8010/wavefront/report?f=wavefront", request.URL.String()) diff --git a/internal/typed_sender.go b/internal/typed_sender.go new file mode 100644 index 0000000..16fcd9d --- /dev/null +++ b/internal/typed_sender.go @@ -0,0 +1,53 @@ +package internal + +import "github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics" + +type TypedSender interface { + TrySend(string, error) error + Start() + Stop() + Flush() error + GetFailureCount() int64 +} + +type typedSender struct { + tracker sdkmetrics.SuccessTracker + lineHandler BatchBuilder +} + +func (ts *typedSender) Start() { + ts.lineHandler.Start() +} + +func (ts *typedSender) Stop() { + ts.lineHandler.Stop() +} + +func (ts *typedSender) Flush() error { + return ts.lineHandler.Flush() +} + +func (ts *typedSender) GetFailureCount() int64 { + return ts.lineHandler.GetFailureCount() +} + +func (ts *typedSender) TrySend(line string, err error) error { + if err != nil { + ts.tracker.IncInvalid() + return err + } + + ts.tracker.IncValid() + err = ts.lineHandler.HandleLine(line) + if err != nil { + ts.tracker.IncDropped() + } + return err +} + +func NewTypedSender(tracker sdkmetrics.SuccessTracker, handler BatchBuilder) TypedSender { + return &typedSender{ + tracker: tracker, + lineHandler: handler, + } +} diff --git a/senders/new_sender.go b/senders/new_sender.go index 406f83a..d681d78 100644 --- a/senders/new_sender.go +++ b/senders/new_sender.go @@ -29,7 +29,7 @@ func NewSender(wfURL string, setters ...Option) (Sender, error) { sender.internalRegistry = sdkmetrics.NewNoOpRegistry() } - hf := internal.NewHandlerFactory( + hf := internal.NewSenderFactory( metricsReporter, tracesReporter, cfg.FlushInterval, @@ -37,11 +37,11 @@ func NewSender(wfURL string, setters ...Option) (Sender, error) { sender.internalRegistry, ) - sender.pointHandler = hf.NewPointHandler(cfg.BatchSize) - sender.histoHandler = hf.NewHistogramHandler(cfg.BatchSize) - sender.spanHandler = hf.NewSpanHandler(cfg.BatchSize) - sender.spanLogHandler = hf.NewSpanLogHandler(cfg.BatchSize) - sender.eventHandler = hf.NewEventHandler() + sender.pointSender = hf.NewPointSender(cfg.BatchSize) + sender.histoSender = hf.NewHistogramSender(cfg.BatchSize) + sender.spanSender = hf.NewSpanSender(cfg.BatchSize) + sender.spanLogSender = hf.NewSpanLogSender(cfg.BatchSize) + sender.eventSender = hf.NewEventsSender() sender.Start() return sender, nil } diff --git a/senders/real_sender.go b/senders/real_sender.go index 45ee48c..c7e714b 100644 --- a/senders/real_sender.go +++ b/senders/real_sender.go @@ -8,6 +8,7 @@ import ( "github.com/wavefronthq/wavefront-sdk-go/event" "github.com/wavefronthq/wavefront-sdk-go/histogram" "github.com/wavefronthq/wavefront-sdk-go/internal" + "github.com/wavefronthq/wavefront-sdk-go/internal/delta" eventInternal "github.com/wavefronthq/wavefront-sdk-go/internal/event" histogramInternal "github.com/wavefronthq/wavefront-sdk-go/internal/histogram" "github.com/wavefronthq/wavefront-sdk-go/internal/metric" @@ -29,47 +30,34 @@ type Sender interface { type realSender struct { defaultSource string - pointHandler internal.LineHandler - histoHandler internal.LineHandler - spanHandler internal.LineHandler - spanLogHandler internal.LineHandler - eventHandler internal.LineHandler + pointSender internal.TypedSender + histoSender internal.TypedSender + spanSender internal.TypedSender + spanLogSender internal.TypedSender + eventSender internal.TypedSender internalRegistry sdkmetrics.Registry proxy bool } func (sender *realSender) Start() { - sender.pointHandler.Start() - sender.histoHandler.Start() - sender.spanHandler.Start() - sender.spanLogHandler.Start() sender.internalRegistry.Start() - sender.eventHandler.Start() + sender.pointSender.Start() + sender.histoSender.Start() + sender.spanSender.Start() + sender.spanLogSender.Start() + sender.eventSender.Start() } func (sender *realSender) private() { } func (sender *realSender) SendMetric(name string, value float64, ts int64, source string, tags map[string]string) error { - line, err := metric.Line(name, value, ts, source, tags, sender.defaultSource) - return trySendWith( - line, - err, - sender.pointHandler, - sender.internalRegistry.PointsTracker(), - ) + return sender.pointSender.TrySend(metric.Line(name, value, ts, source, tags, sender.defaultSource)) } func (sender *realSender) SendDeltaCounter(name string, value float64, source string, tags map[string]string) error { - if name == "" { - sender.internalRegistry.PointsTracker().IncInvalid() - return fmt.Errorf("empty metric name") - } - if !internal.HasDeltaPrefix(name) { - name = internal.DeltaCounterName(name) - } if value > 0 { - return sender.SendMetric(name, value, 0, source, tags) + return sender.pointSender.TrySend(delta.Line(name, value, source, tags, sender.defaultSource)) } return nil } @@ -82,27 +70,7 @@ func (sender *realSender) SendDistribution( source string, tags map[string]string, ) error { - line, err := histogramInternal.Line(name, centroids, hgs, ts, source, tags, sender.defaultSource) - return trySendWith( - line, - err, - sender.histoHandler, - sender.internalRegistry.HistogramsTracker(), - ) -} - -func trySendWith(line string, err error, handler internal.LineHandler, tracker sdkmetrics.SuccessTracker) error { - if err != nil { - tracker.IncInvalid() - return err - } - - tracker.IncValid() - err = handler.HandleLine(line) - if err != nil { - tracker.IncDropped() - } - return err + return sender.histoSender.TrySend(histogramInternal.Line(name, centroids, hgs, ts, source, tags, sender.defaultSource)) } func (sender *realSender) SendSpan( @@ -113,8 +81,6 @@ func (sender *realSender) SendSpan( tags []SpanTag, spanLogs []SpanLog, ) error { - - logs := makeSpanLogs(spanLogs) line, err := span.Line( name, startMillis, @@ -125,25 +91,17 @@ func (sender *realSender) SendSpan( parents, followsFrom, makeSpanTags(tags), - logs, + makeSpanLogs(spanLogs), sender.defaultSource, ) - err = trySendWith( - line, - err, - sender.spanHandler, - sender.internalRegistry.SpansTracker()) + + err = sender.spanSender.TrySend(line, err) if err != nil { return err } if len(spanLogs) > 0 { - logJSON, logJSONErr := span.LogJSON(traceID, spanID, logs, line) - return trySendWith( - logJSON, - logJSONErr, - sender.spanLogHandler, - sender.internalRegistry.SpanLogsTracker()) + return sender.spanLogSender.TrySend(span.LogJSON(traceID, spanID, makeSpanLogs(spanLogs), line)) } return nil } @@ -171,50 +129,40 @@ func (sender *realSender) SendEvent( tags map[string]string, setters ...event.Option, ) error { - var line string - var err error if sender.proxy { - line, err = eventInternal.Line(name, startMillis, endMillis, source, tags, setters...) - } else { - line, err = eventInternal.LineJSON(name, startMillis, endMillis, source, tags, setters...) + return sender.eventSender.TrySend(eventInternal.Line(name, startMillis, endMillis, source, tags, setters...)) } - - return trySendWith( - line, - err, - sender.eventHandler, - sender.internalRegistry.EventsTracker(), - ) + return sender.eventSender.TrySend(eventInternal.LineJSON(name, startMillis, endMillis, source, tags, setters...)) } func (sender *realSender) Close() { - sender.pointHandler.Stop() - sender.histoHandler.Stop() - sender.spanHandler.Stop() - sender.spanLogHandler.Stop() + sender.pointSender.Stop() + sender.histoSender.Stop() + sender.spanSender.Stop() + sender.spanLogSender.Stop() sender.internalRegistry.Stop() - sender.eventHandler.Stop() + sender.eventSender.Stop() } func (sender *realSender) Flush() error { errStr := "" - err := sender.pointHandler.Flush() + err := sender.pointSender.Flush() if err != nil { errStr = errStr + err.Error() + "\n" } - err = sender.histoHandler.Flush() + err = sender.histoSender.Flush() if err != nil { errStr = errStr + err.Error() + "\n" } - err = sender.spanHandler.Flush() + err = sender.spanSender.Flush() if err != nil { errStr = errStr + err.Error() } - err = sender.spanLogHandler.Flush() + err = sender.spanLogSender.Flush() if err != nil { errStr = errStr + err.Error() } - err = sender.eventHandler.Flush() + err = sender.eventSender.Flush() if err != nil { errStr = errStr + err.Error() } @@ -225,11 +173,11 @@ func (sender *realSender) Flush() error { } func (sender *realSender) GetFailureCount() int64 { - return sender.pointHandler.GetFailureCount() + - sender.histoHandler.GetFailureCount() + - sender.spanHandler.GetFailureCount() + - sender.spanLogHandler.GetFailureCount() + - sender.eventHandler.GetFailureCount() + return sender.pointSender.GetFailureCount() + + sender.histoSender.GetFailureCount() + + sender.spanSender.GetFailureCount() + + sender.spanLogSender.GetFailureCount() + + sender.eventSender.GetFailureCount() } func (sender *realSender) realInternalRegistry(cfg *configuration) sdkmetrics.Registry { diff --git a/senders/wavefront_sender_test.go b/senders/wavefront_sender_test.go index 56e2051..f4f34d0 100644 --- a/senders/wavefront_sender_test.go +++ b/senders/wavefront_sender_test.go @@ -6,6 +6,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/wavefronthq/wavefront-sdk-go/histogram" + "github.com/wavefronthq/wavefront-sdk-go/internal" "github.com/wavefronthq/wavefront-sdk-go/internal/sdkmetrics" ) @@ -16,16 +17,7 @@ func TestWavefrontSender_SendMetric(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := realSender{ - defaultSource: "test", - pointHandler: pointHandler, - histoHandler: histoHandler, - spanHandler: spanHandler, - spanLogHandler: spanLogHandler, - eventHandler: eventHandler, - internalRegistry: registry, - proxy: false, - } + sender := newSenderWithMocks(registry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler) assert.NoError(t, sender.SendMetric("foo", 20, 0, "test", nil)) assert.Equal(t, 1, registry.PointsTracker().(*simpleTracker).valid) @@ -43,6 +35,20 @@ func TestWavefrontSender_SendMetric(t *testing.T) { assert.Equal(t, "\"foo\" 21 source=\"test\"\n", pointHandler.Lines[0]) } +func newSenderWithMocks(registry *mockRegistry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler *mockHandler) *realSender { + sender := realSender{ + defaultSource: "test", + pointSender: internal.NewTypedSender(registry.PointsTracker(), pointHandler), + histoSender: internal.NewTypedSender(registry.HistogramsTracker(), histoHandler), + spanSender: internal.NewTypedSender(registry.SpansTracker(), spanHandler), + spanLogSender: internal.NewTypedSender(registry.SpanLogsTracker(), spanLogHandler), + eventSender: internal.NewTypedSender(registry.EventsTracker(), eventHandler), + internalRegistry: registry, + proxy: false, + } + return &sender +} + func TestWavefrontSender_SendDeltaCounter(t *testing.T) { registry := &mockRegistry{} pointHandler := &mockHandler{} @@ -50,16 +56,7 @@ func TestWavefrontSender_SendDeltaCounter(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := realSender{ - defaultSource: "test", - pointHandler: pointHandler, - histoHandler: histoHandler, - spanHandler: spanHandler, - spanLogHandler: spanLogHandler, - eventHandler: eventHandler, - internalRegistry: registry, - proxy: false, - } + sender := newSenderWithMocks(registry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler) assert.NoError(t, sender.SendDeltaCounter("foo", 20.0, "test", nil)) assert.Equal(t, 1, registry.PointsTracker().(*simpleTracker).valid) @@ -89,16 +86,7 @@ func TestWavefrontSender_SendDistribution(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := realSender{ - defaultSource: "test", - pointHandler: pointHandler, - histoHandler: histoHandler, - spanHandler: spanHandler, - spanLogHandler: spanLogHandler, - eventHandler: eventHandler, - internalRegistry: registry, - proxy: false, - } + sender := newSenderWithMocks(registry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler) centroids := []histogram.Centroid{{Value: 0, Count: 0}, {Value: 200, Count: 300}} granularities := map[histogram.Granularity]bool{ @@ -133,16 +121,7 @@ func TestWavefrontSender_SendSpan(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := realSender{ - defaultSource: "test", - pointHandler: pointHandler, - histoHandler: histoHandler, - spanHandler: spanHandler, - spanLogHandler: spanLogHandler, - eventHandler: eventHandler, - internalRegistry: registry, - proxy: false, - } + sender := newSenderWithMocks(registry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler) traceID := "28e09666-9610-4690-a908-5298d95551ad" spanID := "28b0ad93-58f5-4efe-a68b-7b7a84c8ace8" @@ -199,16 +178,7 @@ func TestWavefrontSender_SendSpan_SpanLogs(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := realSender{ - defaultSource: "test", - pointHandler: pointHandler, - histoHandler: histoHandler, - spanHandler: spanHandler, - spanLogHandler: spanLogHandler, - eventHandler: eventHandler, - internalRegistry: registry, - proxy: false, - } + sender := newSenderWithMocks(registry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler) traceID := "28e09666-9610-4690-a908-5298d95551ad" spanID := "28b0ad93-58f5-4efe-a68b-7b7a84c8ace8" @@ -264,16 +234,7 @@ func TestWavefrontSender_SendEventWithProxyFalse(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := realSender{ - defaultSource: "test", - pointHandler: pointHandler, - histoHandler: histoHandler, - spanHandler: spanHandler, - spanLogHandler: spanLogHandler, - eventHandler: eventHandler, - internalRegistry: registry, - proxy: false, - } + sender := newSenderWithMocks(registry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler) assert.NoError(t, sender.SendEvent( "foo", 200, 400, "test", nil)) @@ -294,16 +255,8 @@ func TestWavefrontSender_SendEventWithProxyTrue(t *testing.T) { spanHandler := &mockHandler{} spanLogHandler := &mockHandler{} eventHandler := &mockHandler{} - sender := realSender{ - defaultSource: "test", - pointHandler: pointHandler, - histoHandler: histoHandler, - spanHandler: spanHandler, - spanLogHandler: spanLogHandler, - eventHandler: eventHandler, - internalRegistry: registry, - proxy: true, - } + sender := newSenderWithMocks(registry, pointHandler, histoHandler, spanHandler, spanLogHandler, eventHandler) + sender.proxy = true assert.NoError(t, sender.SendEvent( "foo", 200, 400, "test", nil))