Skip to content

Commit

Permalink
Set batch priority on dedup and join (#412)
Browse files Browse the repository at this point in the history
* Set batch priority on dedup and join
* Check that job was run with batch priority
* Restore dryrun
* Fix comments
  • Loading branch information
stephen-soltesz authored Oct 17, 2022
1 parent 9043585 commit e32dae1
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 3 deletions.
13 changes: 10 additions & 3 deletions cloud/bq/ops.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,15 @@ func (to TableOps) Dedup(ctx context.Context, dryRun bool) (bqiface.Job, error)
if q == nil {
return nil, dataset.ErrNilQuery
}
if dryRun {
qc := bqiface.QueryConfig{QueryConfig: bigquery.QueryConfig{DryRun: dryRun, Q: qs}}
q.SetQueryConfig(qc)
qc := bqiface.QueryConfig{
QueryConfig: bigquery.QueryConfig{
Q: qs,
DryRun: dryRun,
// Schedule as batch job to avoid quota limits for interactive jobs.
Priority: bigquery.BatchPriority,
},
}
q.SetQueryConfig(qc)
return q.Run(ctx)
}

Expand Down Expand Up @@ -281,6 +286,8 @@ func (to TableOps) Join(ctx context.Context, dryRun bool) (bqiface.Job, error) {
Field: "date",
RequirePartitionFilter: true,
},
// Schedule as batch job to avoid quota limits for interactive jobs.
Priority: bigquery.BatchPriority,
},
Dst: dest,
}
Expand Down
15 changes: 15 additions & 0 deletions cloud/bq/ops_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"testing"
"time"

"cloud.google.com/go/bigquery"
"github.com/m-lab/etl-gardener/cloud/bq"
"github.com/m-lab/etl-gardener/ops"
"github.com/m-lab/etl-gardener/tracker"
Expand Down Expand Up @@ -90,6 +91,13 @@ func TestValidateQueries(t *testing.T) {
if status.Err() != nil {
t.Fatal(t.Name(), err, bq.DedupQuery(*qp))
}
cfg, err := j.Config()
if err != nil {
t.Fatalf("TestValidateQueries() Dedup job Config failed: %v", err)
}
if cfg.(*bigquery.QueryConfig).Priority != bigquery.BatchPriority {
t.Errorf("TestValidateQueries() Dedup job priority is not Batch: %v", bigquery.BatchPriority)
}

if !ops.JoinableDatatypes[qp.Job.Datatype] {
return
Expand All @@ -103,6 +111,13 @@ func TestValidateQueries(t *testing.T) {
if status.Err() != nil {
t.Fatal(t.Name(), err, bq.JoinQuery(*qp))
}
cfg, err = j.Config()
if err != nil {
t.Fatalf("TestValidateQueries() Join job config failed: %v", err)
}
if cfg.(*bigquery.QueryConfig).Priority != bigquery.BatchPriority {
t.Errorf("TestValidateQueries() Join job priority is not Batch: %v", bigquery.BatchPriority)
}
})
}
}

0 comments on commit e32dae1

Please sign in to comment.