Skip to content

Commit

Permalink
Pass model.QRecord around by value
Browse files Browse the repository at this point in the history
A QRecord is 3 integers & a pointer,
mostly held in slices for QRecordBatch
  • Loading branch information
serprex committed Dec 17, 2023
1 parent c196e6c commit b3598ff
Show file tree
Hide file tree
Showing 13 changed files with 56 additions and 57 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,8 +687,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
Kind: qvalue.QValueKindInt64,
Value: syncBatchID,
}
recordStream.Records <- &model.QRecordOrError{
Record: &model.QRecord{
recordStream.Records <- model.QRecordOrError{
Record: model.QRecord{
NumEntries: 10,
Entries: entries[:],
},
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qrecord_value_saver.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (

type QRecordValueSaver struct {
ColumnNames []string
Record *model.QRecord
Record model.QRecord
PartitionID string
RunID int64
}
Expand Down
26 changes: 13 additions & 13 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
fieldDescriptions []pgconn.FieldDescription,
) (*model.QRecordBatch, error) {
// Initialize the record slice
records := make([]*model.QRecord, 0)
records := make([]model.QRecord, 0)
qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
Expand Down Expand Up @@ -171,13 +171,13 @@ func (qe *QRepQueryExecutor) processRowsStream(
for rows.Next() {
record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to map row to QRecord: %w", err),
}
return 0, fmt.Errorf("failed to map row to QRecord: %w", err)
}

stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Record: record,
Err: nil,
}
Expand Down Expand Up @@ -212,7 +212,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
) (int, error) {
rows, err := qe.executeQueryInTx(tx, cursorName, fetchSize)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: err,
}
qe.logger.Error("[pg_query_executor] failed to execute query in tx",
Expand All @@ -237,7 +237,7 @@ func (qe *QRepQueryExecutor) processFetchedRows(
rows.Close()

if rows.Err() != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: rows.Err(),
}
qe.logger.Error("[pg_query_executor] row iteration failed",
Expand Down Expand Up @@ -273,7 +273,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
}
batch := &model.QRecordBatch{
NumRecords: 0,
Records: make([]*model.QRecord, 0),
Records: make([]model.QRecord, 0),
Schema: schema.Schema,
}
for record := range stream.Records {
Expand Down Expand Up @@ -354,7 +354,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
if qe.snapshot != "" {
_, err = tx.Exec(qe.ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", qe.snapshot))
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to set snapshot: %w", err),
}
qe.logger.Error("[pg_query_executor] failed to set snapshot",
Expand All @@ -365,7 +365,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

randomUint, err := shared.RandomUInt64()
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to generate random uint: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err)
Expand All @@ -377,7 +377,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args))
_, err = tx.Exec(qe.ctx, cursorQuery, args...)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to declare cursor: %w", err),
}
qe.logger.Info("[pg_query_executor] failed to declare cursor",
Expand Down Expand Up @@ -409,7 +409,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
qe.logger.Info("Committing transaction")
err = tx.Commit(qe.ctx)
if err != nil {
stream.Records <- &model.QRecordOrError{
stream.Records <- model.QRecordOrError{
Err: fmt.Errorf("failed to commit transaction: %w", err),
}
return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err)
Expand All @@ -421,13 +421,13 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(
}

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string) (*model.QRecord, error) {
customTypeMap map[uint32]string) (model.QRecord, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))

values, err := row.Values()
if err != nil {
return nil, fmt.Errorf("failed to scan row: %w", err)
return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err)
}

for i, fd := range fds {
Expand All @@ -436,7 +436,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
if !ok {
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, tmp)
} else {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func generateRecords(
// Create sample records
records := &model.QRecordBatch{
NumRecords: numRows,
Records: make([]*model.QRecord, numRows),
Records: make([]model.QRecord, numRows),
Schema: schema,
}

Expand All @@ -115,7 +115,7 @@ func generateRecords(
}
}

records.Records[row] = &model.QRecord{
records.Records[row] = model.QRecord{
Entries: entries,
}
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qfields[i] = qfield
}

var records []*model.QRecord
var records []model.QRecord
totalRowsProcessed := 0
const heartBeatNumRows = 25000

Expand Down
3 changes: 1 addition & 2 deletions flow/connectors/utils/avro/avro_writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,9 +146,8 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) (
return 0, fmt.Errorf("[avro] failed to get record from stream: %w", qRecordOrErr.Err)
}

qRecord := qRecordOrErr.Record
avroConverter := model.NewQRecordAvroConverter(
qRecord,
qRecordOrErr.Record,
p.targetDWH,
p.avroSchema.NullableFields,
colNames,
Expand Down
16 changes: 8 additions & 8 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,14 +73,14 @@ func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsT
}, nil
}

func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) *model.QRecordOrError {
func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) model.QRecordOrError {
var entries [8]qvalue.QValue
switch typedRecord := record.(type) {
case *model.InsertRecord:
// json.Marshal converts bytes in Hex automatically to BASE64 string.
itemsJSON, err := typedRecord.Items.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize insert record items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -110,13 +110,13 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record new items to JSON: %w", err),
}
}
oldItemsJSON, err := typedRecord.OldItems.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize update record old items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -145,7 +145,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSON()
if err != nil {
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("failed to serialize delete record items to JSON: %w", err),
}
}
Expand Down Expand Up @@ -173,7 +173,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}
tableMapping[typedRecord.DestinationTableName] += 1
default:
return &model.QRecordOrError{
return model.QRecordOrError{
Err: fmt.Errorf("unknown record type: %T", typedRecord),
}
}
Expand All @@ -191,8 +191,8 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
Value: batchID,
}

return &model.QRecordOrError{
Record: &model.QRecord{
return model.QRecordOrError{
Record: model.QRecord{
NumEntries: 8,
Entries: entries[:],
},
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
return nil, fmt.Errorf("failed to run command: %w", err)
}

var records []*model.QRecord
var records []model.QRecord
for {
var row []bigquery.Value
err := it.Next(&row)
Expand Down
4 changes: 2 additions & 2 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

type QRecordAvroConverter struct {
QRecord *QRecord
QRecord QRecord
TargetDWH qvalue.QDWHType
NullableFields map[string]struct{}
ColNames []string
}

func NewQRecordAvroConverter(
q *QRecord,
q QRecord,
targetDWH qvalue.QDWHType,
nullableFields map[string]struct{},
colNames []string,
Expand Down
6 changes: 3 additions & 3 deletions flow/model/qrecord.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ type QRecord struct {
}

// create a new QRecord with n values
func NewQRecord(n int) *QRecord {
return &QRecord{
func NewQRecord(n int) QRecord {
return QRecord{
NumEntries: n,
Entries: make([]qvalue.QValue, n),
}
Expand All @@ -25,7 +25,7 @@ func (q *QRecord) Set(idx int, value qvalue.QValue) {
}

// equals checks if two QRecords are identical.
func (q *QRecord) equals(other *QRecord) bool {
func (q QRecord) equals(other QRecord) bool {
// First check simple attributes
if q.NumEntries != other.NumEntries {
return false
Expand Down
10 changes: 5 additions & 5 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (

// QRecordBatch holds a batch of QRecord objects.
type QRecordBatch struct {
NumRecords uint32 // NumRecords represents the number of records in the batch.
Records []*QRecord // Records is a slice of pointers to QRecord objects.
NumRecords uint32 // NumRecords represents the number of records in the batch.
Records []QRecord
Schema *QRecordSchema
}

Expand Down Expand Up @@ -66,7 +66,7 @@ func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) {
}

for _, record := range q.Records {
stream.Records <- &QRecordOrError{
stream.Records <- QRecordOrError{
Record: record,
}
}
Expand All @@ -79,7 +79,7 @@ func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) {
type QRecordBatchCopyFromSource struct {
numRecords int
stream *QRecordStream
currentRecord *QRecordOrError
currentRecord QRecordOrError
err error
}

Expand All @@ -89,7 +89,7 @@ func NewQRecordBatchCopyFromSource(
return &QRecordBatchCopyFromSource{
numRecords: 0,
stream: stream,
currentRecord: nil,
currentRecord: QRecordOrError{},
err: nil,
}
}
Expand Down
14 changes: 7 additions & 7 deletions flow/model/qrecord_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package model
import "fmt"

type QRecordOrError struct {
Record *QRecord
Record QRecord
Err error
}

Expand All @@ -13,8 +13,8 @@ type QRecordSchemaOrError struct {
}

type QRecordStream struct {
schema chan *QRecordSchemaOrError
Records chan *QRecordOrError
schema chan QRecordSchemaOrError
Records chan QRecordOrError
schemaSet bool
schemaCache *QRecordSchema
}
Expand Down Expand Up @@ -47,8 +47,8 @@ type RecordsToStreamResponse struct {

func NewQRecordStream(buffer int) *QRecordStream {
return &QRecordStream{
schema: make(chan *QRecordSchemaOrError, 1),
Records: make(chan *QRecordOrError, buffer),
schema: make(chan QRecordSchemaOrError, 1),
Records: make(chan QRecordOrError, buffer),
schemaSet: false,
schemaCache: nil,
}
Expand All @@ -69,7 +69,7 @@ func (s *QRecordStream) SetSchema(schema *QRecordSchema) error {
return fmt.Errorf("Schema already set")
}

s.schema <- &QRecordSchemaOrError{
s.schema <- QRecordSchemaOrError{
Schema: schema,
}
s.schemaSet = true
Expand All @@ -80,6 +80,6 @@ func (s *QRecordStream) IsSchemaSet() bool {
return s.schemaSet
}

func (s *QRecordStream) SchemaChan() chan *QRecordSchemaOrError {
func (s *QRecordStream) SchemaChan() chan QRecordSchemaOrError {
return s.schema
}
Loading

0 comments on commit b3598ff

Please sign in to comment.