Skip to content

Commit

Permalink
Break wal into its own struct for easier fieldalignment and add backo…
Browse files Browse the repository at this point in the history
…ff retry
  • Loading branch information
damemi committed Jun 12, 2023
1 parent 8a04d55 commit 370ec79
Show file tree
Hide file tree
Showing 3 changed files with 115 additions and 79 deletions.
2 changes: 2 additions & 0 deletions exporter/collector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ type MetricConfig struct {
type WALConfig struct {
// Directory is the location to store WAL files.
Directory string `mapstructure:"directory"`
// MaxBackoff sets the length of time to exponentially re-try failed exports.
MaxBackoff time.Duration `mapstructure:"max_backoff"`
// Enabled turns the WAL on or off. When false, the exporter will not use
// the WAL and will instead attempt to send data directly to Google Cloud.
Enabled bool `mapstructure:"enabled"`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package testcases
import (
"os"
"strings"
"time"

"go.opentelemetry.io/collector/pdata/pmetric"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -238,6 +239,7 @@ var MetricsTestCases = []TestCase{
ConfigureCollector: func(cfg *collector.Config) {
cfg.MetricConfig.WALConfig.Enabled = true
cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-")
cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second)
},
SkipForSDK: true,
},
Expand All @@ -249,6 +251,7 @@ var MetricsTestCases = []TestCase{
ConfigureCollector: func(cfg *collector.Config) {
cfg.MetricConfig.WALConfig.Enabled = true
cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-")
cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second)
},
SkipForSDK: true,
},
Expand All @@ -260,6 +263,7 @@ var MetricsTestCases = []TestCase{
cfg.ProjectID = "unavailableproject"
cfg.MetricConfig.WALConfig.Enabled = true
cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-")
cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second)
},
SkipForSDK: true,
ExpectRetries: true,
Expand All @@ -272,6 +276,7 @@ var MetricsTestCases = []TestCase{
cfg.ProjectID = "deadline_exceededproject"
cfg.MetricConfig.WALConfig.Enabled = true
cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-")
cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second)
},
SkipForSDK: true,
ExpectRetries: true,
Expand Down
187 changes: 108 additions & 79 deletions exporter/collector/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,30 +72,34 @@ type selfObservability struct {
// MetricsExporter is the GCM exporter that uses pdata directly.
type MetricsExporter struct {
// write ahead log handles exporter retries in-order to handle network outages
wal *wal.Log
client *monitoring.MetricClient
obs selfObservability
wal *exporterWAL
obs selfObservability
// shutdownC is a channel for signaling a graceful shutdown
shutdownC chan struct{}
// mdCache tracks the metric descriptors that have already been sent to GCM
mdCache map[string]*monitoringpb.CreateMetricDescriptorRequest
// A channel that receives metric descriptor and sends them to GCM once
metricDescriptorC chan *monitoringpb.CreateMetricDescriptorRequest
// the "write" index for WAL
wWALIndex *atomic.Uint64
// the "read" index for WAL
rWALIndex *atomic.Uint64
// the full path of the WAL (user-configured directory + "gcp_metrics_wal")
walPath string
client *monitoring.MetricClient
// requestOpts applies options to the context for requests, such as additional headers.
requestOpts []func(*context.Context, requestInfo)
mapper metricMapper
cfg Config
// goroutines tracks the currently running child tasks
goroutines sync.WaitGroup
timeout time.Duration
}

walMutex sync.Mutex
type exporterWAL struct {
*wal.Log
// the "write" index for WAL
wIndex *atomic.Uint64
// the "read" index for WAL
rIndex *atomic.Uint64
// the full path of the WAL (user-configured directory + "gcp_metrics_wal")
path string
maxBackoff time.Duration
mutex sync.Mutex
}

// requestInfo is meant to abstract info from CreateMetricsDescriptorRequests and
Expand Down Expand Up @@ -205,8 +209,6 @@ func NewGoogleCloudMetricsExporter(
mdCache: make(map[string]*monitoringpb.CreateMetricDescriptorRequest),
shutdownC: shutdown,
timeout: timeout,
rWALIndex: &atomic.Uint64{},
wWALIndex: &atomic.Uint64{},
}

mExp.requestOpts = make([]func(*context.Context, requestInfo), 0)
Expand All @@ -217,13 +219,14 @@ func NewGoogleCloudMetricsExporter(
}

if cfg.MetricConfig.WALConfig.Enabled {
mExp.wal = &exporterWAL{}
err = mExp.setupWAL()
if err != nil {
return nil, err
}
// start WAL popper routine
mExp.goroutines.Add(1)
go mExp.walRunner(ctx)
go mExp.runWALReadAndExportLoop(ctx)
}

// Fire up the metric descriptor exporter.
Expand All @@ -233,48 +236,61 @@ func NewGoogleCloudMetricsExporter(
return mExp, nil
}

// setupWAL creates the WAL.
// This function is also used to re-sync after writes, so it closes the existing WAL if present.
func (me *MetricsExporter) setupWAL() error {
err := me.closeWAL()
if err != nil {
return err
}

walPath := filepath.Join(me.cfg.MetricConfig.WALConfig.Directory, "gcp_metrics_wal")
me.walPath = walPath
me.wal.path = walPath
metricWal, err := wal.Open(walPath, &wal.Options{LogFormat: 1})
if err != nil {
return err
}
me.wal = metricWal
me.wal.Log = metricWal

me.wal.rIndex = &atomic.Uint64{}
me.wal.wIndex = &atomic.Uint64{}

// default to 1 hour exponential backoff
me.wal.maxBackoff = time.Duration(3600 * time.Second)
if me.cfg.MetricConfig.WALConfig.MaxBackoff != 0 {
me.wal.maxBackoff = me.cfg.MetricConfig.WALConfig.MaxBackoff
}

// sync existing WAL indices
rIndex, err := me.wal.FirstIndex()
if err != nil {
return err
}
me.rWALIndex.Store(rIndex)
me.wal.rIndex.Store(rIndex)

wIndex, err := me.wal.LastIndex()
if err != nil {
return err
}
me.wWALIndex.Store(wIndex)
me.wal.wIndex.Store(wIndex)
return nil
}

func (me *MetricsExporter) closeWAL() error {
if me.wal != nil {
err := me.wal.Close()
me.wal = nil
if me.wal != nil && me.wal.Log != nil {
err := me.wal.Log.Close()
me.wal.Log = nil
return err
}
return nil
}

// PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary.
func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) error {
me.walMutex.Lock()
defer me.walMutex.Unlock()
if me.wal != nil {
me.wal.mutex.Lock()
defer me.wal.mutex.Unlock()
}
// map from project -> []timeseries. This groups timeseries by the project
// they need to be sent to. Each project's timeseries are sent in a
// separate request later.
Expand Down Expand Up @@ -359,7 +375,7 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) e
errs = append(errs, fmt.Errorf("failed to marshal protobuf to bytes: %+v", err))
continue
}
err = me.wal.Write(me.wWALIndex.Add(1), bytes)
err = me.wal.Write(me.wal.wIndex.Add(1), bytes)
if err != nil {
errs = append(errs, fmt.Errorf("failed to write to WAL: %+v", err))
continue
Expand All @@ -379,6 +395,7 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) e
return nil
}

// export sends a CreateTimeSeriesRequest to GCM and reports failed/successful points based on the response.
func (me *MetricsExporter) export(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) error {
// if this is an empty request, skip it
// empty requests are used by the WAL to signal the end of pending data
Expand Down Expand Up @@ -417,33 +434,83 @@ func (me *MetricsExporter) export(ctx context.Context, req *monitoringpb.CreateT
return err
}

// walLoop continuously reads from the WAL and exports from it.
func (me *MetricsExporter) walLoop(ctx context.Context) error {
for {
select {
case <-me.shutdownC:
return nil
case <-ctx.Done():
return nil
default:
}

err := me.readWALAndExport(ctx)
if err == nil {
continue
}
// ErrNotFound from wal.Read() means the index is either 0 or out of
// bounds (indicating we're probably at the end of the WAL). That error
// will trigger a file watch for new writes (below this). For other
// errors, fail.
if !errors.Is(err, wal.ErrNotFound) {
return err
}

// Must have been ErrNotFound, start a file watch and block waiting for updates.
if err := me.watchWALFile(ctx); err != nil {
return err
}

// close and re-open the WAL to sync read and write indices
if err := me.setupWAL(); err != nil {
return err
}
}
}

// readWALAndExport pops the next CreateTimeSeriesRequest from the WAL and tries exporting it.
// If the export is successful (or fails for a non-retryable error), the read index is incremented
// so the next entry in the WAL can be read by a subsequent call to readWALAndExport().
// If the export fails for a (retryable) network error, it will keep trying to export the same entry
// until success or the backoff max is reached.
func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
me.walMutex.Lock()
defer me.walMutex.Unlock()
me.wal.mutex.Lock()
defer me.wal.mutex.Unlock()

bytes, err := me.wal.Read(me.rWALIndex.Load())
bytes, err := me.wal.Read(me.wal.rIndex.Load())
if err == nil {
req := new(monitoringpb.CreateTimeSeriesRequest)
if err = proto.Unmarshal(bytes, req); err != nil {
return err
}

err = me.export(ctx, req)
if err != nil {
me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err))
}
// retry at same read index if retryable (network) error
s := status.Convert(err)
if s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable {
// on network failures, retry exponentially a max of 11 times (2^12s > 48 hours, older than allowed by GCM)
// or until user-configured max backoff is hit.
backoff := 0
for i := 0; i < 12; i++ {
err = me.export(ctx, req)
if err != nil {
me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err))
}
// retry at same read index if retryable (network) error
s := status.Convert(err)
if !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable) {
break
}
me.obs.log.Error("retryable error, retrying request")
return nil
backoff = 1 << i
if time.Duration(backoff*int(time.Second)) >= me.wal.maxBackoff {
break
}
time.Sleep(time.Duration(1<<i) * time.Second)
}

lastIndex, indexErr := me.wal.LastIndex()
if indexErr != nil {
return indexErr
}
if me.rWALIndex.Load() == lastIndex && len(req.String()) > 0 {
if me.wal.rIndex.Load() == lastIndex && len(req.String()) > 0 {
// This indicates that we are trying to truncate the last item in the WAL.
// If that is the case, write an empty request so we can truncate the last real request
// (the WAL library requires at least 1 entry).
Expand All @@ -453,14 +520,14 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
if bytesErr != nil {
return bytesErr
}
err = me.wal.Write(me.wWALIndex.Add(1), bytes)
err = me.wal.Write(me.wal.wIndex.Add(1), bytes)
if err != nil {
return err
}
}
// move read index forward if non retryable error (or exported successfully)
me.rWALIndex.Add(1)
err = me.wal.TruncateFront(me.rWALIndex.Load())
me.wal.rIndex.Add(1)
err = me.wal.TruncateFront(me.wal.rIndex.Load())
if err != nil {
return err
}
Expand All @@ -469,54 +536,16 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error {
return err
}

// readWALAndExport continuously pops CreateTimeSeriesRequest protos from the WAL and
// tries exporting them. If an export fails due to a network error, it will
// continually retry the same request until success. Other non-retryable errors
// drop metrics.
func (me *MetricsExporter) walLoop(ctx context.Context) error {
for {
select {
case <-me.shutdownC:
return nil
case <-ctx.Done():
return nil
default:
}

err := me.readWALAndExport(ctx)
if err == nil {
continue
}
// ErrNotFound from wal.Read() means the index is either 0 or out of
// bounds (indicating we're probably at the end of the WAL). That error
// will trigger a file watch for new writes (below this). For other
// errors, fail.
if !errors.Is(err, wal.ErrNotFound) {
return err
}

// Must have been ErrNotFound, start a file watch and block waiting for updates.
if err := me.watchWAL(ctx); err != nil {
return err
}

// close and re-open the WAL to sync read and write indices
if err := me.setupWAL(); err != nil {
return err
}
}
}

// watchWAL watches the WAL directory for a write then returns to the
// continuallyPopWAL() loop.
func (me *MetricsExporter) watchWAL(ctx context.Context) error {
func (me *MetricsExporter) watchWALFile(ctx context.Context) error {
me.goroutines.Add(1)
defer me.goroutines.Done()
walWatcher, err := fsnotify.NewWatcher()
if err != nil {
return err
}
err = walWatcher.Add(me.walPath)
err = walWatcher.Add(me.wal.path)
if err != nil {
return err
}
Expand Down Expand Up @@ -560,7 +589,7 @@ func (me *MetricsExporter) watchWAL(ctx context.Context) error {
return err
}

func (me *MetricsExporter) walRunner(ctx context.Context) {
func (me *MetricsExporter) runWALReadAndExportLoop(ctx context.Context) {
defer me.goroutines.Done()
runCtx, cancel := context.WithCancel(ctx)
defer cancel()
Expand Down

0 comments on commit 370ec79

Please sign in to comment.