diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 7383bac2a6..d5a5a6c350 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -639,199 +639,13 @@ func (c *BigQueryConnector) syncRecordsViaAvro( syncBatchID int64, ) (*model.SyncResponse, error) { tableNameRowsMapping := make(map[string]uint32) - first := true - var firstCP int64 = 0 - recordStream := model.NewQRecordStream(1 << 20) - err := recordStream.SetSchema(&model.QRecordSchema{ - Fields: []*model.QField{ - { - Name: "_peerdb_uid", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_timestamp", - Type: qvalue.QValueKindTimestamp, - Nullable: false, - }, - { - Name: "_peerdb_timestamp_nanos", - Type: qvalue.QValueKindInt64, - Nullable: false, - }, - { - Name: "_peerdb_destination_table_name", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_data", - Type: qvalue.QValueKindString, - Nullable: false, - }, - { - Name: "_peerdb_record_type", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_match_data", - Type: qvalue.QValueKindString, - Nullable: true, - }, - { - Name: "_peerdb_staging_batch_id", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_batch_id", - Type: qvalue.QValueKindInt64, - Nullable: true, - }, - { - Name: "_peerdb_unchanged_toast_columns", - Type: qvalue.QValueKindString, - Nullable: true, - }, - }, - }) - if err != nil { - return nil, err - } - - // loop over req.Records - for record := range req.Records.GetRecords() { - var entries [10]qvalue.QValue - switch r := record.(type) { - case *model.InsertRecord: - - itemsJSON, err := r.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create items to json: %v", err) - } - - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: r.DestinationTableName, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 0, - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } - entries[9] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } - - tableNameRowsMapping[r.DestinationTableName] += 1 - case *model.UpdateRecord: - newItemsJSON, err := r.NewItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create new items to json: %v", err) - } - - oldItemsJSON, err := r.OldItems.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create old items to json: %v", err) - } - - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: r.DestinationTableName, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: newItemsJSON, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 1, - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: oldItemsJSON, - } - entries[9] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: utils.KeysToString(r.UnchangedToastColumns), - } - - tableNameRowsMapping[r.DestinationTableName] += 1 - case *model.DeleteRecord: - itemsJSON, err := r.Items.ToJSON() - if err != nil { - return nil, fmt.Errorf("failed to create items to json: %v", err) - } - - entries[3] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: r.DestinationTableName, - } - entries[4] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[5] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: 2, - } - entries[6] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: itemsJSON, - } - entries[9] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: "", - } + streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID) - tableNameRowsMapping[r.DestinationTableName] += 1 - default: - return nil, fmt.Errorf("record type %T not supported", r) - } - - if first { - firstCP = record.GetCheckPointID() - first = false - } - - entries[0] = qvalue.QValue{ - Kind: qvalue.QValueKindString, - Value: uuid.New().String(), - } - entries[1] = qvalue.QValue{ - Kind: qvalue.QValueKindTimestamp, - Value: time.Now(), - } - entries[2] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: time.Now().UnixNano(), - } - entries[7] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: syncBatchID, - } - entries[8] = qvalue.QValue{ - Kind: qvalue.QValueKindInt64, - Value: syncBatchID, - } - recordStream.Records <- &model.QRecordOrError{ - Record: &model.QRecord{ - NumEntries: 10, - Entries: entries[:], - }, - } + streamRes, err := utils.RecordsToRawTableStream(streamReq) + if err != nil { + return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err) } - close(recordStream.Records) avroSync := NewQRepAvroSyncMethod(c, req.StagingPath) rawTableMetadata, err := c.client.Dataset(c.datasetID).Table(rawTableName).Metadata(c.ctx) if err != nil { @@ -844,7 +658,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( } numRecords, err := avroSync.SyncRecords(rawTableName, req.FlowJobName, - lastCP, rawTableMetadata, syncBatchID, recordStream) + lastCP, rawTableMetadata, syncBatchID, streamRes.Stream) if err != nil { return nil, fmt.Errorf("failed to sync records via avro : %v", err) } @@ -852,7 +666,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro( log.Printf("pushed %d records to %s.%s", numRecords, c.datasetID, rawTableName) return &model.SyncResponse{ - FirstSyncedCheckPointID: firstCP, + FirstSyncedCheckPointID: req.Records.GetFirstCheckpoint(), LastSyncedCheckPointID: lastCP, NumRecordsSynced: int64(numRecords), CurrentSyncBatchID: syncBatchID,