From 56878c77579d03fb1b9db60eda56bc75b8132e88 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Tue, 12 Dec 2017 17:40:03 -0500 Subject: [PATCH 01/11] add DestinationQuery and Dedup methods --- bqext/dataset.go | 106 ++++++++++++++++++++++++++++++ bqext/dataset_integration_test.go | 63 ++++++++++++++++++ 2 files changed, 169 insertions(+) diff --git a/bqext/dataset.go b/bqext/dataset.go index 092d448..ed0ad0d 100644 --- a/bqext/dataset.go +++ b/bqext/dataset.go @@ -20,8 +20,11 @@ package bqext import ( "errors" + "fmt" + "log" "reflect" "strings" + "time" "cloud.google.com/go/bigquery" "golang.org/x/net/context" @@ -108,3 +111,106 @@ func (dsExt *Dataset) QueryAndParse(q string, structPtr interface{}) error { } return nil } + +// TableInfo contains the critical stats for a specific table +// or partition. +type TableInfo struct { + Name string + IsPartitioned bool + NumBytes int64 + NumRows uint64 + CreationTime time.Time + LastModifiedTime time.Time +} + +// PartitionInfo provides basic information about a partition. +type PartitionInfo struct { + PartitionID string + CreationTime time.Time + LastModified time.Time +} + +// GetPartitionInfo provides basic information about a partition. +func (dsExt Dataset) GetPartitionInfo(table string, partition string) (PartitionInfo, error) { + // This uses legacy, because PARTITION_SUMMARY is not supported in standard. + queryString := fmt.Sprintf( + `#legacySQL + SELECT + partition_id as PartitionID, + msec_to_timestamp(creation_time) AS CreationTime, + msec_to_timestamp(last_modified_time) AS LastModified + FROM + [%s$__PARTITIONS_SUMMARY__] + where partition_id = "%s" `, table, partition) + pi := PartitionInfo{} + + err := dsExt.QueryAndParse(queryString, &pi) + if err != nil { + log.Println(err) + return PartitionInfo{}, err + } + return pi, nil +} + +// DestinationQuery constructs a query with common Config settings for +// writing results to a table. +// Generally, may need to change WriteDisposition. +func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table) *bigquery.Query { + q := dsExt.BqClient.Query(query) + if dest != nil { + q.QueryConfig.Dst = dest + } else { + q.QueryConfig.DryRun = true + } + q.QueryConfig.AllowLargeResults = true + // Default for unqualified table names in the query. + q.QueryConfig.DefaultProjectID = dsExt.Dataset.ProjectID + q.QueryConfig.DefaultDatasetID = dsExt.Dataset.DatasetID + q.QueryConfig.DisableFlattenedResults = true + return q +} + +/////////////////////////////////////////////////////////////////// +// Specific queries. +/////////////////////////////////////////////////////////////////// + +// TODO - really should take the one that was parsed last, instead +// of random +var dedupTemplate = ` + #standardSQL + # Delete all duplicate rows based on test_id + SELECT * except (row_number) + FROM ( + SELECT *, ROW_NUMBER() OVER (PARTITION BY %s) row_number + FROM ` + "`%s`" + `) + WHERE row_number = 1` + +// Dedup executes a query that dedups and writes to an appropriate +// partition. +// src is relative to the project:dataset of dsExt. +// dedupOn names the field to be used for dedupping. +// project, dataset, table specify the table to write into. +// NOTE: destination table must include the partition suffix. This +// avoids accidentally overwriting TODAY's partition. +func (dsExt *Dataset) Dedup(src string, dedupOn string, overwrite bool, project, dataset, table string) (*bigquery.JobStatus, error) { + if !strings.Contains(table, "$") { + return nil, errors.New("Destination table does not specify partition") + } + queryString := fmt.Sprintf(dedupTemplate, dedupOn, src) + dest := dsExt.BqClient.DatasetInProject(project, dataset) + q := dsExt.DestinationQuery(queryString, dest.Table(table)) + if overwrite { + q.QueryConfig.WriteDisposition = bigquery.WriteTruncate + } + log.Printf("Removing dups (of %s) and writing to %s\n", dedupOn, table) + job, err := q.Run(context.Background()) + if err != nil { + return nil, err + } + log.Println("JobID:", job.ID()) + status, err := job.Wait(context.Background()) + if err != nil { + return status, err + } + return status, nil +} diff --git a/bqext/dataset_integration_test.go b/bqext/dataset_integration_test.go index 4c061c5..b60c1f5 100644 --- a/bqext/dataset_integration_test.go +++ b/bqext/dataset_integration_test.go @@ -27,6 +27,7 @@ import ( "log" "os" "testing" + "time" "cloud.google.com/go/bigquery" "github.com/go-test/deep" @@ -131,3 +132,65 @@ func TestQueryAndParse(t *testing.T) { t.Error("Incorrect PartitionID") } } + +func ClientOpts() []option.ClientOption { + opts := []option.ClientOption{} + if os.Getenv("TRAVIS") != "" { + authOpt := option.WithCredentialsFile("../travis-testing.key") + opts = append(opts, authOpt) + } + return opts +} + +func TestDedup(t *testing.T) { + start := time.Now() // Later, we will compare partition time to this. + + tExt, err := bqext.NewDataset("mlab-testing", "etl", ClientOpts()...) + if err != nil { + t.Fatal(err) + } + + // First check that source table has expected number of rows. + // TestDedupSrc should have 6 rows, of which 4 should be unique. + type QR struct { + NumRows int64 + } + result := QR{} + err = tExt.QueryAndParse("select count(test_id) as NumRows from `TestDedupSrc_19990101`", &result) + if result.NumRows != 6 { + t.Fatal("Source table has wrong number rows: ", result.NumRows) + } + + _, err = tExt.Dedup("TestDedupSrc_19990101", "test_id", true, "mlab-testing", "etl", "TestDedupDest$19990101") + if err != nil { + t.Fatal(err) + } + + pi, err := tExt.GetPartitionInfo("TestDedupDest", "19990101") + if err != nil { + t.Fatal(err) + } + if pi.CreationTime.Before(start) { + t.Error("Partition not overwritten??? ", pi.CreationTime) + } + + err = tExt.QueryAndParse("select count(test_id) as NumRows from `TestDedupDest` where _PARTITIONTIME = timestamp("+`"1999-01-01 00:00:00"`+")", &result) + if err != nil { + t.Fatal(err) + } + if result.NumRows != 4 { + t.Error("Destination has wrong number of rows: ", result.NumRows) + } +} + +func TestPartitionInfo(t *testing.T) { + util, err := bqext.NewDataset("mlab-testing", "etl", ClientOpts()...) + if err != nil { + t.Fatal(err) + } + + info, err := util.GetPartitionInfo("TestDedupDest", "19990101") + if info.PartitionID != "19990101" { + t.Error("Incorrect PartitionID", info) + } +} From 7fd074303cf2a6862061a8ceef2f42275d5b629b Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Tue, 12 Dec 2017 17:47:22 -0500 Subject: [PATCH 02/11] remove unused TableInfo --- bqext/dataset.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/bqext/dataset.go b/bqext/dataset.go index ed0ad0d..2dceb56 100644 --- a/bqext/dataset.go +++ b/bqext/dataset.go @@ -112,17 +112,6 @@ func (dsExt *Dataset) QueryAndParse(q string, structPtr interface{}) error { return nil } -// TableInfo contains the critical stats for a specific table -// or partition. -type TableInfo struct { - Name string - IsPartitioned bool - NumBytes int64 - NumRows uint64 - CreationTime time.Time - LastModifiedTime time.Time -} - // PartitionInfo provides basic information about a partition. type PartitionInfo struct { PartitionID string From b3ff35f65e04b11427c0047c5b422e33e335e247 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Tue, 12 Dec 2017 17:54:10 -0500 Subject: [PATCH 03/11] add TODO and better comment --- bqext/dataset_integration_test.go | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/bqext/dataset_integration_test.go b/bqext/dataset_integration_test.go index b60c1f5..9025c73 100644 --- a/bqext/dataset_integration_test.go +++ b/bqext/dataset_integration_test.go @@ -21,6 +21,8 @@ package bqext_test // on the state of our bigquery tables, so they may start failing // if the tables are changed. +// TODO (issue #8) tests that use bq tables should create them from scratch. + import ( "encoding/json" "fmt" @@ -80,10 +82,12 @@ func TestGetTableStats(t *testing.T) { } // PartitionInfo provides basic information about a partition. +// Note that a similar struct is defined in dataset.go, but this +// one is used for testing the QueryAndParse method. type PartitionInfo struct { - PartitionID string `qfield:"partition_id"` - // CreationTime time.Time `qfield:"created"` - // LastModified time.Time `qfield:"last_modified"` + PartitionID string + CreationTime time.Time + LastModified time.Time } func TestQueryAndParse(t *testing.T) { From 49552b34017197eda3132a83565e7c5060932d9e Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Tue, 12 Dec 2017 18:21:17 -0500 Subject: [PATCH 04/11] Add env var echo --- .travis.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/.travis.yml b/.travis.yml index 8b92346..f08e895 100644 --- a/.travis.yml +++ b/.travis.yml @@ -30,7 +30,9 @@ before_install: - echo Branch is ${TRAVIS_BRANCH} and Tag is $TRAVIS_TAG +- echo EVENT_TYPE is ${TRAVIS_EVENT_TYPE} - if ${TRAVIS_EVENT_TYPE} == cron; then TEST_TAGS=integration; fi; +- echo TEST_TAGS is ${TEST_TAGS} # These directories will be cached on successful "script" builds, and restored, # if available, to save time on future builds. From 4c6b873f0c3c593652b906634042479c44183b8e Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Tue, 12 Dec 2017 18:24:37 -0500 Subject: [PATCH 05/11] add brackets --- .travis.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index f08e895..00cddef 100644 --- a/.travis.yml +++ b/.travis.yml @@ -31,7 +31,7 @@ before_install: - echo Branch is ${TRAVIS_BRANCH} and Tag is $TRAVIS_TAG - echo EVENT_TYPE is ${TRAVIS_EVENT_TYPE} -- if ${TRAVIS_EVENT_TYPE} == cron; then TEST_TAGS=integration; fi; +- if [[ ${TRAVIS_EVENT_TYPE} == cron ]]; then TEST_TAGS=integration; fi; - echo TEST_TAGS is ${TEST_TAGS} # These directories will be cached on successful "script" builds, and restored, From a7bc47412d2dd45028f16773a7e6299218857b2e Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Wed, 13 Dec 2017 12:59:21 -0500 Subject: [PATCH 06/11] Add some lightweight tests --- bqext/dataset_test.go | 66 +++++++++++++++++++++++++++++++++++++++---- 1 file changed, 60 insertions(+), 6 deletions(-) diff --git a/bqext/dataset_test.go b/bqext/dataset_test.go index 78f39c0..3aabf2e 100644 --- a/bqext/dataset_test.go +++ b/bqext/dataset_test.go @@ -22,14 +22,13 @@ import ( "net/http" "testing" - "google.golang.org/api/option" - "cloud.google.com/go/bigquery" "github.com/go-test/deep" "github.com/m-lab/go/bqext" "github.com/m-lab/go/cloudtest" "golang.org/x/net/context" "golang.org/x/oauth2/google" + "google.golang.org/api/option" ) type nopCloser struct { @@ -65,7 +64,7 @@ const wantTableMetadata = `{"Name":"","Description":"","Schema":[{"Name":"test_i // Client that returns canned response from metadata request. // Pretty ugly implementation. Will need to improve this before using // the strategy more widely. Possibly should use one of the go-vcr tools. -func getTableStatsClient() *http.Client { +func getOKClient() *http.Client { c := make(chan *http.Response, 10) client := cloudtest.NewChannelClient(c) @@ -83,13 +82,13 @@ func getTableStatsClient() *http.Client { // That test runs as an integration test, and the logged response body // can be found it that test's output. func TestGetTableStatsMock(t *testing.T) { - opts := []option.ClientOption{option.WithHTTPClient(getTableStatsClient())} - tExt, err := bqext.NewDataset("mock", "mock", opts...) + opts := []option.ClientOption{option.WithHTTPClient(getOKClient())} + dsExt, err := bqext.NewDataset("mock", "mock", opts...) if err != nil { t.Fatal(err) } - table := tExt.Dataset.Table("TestGetTableStats") + table := dsExt.Dataset.Table("TestGetTableStats") ctx := context.Background() stats, err := table.Metadata(ctx) if err != nil { @@ -105,3 +104,58 @@ func TestGetTableStatsMock(t *testing.T) { t.Error(diff) } } + +// This test only check very basic stuff. Intended mostly just to +// improve coverage metrics. +func TestResultQuery(t *testing.T) { + // Create a dummy client. + opts := []option.ClientOption{option.WithHTTPClient(getOKClient())} + dsExt, err := bqext.NewDataset("mock", "mock", opts...) + if err != nil { + t.Fatal(err) + } + + q := dsExt.ResultQuery("query string", true) + qc := q.QueryConfig + if !qc.DryRun { + t.Error("DryRun should be set.") + } + + q = dsExt.ResultQuery("query string", false) + qc = q.QueryConfig + if qc.DryRun { + t.Error("DryRun should be false.") + } + +} + +// This test only check very basic stuff. Intended mostly just to +// improve coverage metrics. +func TestDestinationQuery(t *testing.T) { + // Create a dummy client. + client := cloudtest.NewChannelClient(make(chan *http.Response, 10)) + opts := []option.ClientOption{option.WithHTTPClient(client)} + dsExt, err := bqext.NewDataset("mock", "mock", opts...) + if err != nil { + t.Fatal(err) + } + + q := dsExt.DestinationQuery("query string", nil) + qc := q.QueryConfig + if qc.Dst != nil { + t.Error("Destination should be nil.") + } + if !qc.DryRun { + t.Error("DryRun should be set.") + } + + q = dsExt.DestinationQuery("query string", dsExt.Dataset.Table("foobar")) + qc = q.QueryConfig + if qc.Dst.TableID != "foobar" { + t.Error("Destination should be foobar.") + } + if qc.DryRun { + t.Error("DryRun should be false.") + } + +} From ad6366374909e40fe9fc98781961f664f8beedd6 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Wed, 13 Dec 2017 18:12:09 -0500 Subject: [PATCH 07/11] minor changes from PR review --- bqext/dataset.go | 2 +- bqext/dataset_integration_test.go | 16 +++++++++------- bqext/dataset_test.go | 5 +---- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/bqext/dataset.go b/bqext/dataset.go index 2dceb56..8db6e9b 100644 --- a/bqext/dataset.go +++ b/bqext/dataset.go @@ -135,7 +135,7 @@ func (dsExt Dataset) GetPartitionInfo(table string, partition string) (Partition err := dsExt.QueryAndParse(queryString, &pi) if err != nil { - log.Println(err) + log.Println(err, ":", queryString) return PartitionInfo{}, err } return pi, nil diff --git a/bqext/dataset_integration_test.go b/bqext/dataset_integration_test.go index 9025c73..adddcfc 100644 --- a/bqext/dataset_integration_test.go +++ b/bqext/dataset_integration_test.go @@ -81,10 +81,10 @@ func TestGetTableStats(t *testing.T) { } } -// PartitionInfo provides basic information about a partition. +// partitionInfo provides basic information about a partition. // Note that a similar struct is defined in dataset.go, but this // one is used for testing the QueryAndParse method. -type PartitionInfo struct { +type partitionInfo struct { PartitionID string CreationTime time.Time LastModified time.Time @@ -114,10 +114,10 @@ func TestQueryAndParse(t *testing.T) { FROM [%s$__PARTITIONS_SUMMARY__] where partition_id = "%s" `, "TestQueryAndParse", "20170101") - pi := PartitionInfo{} + pi := partitionInfo{} // Should be simple struct... - err = tExt.QueryAndParse(queryString, []PartitionInfo{}) + err = tExt.QueryAndParse(queryString, []partitionInfo{}) if err == nil { t.Error("Should produce error on slice input") } @@ -137,7 +137,7 @@ func TestQueryAndParse(t *testing.T) { } } -func ClientOpts() []option.ClientOption { +func clientOpts() []option.ClientOption { opts := []option.ClientOption{} if os.Getenv("TRAVIS") != "" { authOpt := option.WithCredentialsFile("../travis-testing.key") @@ -146,10 +146,12 @@ func ClientOpts() []option.ClientOption { return opts } +// TODO - should build the test tables from scratch. See https://github.com/m-lab/go/issues/8 + func TestDedup(t *testing.T) { start := time.Now() // Later, we will compare partition time to this. - tExt, err := bqext.NewDataset("mlab-testing", "etl", ClientOpts()...) + tExt, err := bqext.NewDataset("mlab-testing", "etl", clientOpts()...) if err != nil { t.Fatal(err) } @@ -188,7 +190,7 @@ func TestDedup(t *testing.T) { } func TestPartitionInfo(t *testing.T) { - util, err := bqext.NewDataset("mlab-testing", "etl", ClientOpts()...) + util, err := bqext.NewDataset("mlab-testing", "etl", clientOpts()...) if err != nil { t.Fatal(err) } diff --git a/bqext/dataset_test.go b/bqext/dataset_test.go index 3aabf2e..9328708 100644 --- a/bqext/dataset_test.go +++ b/bqext/dataset_test.go @@ -126,15 +126,13 @@ func TestResultQuery(t *testing.T) { if qc.DryRun { t.Error("DryRun should be false.") } - } // This test only check very basic stuff. Intended mostly just to // improve coverage metrics. func TestDestinationQuery(t *testing.T) { // Create a dummy client. - client := cloudtest.NewChannelClient(make(chan *http.Response, 10)) - opts := []option.ClientOption{option.WithHTTPClient(client)} + opts := []option.ClientOption{option.WithHTTPClient(getOKClient())} dsExt, err := bqext.NewDataset("mock", "mock", opts...) if err != nil { t.Fatal(err) @@ -157,5 +155,4 @@ func TestDestinationQuery(t *testing.T) { if qc.DryRun { t.Error("DryRun should be false.") } - } From bb80dce5156ec47b3d654694bd98468e9bcc8283 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Thu, 14 Dec 2017 10:15:02 -0500 Subject: [PATCH 08/11] Extract ExecDestQuery and simplify Dedup --- bqext/dataset.go | 57 +++++++++++++++++-------------- bqext/dataset_integration_test.go | 5 +-- 2 files changed, 35 insertions(+), 27 deletions(-) diff --git a/bqext/dataset.go b/bqext/dataset.go index 8db6e9b..1b0dce4 100644 --- a/bqext/dataset.go +++ b/bqext/dataset.go @@ -159,6 +159,22 @@ func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table) *bigq return q } +// ExecDestQuery constructs a destination query, executes it, and returns status or error. +func (dsExt *Dataset) ExecDestQuery(query string, disposition bigquery.TableWriteDisposition, destTable *bigquery.Table) (*bigquery.JobStatus, error) { + q := dsExt.DestinationQuery(query, destTable) + q.QueryConfig.WriteDisposition = disposition + job, err := q.Run(context.Background()) + if err != nil { + return nil, err + } + log.Println("JobID:", job.ID()) + status, err := job.Wait(context.Background()) + if err != nil { + return status, err + } + return status, nil +} + /////////////////////////////////////////////////////////////////// // Specific queries. /////////////////////////////////////////////////////////////////// @@ -174,32 +190,23 @@ var dedupTemplate = ` FROM ` + "`%s`" + `) WHERE row_number = 1` -// Dedup executes a query that dedups and writes to an appropriate -// partition. -// src is relative to the project:dataset of dsExt. -// dedupOn names the field to be used for dedupping. -// project, dataset, table specify the table to write into. -// NOTE: destination table must include the partition suffix. This -// avoids accidentally overwriting TODAY's partition. -func (dsExt *Dataset) Dedup(src string, dedupOn string, overwrite bool, project, dataset, table string) (*bigquery.JobStatus, error) { - if !strings.Contains(table, "$") { +// Dedup_Alpha executes a query that dedups and writes to destination partition. +// This function is alpha status. The interface may change without notice +// or major version number change. +// +// `src` is relative to the project:dataset of dsExt. +// `dedupOn` names the field to be used for dedupping. +// `destTable` specifies the table to write to, typically created with +// dsExt.BqClient.DatasetInProject(...).Table(...) +// +// NOTE: destination table MUST include the partition suffix. This +// avoids accidentally overwriting the entire table. +// TODO(gfr) Support non-partitioned table destination. +func (dsExt *Dataset) Dedup_Alpha(src string, dedupOn string, destTable *bigquery.Table) (*bigquery.JobStatus, error) { + if !strings.Contains(destTable.TableID, "$") { return nil, errors.New("Destination table does not specify partition") } + log.Printf("Removing dups (of %s) and writing to %s\n", dedupOn, destTable.TableID) queryString := fmt.Sprintf(dedupTemplate, dedupOn, src) - dest := dsExt.BqClient.DatasetInProject(project, dataset) - q := dsExt.DestinationQuery(queryString, dest.Table(table)) - if overwrite { - q.QueryConfig.WriteDisposition = bigquery.WriteTruncate - } - log.Printf("Removing dups (of %s) and writing to %s\n", dedupOn, table) - job, err := q.Run(context.Background()) - if err != nil { - return nil, err - } - log.Println("JobID:", job.ID()) - status, err := job.Wait(context.Background()) - if err != nil { - return status, err - } - return status, nil + return dsExt.ExecDestQuery(queryString, bigquery.WriteTruncate, destTable) } diff --git a/bqext/dataset_integration_test.go b/bqext/dataset_integration_test.go index adddcfc..4bf4e1e 100644 --- a/bqext/dataset_integration_test.go +++ b/bqext/dataset_integration_test.go @@ -148,7 +148,7 @@ func clientOpts() []option.ClientOption { // TODO - should build the test tables from scratch. See https://github.com/m-lab/go/issues/8 -func TestDedup(t *testing.T) { +func TestDedup_Alpha(t *testing.T) { start := time.Now() // Later, we will compare partition time to this. tExt, err := bqext.NewDataset("mlab-testing", "etl", clientOpts()...) @@ -167,7 +167,8 @@ func TestDedup(t *testing.T) { t.Fatal("Source table has wrong number rows: ", result.NumRows) } - _, err = tExt.Dedup("TestDedupSrc_19990101", "test_id", true, "mlab-testing", "etl", "TestDedupDest$19990101") + destTable := tExt.BqClient.DatasetInProject("mlab-testing", "etl").Table("TestDedupDest$19990101") + _, err = tExt.Dedup_Alpha("TestDedupSrc_19990101", "test_id", destTable) if err != nil { t.Fatal(err) } From dce14580d937311f90d9cd5a997d3d82313f0b1e Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Thu, 14 Dec 2017 10:20:27 -0500 Subject: [PATCH 09/11] Move TableWriteDisp to DestinationQuery param --- bqext/dataset.go | 9 +++++---- bqext/dataset_test.go | 4 ++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/bqext/dataset.go b/bqext/dataset.go index 1b0dce4..986c205 100644 --- a/bqext/dataset.go +++ b/bqext/dataset.go @@ -143,14 +143,16 @@ func (dsExt Dataset) GetPartitionInfo(table string, partition string) (Partition // DestinationQuery constructs a query with common Config settings for // writing results to a table. -// Generally, may need to change WriteDisposition. -func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table) *bigquery.Query { +// If dest is nil, then this will create a DryRun query. +// TODO - should disposition be an opts... field instead? +func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table, disposition bigquery.TableWriteDisposition) *bigquery.Query { q := dsExt.BqClient.Query(query) if dest != nil { q.QueryConfig.Dst = dest } else { q.QueryConfig.DryRun = true } + q.QueryConfig.WriteDisposition = disposition q.QueryConfig.AllowLargeResults = true // Default for unqualified table names in the query. q.QueryConfig.DefaultProjectID = dsExt.Dataset.ProjectID @@ -161,8 +163,7 @@ func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table) *bigq // ExecDestQuery constructs a destination query, executes it, and returns status or error. func (dsExt *Dataset) ExecDestQuery(query string, disposition bigquery.TableWriteDisposition, destTable *bigquery.Table) (*bigquery.JobStatus, error) { - q := dsExt.DestinationQuery(query, destTable) - q.QueryConfig.WriteDisposition = disposition + q := dsExt.DestinationQuery(query, destTable, disposition) job, err := q.Run(context.Background()) if err != nil { return nil, err diff --git a/bqext/dataset_test.go b/bqext/dataset_test.go index 9328708..ad91843 100644 --- a/bqext/dataset_test.go +++ b/bqext/dataset_test.go @@ -138,7 +138,7 @@ func TestDestinationQuery(t *testing.T) { t.Fatal(err) } - q := dsExt.DestinationQuery("query string", nil) + q := dsExt.DestinationQuery("query string", nil, bigquery.WriteEmpty) qc := q.QueryConfig if qc.Dst != nil { t.Error("Destination should be nil.") @@ -147,7 +147,7 @@ func TestDestinationQuery(t *testing.T) { t.Error("DryRun should be set.") } - q = dsExt.DestinationQuery("query string", dsExt.Dataset.Table("foobar")) + q = dsExt.DestinationQuery("query string", dsExt.Dataset.Table("foobar"), bigquery.WriteEmpty) qc = q.QueryConfig if qc.Dst.TableID != "foobar" { t.Error("Destination should be foobar.") From e9b7a3c717264bf91e28d51fabb6cc5198e52b25 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Thu, 14 Dec 2017 11:35:23 -0500 Subject: [PATCH 10/11] Embed Dataset as is-a --- bqext/dataset.go | 14 +++++++------- bqext/dataset_integration_test.go | 2 +- bqext/dataset_test.go | 4 ++-- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/bqext/dataset.go b/bqext/dataset.go index 986c205..74f1b0e 100644 --- a/bqext/dataset.go +++ b/bqext/dataset.go @@ -36,8 +36,8 @@ import ( // objects to streamline common actions. // It encapsulates the Client and Dataset to simplify methods. type Dataset struct { - BqClient *bigquery.Client - Dataset *bigquery.Dataset + *bigquery.Dataset // Exposes Dataset API directly. + BqClient *bigquery.Client } // NewDataset creates a Dataset for a project. @@ -55,7 +55,7 @@ func NewDataset(project, dataset string, clientOpts ...option.ClientOption) (Dat return Dataset{}, err } - return Dataset{bqClient, bqClient.Dataset(dataset)}, nil + return Dataset{bqClient.Dataset(dataset), bqClient}, nil } // ResultQuery constructs a query with common QueryConfig settings for @@ -68,8 +68,8 @@ func (dsExt *Dataset) ResultQuery(query string, dryRun bool) *bigquery.Query { q.QueryConfig.UseLegacySQL = true } // Default for unqualified table names in the query. - q.QueryConfig.DefaultProjectID = dsExt.Dataset.ProjectID - q.QueryConfig.DefaultDatasetID = dsExt.Dataset.DatasetID + q.QueryConfig.DefaultProjectID = dsExt.ProjectID + q.QueryConfig.DefaultDatasetID = dsExt.DatasetID return q } @@ -155,8 +155,8 @@ func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table, dispo q.QueryConfig.WriteDisposition = disposition q.QueryConfig.AllowLargeResults = true // Default for unqualified table names in the query. - q.QueryConfig.DefaultProjectID = dsExt.Dataset.ProjectID - q.QueryConfig.DefaultDatasetID = dsExt.Dataset.DatasetID + q.QueryConfig.DefaultProjectID = dsExt.ProjectID + q.QueryConfig.DefaultDatasetID = dsExt.DatasetID q.QueryConfig.DisableFlattenedResults = true return q } diff --git a/bqext/dataset_integration_test.go b/bqext/dataset_integration_test.go index 4bf4e1e..fe504f1 100644 --- a/bqext/dataset_integration_test.go +++ b/bqext/dataset_integration_test.go @@ -57,7 +57,7 @@ func TestGetTableStats(t *testing.T) { t.Fatal(err) } - table := tExt.Dataset.Table("TestGetTableStats") + table := tExt.Table("TestGetTableStats") ctx := context.Background() stats, err := table.Metadata(ctx) if err != nil { diff --git a/bqext/dataset_test.go b/bqext/dataset_test.go index ad91843..bf07bc1 100644 --- a/bqext/dataset_test.go +++ b/bqext/dataset_test.go @@ -88,7 +88,7 @@ func TestGetTableStatsMock(t *testing.T) { t.Fatal(err) } - table := dsExt.Dataset.Table("TestGetTableStats") + table := dsExt.Table("TestGetTableStats") ctx := context.Background() stats, err := table.Metadata(ctx) if err != nil { @@ -147,7 +147,7 @@ func TestDestinationQuery(t *testing.T) { t.Error("DryRun should be set.") } - q = dsExt.DestinationQuery("query string", dsExt.Dataset.Table("foobar"), bigquery.WriteEmpty) + q = dsExt.DestinationQuery("query string", dsExt.Table("foobar"), bigquery.WriteEmpty) qc = q.QueryConfig if qc.Dst.TableID != "foobar" { t.Error("Destination should be foobar.") From a8f36049b3dd900d1b336783641bbf048fc252f5 Mon Sep 17 00:00:00 2001 From: Greg Russell Date: Thu, 14 Dec 2017 14:57:17 -0500 Subject: [PATCH 11/11] Make names consistent, use Query, and allow unpartitioned dest --- bqext/dataset.go | 24 +++++++++++++++--------- bqext/dataset_test.go | 6 +++--- 2 files changed, 18 insertions(+), 12 deletions(-) diff --git a/bqext/dataset.go b/bqext/dataset.go index 74f1b0e..1c64ee8 100644 --- a/bqext/dataset.go +++ b/bqext/dataset.go @@ -145,7 +145,7 @@ func (dsExt Dataset) GetPartitionInfo(table string, partition string) (Partition // writing results to a table. // If dest is nil, then this will create a DryRun query. // TODO - should disposition be an opts... field instead? -func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table, disposition bigquery.TableWriteDisposition) *bigquery.Query { +func (dsExt *Dataset) DestQuery(query string, dest *bigquery.Table, disposition bigquery.TableWriteDisposition) *bigquery.Query { q := dsExt.BqClient.Query(query) if dest != nil { q.QueryConfig.Dst = dest @@ -161,9 +161,11 @@ func (dsExt *Dataset) DestinationQuery(query string, dest *bigquery.Table, dispo return q } -// ExecDestQuery constructs a destination query, executes it, and returns status or error. -func (dsExt *Dataset) ExecDestQuery(query string, disposition bigquery.TableWriteDisposition, destTable *bigquery.Table) (*bigquery.JobStatus, error) { - q := dsExt.DestinationQuery(query, destTable, disposition) +// ExecDestQuery executes a destination or dryrun query, and returns status or error. +func (dsExt *Dataset) ExecDestQuery(q *bigquery.Query) (*bigquery.JobStatus, error) { + if q.QueryConfig.Dst == nil && q.QueryConfig.DryRun == false { + return nil, errors.New("query must be a destination or dry run") + } job, err := q.Run(context.Background()) if err != nil { return nil, err @@ -200,14 +202,18 @@ var dedupTemplate = ` // `destTable` specifies the table to write to, typically created with // dsExt.BqClient.DatasetInProject(...).Table(...) // -// NOTE: destination table MUST include the partition suffix. This -// avoids accidentally overwriting the entire table. -// TODO(gfr) Support non-partitioned table destination. +// NOTE: If destination table is partitioned, destTable MUST include the partition +// suffix to avoid accidentally overwriting the entire table. func (dsExt *Dataset) Dedup_Alpha(src string, dedupOn string, destTable *bigquery.Table) (*bigquery.JobStatus, error) { if !strings.Contains(destTable.TableID, "$") { - return nil, errors.New("Destination table does not specify partition") + meta, err := destTable.Metadata(context.Background()) + if err == nil && meta.TimePartitioning != nil { + log.Println(err) + return nil, errors.New("Destination table must specify partition") + } } log.Printf("Removing dups (of %s) and writing to %s\n", dedupOn, destTable.TableID) queryString := fmt.Sprintf(dedupTemplate, dedupOn, src) - return dsExt.ExecDestQuery(queryString, bigquery.WriteTruncate, destTable) + query := dsExt.DestQuery(queryString, destTable, bigquery.WriteTruncate) + return dsExt.ExecDestQuery(query) } diff --git a/bqext/dataset_test.go b/bqext/dataset_test.go index bf07bc1..b7fae5a 100644 --- a/bqext/dataset_test.go +++ b/bqext/dataset_test.go @@ -130,7 +130,7 @@ func TestResultQuery(t *testing.T) { // This test only check very basic stuff. Intended mostly just to // improve coverage metrics. -func TestDestinationQuery(t *testing.T) { +func TestDestQuery(t *testing.T) { // Create a dummy client. opts := []option.ClientOption{option.WithHTTPClient(getOKClient())} dsExt, err := bqext.NewDataset("mock", "mock", opts...) @@ -138,7 +138,7 @@ func TestDestinationQuery(t *testing.T) { t.Fatal(err) } - q := dsExt.DestinationQuery("query string", nil, bigquery.WriteEmpty) + q := dsExt.DestQuery("query string", nil, bigquery.WriteEmpty) qc := q.QueryConfig if qc.Dst != nil { t.Error("Destination should be nil.") @@ -147,7 +147,7 @@ func TestDestinationQuery(t *testing.T) { t.Error("DryRun should be set.") } - q = dsExt.DestinationQuery("query string", dsExt.Table("foobar"), bigquery.WriteEmpty) + q = dsExt.DestQuery("query string", dsExt.Table("foobar"), bigquery.WriteEmpty) qc = q.QueryConfig if qc.Dst.TableID != "foobar" { t.Error("Destination should be foobar.")