Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

removed SQL transport for BQ/SF, removed choices #707

Merged
merged 2 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncMode: input.FlowConnectionConfigs.CdcSyncMode,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
PushBatchSize: input.FlowConnectionConfigs.PushBatchSize,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
Expand Down
229 changes: 3 additions & 226 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"regexp"
"strings"
Expand Down Expand Up @@ -465,206 +464,12 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}
syncBatchID = syncBatchID + 1

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
res, err = c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
return nil, err
}
}
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT {
res, err = c.syncRecordsViaSQL(req, rawTableName, syncBatchID)
if err != nil {
return nil, err
}
}

return res, nil
}

func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
rawTableName string, syncBatchID int64) (*model.SyncResponse, error) {
stagingTableName := c.getStagingTableName(req.FlowJobName)
stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName)
err := c.truncateTable(stagingTableName)
res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to truncate staging table: %v", err)
}
// separate staging batchID which is random/unique
// to handle the case where ingestion into staging passes but raw fails
// helps avoid duplicates in the raw table
//nolint:gosec
stagingBatchID := rand.Int63()
records := make([]StagingBQRecord, 0)
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0

// loop over req.Records
for record := range req.Records.GetRecords() {
switch r := record.(type) {
case *model.InsertRecord:
// create the 3 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 2. _peerdb_timestamp_nanos - current timestamp in nano seconds
// 3. _peerdb_data - itemsJSON of `r.Items`
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: itemsJSON,
recordType: 0,
matchData: "",
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: "",
})
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
// create the 5 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 3. _peerdb_data - json of `r.NewItems`
// 4. _peerdb_record_type - 1
// 5. _peerdb_match_data - json of `r.OldItems`

newItemsJSON, err := r.NewItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)
}

oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)
}

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: newItemsJSON,
recordType: 1,
matchData: oldItemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
})
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.DeleteRecord:
// create the 4 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 3. _peerdb_record_type - 2
// 4. _peerdb_match_data - json of `r.Items`

// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: itemsJSON,
recordType: 2,
matchData: itemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: "",
})

tableNameRowsMapping[r.DestinationTableName] += 1
default:
return nil, fmt.Errorf("record type %T not supported", r)
}

if first {
firstCP = record.GetCheckPointID()
first = false
}
}

numRecords := len(records)
// insert the records into the staging table
stagingInserter := stagingTable.Inserter()
stagingInserter.IgnoreUnknownValues = true

// insert the records into the staging table in batches of size syncRecordsBatchSize
for i := 0; i < numRecords; i += SyncRecordsBatchSize {
end := i + SyncRecordsBatchSize

if end > numRecords {
end = numRecords
}

chunk := records[i:end]
err = stagingInserter.Put(c.ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to insert chunked rows into staging table: %v", err)
}
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab jobs update lock: %v", err)
}

defer func() {
err := release()
if err != nil {
log.Errorf("failed to release jobs update lock: %v", err)
}
}()

// we have to do the following things in a transaction
// 1. append the records in the staging table to the raw table.
// 2. execute the update metadata query to store the last committed watermark.
// 2.(contd) keep track of the last batchID that is synced.
appendStmt := c.getAppendStagingToRawStmt(rawTableName, stagingTableName, stagingBatchID)
updateMetadataStmt, err := c.getUpdateMetadataStmt(req.FlowJobName, lastCP, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to get update metadata statement: %v", err)
}

// execute the statements in a transaction
stmts := []string{}
stmts = append(stmts, "BEGIN TRANSACTION;")
stmts = append(stmts, appendStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err)
return nil, err
}

log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
return res, nil
}

func (c *BigQueryConnector) syncRecordsViaAvro(
Expand Down Expand Up @@ -1083,17 +888,6 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec
return jobStatement, nil
}

// getAppendStagingToRawStmt returns the statement to append the staging table to the raw table.
func (c *BigQueryConnector) getAppendStagingToRawStmt(
rawTableName string, stagingTableName string, stagingBatchID int64,
) string {
return fmt.Sprintf(
`INSERT INTO %s.%s SELECT _peerdb_uid,_peerdb_timestamp,_peerdb_timestamp_nanos,
_peerdb_destination_table_name,_peerdb_data,_peerdb_record_type,_peerdb_match_data,
_peerdb_batch_id,_peerdb_unchanged_toast_columns FROM %s.%s WHERE _peerdb_staging_batch_id = %d;`,
c.datasetID, rawTableName, c.datasetID, stagingTableName, stagingBatchID)
}

// metadataHasJob checks if the metadata table has the given job.
func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) {
checkStmt := fmt.Sprintf(
Expand Down Expand Up @@ -1213,23 +1007,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
return fmt.Sprintf("_peerdb_staging_%s", flowJobName)
}

// truncateTable truncates a table.
func (c *BigQueryConnector) truncateTable(tableIdentifier string) error {
// execute DELETE FROM table where the timestamp is older than 90 mins from now.
// The timestamp is used to ensure that the streaming rows are not effected by the delete.
// column of interest is the _peerdb_timestamp column.
deleteStmt := fmt.Sprintf(
"DELETE FROM %s.%s WHERE _peerdb_timestamp < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 MINUTE)",
c.datasetID, tableIdentifier)
q := c.client.Query(deleteStmt)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete rows from table %s: %w", tableIdentifier, err)
}

return nil
}

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
Expand Down
13 changes: 2 additions & 11 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
" partition %s of destination table %s",
partition.PartitionId, destTable)

syncMode := config.SyncMode
switch syncMode {
case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT:
stagingTableSync := &QRepStagingTableSync{connector: c}
return stagingTableSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO:
avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
default:
return 0, fmt.Errorf("unsupported sync mode: %s", syncMode)
}
avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down
Loading
Loading