From 75a7ec6038b41fa59286feb566ef4b7b2831672b Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 27 Dec 2023 13:00:33 -0500 Subject: [PATCH 1/4] Setup partitioning and clustering for raw table Setup clustering for pkey for normalized table --- flow/connectors/bigquery/bigquery.go | 46 ++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 6 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7905fc05bc..cfb90a2d6b 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -655,10 +655,10 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr table := c.client.Dataset(c.datasetID).Table(rawTableName) // check if the table exists - meta, err := table.Metadata(c.ctx) + tableRef, err := table.Metadata(c.ctx) if err == nil { // table exists, check if the schema matches - if !reflect.DeepEqual(meta.Schema, schema) { + if !reflect.DeepEqual(tableRef.Schema, schema) { return nil, fmt.Errorf("table %s.%s already exists with different schema", c.datasetID, rawTableName) } else { return &protos.CreateRawTableOutput{ @@ -667,10 +667,28 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr } } + partitioning := &bigquery.RangePartitioning{ + Field: "_peerdb_batch_id", + Range: &bigquery.RangePartitioningRange{ + Start: 0, + End: 10_000_000, + Interval: 100, + }, + } + + clustering := &bigquery.Clustering{ + Fields: []string{"_peerdb_destination_table_name"}, + } + + metadata := &bigquery.TableMetadata{ + Schema: schema, + RangePartitioning: partitioning, + Clustering: clustering, + Name: rawTableName, + } + // table does not exist, create it - err = table.Create(c.ctx, &bigquery.TableMetadata{ - Schema: schema, - }) + err = table.Create(c.ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, rawTableName, err) } @@ -802,7 +820,23 @@ func (c *BigQueryConnector) SetupNormalizedTables( // create the table using the columns schema := bigquery.Schema(columns) - err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema}) + + // cluster by the primary key if < 4 columns. + var clustering *bigquery.Clustering + numPkeyCols := len(tableSchema.PrimaryKeyColumns) + if numPkeyCols > 0 && numPkeyCols < 4 { + clustering = &bigquery.Clustering{ + Fields: tableSchema.PrimaryKeyColumns, + } + } + + metadata := &bigquery.TableMetadata{ + Schema: schema, + Name: datasetTable.table, + Clustering: clustering, + } + + err = table.Create(c.ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err) } From 349dbafd4be03880c66c4496e0fff8f930e6489c Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 27 Dec 2023 13:09:42 -0500 Subject: [PATCH 2/4] add timestamp also in clustering --- flow/connectors/bigquery/bigquery.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index cfb90a2d6b..2215b8b782 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -677,7 +677,10 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr } clustering := &bigquery.Clustering{ - Fields: []string{"_peerdb_destination_table_name"}, + Fields: []string{ + "_peerdb_destination_table_name", + "_peerdb_timestamp", + }, } metadata := &bigquery.TableMetadata{ From 3d99fb0184aa20bdbd615c15cf2a8669f96e8133 Mon Sep 17 00:00:00 2001 From: Kevin Biju Date: Thu, 28 Dec 2023 00:19:45 +0530 Subject: [PATCH 3/4] reducing number of partitions to 10K --- flow/connectors/bigquery/bigquery.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2215b8b782..92a3426bab 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -671,7 +671,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr Field: "_peerdb_batch_id", Range: &bigquery.RangePartitioningRange{ Start: 0, - End: 10_000_000, + End: 1_000_000, Interval: 100, }, } From 6649f6dc6f4341bdb2d0fc7fd532dc1cfa2bc495 Mon Sep 17 00:00:00 2001 From: Kaushik Iska Date: Wed, 27 Dec 2023 14:58:20 -0500 Subject: [PATCH 4/4] add batch id in clustering as well --- flow/connectors/bigquery/bigquery.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 92a3426bab..3245f84a47 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -678,6 +678,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr clustering := &bigquery.Clustering{ Fields: []string{ + "_peerdb_batch_id", "_peerdb_destination_table_name", "_peerdb_timestamp", },