Skip to content

Commit

Permalink
Merge pull request #10 from m-lab/dedup
Browse files Browse the repository at this point in the history
More bigquery Dataset extensions for dedupping
  • Loading branch information
gfr10598 authored Dec 14, 2017
2 parents 7ac696d + a8f3604 commit fce6e7b
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 20 deletions.
4 changes: 3 additions & 1 deletion .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@ before_install:

- echo Branch is ${TRAVIS_BRANCH} and Tag is $TRAVIS_TAG

- if ${TRAVIS_EVENT_TYPE} == cron; then TEST_TAGS=integration; fi;
- echo EVENT_TYPE is ${TRAVIS_EVENT_TYPE}
- if [[ ${TRAVIS_EVENT_TYPE} == cron ]]; then TEST_TAGS=integration; fi;
- echo TEST_TAGS is ${TEST_TAGS}

# These directories will be cached on successful "script" builds, and restored,
# if available, to save time on future builds.
Expand Down
119 changes: 114 additions & 5 deletions bqext/dataset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@ package bqext

import (
"errors"
"fmt"
"log"
"reflect"
"strings"
"time"

"cloud.google.com/go/bigquery"
"golang.org/x/net/context"
Expand All @@ -33,8 +36,8 @@ import (
// objects to streamline common actions.
// It encapsulates the Client and Dataset to simplify methods.
type Dataset struct {
BqClient *bigquery.Client
Dataset *bigquery.Dataset
*bigquery.Dataset // Exposes Dataset API directly.
BqClient *bigquery.Client
}

// NewDataset creates a Dataset for a project.
Expand All @@ -52,7 +55,7 @@ func NewDataset(project, dataset string, clientOpts ...option.ClientOption) (Dat
return Dataset{}, err
}

return Dataset{bqClient, bqClient.Dataset(dataset)}, nil
return Dataset{bqClient.Dataset(dataset), bqClient}, nil
}

// ResultQuery constructs a query with common QueryConfig settings for
Expand All @@ -65,8 +68,8 @@ func (dsExt *Dataset) ResultQuery(query string, dryRun bool) *bigquery.Query {
q.QueryConfig.UseLegacySQL = true
}
// Default for unqualified table names in the query.
q.QueryConfig.DefaultProjectID = dsExt.Dataset.ProjectID
q.QueryConfig.DefaultDatasetID = dsExt.Dataset.DatasetID
q.QueryConfig.DefaultProjectID = dsExt.ProjectID
q.QueryConfig.DefaultDatasetID = dsExt.DatasetID
return q
}

Expand Down Expand Up @@ -108,3 +111,109 @@ func (dsExt *Dataset) QueryAndParse(q string, structPtr interface{}) error {
}
return nil
}

// PartitionInfo provides basic information about a partition.
type PartitionInfo struct {
PartitionID string
CreationTime time.Time
LastModified time.Time
}

// GetPartitionInfo provides basic information about a partition.
func (dsExt Dataset) GetPartitionInfo(table string, partition string) (PartitionInfo, error) {
// This uses legacy, because PARTITION_SUMMARY is not supported in standard.
queryString := fmt.Sprintf(
`#legacySQL
SELECT
partition_id as PartitionID,
msec_to_timestamp(creation_time) AS CreationTime,
msec_to_timestamp(last_modified_time) AS LastModified
FROM
[%s$__PARTITIONS_SUMMARY__]
where partition_id = "%s" `, table, partition)
pi := PartitionInfo{}

err := dsExt.QueryAndParse(queryString, &pi)
if err != nil {
log.Println(err, ":", queryString)
return PartitionInfo{}, err
}
return pi, nil
}

// DestinationQuery constructs a query with common Config settings for
// writing results to a table.
// If dest is nil, then this will create a DryRun query.
// TODO - should disposition be an opts... field instead?
func (dsExt *Dataset) DestQuery(query string, dest *bigquery.Table, disposition bigquery.TableWriteDisposition) *bigquery.Query {
q := dsExt.BqClient.Query(query)
if dest != nil {
q.QueryConfig.Dst = dest
} else {
q.QueryConfig.DryRun = true
}
q.QueryConfig.WriteDisposition = disposition
q.QueryConfig.AllowLargeResults = true
// Default for unqualified table names in the query.
q.QueryConfig.DefaultProjectID = dsExt.ProjectID
q.QueryConfig.DefaultDatasetID = dsExt.DatasetID
q.QueryConfig.DisableFlattenedResults = true
return q
}

// ExecDestQuery executes a destination or dryrun query, and returns status or error.
func (dsExt *Dataset) ExecDestQuery(q *bigquery.Query) (*bigquery.JobStatus, error) {
if q.QueryConfig.Dst == nil && q.QueryConfig.DryRun == false {
return nil, errors.New("query must be a destination or dry run")
}
job, err := q.Run(context.Background())
if err != nil {
return nil, err
}
log.Println("JobID:", job.ID())
status, err := job.Wait(context.Background())
if err != nil {
return status, err
}
return status, nil
}

///////////////////////////////////////////////////////////////////
// Specific queries.
///////////////////////////////////////////////////////////////////

// TODO - really should take the one that was parsed last, instead
// of random
var dedupTemplate = `
#standardSQL
# Delete all duplicate rows based on test_id
SELECT * except (row_number)
FROM (
SELECT *, ROW_NUMBER() OVER (PARTITION BY %s) row_number
FROM ` + "`%s`" + `)
WHERE row_number = 1`

// Dedup_Alpha executes a query that dedups and writes to destination partition.
// This function is alpha status. The interface may change without notice
// or major version number change.
//
// `src` is relative to the project:dataset of dsExt.
// `dedupOn` names the field to be used for dedupping.
// `destTable` specifies the table to write to, typically created with
// dsExt.BqClient.DatasetInProject(...).Table(...)
//
// NOTE: If destination table is partitioned, destTable MUST include the partition
// suffix to avoid accidentally overwriting the entire table.
func (dsExt *Dataset) Dedup_Alpha(src string, dedupOn string, destTable *bigquery.Table) (*bigquery.JobStatus, error) {
if !strings.Contains(destTable.TableID, "$") {
meta, err := destTable.Metadata(context.Background())
if err == nil && meta.TimePartitioning != nil {
log.Println(err)
return nil, errors.New("Destination table must specify partition")
}
}
log.Printf("Removing dups (of %s) and writing to %s\n", dedupOn, destTable.TableID)
queryString := fmt.Sprintf(dedupTemplate, dedupOn, src)
query := dsExt.DestQuery(queryString, destTable, bigquery.WriteTruncate)
return dsExt.ExecDestQuery(query)
}
86 changes: 78 additions & 8 deletions bqext/dataset_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ package bqext_test
// on the state of our bigquery tables, so they may start failing
// if the tables are changed.

// TODO (issue #8) tests that use bq tables should create them from scratch.

import (
"encoding/json"
"fmt"
"log"
"os"
"testing"
"time"

"cloud.google.com/go/bigquery"
"github.com/go-test/deep"
Expand Down Expand Up @@ -54,7 +57,7 @@ func TestGetTableStats(t *testing.T) {
t.Fatal(err)
}

table := tExt.Dataset.Table("TestGetTableStats")
table := tExt.Table("TestGetTableStats")
ctx := context.Background()
stats, err := table.Metadata(ctx)
if err != nil {
Expand All @@ -78,11 +81,13 @@ func TestGetTableStats(t *testing.T) {
}
}

// PartitionInfo provides basic information about a partition.
type PartitionInfo struct {
PartitionID string `qfield:"partition_id"`
// CreationTime time.Time `qfield:"created"`
// LastModified time.Time `qfield:"last_modified"`
// partitionInfo provides basic information about a partition.
// Note that a similar struct is defined in dataset.go, but this
// one is used for testing the QueryAndParse method.
type partitionInfo struct {
PartitionID string
CreationTime time.Time
LastModified time.Time
}

func TestQueryAndParse(t *testing.T) {
Expand All @@ -109,10 +114,10 @@ func TestQueryAndParse(t *testing.T) {
FROM
[%s$__PARTITIONS_SUMMARY__]
where partition_id = "%s" `, "TestQueryAndParse", "20170101")
pi := PartitionInfo{}
pi := partitionInfo{}

// Should be simple struct...
err = tExt.QueryAndParse(queryString, []PartitionInfo{})
err = tExt.QueryAndParse(queryString, []partitionInfo{})
if err == nil {
t.Error("Should produce error on slice input")
}
Expand All @@ -131,3 +136,68 @@ func TestQueryAndParse(t *testing.T) {
t.Error("Incorrect PartitionID")
}
}

func clientOpts() []option.ClientOption {
opts := []option.ClientOption{}
if os.Getenv("TRAVIS") != "" {
authOpt := option.WithCredentialsFile("../travis-testing.key")
opts = append(opts, authOpt)
}
return opts
}

// TODO - should build the test tables from scratch. See https://github.com/m-lab/go/issues/8

func TestDedup_Alpha(t *testing.T) {
start := time.Now() // Later, we will compare partition time to this.

tExt, err := bqext.NewDataset("mlab-testing", "etl", clientOpts()...)
if err != nil {
t.Fatal(err)
}

// First check that source table has expected number of rows.
// TestDedupSrc should have 6 rows, of which 4 should be unique.
type QR struct {
NumRows int64
}
result := QR{}
err = tExt.QueryAndParse("select count(test_id) as NumRows from `TestDedupSrc_19990101`", &result)
if result.NumRows != 6 {
t.Fatal("Source table has wrong number rows: ", result.NumRows)
}

destTable := tExt.BqClient.DatasetInProject("mlab-testing", "etl").Table("TestDedupDest$19990101")
_, err = tExt.Dedup_Alpha("TestDedupSrc_19990101", "test_id", destTable)
if err != nil {
t.Fatal(err)
}

pi, err := tExt.GetPartitionInfo("TestDedupDest", "19990101")
if err != nil {
t.Fatal(err)
}
if pi.CreationTime.Before(start) {
t.Error("Partition not overwritten??? ", pi.CreationTime)
}

err = tExt.QueryAndParse("select count(test_id) as NumRows from `TestDedupDest` where _PARTITIONTIME = timestamp("+`"1999-01-01 00:00:00"`+")", &result)
if err != nil {
t.Fatal(err)
}
if result.NumRows != 4 {
t.Error("Destination has wrong number of rows: ", result.NumRows)
}
}

func TestPartitionInfo(t *testing.T) {
util, err := bqext.NewDataset("mlab-testing", "etl", clientOpts()...)
if err != nil {
t.Fatal(err)
}

info, err := util.GetPartitionInfo("TestDedupDest", "19990101")
if info.PartitionID != "19990101" {
t.Error("Incorrect PartitionID", info)
}
}
63 changes: 57 additions & 6 deletions bqext/dataset_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,13 @@ import (
"net/http"
"testing"

"google.golang.org/api/option"

"cloud.google.com/go/bigquery"
"github.com/go-test/deep"
"github.com/m-lab/go/bqext"
"github.com/m-lab/go/cloudtest"
"golang.org/x/net/context"
"golang.org/x/oauth2/google"
"google.golang.org/api/option"
)

type nopCloser struct {
Expand Down Expand Up @@ -65,7 +64,7 @@ const wantTableMetadata = `{"Name":"","Description":"","Schema":[{"Name":"test_i
// Client that returns canned response from metadata request.
// Pretty ugly implementation. Will need to improve this before using
// the strategy more widely. Possibly should use one of the go-vcr tools.
func getTableStatsClient() *http.Client {
func getOKClient() *http.Client {
c := make(chan *http.Response, 10)
client := cloudtest.NewChannelClient(c)

Expand All @@ -83,13 +82,13 @@ func getTableStatsClient() *http.Client {
// That test runs as an integration test, and the logged response body
// can be found it that test's output.
func TestGetTableStatsMock(t *testing.T) {
opts := []option.ClientOption{option.WithHTTPClient(getTableStatsClient())}
tExt, err := bqext.NewDataset("mock", "mock", opts...)
opts := []option.ClientOption{option.WithHTTPClient(getOKClient())}
dsExt, err := bqext.NewDataset("mock", "mock", opts...)
if err != nil {
t.Fatal(err)
}

table := tExt.Dataset.Table("TestGetTableStats")
table := dsExt.Table("TestGetTableStats")
ctx := context.Background()
stats, err := table.Metadata(ctx)
if err != nil {
Expand All @@ -105,3 +104,55 @@ func TestGetTableStatsMock(t *testing.T) {
t.Error(diff)
}
}

// This test only check very basic stuff. Intended mostly just to
// improve coverage metrics.
func TestResultQuery(t *testing.T) {
// Create a dummy client.
opts := []option.ClientOption{option.WithHTTPClient(getOKClient())}
dsExt, err := bqext.NewDataset("mock", "mock", opts...)
if err != nil {
t.Fatal(err)
}

q := dsExt.ResultQuery("query string", true)
qc := q.QueryConfig
if !qc.DryRun {
t.Error("DryRun should be set.")
}

q = dsExt.ResultQuery("query string", false)
qc = q.QueryConfig
if qc.DryRun {
t.Error("DryRun should be false.")
}
}

// This test only check very basic stuff. Intended mostly just to
// improve coverage metrics.
func TestDestQuery(t *testing.T) {
// Create a dummy client.
opts := []option.ClientOption{option.WithHTTPClient(getOKClient())}
dsExt, err := bqext.NewDataset("mock", "mock", opts...)
if err != nil {
t.Fatal(err)
}

q := dsExt.DestQuery("query string", nil, bigquery.WriteEmpty)
qc := q.QueryConfig
if qc.Dst != nil {
t.Error("Destination should be nil.")
}
if !qc.DryRun {
t.Error("DryRun should be set.")
}

q = dsExt.DestQuery("query string", dsExt.Table("foobar"), bigquery.WriteEmpty)
qc = q.QueryConfig
if qc.Dst.TableID != "foobar" {
t.Error("Destination should be foobar.")
}
if qc.DryRun {
t.Error("DryRun should be false.")
}
}

0 comments on commit fce6e7b

Please sign in to comment.