diff --git a/cloud/bq/sanity.go b/cloud/bq/sanity.go index 44635768..e4a5644b 100644 --- a/cloud/bq/sanity.go +++ b/cloud/bq/sanity.go @@ -398,7 +398,7 @@ func SanityCheckAndCopy(ctx context.Context, src, dest *AnnotatedTable) error { log.Println("Copying...", src.TableID()) job, err := copier.Run(ctx) if err != nil { - log.Println("Copy Error: %v", err) + log.Println("Copy Error:", err) return err } diff --git a/cloud/tq/tq.go b/cloud/tq/tq.go index f9a973ea..d10fccfd 100644 --- a/cloud/tq/tq.go +++ b/cloud/tq/tq.go @@ -156,8 +156,9 @@ func (qh *QueueHandler) postWithRetry(bucket, filepath string) error { } // PostAll posts all normal file items in an ObjectIterator into the appropriate queue. -func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, error) { +func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, int64, error) { fileCount := 0 + byteCount := int64(0) qpErrCount := 0 gcsErrCount := 0 for o, err := it.Next(); err != iterator.Done; o, err = it.Next() { @@ -168,7 +169,7 @@ func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, gcsErrCount++ if gcsErrCount > 5 { log.Printf("Failed after %d files to %s.\n", fileCount, qh.Queue) - return 0, err + return fileCount, byteCount, err } continue } @@ -179,19 +180,20 @@ func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, qpErrCount++ if qpErrCount > 5 { log.Printf("Failed after %d files to %s (on %s).\n", fileCount, qh.Queue, o.Name) - return 0, err + return fileCount, byteCount, err } } else { fileCount++ + byteCount += o.Size } } - return fileCount, nil + return fileCount, byteCount, nil } // PostDay fetches an iterator over the objects with ndt/YYYY/MM/DD prefix, // and passes the iterator to postDay with appropriate queue. // This typically takes about 10 minutes for a 20K task NDT day. -func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, error) { +func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, int64, error) { log.Println("Adding ", prefix, " to ", qh.Queue) qry := storage.Query{ Delimiter: "/", diff --git a/cloud/tq/tq_integration_test.go b/cloud/tq/tq_integration_test.go index 8a519a84..a214597a 100644 --- a/cloud/tq/tq_integration_test.go +++ b/cloud/tq/tq_integration_test.go @@ -109,14 +109,14 @@ func TestPostDay(t *testing.T) { if err != nil { t.Fatal(err) } - n, err := q.PostDay(ctx, bucket, bucketName, "ndt/2017/09/24/") + n, _, err := q.PostDay(ctx, bucket, bucketName, "ndt/2017/09/24/") if err != nil { t.Fatal(err) } if n != 3 { t.Error("Should have posted 3 items", n) } - n, err = q.PostDay(ctx, bucket, bucketName, "ndt/2018/05/01/") + n, _, err = q.PostDay(ctx, bucket, bucketName, "ndt/2018/05/01/") if err != nil { t.Fatal(err) } diff --git a/metrics/metrics.go b/metrics/metrics.go index 89102490..75b1f035 100644 --- a/metrics/metrics.go +++ b/metrics/metrics.go @@ -11,6 +11,8 @@ func init() { prometheus.MustRegister(CompletedCount) prometheus.MustRegister(StateTimeSummary) prometheus.MustRegister(StateDate) + prometheus.MustRegister(FilesPerDateHistogram) + prometheus.MustRegister(BytesPerDateHistogram) } var ( @@ -20,7 +22,7 @@ var ( // Provides metrics: // gardener_started_total{experiment} // Example usage: - // metrics.StartCount.WithLabelValues("sidestream").Inc() + // metrics.StartedCount.WithLabelValues("sidestream").Inc() StartedCount = prometheus.NewCounterVec( prometheus.CounterOpts{ Name: "gardener_started_total", @@ -110,4 +112,55 @@ var ( Objectives: map[float64]float64{0.01: 0.001, 0.1: 0.01, 0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }, []string{"state"}, ) + + // FilesPerDateHistogram provides a histogram of files per date submitted to pipeline. + // + // Provides metrics: + // gardener_files_bucket{year="...", le="..."} + // ... + // gardener_files_sum{year="...", le="..."} + // gardener_files_count{year="...", le="..."} + // Usage example: + // metrics.FilesPerDateHistogram.WithLabelValues( + // "2011").Observe(files) + FilesPerDateHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gardener_files", + Help: "Histogram of number of files submitted per date", + Buckets: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9, + 10, 12, 14, 17, 20, 24, 28, 32, 38, 44, 50, 60, 70, 80, 90, + 100, 120, 140, 170, 200, 240, 280, 320, 380, 440, 500, 600, 700, 800, 900, + 1000, 1200, 1400, 1700, 2000, 2400, 2800, 3200, 3800, 4400, 5000, 6000, 7000, 8000, 9000, + 10000, 12000, 14000, 17000, 20000, 24000, 28000, 32000, 38000, 44000, 50000, 60000, 70000, 80000, 90000, + 100000, 120000, 140000, 170000, 200000, 240000, 280000, 320000, 380000, 440000, 500000, 600000, 700000, 800000, 900000, + }, + }, + []string{"year"}, + ) + + // BytesPerDateHistogram provides a histogram of bytes per date submitted to pipeline + // + // Provides metrics: + // gardener_bytes_bucket{year="...", le="..."} + // ... + // gardener_bytes_sum{year="...", le="..."} + // gardener_bytes_count{year="...", le="..."} + // Usage example: + // metrics.BytesPerDateHistogram.WithLabelValues( + // "2011").Observe(bytes) + BytesPerDateHistogram = prometheus.NewHistogramVec( + prometheus.HistogramOpts{ + Name: "gardener_bytes", + Help: "Histogram of number of bytes submitted per date", + Buckets: []float64{ + 100000, 140000, 200000, 280000, 400000, 560000, 800000, + 1000000, 1400000, 2000000, 2800000, 4000000, 5600000, 8000000, + 10000000, 14000000, 20000000, 28000000, 40000000, 56000000, 80000000, + 100000000, 140000000, 200000000, 280000000, 400000000, 560000000, 800000000, + 1000000000, 1400000000, 2000000000, 2800000000, 4000000000, 5600000000, 8000000000, + 10000000000, 14000000000, 20000000000, 28000000000, 40000000000, 56000000000, 80000000000, + }, + }, + []string{"year"}, + ) ) diff --git a/rex/rex.go b/rex/rex.go index 6cce6d5e..29c488a8 100644 --- a/rex/rex.go +++ b/rex/rex.go @@ -13,6 +13,7 @@ import ( "io" "log" "math/rand" + "strconv" "strings" "time" @@ -94,17 +95,21 @@ func (rex *ReprocessingExecutor) Next(ctx context.Context, t *state.Task, termin case state.Queuing: // TODO - handle zero task case. - n, err := rex.queue(ctx, t) + fileCount, byteCount, err := rex.queue(ctx, t) + // Update the metrics, even if there is an error, since the files were submitted to the queue already. + metrics.FilesPerDateHistogram.WithLabelValues(strconv.Itoa(t.Date.Year())).Observe(float64(fileCount)) + metrics.BytesPerDateHistogram.WithLabelValues(strconv.Itoa(t.Date.Year())).Observe(float64(byteCount)) if err != nil { // SetError also pushes to datastore, like Update(ctx, ) t.SetError(ctx, err, "rex.queue") return err } - if n < 1 { + if fileCount < 1 { // If there were no tasks posted, then there is nothing more to do. t.Queue = "" t.Update(ctx, state.Done) } else { + // TODO - should we also add metrics when there are errors? t.Update(ctx, state.Processing) } @@ -219,21 +224,21 @@ func (rex *ReprocessingExecutor) waitForParsing(ctx context.Context, t *state.Ta return nil } -func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, error) { +func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, int64, error) { // Submit all files from the bucket that match the prefix. // Where do we get the bucket? //func (qh *ChannelQueueHandler) handleLoop(next api.BasicPipe, bucketOpts ...option.ClientOption) { qh, err := tq.NewQueueHandler(rex.Config, t.Queue) if err != nil { t.SetError(ctx, err, "NewQueueHandler") - return 0, err + return 0, 0, err } parts, err := t.ParsePrefix() if err != nil { // If there is a parse error, log and skip request. log.Println(err) t.SetError(ctx, err, "BadPrefix") - return 0, err + return 0, 0, err } bucketName := parts[0] @@ -244,22 +249,22 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, if err != nil { if err == io.EOF && env.TestMode { log.Println("Using fake client, ignoring EOF error") - return 0, nil + return 0, 0, nil } log.Println(err) t.SetError(ctx, err, "BucketError") - return 0, err + return 0, 0, err } // NOTE: This does not check the terminate channel, so once started, it will // complete the queuing. - n, err := qh.PostDay(ctx, bucket, bucketName, parts[1]+"/"+parts[2]+"/") + fileCount, byteCount, err := qh.PostDay(ctx, bucket, bucketName, parts[1]+"/"+parts[2]+"/") if err != nil { log.Println(err) t.SetError(ctx, err, "PostDayError") - return n, nil + return fileCount, byteCount, nil } - log.Println("Added ", n, t.Name, " tasks to ", qh.Queue) - return n, nil + log.Println("Added ", byteCount, "bytes in", fileCount, t.Name, " tasks to ", qh.Queue) + return fileCount, byteCount, nil } func (rex *ReprocessingExecutor) dedup(ctx context.Context, t *state.Task) error {