Skip to content

Commit

Permalink
Merge branch 'main' into qrep-syncedpart-handling
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Feb 29, 2024
2 parents ef6d540 + b7db2ad commit 85958be
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 15 deletions.
10 changes: 4 additions & 6 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -654,17 +654,16 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
bufferSize := shared.FetchAndChannelSize
if config.SourcePeer.Type == protos.DBType_POSTGRES {
errGroup, errCtx := errgroup.WithContext(ctx)
taskCtx, taskCancel := context.WithCancel(errCtx)
stream := model.NewQRecordStream(bufferSize)
errGroup.Go(func() error {
pgConn := srcConn.(*connpostgres.PostgresConnector)
tmp, err := pgConn.PullQRepRecordStream(taskCtx, config, partition, stream)
tmp, err := pgConn.PullQRepRecordStream(errCtx, config, partition, stream)
numRecords := int64(tmp)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull records: %w", err)
} else {
err = monitoring.UpdatePullEndTimeAndRowsForPartition(taskCtx,
err = monitoring.UpdatePullEndTimeAndRowsForPartition(errCtx,
a.CatalogPool, runUUID, partition, numRecords)
if err != nil {
logger.Error(err.Error())
Expand All @@ -674,13 +673,12 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
})

errGroup.Go(func() error {
rowsSynced, err = dstConn.SyncQRepRecords(taskCtx, config, partition, stream)
rowsSynced, err = dstConn.SyncQRepRecords(errCtx, config, partition, stream)
if err != nil {
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to sync records: %w", err)
}
taskCancel()
return nil
return context.Canceled
})

err = errGroup.Wait()
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/bigquery/avro_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ func TestAvroTransform(t *testing.T) {
"ST_GEOGFROMTEXT(`col1`) AS `col1`",
"PARSE_JSON(`col2`,wide_number_mode=>'round') AS `col2`",
"`camelCol4`",
"CURRENT_TIMESTAMP AS `sync_col`",
}
transformedCols := getTransformedColumns(dstSchema, "sync_col", "del_col")
if !reflect.DeepEqual(transformedCols, expectedTransformCols) {
Expand Down
24 changes: 15 additions & 9 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,15 @@ func (s *QRepAvroSyncMethod) SyncRecords(
func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softDeleteCol string) []string {
transformedColumns := make([]string, 0, len(*dstSchema))
for _, col := range *dstSchema {
if col.Name == syncedAtCol || col.Name == softDeleteCol {
if col.Name == syncedAtCol { // PeerDB column
transformedColumns = append(transformedColumns, "CURRENT_TIMESTAMP AS `"+col.Name+"`")
continue
}
if col.Name == softDeleteCol { // PeerDB column
transformedColumns = append(transformedColumns, "FALSE AS `"+col.Name+"`")
continue
}

switch col.Type {
case bigquery.GeographyFieldType:
transformedColumns = append(transformedColumns,
Expand Down Expand Up @@ -184,18 +190,18 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client

insertColumns := make([]string, 0, len(dstTableMetadata.Schema))
for _, col := range dstTableMetadata.Schema {
insertColumns = append(insertColumns, fmt.Sprintf("`%s`", col.Name))
}

insertColumnSQL := strings.Join(insertColumns, ", ")
transformedColumns := getTransformedColumns(&dstTableMetadata.Schema, syncedAtCol, softDeleteCol)
selector := strings.Join(transformedColumns, ", ")

if softDeleteCol != "" { // PeerDB column
selector += ", FALSE"
}
if syncedAtCol != "" { // PeerDB column
selector += ", CURRENT_TIMESTAMP"
}
// Insert the records from the staging table into the destination table
insertStmt := fmt.Sprintf("INSERT INTO `%s` SELECT %s FROM `%s`;",
dstTableName, selector, stagingDatasetTable.string())
insertStmt := fmt.Sprintf("INSERT INTO `%s`(%s) SELECT %s FROM `%s`;",
dstTableName, insertColumnSQL, selector, stagingDatasetTable.string())

s.connector.logger.Info("Performing transaction inside QRep sync function", flowLog)

Expand Down

0 comments on commit 85958be

Please sign in to comment.