Skip to content

Commit

Permalink
[breaking] BQ SyncRecords now streams properly, code cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal committed Dec 27, 2023
1 parent e16a371 commit 60361a8
Show file tree
Hide file tree
Showing 4 changed files with 23 additions and 230 deletions.
221 changes: 10 additions & 211 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
"github.com/google/uuid"
"github.com/jackc/pgx/v5/pgxpool"

"go.temporal.io/sdk/activity"
Expand Down Expand Up @@ -69,19 +68,6 @@ type BigQueryConnector struct {
logger slog.Logger
}

type StagingBQRecord struct {
uid string `bigquery:"_peerdb_uid"`
timestamp time.Time `bigquery:"_peerdb_timestamp"`
timestampNanos int64 `bigquery:"_peerdb_timestamp_nanos"`
destinationTableName string `bigquery:"_peerdb_destination_table_name"`
data string `bigquery:"_peerdb_data"`
recordType int `bigquery:"_peerdb_record_type"`
matchData string `bigquery:"_peerdb_match_data"`
batchID int64 `bigquery:"_peerdb_batch_id"`
stagingBatchID int64 `bigquery:"_peerdb_staging_batch_id"`
unchangedToastColumns string `bigquery:"_peerdb_unchanged_toast_columns"`
}

// Create BigQueryServiceAccount from BigqueryConfig
func NewBigQueryServiceAccount(bqConfig *protos.BigqueryConfig) (*BigQueryServiceAccount, error) {
var serviceAccount BigQueryServiceAccount
Expand Down Expand Up @@ -493,22 +479,6 @@ func (c *BigQueryConnector) getTableNametoUnchangedCols(flowJobName string, sync
return resultMap, nil
}

// ValueSaver interface for bqRecord
func (r StagingBQRecord) Save() (map[string]bigquery.Value, string, error) {
return map[string]bigquery.Value{
"_peerdb_uid": r.uid,
"_peerdb_timestamp": r.timestamp,
"_peerdb_timestamp_nanos": r.timestampNanos,
"_peerdb_destination_table_name": r.destinationTableName,
"_peerdb_data": r.data,
"_peerdb_record_type": r.recordType,
"_peerdb_match_data": r.matchData,
"_peerdb_batch_id": r.batchID,
"_peerdb_staging_batch_id": r.stagingBatchID,
"_peerdb_unchanged_toast_columns": r.unchangedToastColumns,
}, bigquery.NoDedupeID, nil
}

// SyncRecords pushes records to the destination.
// Currently only supports inserts, updates, and deletes.
// More record types will be added in the future.
Expand Down Expand Up @@ -539,201 +509,31 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_timestamp",
Type: qvalue.QValueKindTimestamp,
Nullable: false,
},
{
Name: "_peerdb_timestamp_nanos",
Type: qvalue.QValueKindInt64,
Nullable: false,
},
{
Name: "_peerdb_destination_table_name",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_data",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_record_type",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_match_data",
Type: qvalue.QValueKindString,
Nullable: true,
},
{
Name: "_peerdb_staging_batch_id",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_batch_id",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_unchanged_toast_columns",
Type: qvalue.QValueKindString,
Nullable: true,
},
},
})
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

// loop over req.Records
for record := range req.Records.GetRecords() {
var entries [10]qvalue.QValue
switch r := record.(type) {
case *model.InsertRecord:
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 0,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
newItemsJSON, err := r.NewItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)
}
oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)
}

entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: newItemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 1,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: oldItemsJSON,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: utils.KeysToString(r.UnchangedToastColumns),
}

tableNameRowsMapping[r.DestinationTableName] += 1
case *model.DeleteRecord:
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 2,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
default:
return nil, fmt.Errorf("record type %T not supported", r)
}

entries[0] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: uuid.New().String(),
}
entries[1] = qvalue.QValue{
Kind: qvalue.QValueKindTimestamp,
Value: time.Now(),
}
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: time.Now().UnixNano(),
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: record.GetDestinationTableName(),
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
entries[8] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
recordStream.Records <- model.QRecordOrError{
Record: model.QRecord{
NumEntries: 10,
Entries: entries[:],
},
}
}

close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath, req.FlowJobName)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
return nil, fmt.Errorf("failed to get metadata of destination table: %v", err)
}

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

numRecords, err := avroSync.SyncRecords(rawTableName, req.FlowJobName,
lastCP, rawTableMetadata, syncBatchID, recordStream)
req.Records, rawTableMetadata, syncBatchID, streamRes.Stream)
if err != nil {
return nil, fmt.Errorf("failed to sync records via avro : %v", err)
}

c.logger.Info(fmt.Sprintf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName))

lastCP, err := req.Records.GetLastCheckpoint()
if err != nil {
return nil, fmt.Errorf("failed to get last checkpoint: %v", err)
}

return &model.SyncResponse{
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
Expand Down Expand Up @@ -842,8 +642,7 @@ func (c *BigQueryConnector) CreateRawTable(req *protos.CreateRawTableInput) (*pr

schema := bigquery.Schema{
{Name: "_peerdb_uid", Type: bigquery.StringFieldType},
{Name: "_peerdb_timestamp", Type: bigquery.TimestampFieldType},
{Name: "_peerdb_timestamp_nanos", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_timestamp", Type: bigquery.IntegerFieldType},
{Name: "_peerdb_destination_table_name", Type: bigquery.StringFieldType},
{Name: "_peerdb_data", Type: bigquery.StringFieldType},
{Name: "_peerdb_record_type", Type: bigquery.IntegerFieldType},
Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,6 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
flattenedProjs = append(
flattenedProjs,
"_peerdb_timestamp",
"_peerdb_timestamp_nanos",
"_peerdb_record_type",
"_peerdb_unchanged_toast_columns",
)
Expand All @@ -99,7 +98,7 @@ func (m *mergeStmtGenerator) generateDeDupedCTE() string {
SELECT _peerdb_ranked.*
FROM (
SELECT RANK() OVER (
PARTITION BY %s ORDER BY _peerdb_timestamp_nanos DESC
PARTITION BY %s ORDER BY _peerdb_timestamp DESC
) as _peerdb_rank, * FROM _peerdb_flattened
) _peerdb_ranked
WHERE _peerdb_rank = 1
Expand Down
9 changes: 7 additions & 2 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ func NewQRepAvroSyncMethod(connector *BigQueryConnector, gcsBucket string,
func (s *QRepAvroSyncMethod) SyncRecords(
rawTableName string,
flowJobName string,
lastCP int64,
records *model.CDCRecordStream,
dstTableMetadata *bigquery.TableMetadata,
syncBatchID int64,
stream *model.QRecordStream,
Expand Down Expand Up @@ -67,6 +67,11 @@ func (s *QRepAvroSyncMethod) SyncRecords(
datasetID := s.connector.datasetID
insertStmt := fmt.Sprintf("INSERT INTO `%s.%s` SELECT * FROM `%s.%s`;",
datasetID, rawTableName, datasetID, stagingTable)

lastCP, err := records.GetLastCheckpoint()
if err != nil {
return -1, fmt.Errorf("failed to get last checkpoint: %v", err)
}
updateMetadataStmt, err := s.connector.getUpdateMetadataStmt(flowJobName, lastCP, syncBatchID)
if err != nil {
return -1, fmt.Errorf("failed to update metadata: %v", err)
Expand Down Expand Up @@ -421,7 +426,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
if err := status.Err(); err != nil {
return 0, fmt.Errorf("failed to load Avro file into BigQuery table: %w", err)
}
slog.Info(fmt.Sprintf("Pushed into %s", avroFile.FilePath))
slog.Info(fmt.Sprintf("Pushed from %s to BigQuery", avroFile.FilePath))

err = s.connector.waitForTableReady(stagingTable)
if err != nil {
Expand Down
20 changes: 5 additions & 15 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
)

func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsToStreamResponse, error) {
recordStream := model.NewQRecordStream(1 << 16)
recordStream := model.NewQRecordStream(1 << 17)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []model.QField{
{
Expand Down Expand Up @@ -85,11 +85,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
}

// add insert record to the raw table
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: typedRecord.DestinationTableName,
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
Expand Down Expand Up @@ -121,10 +116,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
}

entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: typedRecord.DestinationTableName,
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: newItemsJSON,
Expand All @@ -150,11 +141,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
}

// append delete record to the raw table
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: typedRecord.DestinationTableName,
}
entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
Expand Down Expand Up @@ -186,6 +172,10 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Kind: qvalue.QValueKindInt64,
Value: time.Now().UnixNano(),
}
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: record.GetDestinationTableName(),
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: batchID,
Expand Down

0 comments on commit 60361a8

Please sign in to comment.