Skip to content

Commit

Permalink
BigQuery: More reliable insert-into-select (#1359)
Browse files Browse the repository at this point in the history
Not sure how this issue wasn't caught until now, but not explicitly
specifying the column order in the QRep insert into select statement can
cause it to fail due to column-value insert mismatch (ex: expected int
but got bool for _peerdb_is_deleted)
  • Loading branch information
Amogh-Bharadwaj authored Feb 29, 2024
1 parent ff1589e commit b7db2ad
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 9 deletions.
1 change: 1 addition & 0 deletions flow/connectors/bigquery/avro_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestAvroTransform(t *testing.T) {
"ST_GEOGFROMTEXT(`col1`) AS `col1`",
"PARSE_JSON(`col2`,wide_number_mode=>'round') AS `col2`",
"`camelCol4`",
"CURRENT_TIMESTAMP AS `sync_col`",
}
transformedCols := getTransformedColumns(dstSchema, "sync_col", "del_col")
if !reflect.DeepEqual(transformedCols, expectedTransformCols) {
Expand Down
24 changes: 15 additions & 9 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,15 @@ func (s *QRepAvroSyncMethod) SyncRecords(
func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softDeleteCol string) []string {
transformedColumns := make([]string, 0, len(*dstSchema))
for _, col := range *dstSchema {
if col.Name == syncedAtCol || col.Name == softDeleteCol {
if col.Name == syncedAtCol { // PeerDB column
transformedColumns = append(transformedColumns, "CURRENT_TIMESTAMP AS `"+col.Name+"`")
continue
}
if col.Name == softDeleteCol { // PeerDB column
transformedColumns = append(transformedColumns, "FALSE AS `"+col.Name+"`")
continue
}

switch col.Type {
case bigquery.GeographyFieldType:
transformedColumns = append(transformedColumns,
Expand Down Expand Up @@ -184,18 +190,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client

insertColumns := make([]string, 0, len(dstTableMetadata.Schema))
for _, col := range dstTableMetadata.Schema {
insertColumns = append(insertColumns, fmt.Sprintf("`%s`", col.Name))
}

insertColumnSQL := strings.Join(insertColumns, ", ")
transformedColumns := getTransformedColumns(&dstTableMetadata.Schema, syncedAtCol, softDeleteCol)
selector := strings.Join(transformedColumns, ", ")

if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
if syncedAtCol != "" { // PeerDB column
selector += ", CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;",
dstTableName, selector, stagingDatasetTable.string())
insertStmt := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM `%s`;",
dstTableName, insertColumnSQL, selector, stagingDatasetTable.string())

s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)

Expand Down

0 comments on commit b7db2ad

Please sign in to comment.