diff --git a/exporter/collector/config.go b/exporter/collector/config.go index c248a3201..8ad2fcc9f 100644 --- a/exporter/collector/config.go +++ b/exporter/collector/config.go @@ -149,6 +149,8 @@ type MetricConfig struct { type WALConfig struct { // Directory is the location to store WAL files. Directory string `mapstructure:"directory"` + // MaxBackoff sets the length of time to exponentially re-try failed exports. + MaxBackoff time.Duration `mapstructure:"max_backoff"` // Enabled turns the WAL on or off. When false, the exporter will not use // the WAL and will instead attempt to send data directly to Google Cloud. Enabled bool `mapstructure:"enabled"` diff --git a/exporter/collector/integrationtest/testcases/testcases_metrics.go b/exporter/collector/integrationtest/testcases/testcases_metrics.go index 1612720c3..1a24ff1c6 100644 --- a/exporter/collector/integrationtest/testcases/testcases_metrics.go +++ b/exporter/collector/integrationtest/testcases/testcases_metrics.go @@ -17,6 +17,7 @@ package testcases import ( "os" "strings" + "time" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/otel/attribute" @@ -238,6 +239,7 @@ var MetricsTestCases = []TestCase{ ConfigureCollector: func(cfg *collector.Config) { cfg.MetricConfig.WALConfig.Enabled = true cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-") + cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second) }, SkipForSDK: true, }, @@ -249,6 +251,7 @@ var MetricsTestCases = []TestCase{ ConfigureCollector: func(cfg *collector.Config) { cfg.MetricConfig.WALConfig.Enabled = true cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-") + cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second) }, SkipForSDK: true, }, @@ -260,6 +263,7 @@ var MetricsTestCases = []TestCase{ cfg.ProjectID = "unavailableproject" cfg.MetricConfig.WALConfig.Enabled = true cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-") + cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second) }, SkipForSDK: true, ExpectRetries: true, @@ -272,6 +276,7 @@ var MetricsTestCases = []TestCase{ cfg.ProjectID = "deadline_exceededproject" cfg.MetricConfig.WALConfig.Enabled = true cfg.MetricConfig.WALConfig.Directory, _ = os.MkdirTemp("", "test-wal-") + cfg.MetricConfig.WALConfig.MaxBackoff = time.Duration(1 * time.Second) }, SkipForSDK: true, ExpectRetries: true, diff --git a/exporter/collector/metrics.go b/exporter/collector/metrics.go index be7f97579..a93a5cbc8 100644 --- a/exporter/collector/metrics.go +++ b/exporter/collector/metrics.go @@ -72,21 +72,15 @@ type selfObservability struct { // MetricsExporter is the GCM exporter that uses pdata directly. type MetricsExporter struct { // write ahead log handles exporter retries in-order to handle network outages - wal *wal.Log - client *monitoring.MetricClient - obs selfObservability + wal *exporterWAL + obs selfObservability // shutdownC is a channel for signaling a graceful shutdown shutdownC chan struct{} // mdCache tracks the metric descriptors that have already been sent to GCM mdCache map[string]*monitoringpb.CreateMetricDescriptorRequest // A channel that receives metric descriptor and sends them to GCM once metricDescriptorC chan *monitoringpb.CreateMetricDescriptorRequest - // the "write" index for WAL - wWALIndex *atomic.Uint64 - // the "read" index for WAL - rWALIndex *atomic.Uint64 - // the full path of the WAL (user-configured directory + "gcp_metrics_wal") - walPath string + client *monitoring.MetricClient // requestOpts applies options to the context for requests, such as additional headers. requestOpts []func(*context.Context, requestInfo) mapper metricMapper @@ -94,8 +88,18 @@ type MetricsExporter struct { // goroutines tracks the currently running child tasks goroutines sync.WaitGroup timeout time.Duration +} - walMutex sync.Mutex +type exporterWAL struct { + *wal.Log + // the "write" index for WAL + wIndex *atomic.Uint64 + // the "read" index for WAL + rIndex *atomic.Uint64 + // the full path of the WAL (user-configured directory + "gcp_metrics_wal") + path string + maxBackoff time.Duration + mutex sync.Mutex } // requestInfo is meant to abstract info from CreateMetricsDescriptorRequests and @@ -205,8 +209,6 @@ func NewGoogleCloudMetricsExporter( mdCache: make(map[string]*monitoringpb.CreateMetricDescriptorRequest), shutdownC: shutdown, timeout: timeout, - rWALIndex: &atomic.Uint64{}, - wWALIndex: &atomic.Uint64{}, } mExp.requestOpts = make([]func(*context.Context, requestInfo), 0) @@ -217,13 +219,14 @@ func NewGoogleCloudMetricsExporter( } if cfg.MetricConfig.WALConfig.Enabled { + mExp.wal = &exporterWAL{} err = mExp.setupWAL() if err != nil { return nil, err } // start WAL popper routine mExp.goroutines.Add(1) - go mExp.walRunner(ctx) + go mExp.runWALReadAndExportLoop(ctx) } // Fire up the metric descriptor exporter. @@ -233,6 +236,8 @@ func NewGoogleCloudMetricsExporter( return mExp, nil } +// setupWAL creates the WAL. +// This function is also used to re-sync after writes, so it closes the existing WAL if present. func (me *MetricsExporter) setupWAL() error { err := me.closeWAL() if err != nil { @@ -240,32 +245,41 @@ func (me *MetricsExporter) setupWAL() error { } walPath := filepath.Join(me.cfg.MetricConfig.WALConfig.Directory, "gcp_metrics_wal") - me.walPath = walPath + me.wal.path = walPath metricWal, err := wal.Open(walPath, &wal.Options{LogFormat: 1}) if err != nil { return err } - me.wal = metricWal + me.wal.Log = metricWal + + me.wal.rIndex = &atomic.Uint64{} + me.wal.wIndex = &atomic.Uint64{} + + // default to 1 hour exponential backoff + me.wal.maxBackoff = time.Duration(3600 * time.Second) + if me.cfg.MetricConfig.WALConfig.MaxBackoff != 0 { + me.wal.maxBackoff = me.cfg.MetricConfig.WALConfig.MaxBackoff + } // sync existing WAL indices rIndex, err := me.wal.FirstIndex() if err != nil { return err } - me.rWALIndex.Store(rIndex) + me.wal.rIndex.Store(rIndex) wIndex, err := me.wal.LastIndex() if err != nil { return err } - me.wWALIndex.Store(wIndex) + me.wal.wIndex.Store(wIndex) return nil } func (me *MetricsExporter) closeWAL() error { - if me.wal != nil { - err := me.wal.Close() - me.wal = nil + if me.wal != nil && me.wal.Log != nil { + err := me.wal.Log.Close() + me.wal.Log = nil return err } return nil @@ -273,8 +287,10 @@ func (me *MetricsExporter) closeWAL() error { // PushMetrics calls pushes pdata metrics to GCM, creating metric descriptors if necessary. func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) error { - me.walMutex.Lock() - defer me.walMutex.Unlock() + if me.wal != nil { + me.wal.mutex.Lock() + defer me.wal.mutex.Unlock() + } // map from project -> []timeseries. This groups timeseries by the project // they need to be sent to. Each project's timeseries are sent in a // separate request later. @@ -359,7 +375,7 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) e errs = append(errs, fmt.Errorf("failed to marshal protobuf to bytes: %+v", err)) continue } - err = me.wal.Write(me.wWALIndex.Add(1), bytes) + err = me.wal.Write(me.wal.wIndex.Add(1), bytes) if err != nil { errs = append(errs, fmt.Errorf("failed to write to WAL: %+v", err)) continue @@ -379,6 +395,7 @@ func (me *MetricsExporter) PushMetrics(ctx context.Context, m pmetric.Metrics) e return nil } +// export sends a CreateTimeSeriesRequest to GCM and reports failed/successful points based on the response. func (me *MetricsExporter) export(ctx context.Context, req *monitoringpb.CreateTimeSeriesRequest) error { // if this is an empty request, skip it // empty requests are used by the WAL to signal the end of pending data @@ -417,33 +434,83 @@ func (me *MetricsExporter) export(ctx context.Context, req *monitoringpb.CreateT return err } +// walLoop continuously reads from the WAL and exports from it. +func (me *MetricsExporter) walLoop(ctx context.Context) error { + for { + select { + case <-me.shutdownC: + return nil + case <-ctx.Done(): + return nil + default: + } + + err := me.readWALAndExport(ctx) + if err == nil { + continue + } + // ErrNotFound from wal.Read() means the index is either 0 or out of + // bounds (indicating we're probably at the end of the WAL). That error + // will trigger a file watch for new writes (below this). For other + // errors, fail. + if !errors.Is(err, wal.ErrNotFound) { + return err + } + + // Must have been ErrNotFound, start a file watch and block waiting for updates. + if err := me.watchWALFile(ctx); err != nil { + return err + } + + // close and re-open the WAL to sync read and write indices + if err := me.setupWAL(); err != nil { + return err + } + } +} + +// readWALAndExport pops the next CreateTimeSeriesRequest from the WAL and tries exporting it. +// If the export is successful (or fails for a non-retryable error), the read index is incremented +// so the next entry in the WAL can be read by a subsequent call to readWALAndExport(). +// If the export fails for a (retryable) network error, it will keep trying to export the same entry +// until success or the backoff max is reached. func (me *MetricsExporter) readWALAndExport(ctx context.Context) error { - me.walMutex.Lock() - defer me.walMutex.Unlock() + me.wal.mutex.Lock() + defer me.wal.mutex.Unlock() - bytes, err := me.wal.Read(me.rWALIndex.Load()) + bytes, err := me.wal.Read(me.wal.rIndex.Load()) if err == nil { req := new(monitoringpb.CreateTimeSeriesRequest) if err = proto.Unmarshal(bytes, req); err != nil { return err } - err = me.export(ctx, req) - if err != nil { - me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err)) - } - // retry at same read index if retryable (network) error - s := status.Convert(err) - if s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable { + // on network failures, retry exponentially a max of 11 times (2^12s > 48 hours, older than allowed by GCM) + // or until user-configured max backoff is hit. + backoff := 0 + for i := 0; i < 12; i++ { + err = me.export(ctx, req) + if err != nil { + me.obs.log.Warn(fmt.Sprintf("error exporting to GCM: %+v", err)) + } + // retry at same read index if retryable (network) error + s := status.Convert(err) + if !(s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable) { + break + } me.obs.log.Error("retryable error, retrying request") - return nil + backoff = 1 << i + if time.Duration(backoff*int(time.Second)) >= me.wal.maxBackoff { + break + } + time.Sleep(time.Duration(1< 0 { + if me.wal.rIndex.Load() == lastIndex && len(req.String()) > 0 { // This indicates that we are trying to truncate the last item in the WAL. // If that is the case, write an empty request so we can truncate the last real request // (the WAL library requires at least 1 entry). @@ -453,14 +520,14 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error { if bytesErr != nil { return bytesErr } - err = me.wal.Write(me.wWALIndex.Add(1), bytes) + err = me.wal.Write(me.wal.wIndex.Add(1), bytes) if err != nil { return err } } // move read index forward if non retryable error (or exported successfully) - me.rWALIndex.Add(1) - err = me.wal.TruncateFront(me.rWALIndex.Load()) + me.wal.rIndex.Add(1) + err = me.wal.TruncateFront(me.wal.rIndex.Load()) if err != nil { return err } @@ -469,54 +536,16 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error { return err } -// readWALAndExport continuously pops CreateTimeSeriesRequest protos from the WAL and -// tries exporting them. If an export fails due to a network error, it will -// continually retry the same request until success. Other non-retryable errors -// drop metrics. -func (me *MetricsExporter) walLoop(ctx context.Context) error { - for { - select { - case <-me.shutdownC: - return nil - case <-ctx.Done(): - return nil - default: - } - - err := me.readWALAndExport(ctx) - if err == nil { - continue - } - // ErrNotFound from wal.Read() means the index is either 0 or out of - // bounds (indicating we're probably at the end of the WAL). That error - // will trigger a file watch for new writes (below this). For other - // errors, fail. - if !errors.Is(err, wal.ErrNotFound) { - return err - } - - // Must have been ErrNotFound, start a file watch and block waiting for updates. - if err := me.watchWAL(ctx); err != nil { - return err - } - - // close and re-open the WAL to sync read and write indices - if err := me.setupWAL(); err != nil { - return err - } - } -} - // watchWAL watches the WAL directory for a write then returns to the // continuallyPopWAL() loop. -func (me *MetricsExporter) watchWAL(ctx context.Context) error { +func (me *MetricsExporter) watchWALFile(ctx context.Context) error { me.goroutines.Add(1) defer me.goroutines.Done() walWatcher, err := fsnotify.NewWatcher() if err != nil { return err } - err = walWatcher.Add(me.walPath) + err = walWatcher.Add(me.wal.path) if err != nil { return err } @@ -560,7 +589,7 @@ func (me *MetricsExporter) watchWAL(ctx context.Context) error { return err } -func (me *MetricsExporter) walRunner(ctx context.Context) { +func (me *MetricsExporter) runWALReadAndExportLoop(ctx context.Context) { defer me.goroutines.Done() runCtx, cancel := context.WithCancel(ctx) defer cancel()