Skip to content

Commit

Permalink
Merge branch 'main' into full-table-partition-patches
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored Nov 24, 2023
2 parents ecffa96 + 0c5cbaa commit bcbf919
Show file tree
Hide file tree
Showing 24 changed files with 61 additions and 711 deletions.
1 change: 1 addition & 0 deletions docker-compose-dev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ services:

peerdb:
container_name: peerdb-server
stop_signal: SIGINT
build:
context: .
dockerfile: stacks/peerdb-server.Dockerfile
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,7 @@ services:

peerdb:
container_name: peerdb-server
stop_signal: SIGINT
image: ghcr.io/peerdb-io/peerdb-server:latest-dev
environment:
<<: *catalog-config
Expand Down
1 change: 0 additions & 1 deletion flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ func (a *FlowableActivity) StartFlow(ctx context.Context,
res, err := dstConn.SyncRecords(&model.SyncRecordsRequest{
Records: recordBatch,
FlowJobName: input.FlowConnectionConfigs.FlowJobName,
SyncMode: input.FlowConnectionConfigs.CdcSyncMode,
StagingPath: input.FlowConnectionConfigs.CdcStagingPath,
PushBatchSize: input.FlowConnectionConfigs.PushBatchSize,
PushParallelism: input.FlowConnectionConfigs.PushParallelism,
Expand Down
229 changes: 3 additions & 226 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"encoding/json"
"fmt"
"math/rand"
"reflect"
"regexp"
"strings"
Expand Down Expand Up @@ -465,206 +464,12 @@ func (c *BigQueryConnector) SyncRecords(req *model.SyncRecordsRequest) (*model.S
}
syncBatchID = syncBatchID + 1

var res *model.SyncResponse
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO {
res, err = c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
return nil, err
}
}
if req.SyncMode == protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT {
res, err = c.syncRecordsViaSQL(req, rawTableName, syncBatchID)
if err != nil {
return nil, err
}
}

return res, nil
}

func (c *BigQueryConnector) syncRecordsViaSQL(req *model.SyncRecordsRequest,
rawTableName string, syncBatchID int64) (*model.SyncResponse, error) {
stagingTableName := c.getStagingTableName(req.FlowJobName)
stagingTable := c.client.Dataset(c.datasetID).Table(stagingTableName)
err := c.truncateTable(stagingTableName)
res, err := c.syncRecordsViaAvro(req, rawTableName, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to truncate staging table: %v", err)
}
// separate staging batchID which is random/unique
// to handle the case where ingestion into staging passes but raw fails
// helps avoid duplicates in the raw table
//nolint:gosec
stagingBatchID := rand.Int63()
records := make([]StagingBQRecord, 0)
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0

// loop over req.Records
for record := range req.Records.GetRecords() {
switch r := record.(type) {
case *model.InsertRecord:
// create the 3 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 2. _peerdb_timestamp_nanos - current timestamp in nano seconds
// 3. _peerdb_data - itemsJSON of `r.Items`
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: itemsJSON,
recordType: 0,
matchData: "",
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: "",
})
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
// create the 5 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 3. _peerdb_data - json of `r.NewItems`
// 4. _peerdb_record_type - 1
// 5. _peerdb_match_data - json of `r.OldItems`

newItemsJSON, err := r.NewItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)
}

oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)
}

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: newItemsJSON,
recordType: 1,
matchData: oldItemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: utils.KeysToString(r.UnchangedToastColumns),
})
tableNameRowsMapping[r.DestinationTableName] += 1
case *model.DeleteRecord:
// create the 4 required fields
// 1. _peerdb_uid - uuid
// 2. _peerdb_timestamp - current timestamp
// 3. _peerdb_record_type - 2
// 4. _peerdb_match_data - json of `r.Items`

// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

// append the row to the records
records = append(records, StagingBQRecord{
uid: uuid.New().String(),
timestamp: time.Now(),
timestampNanos: time.Now().UnixNano(),
destinationTableName: r.DestinationTableName,
data: itemsJSON,
recordType: 2,
matchData: itemsJSON,
batchID: syncBatchID,
stagingBatchID: stagingBatchID,
unchangedToastColumns: "",
})

tableNameRowsMapping[r.DestinationTableName] += 1
default:
return nil, fmt.Errorf("record type %T not supported", r)
}

if first {
firstCP = record.GetCheckPointID()
first = false
}
}

numRecords := len(records)
// insert the records into the staging table
stagingInserter := stagingTable.Inserter()
stagingInserter.IgnoreUnknownValues = true

// insert the records into the staging table in batches of size syncRecordsBatchSize
for i := 0; i < numRecords; i += SyncRecordsBatchSize {
end := i + SyncRecordsBatchSize

if end > numRecords {
end = numRecords
}

chunk := records[i:end]
err = stagingInserter.Put(c.ctx, chunk)
if err != nil {
return nil, fmt.Errorf("failed to insert chunked rows into staging table: %v", err)
}
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

release, err := c.grabJobsUpdateLock()
if err != nil {
return nil, fmt.Errorf("failed to grab jobs update lock: %v", err)
}

defer func() {
err := release()
if err != nil {
log.Errorf("failed to release jobs update lock: %v", err)
}
}()

// we have to do the following things in a transaction
// 1. append the records in the staging table to the raw table.
// 2. execute the update metadata query to store the last committed watermark.
// 2.(contd) keep track of the last batchID that is synced.
appendStmt := c.getAppendStagingToRawStmt(rawTableName, stagingTableName, stagingBatchID)
updateMetadataStmt, err := c.getUpdateMetadataStmt(req.FlowJobName, lastCP, syncBatchID)
if err != nil {
return nil, fmt.Errorf("failed to get update metadata statement: %v", err)
}

// execute the statements in a transaction
stmts := []string{}
stmts = append(stmts, "BEGIN TRANSACTION;")
stmts = append(stmts, appendStmt)
stmts = append(stmts, updateMetadataStmt)
stmts = append(stmts, "COMMIT TRANSACTION;")
_, err = c.client.Query(strings.Join(stmts, "\n")).Read(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to execute statements in a transaction: %v", err)
return nil, err
}

log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
TableNameRowsMapping: tableNameRowsMapping,
}, nil
return res, nil
}

func (c *BigQueryConnector) syncRecordsViaAvro(
Expand Down Expand Up @@ -1083,17 +888,6 @@ func (c *BigQueryConnector) getUpdateMetadataStmt(jobName string, lastSyncedChec
return jobStatement, nil
}

// getAppendStagingToRawStmt returns the statement to append the staging table to the raw table.
func (c *BigQueryConnector) getAppendStagingToRawStmt(
rawTableName string, stagingTableName string, stagingBatchID int64,
) string {
return fmt.Sprintf(
`INSERT INTO %s.%s SELECT _peerdb_uid,_peerdb_timestamp,_peerdb_timestamp_nanos,
_peerdb_destination_table_name,_peerdb_data,_peerdb_record_type,_peerdb_match_data,
_peerdb_batch_id,_peerdb_unchanged_toast_columns FROM %s.%s WHERE _peerdb_staging_batch_id = %d;`,
c.datasetID, rawTableName, c.datasetID, stagingTableName, stagingBatchID)
}

// metadataHasJob checks if the metadata table has the given job.
func (c *BigQueryConnector) metadataHasJob(jobName string) (bool, error) {
checkStmt := fmt.Sprintf(
Expand Down Expand Up @@ -1213,23 +1007,6 @@ func (c *BigQueryConnector) getStagingTableName(flowJobName string) string {
return fmt.Sprintf("_peerdb_staging_%s", flowJobName)
}

// truncateTable truncates a table.
func (c *BigQueryConnector) truncateTable(tableIdentifier string) error {
// execute DELETE FROM table where the timestamp is older than 90 mins from now.
// The timestamp is used to ensure that the streaming rows are not effected by the delete.
// column of interest is the _peerdb_timestamp column.
deleteStmt := fmt.Sprintf(
"DELETE FROM %s.%s WHERE _peerdb_timestamp < TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 90 MINUTE)",
c.datasetID, tableIdentifier)
q := c.client.Query(deleteStmt)
_, err := q.Read(c.ctx)
if err != nil {
return fmt.Errorf("failed to delete rows from table %s: %w", tableIdentifier, err)
}

return nil
}

// Bigquery doesn't allow concurrent updates to the same table.
// we grab a lock on catalog to ensure that only one job is updating
// bigquery tables at a time.
Expand Down
13 changes: 2 additions & 11 deletions flow/connectors/bigquery/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,17 +48,8 @@ func (c *BigQueryConnector) SyncQRepRecords(
" partition %s of destination table %s",
partition.PartitionId, destTable)

syncMode := config.SyncMode
switch syncMode {
case protos.QRepSyncMode_QREP_SYNC_MODE_MULTI_INSERT:
stagingTableSync := &QRepStagingTableSync{connector: c}
return stagingTableSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
case protos.QRepSyncMode_QREP_SYNC_MODE_STORAGE_AVRO:
avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
default:
return 0, fmt.Errorf("unsupported sync mode: %s", syncMode)
}
avroSync := &QRepAvroSyncMethod{connector: c, gcsBucket: config.StagingPath}
return avroSync.SyncQRepRecords(config.FlowJobName, destTable, partition, tblMetadata, stream)
}

func (c *BigQueryConnector) replayTableSchemaDeltasQRep(config *protos.QRepConfig, partition *protos.QRepPartition,
Expand Down
Loading

0 comments on commit bcbf919

Please sign in to comment.