Skip to content

Commit

Permalink
[chore] refactore carbonexporter to handle errors, use generic interf…
Browse files Browse the repository at this point in the history
…aces (open-telemetry#29885)

Signed-off-by: Bogdan Drutu <[email protected]>
  • Loading branch information
bogdandrutu authored Dec 15, 2023
1 parent a57919a commit 3dd7ec4
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 24 deletions.
50 changes: 27 additions & 23 deletions exporter/carbonexporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,23 @@ package carbonexporter // import "github.com/open-telemetry/opentelemetry-collec

import (
"context"
"io"
"net"
"sync"
"time"

"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pdata/pmetric"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/resourcetotelemetry"
)

// newCarbonExporter returns a new Carbon exporter.
func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metrics, error) {
sender := carbonSender{
connPool: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
writer: newTCPConnPool(cfg.Endpoint, cfg.Timeout),
}

exp, err := exporterhelper.NewMetricsExporter(
Expand All @@ -42,13 +44,13 @@ func newCarbonExporter(cfg *Config, set exporter.CreateSettings) (exporter.Metri
// connections into an implementations of exporterhelper.PushMetricsData so
// the exporter can leverage the helper and get consistent observability.
type carbonSender struct {
connPool *connPool
writer io.WriteCloser
}

func (cs *carbonSender) pushMetricsData(_ context.Context, md pmetric.Metrics) error {
lines := metricDataToPlaintext(md)

if _, err := cs.connPool.Write([]byte(lines)); err != nil {
if _, err := cs.writer.Write([]byte(lines)); err != nil {
// Use the sum of converted and dropped since the write failed for all.
return err
}
Expand All @@ -57,8 +59,7 @@ func (cs *carbonSender) pushMetricsData(_ context.Context, md pmetric.Metrics) e
}

func (cs *carbonSender) Shutdown(context.Context) error {
cs.connPool.Close()
return nil
return cs.writer.Close()
}

// connPool is a very simple implementation of a pool of net.TCPConn instances.
Expand All @@ -72,36 +73,25 @@ func (cs *carbonSender) Shutdown(context.Context) error {
// unused connections as that was the case on the prior art mentioned above.
type connPool struct {
mtx sync.Mutex
conns []*net.TCPConn
conns []net.Conn
endpoint string
timeout time.Duration
}

func newTCPConnPool(
endpoint string,
timeout time.Duration,
) *connPool {
) io.WriteCloser {
return &connPool{
endpoint: endpoint,
timeout: timeout,
}
}

func (cp *connPool) Write(bytes []byte) (int, error) {
var conn *net.TCPConn
var conn net.Conn
var err error

// The deferred function below is what puts back connections on the pool.
defer func() {
if err == nil {
cp.mtx.Lock()
cp.conns = append(cp.conns, conn)
cp.mtx.Unlock()
} else if conn != nil {
conn.Close()
}
}()

start := time.Now()
cp.mtx.Lock()
lastIdx := len(cp.conns) - 1
Expand All @@ -116,6 +106,18 @@ func (cp *connPool) Write(bytes []byte) (int, error) {
}
}

// The deferred function below is what puts back connections on the pool if no error.
defer func() {
if err != nil {
// err already not nil, so will not influence retry logic because of the connection close.
err = multierr.Append(err, conn.Close())
return
}
cp.mtx.Lock()
cp.conns = append(cp.conns, conn)
cp.mtx.Unlock()
}()

// There is no way to do a call equivalent to recvfrom with an empty buffer
// to check if the connection was terminated (if the size of the buffer is
// 0 the Read call doesn't call lower level). So due to buffer sizes it is
Expand Down Expand Up @@ -144,20 +146,22 @@ func (cp *connPool) Write(bytes []byte) (int, error) {
return n, err
}

func (cp *connPool) Close() {
func (cp *connPool) Close() error {
cp.mtx.Lock()
defer cp.mtx.Unlock()

var errs error
for _, conn := range cp.conns {
conn.Close()
errs = multierr.Append(errs, conn.Close())
}
cp.conns = nil
return errs
}

func (cp *connPool) createTCPConn() (*net.TCPConn, error) {
func (cp *connPool) createTCPConn() (net.Conn, error) {
c, err := net.DialTimeout("tcp", cp.endpoint, cp.timeout)
if err != nil {
return nil, err
}
return c.(*net.TCPConn), err
return c, err
}
2 changes: 1 addition & 1 deletion exporter/carbonexporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
go.opentelemetry.io/collector/exporter v0.91.0
go.opentelemetry.io/collector/pdata v1.0.0
go.opentelemetry.io/collector/semconv v0.91.0
go.uber.org/multierr v1.11.0
)

require (
Expand Down Expand Up @@ -40,7 +41,6 @@ require (
go.opentelemetry.io/otel v1.21.0 // indirect
go.opentelemetry.io/otel/metric v1.21.0 // indirect
go.opentelemetry.io/otel/trace v1.21.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
go.uber.org/zap v1.26.0 // indirect
golang.org/x/net v0.18.0 // indirect
golang.org/x/sys v0.15.0 // indirect
Expand Down

0 comments on commit 3dd7ec4

Please sign in to comment.