Skip to content

Commit

Permalink
Merge pull request #113 from m-lab/sandbox-bytecount
Browse files Browse the repository at this point in the history
Sandbox bytecount
  • Loading branch information
gfr10598 authored Oct 23, 2018
2 parents fecb71a + 9cf7fe3 commit 41cacb1
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 20 deletions.
2 changes: 1 addition & 1 deletion cloud/bq/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func SanityCheckAndCopy(ctx context.Context, src, dest *AnnotatedTable) error {
log.Println("Copying...", src.TableID())
job, err := copier.Run(ctx)
if err != nil {
log.Println("Copy Error: %v", err)
log.Println("Copy Error:", err)
return err
}

Expand Down
12 changes: 7 additions & 5 deletions cloud/tq/tq.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,8 +156,9 @@ func (qh *QueueHandler) postWithRetry(bucket, filepath string) error {
}

// PostAll posts all normal file items in an ObjectIterator into the appropriate queue.
func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, error) {
func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int, int64, error) {
fileCount := 0
byteCount := int64(0)
qpErrCount := 0
gcsErrCount := 0
for o, err := it.Next(); err != iterator.Done; o, err = it.Next() {
Expand All @@ -168,7 +169,7 @@ func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int,
gcsErrCount++
if gcsErrCount > 5 {
log.Printf("Failed after %d files to %s.\n", fileCount, qh.Queue)
return 0, err
return fileCount, byteCount, err
}
continue
}
Expand All @@ -179,19 +180,20 @@ func (qh *QueueHandler) PostAll(bucket string, it stiface.ObjectIterator) (int,
qpErrCount++
if qpErrCount > 5 {
log.Printf("Failed after %d files to %s (on %s).\n", fileCount, qh.Queue, o.Name)
return 0, err
return fileCount, byteCount, err
}
} else {
fileCount++
byteCount += o.Size
}
}
return fileCount, nil
return fileCount, byteCount, nil
}

// PostDay fetches an iterator over the objects with ndt/YYYY/MM/DD prefix,
// and passes the iterator to postDay with appropriate queue.
// This typically takes about 10 minutes for a 20K task NDT day.
func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, error) {
func (qh *QueueHandler) PostDay(ctx context.Context, bucket stiface.BucketHandle, bucketName, prefix string) (int, int64, error) {
log.Println("Adding ", prefix, " to ", qh.Queue)
qry := storage.Query{
Delimiter: "/",
Expand Down
4 changes: 2 additions & 2 deletions cloud/tq/tq_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,14 +109,14 @@ func TestPostDay(t *testing.T) {
if err != nil {
t.Fatal(err)
}
n, err := q.PostDay(ctx, bucket, bucketName, "ndt/2017/09/24/")
n, _, err := q.PostDay(ctx, bucket, bucketName, "ndt/2017/09/24/")
if err != nil {
t.Fatal(err)
}
if n != 3 {
t.Error("Should have posted 3 items", n)
}
n, err = q.PostDay(ctx, bucket, bucketName, "ndt/2018/05/01/")
n, _, err = q.PostDay(ctx, bucket, bucketName, "ndt/2018/05/01/")
if err != nil {
t.Fatal(err)
}
Expand Down
55 changes: 54 additions & 1 deletion metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ func init() {
prometheus.MustRegister(CompletedCount)
prometheus.MustRegister(StateTimeSummary)
prometheus.MustRegister(StateDate)
prometheus.MustRegister(FilesPerDateHistogram)
prometheus.MustRegister(BytesPerDateHistogram)
}

var (
Expand All @@ -20,7 +22,7 @@ var (
// Provides metrics:
// gardener_started_total{experiment}
// Example usage:
// metrics.StartCount.WithLabelValues("sidestream").Inc()
// metrics.StartedCount.WithLabelValues("sidestream").Inc()
StartedCount = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: "gardener_started_total",
Expand Down Expand Up @@ -110,4 +112,55 @@ var (
Objectives: map[float64]float64{0.01: 0.001, 0.1: 0.01, 0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}, []string{"state"},
)

// FilesPerDateHistogram provides a histogram of files per date submitted to pipeline.
//
// Provides metrics:
// gardener_files_bucket{year="...", le="..."}
// ...
// gardener_files_sum{year="...", le="..."}
// gardener_files_count{year="...", le="..."}
// Usage example:
// metrics.FilesPerDateHistogram.WithLabelValues(
// "2011").Observe(files)
FilesPerDateHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gardener_files",
Help: "Histogram of number of files submitted per date",
Buckets: []float64{1, 2, 3, 4, 5, 6, 7, 8, 9,
10, 12, 14, 17, 20, 24, 28, 32, 38, 44, 50, 60, 70, 80, 90,
100, 120, 140, 170, 200, 240, 280, 320, 380, 440, 500, 600, 700, 800, 900,
1000, 1200, 1400, 1700, 2000, 2400, 2800, 3200, 3800, 4400, 5000, 6000, 7000, 8000, 9000,
10000, 12000, 14000, 17000, 20000, 24000, 28000, 32000, 38000, 44000, 50000, 60000, 70000, 80000, 90000,
100000, 120000, 140000, 170000, 200000, 240000, 280000, 320000, 380000, 440000, 500000, 600000, 700000, 800000, 900000,
},
},
[]string{"year"},
)

// BytesPerDateHistogram provides a histogram of bytes per date submitted to pipeline
//
// Provides metrics:
// gardener_bytes_bucket{year="...", le="..."}
// ...
// gardener_bytes_sum{year="...", le="..."}
// gardener_bytes_count{year="...", le="..."}
// Usage example:
// metrics.BytesPerDateHistogram.WithLabelValues(
// "2011").Observe(bytes)
BytesPerDateHistogram = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: "gardener_bytes",
Help: "Histogram of number of bytes submitted per date",
Buckets: []float64{
100000, 140000, 200000, 280000, 400000, 560000, 800000,
1000000, 1400000, 2000000, 2800000, 4000000, 5600000, 8000000,
10000000, 14000000, 20000000, 28000000, 40000000, 56000000, 80000000,
100000000, 140000000, 200000000, 280000000, 400000000, 560000000, 800000000,
1000000000, 1400000000, 2000000000, 2800000000, 4000000000, 5600000000, 8000000000,
10000000000, 14000000000, 20000000000, 28000000000, 40000000000, 56000000000, 80000000000,
},
},
[]string{"year"},
)
)
27 changes: 16 additions & 11 deletions rex/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"io"
"log"
"math/rand"
"strconv"
"strings"
"time"

Expand Down Expand Up @@ -94,17 +95,21 @@ func (rex *ReprocessingExecutor) Next(ctx context.Context, t *state.Task, termin

case state.Queuing:
// TODO - handle zero task case.
n, err := rex.queue(ctx, t)
fileCount, byteCount, err := rex.queue(ctx, t)
// Update the metrics, even if there is an error, since the files were submitted to the queue already.
metrics.FilesPerDateHistogram.WithLabelValues(strconv.Itoa(t.Date.Year())).Observe(float64(fileCount))
metrics.BytesPerDateHistogram.WithLabelValues(strconv.Itoa(t.Date.Year())).Observe(float64(byteCount))
if err != nil {
// SetError also pushes to datastore, like Update(ctx, )
t.SetError(ctx, err, "rex.queue")
return err
}
if n < 1 {
if fileCount < 1 {
// If there were no tasks posted, then there is nothing more to do.
t.Queue = ""
t.Update(ctx, state.Done)
} else {
// TODO - should we also add metrics when there are errors?
t.Update(ctx, state.Processing)
}

Expand Down Expand Up @@ -219,21 +224,21 @@ func (rex *ReprocessingExecutor) waitForParsing(ctx context.Context, t *state.Ta
return nil
}

func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, error) {
func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int, int64, error) {
// Submit all files from the bucket that match the prefix.
// Where do we get the bucket?
//func (qh *ChannelQueueHandler) handleLoop(next api.BasicPipe, bucketOpts ...option.ClientOption) {
qh, err := tq.NewQueueHandler(rex.Config, t.Queue)
if err != nil {
t.SetError(ctx, err, "NewQueueHandler")
return 0, err
return 0, 0, err
}
parts, err := t.ParsePrefix()
if err != nil {
// If there is a parse error, log and skip request.
log.Println(err)
t.SetError(ctx, err, "BadPrefix")
return 0, err
return 0, 0, err
}
bucketName := parts[0]

Expand All @@ -244,22 +249,22 @@ func (rex *ReprocessingExecutor) queue(ctx context.Context, t *state.Task) (int,
if err != nil {
if err == io.EOF && env.TestMode {
log.Println("Using fake client, ignoring EOF error")
return 0, nil
return 0, 0, nil
}
log.Println(err)
t.SetError(ctx, err, "BucketError")
return 0, err
return 0, 0, err
}
// NOTE: This does not check the terminate channel, so once started, it will
// complete the queuing.
n, err := qh.PostDay(ctx, bucket, bucketName, parts[1]+"/"+parts[2]+"/")
fileCount, byteCount, err := qh.PostDay(ctx, bucket, bucketName, parts[1]+"/"+parts[2]+"/")
if err != nil {
log.Println(err)
t.SetError(ctx, err, "PostDayError")
return n, nil
return fileCount, byteCount, nil
}
log.Println("Added ", n, t.Name, " tasks to ", qh.Queue)
return n, nil
log.Println("Added ", byteCount, "bytes in", fileCount, t.Name, " tasks to ", qh.Queue)
return fileCount, byteCount, nil
}

func (rex *ReprocessingExecutor) dedup(ctx context.Context, t *state.Task) error {
Expand Down

0 comments on commit 41cacb1

Please sign in to comment.