diff --git a/flow/connectors/bigquery/avro_transform_test.go b/flow/connectors/bigquery/avro_transform_test.go index 75dc7b65f1..89f9187cf2 100644 --- a/flow/connectors/bigquery/avro_transform_test.go +++ b/flow/connectors/bigquery/avro_transform_test.go @@ -31,6 +31,7 @@ func TestAvroTransform(t *testing.T) { "ST_GEOGFROMTEXT(`col1`) AS `col1`", "PARSE_JSON(`col2`,wide_number_mode=>'round') AS `col2`", "`camelCol4`", + "CURRENT_TIMESTAMP AS `sync_col`", } transformedCols := getTransformedColumns(dstSchema, "sync_col", "del_col") if !reflect.DeepEqual(transformedCols, expectedTransformCols) { diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 2e522f7543..868bf845a5 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -124,9 +124,15 @@ func (s *QRepAvroSyncMethod) SyncRecords( func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softDeleteCol string) []string { transformedColumns := make([]string, 0, len(*dstSchema)) for _, col := range *dstSchema { - if col.Name == syncedAtCol || col.Name == softDeleteCol { + if col.Name == syncedAtCol { // PeerDB column + transformedColumns = append(transformedColumns, "CURRENT_TIMESTAMP AS `"+col.Name+"`") continue } + if col.Name == softDeleteCol { // PeerDB column + transformedColumns = append(transformedColumns, "FALSE AS `"+col.Name+"`") + continue + } + switch col.Type { case bigquery.GeographyFieldType: transformedColumns = append(transformedColumns, @@ -184,18 +190,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client + insertColumns := make([]string, 0, len(dstTableMetadata.Schema)) + for _, col := range dstTableMetadata.Schema { + insertColumns = append(insertColumns, fmt.Sprintf("`%s`", col.Name)) + } + + insertColumnSQL := strings.Join(insertColumns, ", ") transformedColumns := getTransformedColumns(&dstTableMetadata.Schema, syncedAtCol, softDeleteCol) selector := strings.Join(transformedColumns, ", ") - if softDeleteCol != "" { // PeerDB column - selector += ", FALSE" - } - if syncedAtCol != "" { // PeerDB column - selector += ", CURRENT_TIMESTAMP" - } // Insert the records from the staging table into the destination table - insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;", - dstTableName, selector, stagingDatasetTable.string()) + insertStmt := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM `%s`;", + dstTableName, insertColumnSQL, selector, stagingDatasetTable.string()) s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)