Skip to content

Commit

Permalink
support composite primary keys bq resync
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik committed Feb 13, 2024
1 parent 53047e2 commit 3852ec1
Showing 1 changed file with 33 additions and 4 deletions.
37 changes: 33 additions & 4 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -816,6 +816,14 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
activity.RecordHeartbeat(ctx, fmt.Sprintf("renaming table '%s' to '%s'...", srcDatasetTable.string(),
dstDatasetTable.string()))

// if source table does not exist, log and continue.
dataset := c.client.DatasetInProject(c.projectID, srcDatasetTable.dataset)
_, err := dataset.Table(srcDatasetTable.table).Metadata(ctx)
if err != nil {
c.logger.Info(fmt.Sprintf("table '%s' does not exist, skipping rename", srcDatasetTable.string()))
continue
}

columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, col.Name)
Expand All @@ -834,16 +842,37 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
allColsWithoutAlias := strings.Join(columnNames, ",")
allColsWithAlias := allColsBuilder.String()

pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")
pkeyCols := renameRequest.TableSchema.PrimaryKeyColumns

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

activity.RecordHeartbeat(ctx, fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

q := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s _pt WHERE (%s) NOT IN (SELECT %s FROM %s)",
pkeyOnClauseBuilder := strings.Builder{}
ljWhereClauseBuilder := strings.Builder{}
for idx, col := range pkeyCols {
pkeyOnClauseBuilder.WriteString("_pt.")
pkeyOnClauseBuilder.WriteString(col)
pkeyOnClauseBuilder.WriteString(" = _resync.")
pkeyOnClauseBuilder.WriteString(col)

ljWhereClauseBuilder.WriteString("_resync.")
ljWhereClauseBuilder.WriteString(col)
ljWhereClauseBuilder.WriteString(" IS NULL")

if idx < len(pkeyCols)-1 {
pkeyOnClauseBuilder.WriteString(" AND ")
ljWhereClauseBuilder.WriteString(" AND ")
}
}

leftJoin := fmt.Sprintf("LEFT JOIN %s _resync ON %s WHERE %s", srcDatasetTable.string(),
pkeyOnClauseBuilder.String(), ljWhereClauseBuilder.String())

q := fmt.Sprintf("INSERT INTO %s(%s) SELECT %s,true AS %s FROM %s _pt %s",
srcDatasetTable.string(), fmt.Sprintf("%s,%s", allColsWithoutAlias, *req.SoftDeleteColName),
allColsWithAlias, *req.SoftDeleteColName, dstDatasetTable.string(),
pkeyCols, pkeyCols, srcDatasetTable.string())
leftJoin)

c.logger.Info(q)
query := c.client.Query(q)
Expand Down Expand Up @@ -884,7 +913,7 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
dstDatasetTable.string()))
dropQuery.DefaultProjectID = c.projectID
dropQuery.DefaultDatasetID = c.datasetID
_, err := dropQuery.Read(ctx)
_, err = dropQuery.Read(ctx)
if err != nil {
return nil, fmt.Errorf("unable to drop table %s: %w", dstDatasetTable.string(), err)
}
Expand Down

0 comments on commit 3852ec1

Please sign in to comment.