From ab572676282b0693c7bd9261b849c0a983dc0c7b Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Mon, 22 Oct 2018 10:18:37 -0400 Subject: [PATCH 1/3] add file and byte count metrics --- cloud/bq/sanity.go | 2 +- cloud/tq/tq.go | 12 ++++--- metrics/metrics.go | 83 +++++++++++++++++++++++++++++++++++++++++++++- rex/rex.go | 27 +++++++++------ 4 files changed, 106 insertions(+), 18 deletions(-) diff --git a/cloud/bq/sanity.go b/cloud/bq/sanity.go index 44635768..e4a5644b 100644 --- a/cloud/bq/sanity.go +++ b/cloud/bq/sanity.go @@ -398,7 +398,7 @@ func SanityCheckAndCopy(ctx context.Context, src, dest *AnnotatedTable) error { log.Println("Copying...", src.TableID()) job, err := copier.Run(ctx) if err != nil { - log.Println("Copy Error: %v", err) + log.Println("Copy Error:", err) return err } diff --git a/cloud/tq/tq.go b/cloud/tq/tq.go index f9a973ea..d10fccfd 100644 --- a/cloud/tq/tq.go +++ b/cloud/tq/tq.go @@ -156,8 +156,9 @@ func (qh *QueueHandler) postWithRetry(bucket, filepath string) error { } // PostAll posts all normal file items in an ObjectIterator into the appropriate queue. -func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, error) { +func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, int64, error) { fileCount := 0 + byteCount := int64(0) qpErrCount := 0 gcsErrCount := 0 for o, err := it.Next(); err != iterator.Done; o, err = it.Next() { @@ -168,7 +169,7 @@ func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, gcsErrCount++ if gcsErrCount > 5 { log.Printf("Failed after %d files to %s.\n", fileCount, qh.Queue) - return 0, err + return fileCount, byteCount, err } continue } @@ -179,19 +180,20 @@ func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, qpErrCount++ if qpErrCount > 5 { log.Printf("Failed after %d files to %s (on %s).\n", fileCount, qh.Queue, o.Name) - return 0, err + return fileCount, byteCount, err } } else { fileCount++ + byteCount += o.Size } } - return fileCount, nil + return fileCount, byteCount, nil } // PostDay fetches an iterator over the objects with ndt/YYYY/MM/DD prefix, // and passes the iterator to postDay with appropriate queue. // This typically takes about 10 minutes for a 20K task NDT day. -func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, error) { +func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, int64, error) { log.Println("Adding ", prefix, " to ", qh.Queue) qry := storage.Query{ Delimiter: "/", diff --git a/metrics/metrics.go b/metrics/metrics.go index 89102490..7dbab231 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -11,6 +11,10 @@ func init() { prometheus.MustRegister(CompletedCount) prometheus.MustRegister(StateTimeSummary) prometheus.MustRegister(StateDate) + prometheus.MustRegister(FilesPerDateHistogram) + prometheus.MustRegister(BytesPerDateHistogram) + prometheus.MustRegister(FileCount) + prometheus.MustRegister(ByteCount) } var ( @@ -20,7 +24,7 @@ var ( // Provides metrics: // gardener_started_total{experiment} // Example usage: - // metrics.StartCount.WithLabelValues("sidestream").Inc() + // metrics.StartedCount.WithLabelValues("sidestream").Inc() StartedCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gardener_started_total", @@ -110,4 +114,81 @@ var ( Objectives: map[float64]float64{0.01: 0.001, 0.1: 0.01, 0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, []string{"state"}, ) + + // FilesPerDateHistogram provides a histogram of files per date submitted to pipeline. + // + // Provides metrics: + // gardener_files_bucket{year="...", le="..."} + // ... + // gardener_files_sum{year="...", le="..."} + // gardener_files_count{year="...", le="..."} + // Usage example: + // metrics.FilesPerDateHistogram.WithLabelValues( + // "2011").Observe(files) + FilesPerDateHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gardener_files", + Help: "Histogram of number of files submitted per date", + Buckets: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 12, 14, 17, 20, 24, 28, 32, 38, 44, 50, 60, 70, 80, 90, + 100, 120, 140, 170, 200, 240, 280, 320, 380, 440, 500, 600, 700, 800, 900, + 1000, 1200, 1400, 1700, 2000, 2400, 2800, 3200, 3800, 4400, 5000, 6000, 7000, 8000, 9000, + 10000, 12000, 14000, 17000, 20000, 24000, 28000, 32000, 38000, 44000, 50000, 60000, 70000, 80000, 90000, + 100000, 120000, 140000, 170000, 200000, 240000, 280000, 320000, 380000, 440000, 500000, 600000, 700000, 800000, 900000, + }, + }, + []string{"year"}, + ) + + // BytesPerDateHistogram provides a histogram of bytes per date submitted to pipeline + // + // Provides metrics: + // gardener_bytes_bucket{year="...", le="..."} + // ... + // gardener_bytes_sum{year="...", le="..."} + // gardener_bytes_count{year="...", le="..."} + // Usage example: + // metrics.BytesPerDateHistogram.WithLabelValues( + // "2011").Observe(bytes) + BytesPerDateHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gardener_bytes", + Help: "Histogram of number of bytes submitted per date", + Buckets: []float64{ + 100000, 140000, 200000, 280000, 400000, 560000, 800000, 1000000, + 1000000, 1400000, 2000000, 2800000, 4000000, 5600000, 8000000, 10000000, + 10000000, 14000000, 20000000, 28000000, 40000000, 56000000, 80000000, 100000000, + 100000000, 140000000, 200000000, 280000000, 400000000, 560000000, 800000000, 1000000000, + 1000000000, 1400000000, 2000000000, 2800000000, 4000000000, 5600000000, 8000000000, 10000000000, + 10000000000, 14000000000, 20000000000, 28000000000, 40000000000, 56000000000, 80000000000, 100000000000, + }, + }, + []string{"year"}, + ) + + // FileCount counts the total number of files submitted to the pipeline. + // + // Provides metrics: + // gardener_files_total + // Example usage: + // metrics.FilesCount.Inc() + FileCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gardener_files_total", + Help: "Total number of files submitted to pipeline.", + }, + ) + + // ByteCount counts the total number of bytes submitted to the pipeline. + // + // Provides metrics: + // gardener_bytes_total + // Example usage: + // metrics.BytesCount.Inc() + ByteCount = prometheus.NewCounter( + prometheus.CounterOpts{ + Name: "gardener_bytes_total", + Help: "Total number of bytes submitted to pipeline.", + }, + ) ) diff --git a/rex/rex.go b/rex/rex.go index 6cce6d5e..6bd981a8 100644 --- a/rex/rex.go +++ b/rex/rex.go @@ -94,17 +94,22 @@ func (rex *ReprocessingExecutor) Next(ctx context.Context, t *state.Task, termin case state.Queuing: // TODO - handle zero task case. - n, err := rex.queue(ctx, t) + fileCount, byteCount, err := rex.queue(ctx, t) if err != nil { // SetError also pushes to datastore, like Update(ctx, ) t.SetError(ctx, err, "rex.queue") return err } - if n < 1 { + if fileCount < 1 { // If there were no tasks posted, then there is nothing more to do. t.Queue = "" t.Update(ctx, state.Done) } else { + // TODO - should we also add metrics when there are errors? + metrics.FilesPerDateHistogram.WithLabelValues("").Observe(float64(fileCount)) + metrics.FileCount.Add(float64(fileCount)) + metrics.BytesPerDateHistogram.WithLabelValues("").Observe(float64(byteCount)) + metrics.ByteCount.Add(float64(byteCount)) t.Update(ctx, state.Processing) } @@ -219,21 +224,21 @@ func (rex *ReprocessingExecutor) waitForParsing(ctx context.Context, t *state.Ta return nil } -func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, error) { +func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, int64, error) { // Submit all files from the bucket that match the prefix. // Where do we get the bucket? //func (qh *ChannelQueueHandler) handleLoop(next api.BasicPipe, bucketOpts ...option.ClientOption) { qh, err := tq.NewQueueHandler(rex.Config, t.Queue) if err != nil { t.SetError(ctx, err, "NewQueueHandler") - return 0, err + return 0, 0, err } parts, err := t.ParsePrefix() if err != nil { // If there is a parse error, log and skip request. log.Println(err) t.SetError(ctx, err, "BadPrefix") - return 0, err + return 0, 0, err } bucketName := parts[0] @@ -244,22 +249,22 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, if err != nil { if err == io.EOF && env.TestMode { log.Println("Using fake client, ignoring EOF error") - return 0, nil + return 0, 0, nil } log.Println(err) t.SetError(ctx, err, "BucketError") - return 0, err + return 0, 0, err } // NOTE: This does not check the terminate channel, so once started, it will // complete the queuing. - n, err := qh.PostDay(ctx, bucket, bucketName, parts[1]+"/"+parts[2]+"/") + fileCount, byteCount, err := qh.PostDay(ctx, bucket, bucketName, parts[1]+"/"+parts[2]+"/") if err != nil { log.Println(err) t.SetError(ctx, err, "PostDayError") - return n, nil + return fileCount, byteCount, nil } - log.Println("Added ", n, t.Name, " tasks to ", qh.Queue) - return n, nil + log.Println("Added ", byteCount, "bytes in", fileCount, t.Name, " tasks to ", qh.Queue) + return fileCount, byteCount, nil } func (rex *ReprocessingExecutor) dedup(ctx context.Context, t *state.Task) error { From bd5d2dd2db65c94c2fbaf7c731e0303c2d1e8751 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Mon, 22 Oct 2018 16:19:46 -0400 Subject: [PATCH 2/3] fix bugs --- cloud/tq/tq_integration_test.go | 4 ++-- metrics/metrics.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/cloud/tq/tq_integration_test.go b/cloud/tq/tq_integration_test.go index 8a519a84..a214597a 100644 --- a/cloud/tq/tq_integration_test.go +++ b/cloud/tq/tq_integration_test.go @@ -109,14 +109,14 @@ func TestPostDay(t *testing.T) { if err != nil { t.Fatal(err) } - n, err := q.PostDay(ctx, bucket, bucketName, "ndt/2017/09/24/") + n, _, err := q.PostDay(ctx, bucket, bucketName, "ndt/2017/09/24/") if err != nil { t.Fatal(err) } if n != 3 { t.Error("Should have posted 3 items", n) } - n, err = q.PostDay(ctx, bucket, bucketName, "ndt/2018/05/01/") + n, _, err = q.PostDay(ctx, bucket, bucketName, "ndt/2018/05/01/") if err != nil { t.Fatal(err) } diff --git a/metrics/metrics.go b/metrics/metrics.go index 7dbab231..de36af47 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -155,12 +155,12 @@ var ( Name: "gardener_bytes", Help: "Histogram of number of bytes submitted per date", Buckets: []float64{ - 100000, 140000, 200000, 280000, 400000, 560000, 800000, 1000000, - 1000000, 1400000, 2000000, 2800000, 4000000, 5600000, 8000000, 10000000, - 10000000, 14000000, 20000000, 28000000, 40000000, 56000000, 80000000, 100000000, - 100000000, 140000000, 200000000, 280000000, 400000000, 560000000, 800000000, 1000000000, - 1000000000, 1400000000, 2000000000, 2800000000, 4000000000, 5600000000, 8000000000, 10000000000, - 10000000000, 14000000000, 20000000000, 28000000000, 40000000000, 56000000000, 80000000000, 100000000000, + 100000, 140000, 200000, 280000, 400000, 560000, 800000, + 1000000, 1400000, 2000000, 2800000, 4000000, 5600000, 8000000, + 10000000, 14000000, 20000000, 28000000, 40000000, 56000000, 80000000, + 100000000, 140000000, 200000000, 280000000, 400000000, 560000000, 800000000, + 1000000000, 1400000000, 2000000000, 2800000000, 4000000000, 5600000000, 8000000000, + 10000000000, 14000000000, 20000000000, 28000000000, 40000000000, 56000000000, 80000000000, }, }, []string{"year"}, From 9cf7fe33f74d87a6ede99ab67e8e817496e963cb Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Tue, 23 Oct 2018 14:15:33 -0400 Subject: [PATCH 3/3] Add year, remove unneeded metrics --- metrics/metrics.go | 28 ---------------------------- rex/rex.go | 8 ++++---- 2 files changed, 4 insertions(+), 32 deletions(-) diff --git a/metrics/metrics.go b/metrics/metrics.go index de36af47..75b1f035 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -13,8 +13,6 @@ func init() { prometheus.MustRegister(StateDate) prometheus.MustRegister(FilesPerDateHistogram) prometheus.MustRegister(BytesPerDateHistogram) - prometheus.MustRegister(FileCount) - prometheus.MustRegister(ByteCount) } var ( @@ -165,30 +163,4 @@ var ( }, []string{"year"}, ) - - // FileCount counts the total number of files submitted to the pipeline. - // - // Provides metrics: - // gardener_files_total - // Example usage: - // metrics.FilesCount.Inc() - FileCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "gardener_files_total", - Help: "Total number of files submitted to pipeline.", - }, - ) - - // ByteCount counts the total number of bytes submitted to the pipeline. - // - // Provides metrics: - // gardener_bytes_total - // Example usage: - // metrics.BytesCount.Inc() - ByteCount = prometheus.NewCounter( - prometheus.CounterOpts{ - Name: "gardener_bytes_total", - Help: "Total number of bytes submitted to pipeline.", - }, - ) ) diff --git a/rex/rex.go b/rex/rex.go index 6bd981a8..29c488a8 100644 --- a/rex/rex.go +++ b/rex/rex.go @@ -13,6 +13,7 @@ import ( "io" "log" "math/rand" + "strconv" "strings" "time" @@ -95,6 +96,9 @@ func (rex *ReprocessingExecutor) Next(ctx context.Context, t *state.Task, termin case state.Queuing: // TODO - handle zero task case. fileCount, byteCount, err := rex.queue(ctx, t) + // Update the metrics, even if there is an error, since the files were submitted to the queue already. + metrics.FilesPerDateHistogram.WithLabelValues(strconv.Itoa(t.Date.Year())).Observe(float64(fileCount)) + metrics.BytesPerDateHistogram.WithLabelValues(strconv.Itoa(t.Date.Year())).Observe(float64(byteCount)) if err != nil { // SetError also pushes to datastore, like Update(ctx, ) t.SetError(ctx, err, "rex.queue") @@ -106,10 +110,6 @@ func (rex *ReprocessingExecutor) Next(ctx context.Context, t *state.Task, termin t.Update(ctx, state.Done) } else { // TODO - should we also add metrics when there are errors? - metrics.FilesPerDateHistogram.WithLabelValues("").Observe(float64(fileCount)) - metrics.FileCount.Add(float64(fileCount)) - metrics.BytesPerDateHistogram.WithLabelValues("").Observe(float64(byteCount)) - metrics.ByteCount.Add(float64(byteCount)) t.Update(ctx, state.Processing) }