diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 2d278580f..2d38cbf89 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -878,31 +878,75 @@ func (c *BigQueryConnector) getRawTableName(flowJobName string) string { } func (c *BigQueryConnector) RenameTables(req *protos.RenameTablesInput) (*protos.RenameTablesOutput, error) { + // BigQuery doesn't really do transactions properly anyway so why bother? for _, renameRequest := range req.RenameTableOptions { srcDatasetTable, _ := c.convertToDatasetTable(renameRequest.CurrentName) dstDatasetTable, _ := c.convertToDatasetTable(renameRequest.NewName) - c.logger.Info(fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), + c.logger.InfoContext(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), dstDatasetTable.string())) activity.RecordHeartbeat(c.ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(), dstDatasetTable.string())) + if req.SoftDeleteColName != nil { + allCols := strings.Join(utils.TableSchemaColumnNames(renameRequest.TableSchema), ",") + pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",") + + c.logger.InfoContext(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) + + activity.RecordHeartbeat(c.ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string())) + + c.logger.InfoContext(c.ctx, fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", + srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName), + allCols, *req.SoftDeleteColName, dstDatasetTable.string(), + pkeyCols, pkeyCols, srcDatasetTable.string())) + _, err := c.client.Query( + fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)", + srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName), + allCols, *req.SoftDeleteColName, dstDatasetTable.string(), + pkeyCols, pkeyCols, srcDatasetTable.string())).Read(c.ctx) + if err != nil { + return nil, fmt.Errorf("unable to handle soft-deletes for table %s: %w", dstDatasetTable.string(), err) + } + } + + if req.SyncedAtColName != nil { + c.logger.Info(fmt.Sprintf("setting synced at column for table '%s'...", srcDatasetTable.string())) + + activity.RecordHeartbeat(c.ctx, fmt.Sprintf("setting synced at column for table '%s'...", + srcDatasetTable.string())) + + c.logger.InfoContext(c.ctx, + fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(), + *req.SyncedAtColName, *req.SyncedAtColName)) + _, err := c.client.Query( + fmt.Sprintf("UPDATE %s SET %s = CURRENT_TIMESTAMP WHERE %s IS NULL", srcDatasetTable.string(), + *req.SyncedAtColName, *req.SyncedAtColName)).Read(c.ctx) + if err != nil { + return nil, fmt.Errorf("unable to set synced at column for table %s: %w", srcDatasetTable.string(), err) + } + } + + c.logger.InfoContext(c.ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", + dstDatasetTable.string())) // drop the dst table if exists - _, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s.%s", - dstDatasetTable.dataset, dstDatasetTable.table)).Run(c.ctx) + _, err := c.client.Query(fmt.Sprintf("DROP TABLE IF EXISTS %s", + dstDatasetTable.string())).Read(c.ctx) if err != nil { return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err) } + c.logger.InfoContext(c.ctx, fmt.Sprintf("ALTER TABLE %s RENAME TO %s", + srcDatasetTable.string(), dstDatasetTable.table)) // rename the src table to dst - _, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s.%s RENAME TO %s", - srcDatasetTable.dataset, srcDatasetTable.table, dstDatasetTable.table)).Run(c.ctx) + _, err = c.client.Query(fmt.Sprintf("ALTER TABLE %s RENAME TO %s", + srcDatasetTable.string(), dstDatasetTable.table)).Read(c.ctx) if err != nil { return nil, fmt.Errorf("unable to rename table %s to %s: %w", srcDatasetTable.string(), dstDatasetTable.string(), err) } - c.logger.Info(fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(), + c.logger.InfoContext(c.ctx, fmt.Sprintf("successfully renamed table '%s' to '%s'", srcDatasetTable.string(), dstDatasetTable.string())) } @@ -923,7 +967,7 @@ func (c *BigQueryConnector) CreateTablesFromExisting(req *protos.CreateTablesFro // rename the src table to dst _, err := c.client.Query(fmt.Sprintf("CREATE TABLE IF NOT EXISTS `%s` LIKE `%s`", - newDatasetTable.string(), existingDatasetTable.string())).Run(c.ctx) + newDatasetTable.string(), existingDatasetTable.string())).Read(c.ctx) if err != nil { return nil, fmt.Errorf("unable to create table %s: %w", newTable, err) }