Skip to content

Commit

Permalink
Resync case where column names match table names (#1270)
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Feb 13, 2024
1 parent 516e384 commit 1da535b
Showing 1 changed file with 16 additions and 8 deletions.
24 changes: 16 additions & 8 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -822,22 +822,30 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
}

if req.SoftDeleteColName != nil {
allCols := strings.Join(columnNames, ",")
allColsBuilder := strings.Builder{}
for idx, col := range columnNames {
allColsBuilder.WriteString("_pt.")
allColsBuilder.WriteString(col)
if idx < len(columnNames)-1 {
allColsBuilder.WriteString(",")
}
}

allCols := allColsBuilder.String()

pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

c.logger.Info(fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
q := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s _pt WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string()))
query := c.client.Query(
fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s WHERE (%s) NOT IN (SELECT %s FROM %s)",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allCols, *req.SoftDeleteColName),
allCols, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string()))
pkeyCols, pkeyCols, srcDatasetTable.string())

c.logger.Info(q)
query := c.client.Query(q)

query.DefaultProjectID = c.projectID
query.DefaultDatasetID = c.datasetID
Expand Down

0 comments on commit 1da535b

Please sign in to comment.