Skip to content

Commit

Permalink
Merge branch 'main' into remove-testify-outside-e2e
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Dec 27, 2023
2 parents 3cbc37c + 6c57ce4 commit fa88de2
Showing 1 changed file with 44 additions and 6 deletions.
50 changes: 44 additions & 6 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -655,10 +655,10 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
table := c.client.Dataset(c.datasetID).Table(rawTableName)

// check if the table exists
meta, err := table.Metadata(c.ctx)
tableRef, err := table.Metadata(c.ctx)
if err == nil {
// table exists, check if the schema matches
if !reflect.DeepEqual(meta.Schema, schema) {
if !reflect.DeepEqual(tableRef.Schema, schema) {
return nil, fmt.Errorf("table %s.%s already exists with different schema", c.datasetID, rawTableName)
} else {
return &protos.CreateRawTableOutput{
Expand All @@ -667,10 +667,32 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr
}
}

partitioning := &bigquery.RangePartitioning{
Field: "_peerdb_batch_id",
Range: &bigquery.RangePartitioningRange{
Start: 0,
End: 1_000_000,
Interval: 100,
},
}

clustering := &bigquery.Clustering{
Fields: []string{
"_peerdb_batch_id",
"_peerdb_destination_table_name",
"_peerdb_timestamp",
},
}

metadata := &bigquery.TableMetadata{
Schema: schema,
RangePartitioning: partitioning,
Clustering: clustering,
Name: rawTableName,
}

// table does not exist, create it
err = table.Create(c.ctx, &bigquery.TableMetadata{
Schema: schema,
})
err = table.Create(c.ctx, metadata)
if err != nil {
return nil, fmt.Errorf("failed to create table %s.%s: %w", c.datasetID, rawTableName, err)
}
Expand Down Expand Up @@ -802,7 +824,23 @@ func (c *BigQueryConnector) SetupNormalizedTables(

// create the table using the columns
schema := bigquery.Schema(columns)
err = table.Create(c.ctx, &bigquery.TableMetadata{Schema: schema})

// cluster by the primary key if < 4 columns.
var clustering *bigquery.Clustering
numPkeyCols := len(tableSchema.PrimaryKeyColumns)
if numPkeyCols > 0 && numPkeyCols < 4 {
clustering = &bigquery.Clustering{
Fields: tableSchema.PrimaryKeyColumns,
}
}

metadata := &bigquery.TableMetadata{
Schema: schema,
Name: datasetTable.table,
Clustering: clustering,
}

err = table.Create(c.ctx, metadata)
if err != nil {
return nil, fmt.Errorf("failed to create table %s: %w", tableIdentifier, err)
}
Expand Down

0 comments on commit fa88de2

Please sign in to comment.