From 785cbfcd29009425826a98902bb0c4f01b7674fa Mon Sep 17 00:00:00 2001 From: Kevin Biju <52661649+heavycrystal@users.noreply.github.com> Date: Wed, 3 Jan 2024 00:26:39 +0530 Subject: [PATCH] fixed interactions with soft delete and CDC resync (#961) Resync works by performing initial load to a table with the suffix `_resync`, renaming it to the destination table name and then resuming CDC. This ensures maximum availability of the table for the customer as the downtime is scoped to the renaming phase. Since soft-deleted rows are not present on the source, this can lead to such rows disappearing after resync, as initial load just copies the source tables over. This is fixed by adding an additional step to `INSERT` rows from the destination table with primary keys not present in the `_resync` table, before the renaming. This ensures all soft-deleted rows are captured and retained after resync. The synced at column is also updated for these rows. --- flow/connectors/bigquery/bigquery.go | 58 ++++++++++++++++++++++++---- 1 file changed, 51 insertions(+), 7 deletions(-) diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2d278580f..2d38cbf89 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -878,31 +878,75 @@ func (c *BigQueryConnector) getRawTableName(flowJobName string) string { } func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { + // BigQuery doesn't really do transactions properly anyway so why bother? for _, renameRequest := range req.RenameTableOptions { srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName) dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName) - c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), + c.logger.InfoContext(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), dstDatasetTable.string())) activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), dstDatasetTable.string())) + if req.SoftDeleteColName != nil { + allCols := strings.Join(utils.TableSchemaColumnNames(renameRequest.TableSchema), ",") + pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",") + + c.logger.InfoContext(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) + + activity.RecordHeartbeat(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) + + c.logger.InfoContext(c.ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", + srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName), + allCols, *req.SoftDeleteColName, dstDatasetTable.string(), + pkeyCols, pkeyCols, srcDatasetTable.string())) + _, err := c.client.Query( + fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", + srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName), + allCols, *req.SoftDeleteColName, dstDatasetTable.string(), + pkeyCols, pkeyCols, srcDatasetTable.string())).Read(c.ctx) + if err != nil { + return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dstDatasetTable.string(), err) + } + } + + if req.SyncedAtColName != nil { + c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string())) + + activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...", + srcDatasetTable.string())) + + c.logger.InfoContext(c.ctx, + fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(), + *req.SyncedAtColName, *req.SyncedAtColName)) + _, err := c.client.Query( + fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(), + *req.SyncedAtColName, *req.SyncedAtColName)).Read(c.ctx) + if err != nil { + return nil, fmt.Errorf("unable to set synced at column for table %s: %w", srcDatasetTable.string(), err) + } + } + + c.logger.InfoContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", + dstDatasetTable.string())) // drop the dst table if exists - _, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", - dstDatasetTable.dataset, dstDatasetTable.table)).Run(c.ctx) + _, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s", + dstDatasetTable.string())).Read(c.ctx) if err != nil { return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err) } + c.logger.InfoContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", + srcDatasetTable.string(), dstDatasetTable.table)) // rename the src table to dst - _, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s RENAME TO %s", - srcDatasetTable.dataset, srcDatasetTable.table, dstDatasetTable.table)).Run(c.ctx) + _, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", + srcDatasetTable.string(), dstDatasetTable.table)).Read(c.ctx) if err != nil { return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(), dstDatasetTable.string(), err) } - c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(), + c.logger.InfoContext(c.ctx, fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(), dstDatasetTable.string())) } @@ -923,7 +967,7 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro // rename the src table to dst _, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`", - newDatasetTable.string(), existingDatasetTable.string())).Run(c.ctx) + newDatasetTable.string(), existingDatasetTable.string())).Read(c.ctx) if err != nil { return nil, fmt.Errorf("unable to create table %s: %w", newTable, err) }