From b7db2ad44c7183fc82f1d03cf57f987e30c2f524 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Fri, 1 Mar 2024 01:49:13 +0530 Subject: [PATCH] BigQuery: More reliable insert-into-select (#1359) Not sure how this issue wasn't caught until now, but not explicitly specifying the column order in the QRep insert into select statement can cause it to fail due to column-value insert mismatch (ex: expected int but got bool for _peerdb_is_deleted) --- .../bigquery/avro_transform_test.go | 1 + flow/connectors/bigquery/qrep_avro_sync.go | 24 ++++++++++++------- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/flow/connectors/bigquery/avro_transform_test.go b/flow/connectors/bigquery/avro_transform_test.go index 75dc7b65f1..89f9187cf2 100644 --- a/flow/connectors/bigquery/avro_transform_test.go +++ b/flow/connectors/bigquery/avro_transform_test.go @@ -31,6 +31,7 @@ func TestAvroTransform(t *testing.T) { "ST_GEOGFROMTEXT(`col1`) AS `col1`", "PARSE_JSON(`col2`,wide_number_mode=>'round') AS `col2`", "`camelCol4`", + "CURRENT_TIMESTAMP AS `sync_col`", } transformedCols := getTransformedColumns(dstSchema, "sync_col", "del_col") if !reflect.DeepEqual(transformedCols, expectedTransformCols) { diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 2e522f7543..868bf845a5 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -124,9 +124,15 @@ func (s *QRepAvroSyncMethod) SyncRecords( func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softDeleteCol string) []string { transformedColumns := make([]string, 0, len(*dstSchema)) for _, col := range *dstSchema { - if col.Name == syncedAtCol || col.Name == softDeleteCol { + if col.Name == syncedAtCol { // PeerDB column + transformedColumns = append(transformedColumns, "CURRENT_TIMESTAMP AS `"+col.Name+"`") continue } + if col.Name == softDeleteCol { // PeerDB column + transformedColumns = append(transformedColumns, "FALSE AS `"+col.Name+"`") + continue + } + switch col.Type { case bigquery.GeographyFieldType: transformedColumns = append(transformedColumns, @@ -184,18 +190,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client + insertColumns := make([]string, 0, len(dstTableMetadata.Schema)) + for _, col := range dstTableMetadata.Schema { + insertColumns = append(insertColumns, fmt.Sprintf("`%s`", col.Name)) + } + + insertColumnSQL := strings.Join(insertColumns, ", ") transformedColumns := getTransformedColumns(&dstTableMetadata.Schema, syncedAtCol, softDeleteCol) selector := strings.Join(transformedColumns, ", ") - if softDeleteCol != "" { // PeerDB column - selector += ", FALSE" - } - if syncedAtCol != "" { // PeerDB column - selector += ", CURRENT_TIMESTAMP" - } // Insert the records from the staging table into the destination table - insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;", - dstTableName, selector, stagingDatasetTable.string()) + insertStmt := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM `%s`;", + dstTableName, insertColumnSQL, selector, stagingDatasetTable.string()) s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)