Skip to content

Commit

Permalink
better bq qrep push
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Feb 29, 2024
1 parent db376cb commit c5e5f08
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,15 @@ func (s *QRepAvroSyncMethod) SyncRecords(
func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softDeleteCol string) []string {
transformedColumns := make([]string, 0, len(*dstSchema))
for _, col := range *dstSchema {
if col.Name == syncedAtCol || col.Name == softDeleteCol {
if col.Name == syncedAtCol { // PeerDB column
transformedColumns = append(transformedColumns, "CURRENT_TIMESTAMP AS `"+col.Name+"`")
continue
}
if col.Name == softDeleteCol { // PeerDB column
transformedColumns = append(transformedColumns, "FALSE AS `"+col.Name+"`")
continue
}

switch col.Type {
case bigquery.GeographyFieldType:
transformedColumns = append(transformedColumns,
Expand Down Expand Up @@ -184,18 +190,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client

insertColumns := make([]string, 0, len(dstTableMetadata.Schema))
for _, col := range dstTableMetadata.Schema {
insertColumns = append(insertColumns, fmt.Sprintf("`%s`", col.Name))
}

insertColumnSQL := strings.Join(insertColumns, ", ")
transformedColumns := getTransformedColumns(&dstTableMetadata.Schema, syncedAtCol, softDeleteCol)
selector := strings.Join(transformedColumns, ", ")

if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
if syncedAtCol != "" { // PeerDB column
selector += ", CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;",
dstTableName, selector, stagingDatasetTable.string())
insertStmt := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM `%s`;",
dstTableName, insertColumnSQL, selector, stagingDatasetTable.string())

s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)

Expand Down

0 comments on commit c5e5f08

Please sign in to comment.