From 4030aaabc57ecdd31e09492dccd4eb5a28e2dba2 Mon Sep 17 00:00:00 2001 From: Mike Dame Date: Thu, 25 May 2023 15:36:55 +0000 Subject: [PATCH] make fixtures --- Makefile | 6 +- .../basic_counter_metrics_wal_expect.json | 333 ++++++++++++++++++ exporter/collector/metrics.go | 41 ++- 3 files changed, 364 insertions(+), 16 deletions(-) diff --git a/Makefile b/Makefile index 9c2569004..71fa45554 100644 --- a/Makefile +++ b/Makefile @@ -223,6 +223,10 @@ release: prepare-release check-clean-work-tree go run tools/release.go tag .PHONY: fixtures -fixtures: +fixtures: clean cd ./exporter/collector/integrationtest && \ go run cmd/recordfixtures/main.go + +.PHONY: clean +clean: + rm -rf ./exporter/collector/integrationtest/gcp_metrics_wal diff --git a/exporter/collector/integrationtest/testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json b/exporter/collector/integrationtest/testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json index ffc0bbca7..b16561243 100644 --- a/exporter/collector/integrationtest/testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json +++ b/exporter/collector/integrationtest/testdata/fixtures/metrics/basic_counter_metrics_wal_expect.json @@ -1,4 +1,41 @@ { + "createTimeSeriesRequests": [ + { + "name": "projects/fakeprojectid", + "timeSeries": [ + { + "metric": { + "type": "workload.googleapis.com/testcounter", + "labels": { + "foo": "bar" + } + }, + "resource": { + "type": "generic_node", + "labels": { + "location": "global", + "namespace": "", + "node_id": "" + } + }, + "metricKind": "CUMULATIVE", + "valueType": "INT64", + "points": [ + { + "interval": { + "endTime": "1970-01-01T00:00:00Z", + "startTime": "1970-01-01T00:00:00Z" + }, + "value": { + "int64Value": "253" + } + } + ], + "unit": "1" + } + ] + } + ], "createMetricDescriptorRequests": [ { "name": "projects/fakeprojectid", @@ -23,6 +60,28 @@ { "name": "projects/myproject", "timeSeries": [ + { + "metric": { + "type": "custom.googleapis.com/opencensus/googlecloudmonitoring/point_count", + "labels": { + "status": "OK" + } + }, + "resource": { + "type": "global" + }, + "points": [ + { + "interval": { + "endTime": "1970-01-01T00:00:00Z", + "startTime": "1970-01-01T00:00:00Z" + }, + "value": { + "int64Value": "1" + } + } + ] + }, { "metric": { "type": "custom.googleapis.com/opencensus/grpc.io/client/completed_rpcs", @@ -46,6 +105,29 @@ } ] }, + { + "metric": { + "type": "custom.googleapis.com/opencensus/grpc.io/client/completed_rpcs", + "labels": { + "grpc_client_method": "google.monitoring.v3.MetricService/CreateTimeSeries", + "grpc_client_status": "OK" + } + }, + "resource": { + "type": "global" + }, + "points": [ + { + "interval": { + "endTime": "1970-01-01T00:00:00Z", + "startTime": "1970-01-01T00:00:00Z" + }, + "value": { + "int64Value": "1" + } + } + ] + }, { "metric": { "type": "custom.googleapis.com/opencensus/grpc.io/client/received_bytes_per_rpc", @@ -106,6 +188,66 @@ } ] }, + { + "metric": { + "type": "custom.googleapis.com/opencensus/grpc.io/client/received_bytes_per_rpc", + "labels": { + "grpc_client_method": "google.monitoring.v3.MetricService/CreateTimeSeries" + } + }, + "resource": { + "type": "global" + }, + "points": [ + { + "interval": { + "endTime": "1970-01-01T00:00:00Z", + "startTime": "1970-01-01T00:00:00Z" + }, + "value": { + "distributionValue": { + "bucketOptions": { + "explicitBuckets": { + "bounds": [ + 0, + 1024, + 2048, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, + 268435456, + 1073741824, + 4294967296 + ] + } + }, + "bucketCounts": [ + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0" + ] + } + } + } + ] + }, { "metric": { "type": "custom.googleapis.com/opencensus/grpc.io/client/roundtrip_latency", @@ -220,6 +362,120 @@ } ] }, + { + "metric": { + "type": "custom.googleapis.com/opencensus/grpc.io/client/roundtrip_latency", + "labels": { + "grpc_client_method": "google.monitoring.v3.MetricService/CreateTimeSeries" + } + }, + "resource": { + "type": "global" + }, + "points": [ + { + "interval": { + "endTime": "1970-01-01T00:00:00Z", + "startTime": "1970-01-01T00:00:00Z" + }, + "value": { + "distributionValue": { + "bucketOptions": { + "explicitBuckets": { + "bounds": [ + 0, + 0.01, + 0.05, + 0.1, + 0.3, + 0.6, + 0.8, + 1, + 2, + 3, + 4, + 5, + 6, + 8, + 10, + 13, + 16, + 20, + 25, + 30, + 40, + 50, + 65, + 80, + 100, + 130, + 160, + 200, + 250, + 300, + 400, + 500, + 650, + 800, + 1000, + 2000, + 5000, + 10000, + 20000, + 50000, + 100000 + ] + } + }, + "bucketCounts": [ + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0" + ] + } + } + } + ] + }, { "metric": { "type": "custom.googleapis.com/opencensus/grpc.io/client/sent_bytes_per_rpc", @@ -279,11 +535,88 @@ } } ] + }, + { + "metric": { + "type": "custom.googleapis.com/opencensus/grpc.io/client/sent_bytes_per_rpc", + "labels": { + "grpc_client_method": "google.monitoring.v3.MetricService/CreateTimeSeries" + } + }, + "resource": { + "type": "global" + }, + "points": [ + { + "interval": { + "endTime": "1970-01-01T00:00:00Z", + "startTime": "1970-01-01T00:00:00Z" + }, + "value": { + "distributionValue": { + "bucketOptions": { + "explicitBuckets": { + "bounds": [ + 0, + 1024, + 2048, + 4096, + 16384, + 65536, + 262144, + 1048576, + 4194304, + 16777216, + 67108864, + 268435456, + 1073741824, + 4294967296 + ] + } + }, + "bucketCounts": [ + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0", + "0" + ] + } + } + } + ] } ] } ], "createMetricDescriptorRequests": [ + { + "name": "projects/myproject", + "metricDescriptor": { + "name": "projects/myproject/metricDescriptors/custom.googleapis.com/opencensus/googlecloudmonitoring/point_count", + "type": "custom.googleapis.com/opencensus/googlecloudmonitoring/point_count", + "labels": [ + { + "key": "status" + } + ], + "metricKind": "CUMULATIVE", + "valueType": "INT64", + "unit": "1", + "description": "Count of metric points written to Cloud Monitoring.", + "displayName": "OpenCensus/googlecloudmonitoring/point_count" + } + }, { "name": "projects/myproject", "metricDescriptor": { diff --git a/exporter/collector/metrics.go b/exporter/collector/metrics.go index 7e2ea4f9c..e54b3056d 100644 --- a/exporter/collector/metrics.go +++ b/exporter/collector/metrics.go @@ -406,7 +406,11 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error { return nil default: } - bytes, err := me.wal.Read(me.rWALIndex.Load()) + index := me.rWALIndex.Load() + if index <= 0 { + index = 1 + } + bytes, err := me.wal.Read(index) if err == nil { req := new(monitoringpb.CreateTimeSeriesRequest) if err = proto.Unmarshal(bytes, req); err != nil { @@ -420,16 +424,16 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error { // retry at same read index if retryable (network) error s := status.Convert(err) if s.Code() == codes.DeadlineExceeded || s.Code() == codes.Unavailable { - me.obs.log.Error("non-retryable error, skipping request") + me.obs.log.Error("retryable error, retrying request") continue } - err = me.wal.TruncateFront(me.rWALIndex.Load()) - if err != nil { - return err - } // move read index forward if non retryable error (or exported successfully) me.rWALIndex.Add(1) + err = me.wal.TruncateFront(index) + if err != nil && !errors.Is(err, wal.ErrOutOfRange) { + return err + } continue } @@ -451,6 +455,8 @@ func (me *MetricsExporter) readWALAndExport(ctx context.Context) error { // watchWAL watches the WAL directory for a write then returns to the // continuallyPopWAL() loop. func (me *MetricsExporter) watchWAL(ctx context.Context) error { + me.goroutines.Add(1) + defer me.goroutines.Done() walWatcher, err := fsnotify.NewWatcher() if err != nil { return err @@ -501,22 +507,27 @@ func (me *MetricsExporter) watchWAL(ctx context.Context) error { func (me *MetricsExporter) walRunner(ctx context.Context) { defer me.goroutines.Done() + runCtx, cancel := context.WithCancel(ctx) + defer cancel() for { select { case <-me.shutdownC: // do one last final sync/read/export then return - err := me.wal.Sync() - if err != nil { - me.obs.log.Error(fmt.Sprintf("error syncing WAL: %+v", err)) - } - err = me.readWALAndExport(ctx) - if err != nil { - me.obs.log.Error(fmt.Sprintf("error reading WAL and exporting: %+v", err)) - } + /* + err := me.wal.Sync() + if err != nil { + me.obs.log.Error(fmt.Sprintf("error syncing WAL: %+v", err)) + } + err = me.readWALAndExport(runCtx) + if err != nil { + me.obs.log.Error(fmt.Sprintf("error reading WAL and exporting: %+v", err)) + } + */ return default: - err := me.readWALAndExport(ctx) + err := me.readWALAndExport(runCtx) if err != nil { + fmt.Printf("ERROR: %+v\n\n", err) me.obs.log.Error(fmt.Sprintf("error reading WAL and exporting: %+v", err)) } }