Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Pass QRecord around by value #837

Merged
merged 1 commit into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -690,8 +690,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
recordStream.Records <- &model.QRecordOrError{
Record: &model.QRecord{
recordStream.Records <- model.QRecordOrError{
Record: model.QRecord{
NumEntries: 10,
Entries: entries[:],
},
Expand Down
26 changes: 13 additions & 13 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
fieldDescriptions []pgconn.FieldDescription,
) (*model.QRecordBatch, error) {
// Initialize the record slice
records := make([]*model.QRecord, 0)
records := make([]model.QRecord, 0)
qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
Expand Down Expand Up @@ -173,13 +173,13 @@ func (qe *QRepQueryExecutor) processRowsStream(
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to map row to QRecord: %w", err),
}
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
}

stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Record: record,
Err: nil,
}
Expand Down Expand Up @@ -214,7 +214,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
) (int, error) {
rows, err := qe.executeQueryInTx(tx, cursorName, fetchSize)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: err,
}
qe.logger.Error("[pg_query_executor] failed to execute query in tx",
Expand All @@ -239,7 +239,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
rows.Close()

if rows.Err() != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: rows.Err(),
}
qe.logger.Error("[pg_query_executor] row iteration failed",
Expand Down Expand Up @@ -275,7 +275,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
}
batch := &model.QRecordBatch{
NumRecords: 0,
Records: make([]*model.QRecord, 0),
Records: make([]model.QRecord, 0),
Schema: schema.Schema,
}
for record := range stream.Records {
Expand Down Expand Up @@ -356,7 +356,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
if qe.snapshot != "" {
_, err = tx.Exec(qe.ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", qe.snapshot))
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to set snapshot: %w", err),
}
qe.logger.Error("[pg_query_executor] failed to set snapshot",
Expand All @@ -367,7 +367,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

randomUint, err := shared.RandomUInt64()
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to generate random uint: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err)
Expand All @@ -379,7 +379,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args))
_, err = tx.Exec(qe.ctx, cursorQuery, args...)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to declare cursor: %w", err),
}
qe.logger.Info("[pg_query_executor] failed to declare cursor",
Expand Down Expand Up @@ -411,7 +411,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info("Committing transaction")
err = tx.Commit(qe.ctx)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to commit transaction: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
Expand All @@ -424,13 +424,13 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string,
) (*model.QRecord, error) {
) (model.QRecord, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))

values, err := row.Values()
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err)
}

for i, fd := range fds {
Expand All @@ -439,7 +439,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, tmp)
} else {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func generateRecords(
// Create sample records
records := &model.QRecordBatch{
NumRecords: numRows,
Records: make([]*model.QRecord, numRows),
Records: make([]model.QRecord, numRows),
Schema: schema,
}

Expand All @@ -115,7 +115,7 @@ func generateRecords(
}
}

records.Records[row] = &model.QRecord{
records.Records[row] = model.QRecord{
Entries: entries,
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qfields[i] = qfield
}

var records []*model.QRecord
var records []model.QRecord
totalRowsProcessed := 0
const heartBeatNumRows = 25000

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,9 +148,8 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
return 0, fmt.Errorf("[avro] failed to get record from stream: %w", qRecordOrErr.Err)
}

qRecord := qRecordOrErr.Record
avroConverter := model.NewQRecordAvroConverter(
qRecord,
qRecordOrErr.Record,
p.targetDWH,
p.avroSchema.NullableFields,
colNames,
Expand Down
16 changes: 8 additions & 8 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsT
}, nil
}

func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) *model.QRecordOrError {
func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) model.QRecordOrError {
var entries [8]qvalue.QValue
switch typedRecord := record.(type) {
case *model.InsertRecord:
// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := typedRecord.Items.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize insert record items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -110,13 +110,13 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record new items to JSON: %w", err),
}
}
oldItemsJSON, err := typedRecord.OldItems.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record old items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize delete record items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
tableMapping[typedRecord.DestinationTableName] += 1
default:
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("unknown record type: %T", typedRecord),
}
}
Expand All @@ -191,8 +191,8 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Value: batchID,
}

return &model.QRecordOrError{
Record: &model.QRecord{
return model.QRecordOrError{
Record: model.QRecord{
NumEntries: 8,
Entries: entries[:],
},
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
return nil, fmt.Errorf("failed to run command: %w", err)
}

var records []*model.QRecord
var records []model.QRecord
for {
var row []bigquery.Value
err := it.Next(&row)
Expand Down
4 changes: 2 additions & 2 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

type QRecordAvroConverter struct {
QRecord *QRecord
QRecord QRecord
TargetDWH qvalue.QDWHType
NullableFields map[string]struct{}
ColNames []string
}

func NewQRecordAvroConverter(
q *QRecord,
q QRecord,
targetDWH qvalue.QDWHType,
nullableFields map[string]struct{},
colNames []string,
Expand Down
6 changes: 3 additions & 3 deletions flow/model/qrecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type QRecord struct {
}

// create a new QRecord with n values
func NewQRecord(n int) *QRecord {
return &QRecord{
func NewQRecord(n int) QRecord {
return QRecord{
NumEntries: n,
Entries: make([]qvalue.QValue, n),
}
Expand All @@ -25,7 +25,7 @@ func (q *QRecord) Set(idx int, value qvalue.QValue) {
}

// equals checks if two QRecords are identical.
func (q *QRecord) equals(other *QRecord) bool {
func (q QRecord) equals(other QRecord) bool {
// First check simple attributes
if q.NumEntries != other.NumEntries {
return false
Expand Down
10 changes: 5 additions & 5 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

// QRecordBatch holds a batch of QRecord objects.
type QRecordBatch struct {
NumRecords uint32 // NumRecords represents the number of records in the batch.
Records []*QRecord // Records is a slice of pointers to QRecord objects.
NumRecords uint32 // NumRecords represents the number of records in the batch.
Records []QRecord
Schema *QRecordSchema
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) {
}

for _, record := range q.Records {
stream.Records <- &QRecordOrError{
stream.Records <- QRecordOrError{
Record: record,
}
}
Expand All @@ -79,7 +79,7 @@ func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) {
type QRecordBatchCopyFromSource struct {
numRecords int
stream *QRecordStream
currentRecord *QRecordOrError
currentRecord QRecordOrError
err error
}

Expand All @@ -89,7 +89,7 @@ func NewQRecordBatchCopyFromSource(
return &QRecordBatchCopyFromSource{
numRecords: 0,
stream: stream,
currentRecord: nil,
currentRecord: QRecordOrError{},
err: nil,
}
}
Expand Down
14 changes: 7 additions & 7 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package model
import "fmt"

type QRecordOrError struct {
Record *QRecord
Record QRecord
Err error
}

Expand All @@ -13,8 +13,8 @@ type QRecordSchemaOrError struct {
}

type QRecordStream struct {
schema chan *QRecordSchemaOrError
Records chan *QRecordOrError
schema chan QRecordSchemaOrError
Records chan QRecordOrError
schemaSet bool
schemaCache *QRecordSchema
}
Expand Down Expand Up @@ -47,8 +47,8 @@ type RecordsToStreamResponse struct {

func NewQRecordStream(buffer int) *QRecordStream {
return &QRecordStream{
schema: make(chan *QRecordSchemaOrError, 1),
Records: make(chan *QRecordOrError, buffer),
schema: make(chan QRecordSchemaOrError, 1),
Records: make(chan QRecordOrError, buffer),
schemaSet: false,
schemaCache: nil,
}
Expand All @@ -69,7 +69,7 @@ func (s *QRecordStream) SetSchema(schema *QRecordSchema) error {
return fmt.Errorf("Schema already set")
}

s.schema <- &QRecordSchemaOrError{
s.schema <- QRecordSchemaOrError{
Schema: schema,
}
s.schemaSet = true
Expand All @@ -80,6 +80,6 @@ func (s *QRecordStream) IsSchemaSet() bool {
return s.schemaSet
}

func (s *QRecordStream) SchemaChan() chan *QRecordSchemaOrError {
func (s *QRecordStream) SchemaChan() chan QRecordSchemaOrError {
return s.schema
}
Loading
Loading