Skip to content


Merge branch 'main' into ui-basic-auth
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 24, 2023
2 parents 88f7bda + 0c5cbaa commit 82a3cff
Show file tree
Hide file tree
Showing 21 changed files with 52 additions and 710 deletions.
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncMode: input.FlowConnectionConfigs.CdcSyncMode,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
PushBatchSize: input.FlowConnectionConfigs.PushBatchSize,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
Expand Down
229 changes: 3 additions & 226 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
Expand Down Expand Up @@ -465,206 +464,12 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
syncBatchID = syncBatchID + 1

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
res, err = c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
return nil, err
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT {
res, err = c.syncRecordsViaSQL(req, rawTableName, syncBatchID)
if err != nil {
return nil, err

return res, nil

func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
rawTableName string, syncBatchID int64) (*model.SyncResponse, error) {
stagingTableName := c.getStagingTableName(req.FlowJobName)
stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName)
err := c.truncateTable(stagingTableName)
res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to truncate staging table: %v", err)
// separate staging batchID which is random/unique
// to handle the case where ingestion into staging passes but raw fails
// helps avoid duplicates in the raw table
stagingBatchID := rand.Int63()
records := make([]StagingBQRecord, 0)
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0

// loop over req.Records
for record := range req.Records.GetRecords() {
switch r := record.(type) {
case *model.InsertRecord:
// create the 3 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 2. _peerdb_timestamp_nanos - current timestamp in nano seconds
// 3. _peerdb_data - itemsJSON of `r.Items`
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: itemsJSON,
recordType: 0,
matchData: "",
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: "",
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
// create the 5 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 3. _peerdb_data - json of `r.NewItems`
// 4. _peerdb_record_type - 1
// 5. _peerdb_match_data - json of `r.OldItems`

newItemsJSON, err := r.NewItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)

oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: newItemsJSON,
recordType: 1,
matchData: oldItemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.DeleteRecord:
// create the 4 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 3. _peerdb_record_type - 2
// 4. _peerdb_match_data - json of `r.Items`

// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: itemsJSON,
recordType: 2,
matchData: itemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: "",

tableNameRowsMapping[r.DestinationTableName] += 1
return nil, fmt.Errorf("record type %T not supported", r)

if first {
firstCP = record.GetCheckPointID()
first = false

numRecords := len(records)
// insert the records into the staging table
stagingInserter := stagingTable.Inserter()
stagingInserter.IgnoreUnknownValues = true

// insert the records into the staging table in batches of size syncRecordsBatchSize
for i := 0; i < numRecords; i += SyncRecordsBatchSize {
end := i + SyncRecordsBatchSize

if end > numRecords {
end = numRecords

chunk := records[i:end]
err = stagingInserter.Put(c.ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to insert chunked rows into staging table: %v", err)

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab jobs update lock: %v", err)

defer func() {
err := release()
if err != nil {
log.Errorf("failed to release jobs update lock: %v", err)

// we have to do the following things in a transaction
// 1. append the records in the staging table to the raw table.
// 2. execute the update metadata query to store the last committed watermark.
// 2.(contd) keep track of the last batchID that is synced.
appendStmt := c.getAppendStagingToRawStmt(rawTableName, stagingTableName, stagingBatchID)
updateMetadataStmt, err := c.getUpdateMetadataStmt(req.FlowJobName, lastCP, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to get update metadata statement: %v", err)

// execute the statements in a transaction
stmts := []string{}
stmts = append(stmts, "BEGIN TRANSACTION;")
stmts = append(stmts, appendStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err)
return nil, err

log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
return res, nil

func (c *BigQueryConnector) syncRecordsViaAvro(
Expand Down Expand Up @@ -1083,17 +888,6 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec
return jobStatement, nil

// getAppendStagingToRawStmt returns the statement to append the staging table to the raw table.
func (c *BigQueryConnector) getAppendStagingToRawStmt(
rawTableName string, stagingTableName string, stagingBatchID int64,
) string {
return fmt.Sprintf(
`INSERT INTO %s.%s SELECT _peerdb_uid,_peerdb_timestamp,_peerdb_timestamp_nanos,
_peerdb_batch_id,_peerdb_unchanged_toast_columns FROM %s.%s WHERE _peerdb_staging_batch_id = %d;`,
c.datasetID, rawTableName, c.datasetID, stagingTableName, stagingBatchID)

// metadataHasJob checks if the metadata table has the given job.
func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) {
checkStmt := fmt.Sprintf(
Expand Down Expand Up @@ -1213,23 +1007,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
return fmt.Sprintf("_peerdb_staging_%s", flowJobName)

// truncateTable truncates a table.
func (c *BigQueryConnector) truncateTable(tableIdentifier string) error {
// execute DELETE FROM table where the timestamp is older than 90 mins from now.
// The timestamp is used to ensure that the streaming rows are not effected by the delete.
// column of interest is the _peerdb_timestamp column.
deleteStmt := fmt.Sprintf(
c.datasetID, tableIdentifier)
q := c.client.Query(deleteStmt)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete rows from table %s: %w", tableIdentifier, err)

return nil

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
Expand Down
13 changes: 2 additions & 11 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
" partition %s of destination table %s",
partition.PartitionId, destTable)

syncMode := config.SyncMode
switch syncMode {
stagingTableSync := &QRepStagingTableSync{connector: c}
return stagingTableSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
return 0, fmt.Errorf("unsupported sync mode: %s", syncMode)
avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down

0 comments on commit 82a3cff

Please sign in to comment.