Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

unify bq codepath for pushing raw changes #597

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
198 changes: 6 additions & 192 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -639,199 +639,13 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
syncBatchID int64,
) (*model.SyncResponse, error) {
tableNameRowsMapping := make(map[string]uint32)
first := true
var firstCP int64 = 0
recordStream := model.NewQRecordStream(1 << 20)
err := recordStream.SetSchema(&model.QRecordSchema{
Fields: []*model.QField{
{
Name: "_peerdb_uid",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_timestamp",
Type: qvalue.QValueKindTimestamp,
Nullable: false,
},
{
Name: "_peerdb_timestamp_nanos",
Type: qvalue.QValueKindInt64,
Nullable: false,
},
{
Name: "_peerdb_destination_table_name",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_data",
Type: qvalue.QValueKindString,
Nullable: false,
},
{
Name: "_peerdb_record_type",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_match_data",
Type: qvalue.QValueKindString,
Nullable: true,
},
{
Name: "_peerdb_staging_batch_id",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_batch_id",
Type: qvalue.QValueKindInt64,
Nullable: true,
},
{
Name: "_peerdb_unchanged_toast_columns",
Type: qvalue.QValueKindString,
Nullable: true,
},
},
})
if err != nil {
return nil, err
}

// loop over req.Records
for record := range req.Records.GetRecords() {
var entries [10]qvalue.QValue
switch r := record.(type) {
case *model.InsertRecord:

itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: r.DestinationTableName,
}
entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 0,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}

tableNameRowsMapping[r.DestinationTableName] += 1
case *model.UpdateRecord:
newItemsJSON, err := r.NewItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create new items to json: %v", err)
}

oldItemsJSON, err := r.OldItems.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create old items to json: %v", err)
}

entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: r.DestinationTableName,
}
entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: newItemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 1,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: oldItemsJSON,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: utils.KeysToString(r.UnchangedToastColumns),
}

tableNameRowsMapping[r.DestinationTableName] += 1
case *model.DeleteRecord:
itemsJSON, err := r.Items.ToJSON()
if err != nil {
return nil, fmt.Errorf("failed to create items to json: %v", err)
}

entries[3] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: r.DestinationTableName,
}
entries[4] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[5] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: 2,
}
entries[6] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: itemsJSON,
}
entries[9] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: "",
}
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)

tableNameRowsMapping[r.DestinationTableName] += 1
default:
return nil, fmt.Errorf("record type %T not supported", r)
}

if first {
firstCP = record.GetCheckPointID()
first = false
}

entries[0] = qvalue.QValue{
Kind: qvalue.QValueKindString,
Value: uuid.New().String(),
}
entries[1] = qvalue.QValue{
Kind: qvalue.QValueKindTimestamp,
Value: time.Now(),
}
entries[2] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: time.Now().UnixNano(),
}
entries[7] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
entries[8] = qvalue.QValue{
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
recordStream.Records <- &model.QRecordOrError{
Record: &model.QRecord{
NumEntries: 10,
Entries: entries[:],
},
}
streamRes, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}

close(recordStream.Records)
avroSync := NewQRepAvroSyncMethod(c, req.StagingPath)
rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx)
if err != nil {
Expand All @@ -844,15 +658,15 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
}

numRecords, err := avroSync.SyncRecords(rawTableName, req.FlowJobName,
lastCP, rawTableMetadata, syncBatchID, recordStream)
lastCP, rawTableMetadata, syncBatchID, streamRes.Stream)
if err != nil {
return nil, fmt.Errorf("failed to sync records via avro : %v", err)
}

log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName)

return &model.SyncResponse{
FirstSyncedCheckPointID: firstCP,
FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(),
LastSyncedCheckPointID: lastCP,
NumRecordsSynced: int64(numRecords),
CurrentSyncBatchID: syncBatchID,
Expand Down
Loading