Skip to content

Commit

Permalink
fixed interactions with soft delete and CDC resync (#961)
Browse files Browse the repository at this point in the history
Resync works by performing initial load to a table with the suffix
`_resync`, renaming it to the destination table name and then resuming
CDC. This ensures maximum availability of the table for the customer as
the downtime is scoped to the renaming phase.

Since soft-deleted rows are not present on the source, this can lead to
such rows disappearing after resync, as initial load just copies the
source tables over.

This is fixed by adding an additional step to `INSERT` rows from the
destination table with primary keys not present in the `_resync` table,
before the renaming. This ensures all soft-deleted rows are captured and
retained after resync. The synced at column is also updated for these
rows.
  • Loading branch information
heavycrystal authored Jan 2, 2024
1 parent d3c23f8 commit 785cbfc
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,31 +878,75 @@ func (c *BigQueryConnector) getRawTableName(flowJobName string) string {
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
// BigQuery doesn't really do transactions properly anyway so why bother?
for _, renameRequest := range req.RenameTableOptions {
srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName)
dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName)
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
c.logger.InfoContext(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

if req.SoftDeleteColName != nil {
allCols := strings.Join(utils.TableSchemaColumnNames(renameRequest.TableSchema), ",")
pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")

c.logger.InfoContext(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

c.logger.InfoContext(c.ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string()))
_, err := c.client.Query(
fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string())).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dstDatasetTable.string(), err)
}
}

if req.SyncedAtColName != nil {
c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...",
srcDatasetTable.string()))

c.logger.InfoContext(c.ctx,
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
_, err := c.client.Query(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName)).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", srcDatasetTable.string(), err)
}
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string()))
// drop the dst table if exists
_, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
dstDatasetTable.dataset, dstDatasetTable.table)).Run(c.ctx)
_, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string())).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err)
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table))
// rename the src table to dst
_, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s RENAME TO %s",
srcDatasetTable.dataset, srcDatasetTable.table, dstDatasetTable.table)).Run(c.ctx)
_, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table)).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(),
dstDatasetTable.string(), err)
}

c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(),
c.logger.InfoContext(c.ctx, fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(),
dstDatasetTable.string()))
}

Expand All @@ -923,7 +967,7 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro

// rename the src table to dst
_, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`",
newDatasetTable.string(), existingDatasetTable.string())).Run(c.ctx)
newDatasetTable.string(), existingDatasetTable.string())).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to create table %s: %w", newTable, err)
}
Expand Down

0 comments on commit 785cbfc

Please sign in to comment.