Skip to content

Commit

Permalink
BigQuery Add Column: Account for update limit (#1662)
Browse files Browse the repository at this point in the history
BigQuery has a
[quota](https://cloud.google.com/bigquery/quotas#standard_tables) of 5
metadata update operations per table per 10 seconds. So currently if
many columns are added in a PG to BQ CDC mirror, sync records fails with
this error from BigQuery - even though we do `ADD COLUMN IF NOT EXISTS`
(it still counts as an update operation)

This PR accounts for this by manually checking the table metadata for
the column before adding it. This will atleast ensure that on sync flow
retry, the remaining columns get added

Functionally tested
  • Loading branch information
Amogh-Bharadwaj authored May 2, 2024
1 parent 5b6ca38 commit c1852cf
Showing 1 changed file with 15 additions and 0 deletions.
15 changes: 15 additions & 0 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,21 @@ func (c *BigQueryConnector) ReplayTableSchemaDeltas(

for _, addedColumn := range schemaDelta.AddedColumns {
dstDatasetTable, _ := c.convertToDatasetTable(schemaDelta.DstTableName)
table := c.client.DatasetInProject(c.projectID, dstDatasetTable.dataset).Table(dstDatasetTable.table)
dstMetadata, metadataErr := table.Metadata(ctx)
if metadataErr != nil {
return fmt.Errorf("failed to get metadata for table %s: %w", schemaDelta.DstTableName, metadataErr)
}

// check if the column already exists
for _, field := range dstMetadata.Schema {
if field.Name == addedColumn.Name {
c.logger.Info(fmt.Sprintf("[schema delta replay] column %s already exists in table %s",
addedColumn.Name, schemaDelta.DstTableName))
continue
}
}

addedColumnBigQueryType := qValueKindToBigQueryTypeString(addedColumn.Type)
query := c.client.Query(fmt.Sprintf(
"ALTER TABLE %s ADD COLUMN IF NOT EXISTS `%s` %s",
Expand Down

0 comments on commit c1852cf

Please sign in to comment.