diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7905fc05bc..3245f84a47 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -655,10 +655,10 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr table := c.client.Dataset(c.datasetID).Table(rawTableName) // check if the table exists - meta, err := table.Metadata(c.ctx) + tableRef, err := table.Metadata(c.ctx) if err == nil { // table exists, check if the schema matches - if !reflect.DeepEqual(meta.Schema, schema) { + if !reflect.DeepEqual(tableRef.Schema, schema) { return nil, fmt.Errorf("table %s.%s already exists with different schema", c.datasetID, rawTableName) } else { return &protos.CreateRawTableOutput{ @@ -667,10 +667,32 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr } } + partitioning := &bigquery.RangePartitioning{ + Field: "_peerdb_batch_id", + Range: &bigquery.RangePartitioningRange{ + Start: 0, + End: 1_000_000, + Interval: 100, + }, + } + + clustering := &bigquery.Clustering{ + Fields: []string{ + "_peerdb_batch_id", + "_peerdb_destination_table_name", + "_peerdb_timestamp", + }, + } + + metadata := &bigquery.TableMetadata{ + Schema: schema, + RangePartitioning: partitioning, + Clustering: clustering, + Name: rawTableName, + } + // table does not exist, create it - err = table.Create(c.ctx, &bigquery.TableMetadata{ - Schema: schema, - }) + err = table.Create(c.ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, rawTableName, err) } @@ -802,7 +824,23 @@ func (c *BigQueryConnector) SetupNormalizedTables( // create the table using the columns schema := bigquery.Schema(columns) - err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema}) + + // cluster by the primary key if < 4 columns. + var clustering *bigquery.Clustering + numPkeyCols := len(tableSchema.PrimaryKeyColumns) + if numPkeyCols > 0 && numPkeyCols < 4 { + clustering = &bigquery.Clustering{ + Fields: tableSchema.PrimaryKeyColumns, + } + } + + metadata := &bigquery.TableMetadata{ + Schema: schema, + Name: datasetTable.table, + Clustering: clustering, + } + + err = table.Create(c.ctx, metadata) if err != nil { return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err) }