Skip to content

Commit

Permalink
Merge pull request #136 from m-lab/sandbox-log-stabilize
Browse files Browse the repository at this point in the history
Increase delay to 60 minutes
  • Loading branch information
gfr10598 authored Feb 14, 2019
2 parents 9731011 + 0404f31 commit 4c7d0fd
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 2 deletions.
2 changes: 1 addition & 1 deletion cloud/bq/dedup.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func WaitForStableTable(ctx context.Context, tt bqiface.Table) error {
bufferEmptySince := never
// NOTE: This must be larger than the errorTimeout.
// We have seen false negatives up to about 2.5 minutes, so 5 minutes might not be enough.
emptyBufferWaitTime := 10 * time.Minute
emptyBufferWaitTime := 60 * time.Minute

errorTimeout := 2 * time.Minute
if testMode {
Expand Down
3 changes: 2 additions & 1 deletion cloud/bq/sanity.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,6 @@ func (at *AnnotatedTable) checkModifiedAfter(ctx context.Context, other *Annotat
// Note that if other doesn't actually exist, its LastModifiedTime will be the time zero value,
// so this will generally work as intended.
if thisMeta.LastModifiedTime.Before(other.LastModifiedTime(ctx)) {
log.Printf("Warning - existing modified later than replacement")
// TODO should perhaps delete the source table?
return ErrSrcOlderThanDest
}
Expand Down Expand Up @@ -389,6 +388,8 @@ func SanityCheckAndCopy(ctx context.Context, src, dest *AnnotatedTable) error {

err = src.checkModifiedAfter(ctx, dest)
if err != nil {
// TODO: Should we delete the source table here?
log.Printf("%s modified (%v) after %s (%v)\n", src.FullyQualifiedName(), src.LastModifiedTime(ctx), dest.FullyQualifiedName(), dest.LastModifiedTime(ctx))
return err
}

Expand Down
12 changes: 12 additions & 0 deletions rex/rex.go
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,12 @@ func (rex *ReprocessingExecutor) Next(ctx context.Context, t *state.Task, termin
}
}

// Remove this when Gardener is working smoothly.
// Log the stabilizing duration. Some SanityChecks are failing, and they may
// be related to premature exit from Stabiliting phase.
duration := time.Since(t.UpdateTime)
log.Println("Stabilizing", t.Name, "took", duration)

t.Update(ctx, state.Deduplicating)

case state.Deduplicating:
Expand Down Expand Up @@ -297,6 +303,7 @@ func (rex *ReprocessingExecutor) dedup(ctx context.Context, t *state.Task) error

// WaitForJob waits for job to complete. Uses fibonacci backoff until the backoff
// >= maxBackoff, at which point it continues using same backoff.
// TODO - why don't we just use job.Wait()? Just because of terminate?
// TODO - develop a BQJob interface for wrapping bigquery.Job, and allowing fakes.
// TODO - move this to go/dataset, since it is bigquery specific and general purpose.
func waitForJob(ctx context.Context, job bqiface.Job, maxBackoff time.Duration, terminate <-chan struct{}) error {
Expand Down Expand Up @@ -362,6 +369,8 @@ func (rex *ReprocessingExecutor) finish(ctx context.Context, t *state.Task, term
t.SetError(ctx, err, "waitForJob")
return err
}

// This just gets the status.
// TODO - should this context have a deadline?
status, err := job.Wait(ctx)
if err != nil {
Expand All @@ -374,6 +383,9 @@ func (rex *ReprocessingExecutor) finish(ctx context.Context, t *state.Task, term

log.Println("Completed deduplication from", src.FullyQualifiedName())

// Clear the job ID. It won't be pushed to datastore until an error or update.
t.JobID = ""

// Sanity check and copy things to final DS.
// ==========================================================================
// Skip SanityCheckAndCopy for deployments that do not specify the final dataset.
Expand Down

0 comments on commit 4c7d0fd

Please sign in to comment.