diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index 0237f7aed0..154c19cf11 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -124,9 +124,15 @@ func (s *QRepAvroSyncMethod) SyncRecords( func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softDeleteCol string) []string { transformedColumns := make([]string, 0, len(*dstSchema)) for _, col := range *dstSchema { - if col.Name == syncedAtCol || col.Name == softDeleteCol { + if col.Name == syncedAtCol { // PeerDB column + transformedColumns = append(transformedColumns, "CURRENT_TIMESTAMP AS `"+col.Name+"`") continue } + if col.Name == softDeleteCol { // PeerDB column + transformedColumns = append(transformedColumns, "FALSE AS `"+col.Name+"`") + continue + } + switch col.Type { case bigquery.GeographyFieldType: transformedColumns = append(transformedColumns, @@ -184,18 +190,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords( ) bqClient := s.connector.client + insertColumns := make([]string, 0, len(dstTableMetadata.Schema)) + for _, col := range dstTableMetadata.Schema { + insertColumns = append(insertColumns, fmt.Sprintf("`%s`", col.Name)) + } + + insertColumnSQL := strings.Join(insertColumns, ", ") transformedColumns := getTransformedColumns(&dstTableMetadata.Schema, syncedAtCol, softDeleteCol) selector := strings.Join(transformedColumns, ", ") - if softDeleteCol != "" { // PeerDB column - selector += ", FALSE" - } - if syncedAtCol != "" { // PeerDB column - selector += ", CURRENT_TIMESTAMP" - } // Insert the records from the staging table into the destination table - insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;", - dstTableName, selector, stagingDatasetTable.string()) + insertStmt := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM `%s`;", + dstTableName, insertColumnSQL, selector, stagingDatasetTable.string()) s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)