Skip to content

Commit

Permalink
Resync for SF and BQ: Quote column names for RenameTables (#1580)
Browse files Browse the repository at this point in the history
This PR quotes column names for the insert into select command in
RenameTables activity for Snowflake and BigQuery. A column with name
'rows' would make the query fail due to it being a reserved keyword.
This PR fixes that

Functionally tested
  • Loading branch information
Amogh-Bharadwaj authored Apr 5, 2024
1 parent 93c1adf commit 488a9be
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
7 changes: 5 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -728,7 +728,7 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename

columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, col.Name)
columnNames = append(columnNames, "`"+col.Name+"`")
}

if req.SoftDeleteColName != nil {
Expand All @@ -744,7 +744,10 @@ func (c *BigQueryConnector) RenameTables(ctx context.Context, req *protos.Rename
allColsWithoutAlias := strings.Join(columnNames, ",")
allColsWithAlias := allColsBuilder.String()

pkeyCols := renameRequest.TableSchema.PrimaryKeyColumns
pkeyCols := make([]string, 0, len(renameRequest.TableSchema.PrimaryKeyColumns))
for _, pkeyCol := range renameRequest.TableSchema.PrimaryKeyColumns {
pkeyCols = append(pkeyCols, "`"+pkeyCol+"`")
}

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dstDatasetTable.string()))

Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -755,11 +755,16 @@ func (c *SnowflakeConnector) RenameTables(ctx context.Context, req *protos.Renam

columnNames := make([]string, 0, len(renameRequest.TableSchema.Columns))
for _, col := range renameRequest.TableSchema.Columns {
columnNames = append(columnNames, col.Name)
columnNames = append(columnNames, SnowflakeIdentifierNormalize(col.Name))
}

pkeyColumnNames := make([]string, 0, len(renameRequest.TableSchema.PrimaryKeyColumns))
for _, col := range renameRequest.TableSchema.PrimaryKeyColumns {
pkeyColumnNames = append(pkeyColumnNames, SnowflakeIdentifierNormalize(col))
}

allCols := strings.Join(columnNames, ",")
pkeyCols := strings.Join(renameRequest.TableSchema.PrimaryKeyColumns, ",")
pkeyCols := strings.Join(pkeyColumnNames, ",")

c.logger.Info(fmt.Sprintf("handling soft-deletes for table '%s'...", dst))

Expand Down

0 comments on commit 488a9be

Please sign in to comment.