From 6c57ce4c6e99a012dfac678b31698d65adeef64a Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 27 Dec 2023 15:34:03 -0500 Subject: [PATCH] Setup partitioning and clustering for raw table (#915) Setup clustering for pkey for normalized table, we noticed at one of our customers that the costs associated with MERGE statements are high. This is an attempt to reduce them per: https://cloud.google.com/blog/products/data-analytics/cost-optimization-best-practices-for-bigquery --------- Co-authored-by: Kevin Biju --- flow/connectors/bigquery/bigquery.go | 50 ++++++++++++++++++++++++---- 1 file changed, 44 insertions(+), 6 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7905fc05bc..3245f84a47 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -655,10 +655,10 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr table := c.client.Dataset(c.datasetID).Table(rawTableName) // check if the table exists - meta, err := table.Metadata(c.ctx) + tableRef, err := table.Metadata(c.ctx) if err == nil { // table exists, check if the schema matches - if !reflect.DeepEqual(meta.Schema, schema) { + if !reflect.DeepEqual(tableRef.Schema, schema) { return nil, fmt.Errorf("table %s.%s already exists with different schema", c.datasetID, rawTableName) } else { return &protos.CreateRawTableOutput{ @@ -667,10 +667,32 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr } } + partitioning := &bigquery.RangePartitioning{ + Field: "_peerdb_batch_id", + Range: &bigquery.RangePartitioningRange{ + Start: 0, + End: 1_000_000, + Interval: 100, + }, + } + + clustering := &bigquery.Clustering{ + Fields: []string{ + "_peerdb_batch_id", + "_peerdb_destination_table_name", + "_peerdb_timestamp", + }, + } + + metadata := &bigquery.TableMetadata{ + Schema: schema, + RangePartitioning: partitioning, + Clustering: clustering, + Name: rawTableName, + } + // table does not exist, create it - err = table.Create(c.ctx, &bigquery.TableMetadata{ - Schema: schema, - }) + err = table.Create(c.ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, rawTableName, err) } @@ -802,7 +824,23 @@ func (c *BigQueryConnector) SetupNormalizedTables( // create the table using the columns schema := bigquery.Schema(columns) - err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema}) + + // cluster by the primary key if < 4 columns. + var clustering *bigquery.Clustering + numPkeyCols := len(tableSchema.PrimaryKeyColumns) + if numPkeyCols > 0 && numPkeyCols < 4 { + clustering = &bigquery.Clustering{ + Fields: tableSchema.PrimaryKeyColumns, + } + } + + metadata := &bigquery.TableMetadata{ + Schema: schema, + Name: datasetTable.table, + Clustering: clustering, + } + + err = table.Create(c.ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err) }