Skip to content

Commit

Permalink
fixed interactions with soft delete and resync
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Jan 2, 2024
1 parent fcfa74c commit 2d84256
Showing 1 changed file with 51 additions and 7 deletions.
58 changes: 51 additions & 7 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -878,31 +878,75 @@ func (c *BigQueryConnector) getRawTableName(flowJobName string) string {
}

func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) {
// BigQuery doesn't really do transactions properly anyway so why bother?
for _, renameRequest := range req.RenameTableOptions {
srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName)
dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName)
c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
c.logger.InfoContext(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

if req.SoftDeleteColName != nil {
allCols := strings.Join(utils.TableSchemaColumnNames(renameRequest.TableSchema), ",")
pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")

c.logger.InfoContext(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

c.logger.InfoContext(c.ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string()))
_, err := c.client.Query(
fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string())).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dstDatasetTable.string(), err)
}
}

if req.SyncedAtColName != nil {
c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string()))

activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...",
srcDatasetTable.string()))

c.logger.InfoContext(c.ctx,
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName))
_, err := c.client.Query(
fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(),
*req.SyncedAtColName, *req.SyncedAtColName)).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to set synced at column for table %s: %w", srcDatasetTable.string(), err)
}
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string()))
// drop the dst table if exists
_, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s",
dstDatasetTable.dataset, dstDatasetTable.table)).Run(c.ctx)
_, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s",
dstDatasetTable.string())).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err)
}

c.logger.InfoContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table))
// rename the src table to dst
_, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s RENAME TO %s",
srcDatasetTable.dataset, srcDatasetTable.table, dstDatasetTable.table)).Run(c.ctx)
_, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s",
srcDatasetTable.string(), dstDatasetTable.table)).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(),
dstDatasetTable.string(), err)
}

c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(),
c.logger.InfoContext(c.ctx, fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(),
dstDatasetTable.string()))
}

Expand All @@ -923,7 +967,7 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro

// rename the src table to dst
_, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`",
newDatasetTable.string(), existingDatasetTable.string())).Run(c.ctx)
newDatasetTable.string(), existingDatasetTable.string())).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("unable to create table %s: %w", newTable, err)
}
Expand Down

0 comments on commit 2d84256

Please sign in to comment.