Skip to content

Commit

Permalink
Make names consistent, use Query, and allow unpartitioned dest
Browse files Browse the repository at this point in the history
  • Loading branch information
gfr10598 committed Dec 14, 2017
1 parent e9b7a3c commit a8f3604
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 12 deletions.
24 changes: 15 additions & 9 deletions bqext/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func (dsExt Dataset) GetPartitionInfo(table string, partition string) (Partition
// writing results to a table.
// If dest is nil, then this will create a DryRun query.
// TODO - should disposition be an opts... field instead?
func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table, disposition bigquery.TableWriteDisposition) *bigquery.Query {
func (dsExt *Dataset) DestQuery(query string, dest *bigquery.Table, disposition bigquery.TableWriteDisposition) *bigquery.Query {
q := dsExt.BqClient.Query(query)
if dest != nil {
q.QueryConfig.Dst = dest
Expand All @@ -161,9 +161,11 @@ func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table, dispo
return q
}

// ExecDestQuery constructs a destination query, executes it, and returns status or error.
func (dsExt *Dataset) ExecDestQuery(query string, disposition bigquery.TableWriteDisposition, destTable *bigquery.Table) (*bigquery.JobStatus, error) {
q := dsExt.DestinationQuery(query, destTable, disposition)
// ExecDestQuery executes a destination or dryrun query, and returns status or error.
func (dsExt *Dataset) ExecDestQuery(q *bigquery.Query) (*bigquery.JobStatus, error) {
if q.QueryConfig.Dst == nil && q.QueryConfig.DryRun == false {
return nil, errors.New("query must be a destination or dry run")
}
job, err := q.Run(context.Background())
if err != nil {
return nil, err
Expand Down Expand Up @@ -200,14 +202,18 @@ var dedupTemplate = `
// `destTable` specifies the table to write to, typically created with
// dsExt.BqClient.DatasetInProject(...).Table(...)
//
// NOTE: destination table MUST include the partition suffix. This
// avoids accidentally overwriting the entire table.
// TODO(gfr) Support non-partitioned table destination.
// NOTE: If destination table is partitioned, destTable MUST include the partition
// suffix to avoid accidentally overwriting the entire table.
func (dsExt *Dataset) Dedup_Alpha(src string, dedupOn string, destTable *bigquery.Table) (*bigquery.JobStatus, error) {
if !strings.Contains(destTable.TableID, "$") {
return nil, errors.New("Destination table does not specify partition")
meta, err := destTable.Metadata(context.Background())
if err == nil && meta.TimePartitioning != nil {
log.Println(err)
return nil, errors.New("Destination table must specify partition")
}
}
log.Printf("Removing dups (of %s) and writing to %s\n", dedupOn, destTable.TableID)
queryString := fmt.Sprintf(dedupTemplate, dedupOn, src)
return dsExt.ExecDestQuery(queryString, bigquery.WriteTruncate, destTable)
query := dsExt.DestQuery(queryString, destTable, bigquery.WriteTruncate)
return dsExt.ExecDestQuery(query)
}
6 changes: 3 additions & 3 deletions bqext/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,15 +130,15 @@ func TestResultQuery(t *testing.T) {

// This test only check very basic stuff. Intended mostly just to
// improve coverage metrics.
func TestDestinationQuery(t *testing.T) {
func TestDestQuery(t *testing.T) {
// Create a dummy client.
opts := []option.ClientOption{option.WithHTTPClient(getOKClient())}
dsExt, err := bqext.NewDataset("mock", "mock", opts...)
if err != nil {
t.Fatal(err)
}

q := dsExt.DestinationQuery("query string", nil, bigquery.WriteEmpty)
q := dsExt.DestQuery("query string", nil, bigquery.WriteEmpty)
qc := q.QueryConfig
if qc.Dst != nil {
t.Error("Destination should be nil.")
Expand All @@ -147,7 +147,7 @@ func TestDestinationQuery(t *testing.T) {
t.Error("DryRun should be set.")
}

q = dsExt.DestinationQuery("query string", dsExt.Table("foobar"), bigquery.WriteEmpty)
q = dsExt.DestQuery("query string", dsExt.Table("foobar"), bigquery.WriteEmpty)
qc = q.QueryConfig
if qc.Dst.TableID != "foobar" {
t.Error("Destination should be foobar.")
Expand Down

0 comments on commit a8f3604

Please sign in to comment.