diff --git a/flow/connectors/bigquery/bigquery.go b/flow/connectors/bigquery/bigquery.go index 9b32e30b43..becc222253 100644 --- a/flow/connectors/bigquery/bigquery.go +++ b/flow/connectors/bigquery/bigquery.go @@ -687,8 +687,8 @@ func (c *BigQueryConnector) syncRecordsViaAvro( Kind: qvalue.QValueKindInt64, Value: syncBatchID, } - recordStream.Records <- &model.QRecordOrError{ - Record: &model.QRecord{ + recordStream.Records <- model.QRecordOrError{ + Record: model.QRecord{ NumEntries: 10, Entries: entries[:], }, diff --git a/flow/connectors/bigquery/qrecord_value_saver.go b/flow/connectors/bigquery/qrecord_value_saver.go index 202ac3df4d..049f891025 100644 --- a/flow/connectors/bigquery/qrecord_value_saver.go +++ b/flow/connectors/bigquery/qrecord_value_saver.go @@ -12,7 +12,7 @@ import ( type QRecordValueSaver struct { ColumnNames []string - Record *model.QRecord + Record model.QRecord PartitionID string RunID int64 } diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 08f9f93488..1e369dbd49 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -131,7 +131,7 @@ func (qe *QRepQueryExecutor) ProcessRows( fieldDescriptions []pgconn.FieldDescription, ) (*model.QRecordBatch, error) { // Initialize the record slice - records := make([]*model.QRecord, 0) + records := make([]model.QRecord, 0) qe.logger.Info("Processing rows") // Iterate over the rows for rows.Next() { @@ -171,13 +171,13 @@ func (qe *QRepQueryExecutor) processRowsStream( for rows.Next() { record, err := mapRowToQRecord(rows, fieldDescriptions, qe.customTypeMap) if err != nil { - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to map row to QRecord: %w", err), } return 0, fmt.Errorf("failed to map row to QRecord: %w", err) } - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Record: record, Err: nil, } @@ -212,7 +212,7 @@ func (qe *QRepQueryExecutor) processFetchedRows( ) (int, error) { rows, err := qe.executeQueryInTx(tx, cursorName, fetchSize) if err != nil { - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Err: err, } qe.logger.Error("[pg_query_executor] failed to execute query in tx", @@ -237,7 +237,7 @@ func (qe *QRepQueryExecutor) processFetchedRows( rows.Close() if rows.Err() != nil { - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Err: rows.Err(), } qe.logger.Error("[pg_query_executor] row iteration failed", @@ -273,7 +273,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( } batch := &model.QRecordBatch{ NumRecords: 0, - Records: make([]*model.QRecord, 0), + Records: make([]model.QRecord, 0), Schema: schema.Schema, } for record := range stream.Records { @@ -354,7 +354,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( if qe.snapshot != "" { _, err = tx.Exec(qe.ctx, fmt.Sprintf("SET TRANSACTION SNAPSHOT '%s'", qe.snapshot)) if err != nil { - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to set snapshot: %w", err), } qe.logger.Error("[pg_query_executor] failed to set snapshot", @@ -365,7 +365,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( randomUint, err := shared.RandomUInt64() if err != nil { - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to generate random uint: %w", err), } return 0, fmt.Errorf("[pg_query_executor] failed to generate random uint: %w", err) @@ -377,7 +377,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( qe.logger.Info(fmt.Sprintf("[pg_query_executor] executing cursor declaration for %v with args %v", cursorQuery, args)) _, err = tx.Exec(qe.ctx, cursorQuery, args...) if err != nil { - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to declare cursor: %w", err), } qe.logger.Info("[pg_query_executor] failed to declare cursor", @@ -409,7 +409,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( qe.logger.Info("Committing transaction") err = tx.Commit(qe.ctx) if err != nil { - stream.Records <- &model.QRecordOrError{ + stream.Records <- model.QRecordOrError{ Err: fmt.Errorf("failed to commit transaction: %w", err), } return 0, fmt.Errorf("[pg_query_executor] failed to commit transaction: %w", err) @@ -421,13 +421,13 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( } func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, - customTypeMap map[uint32]string) (*model.QRecord, error) { + customTypeMap map[uint32]string) (model.QRecord, error) { // make vals an empty array of QValue of size len(fds) record := model.NewQRecord(len(fds)) values, err := row.Values() if err != nil { - return nil, fmt.Errorf("failed to scan row: %w", err) + return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err) } for i, fd := range fds { @@ -436,7 +436,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, if !ok { tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) if err != nil { - return nil, fmt.Errorf("failed to parse field: %w", err) + return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err) } record.Set(i, tmp) } else { diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 76b70f478f..4738cc5758 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -92,7 +92,7 @@ func generateRecords( // Create sample records records := &model.QRecordBatch{ NumRecords: numRows, - Records: make([]*model.QRecord, numRows), + Records: make([]model.QRecord, numRows), Schema: schema, } @@ -115,7 +115,7 @@ func generateRecords( } } - records.Records[row] = &model.QRecord{ + records.Records[row] = model.QRecord{ Entries: entries, } } diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index 02bc14bbfa..89bcf454ac 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa qfields[i] = qfield } - var records []*model.QRecord + var records []model.QRecord totalRowsProcessed := 0 const heartBeatNumRows = 25000 diff --git a/flow/connectors/utils/avro/avro_writer.go b/flow/connectors/utils/avro/avro_writer.go index 1f44f4b7ae..c484352f72 100644 --- a/flow/connectors/utils/avro/avro_writer.go +++ b/flow/connectors/utils/avro/avro_writer.go @@ -146,9 +146,8 @@ func (p *peerDBOCFWriter) writeRecordsToOCFWriter(ocfWriter *goavro.OCFWriter) ( return 0, fmt.Errorf("[avro] failed to get record from stream: %w", qRecordOrErr.Err) } - qRecord := qRecordOrErr.Record avroConverter := model.NewQRecordAvroConverter( - qRecord, + qRecordOrErr.Record, p.targetDWH, p.avroSchema.NullableFields, colNames, diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 2ef78d33ce..2e8215f5f4 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -73,14 +73,14 @@ func RecordsToRawTableStream(req *model.RecordsToStreamRequest) (*model.RecordsT }, nil } -func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) *model.QRecordOrError { +func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, record model.Record) model.QRecordOrError { var entries [8]qvalue.QValue switch typedRecord := record.(type) { case *model.InsertRecord: // json.Marshal converts bytes in Hex automatically to BASE64 string. itemsJSON, err := typedRecord.Items.ToJSON() if err != nil { - return &model.QRecordOrError{ + return model.QRecordOrError{ Err: fmt.Errorf("failed to serialize insert record items to JSON: %w", err), } } @@ -110,13 +110,13 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor case *model.UpdateRecord: newItemsJSON, err := typedRecord.NewItems.ToJSON() if err != nil { - return &model.QRecordOrError{ + return model.QRecordOrError{ Err: fmt.Errorf("failed to serialize update record new items to JSON: %w", err), } } oldItemsJSON, err := typedRecord.OldItems.ToJSON() if err != nil { - return &model.QRecordOrError{ + return model.QRecordOrError{ Err: fmt.Errorf("failed to serialize update record old items to JSON: %w", err), } } @@ -145,7 +145,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor case *model.DeleteRecord: itemsJSON, err := typedRecord.Items.ToJSON() if err != nil { - return &model.QRecordOrError{ + return model.QRecordOrError{ Err: fmt.Errorf("failed to serialize delete record items to JSON: %w", err), } } @@ -173,7 +173,7 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor } tableMapping[typedRecord.DestinationTableName] += 1 default: - return &model.QRecordOrError{ + return model.QRecordOrError{ Err: fmt.Errorf("unknown record type: %T", typedRecord), } } @@ -191,8 +191,8 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor Value: batchID, } - return &model.QRecordOrError{ - Record: &model.QRecord{ + return model.QRecordOrError{ + Record: model.QRecord{ NumEntries: 8, Entries: entries[:], }, diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index f3508d033e..d2fab996f1 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -306,7 +306,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor return nil, fmt.Errorf("failed to run command: %w", err) } - var records []*model.QRecord + var records []model.QRecord for { var row []bigquery.Value err := it.Next(&row) diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index db58610e6b..6818299073 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -8,14 +8,14 @@ import ( ) type QRecordAvroConverter struct { - QRecord *QRecord + QRecord QRecord TargetDWH qvalue.QDWHType NullableFields map[string]struct{} ColNames []string } func NewQRecordAvroConverter( - q *QRecord, + q QRecord, targetDWH qvalue.QDWHType, nullableFields map[string]struct{}, colNames []string, diff --git a/flow/model/qrecord.go b/flow/model/qrecord.go index 5b44808be7..bc2f7f84ff 100644 --- a/flow/model/qrecord.go +++ b/flow/model/qrecord.go @@ -12,8 +12,8 @@ type QRecord struct { } // create a new QRecord with n values -func NewQRecord(n int) *QRecord { - return &QRecord{ +func NewQRecord(n int) QRecord { + return QRecord{ NumEntries: n, Entries: make([]qvalue.QValue, n), } @@ -25,7 +25,7 @@ func (q *QRecord) Set(idx int, value qvalue.QValue) { } // equals checks if two QRecords are identical. -func (q *QRecord) equals(other *QRecord) bool { +func (q QRecord) equals(other QRecord) bool { // First check simple attributes if q.NumEntries != other.NumEntries { return false diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index e2990a28ae..c0fd76b643 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -13,8 +13,8 @@ import ( // QRecordBatch holds a batch of QRecord objects. type QRecordBatch struct { - NumRecords uint32 // NumRecords represents the number of records in the batch. - Records []*QRecord // Records is a slice of pointers to QRecord objects. + NumRecords uint32 // NumRecords represents the number of records in the batch. + Records []QRecord Schema *QRecordSchema } @@ -66,7 +66,7 @@ func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) { } for _, record := range q.Records { - stream.Records <- &QRecordOrError{ + stream.Records <- QRecordOrError{ Record: record, } } @@ -79,7 +79,7 @@ func (q *QRecordBatch) ToQRecordStream(buffer int) (*QRecordStream, error) { type QRecordBatchCopyFromSource struct { numRecords int stream *QRecordStream - currentRecord *QRecordOrError + currentRecord QRecordOrError err error } @@ -89,7 +89,7 @@ func NewQRecordBatchCopyFromSource( return &QRecordBatchCopyFromSource{ numRecords: 0, stream: stream, - currentRecord: nil, + currentRecord: QRecordOrError{}, err: nil, } } diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index 721ab58c6c..af8346d633 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -3,7 +3,7 @@ package model import "fmt" type QRecordOrError struct { - Record *QRecord + Record QRecord Err error } @@ -13,8 +13,8 @@ type QRecordSchemaOrError struct { } type QRecordStream struct { - schema chan *QRecordSchemaOrError - Records chan *QRecordOrError + schema chan QRecordSchemaOrError + Records chan QRecordOrError schemaSet bool schemaCache *QRecordSchema } @@ -47,8 +47,8 @@ type RecordsToStreamResponse struct { func NewQRecordStream(buffer int) *QRecordStream { return &QRecordStream{ - schema: make(chan *QRecordSchemaOrError, 1), - Records: make(chan *QRecordOrError, buffer), + schema: make(chan QRecordSchemaOrError, 1), + Records: make(chan QRecordOrError, buffer), schemaSet: false, schemaCache: nil, } @@ -69,7 +69,7 @@ func (s *QRecordStream) SetSchema(schema *QRecordSchema) error { return fmt.Errorf("Schema already set") } - s.schema <- &QRecordSchemaOrError{ + s.schema <- QRecordSchemaOrError{ Schema: schema, } s.schemaSet = true @@ -80,6 +80,6 @@ func (s *QRecordStream) IsSchemaSet() bool { return s.schemaSet } -func (s *QRecordStream) SchemaChan() chan *QRecordSchemaOrError { +func (s *QRecordStream) SchemaChan() chan QRecordSchemaOrError { return s.schema } diff --git a/flow/model/qrecord_test.go b/flow/model/qrecord_test.go index 50ca08a6a7..5d46d61541 100644 --- a/flow/model/qrecord_test.go +++ b/flow/model/qrecord_test.go @@ -15,17 +15,17 @@ func TestEquals(t *testing.T) { tests := []struct { name string - q1 *QRecord - q2 *QRecord + q1 QRecord + q2 QRecord want bool }{ { name: "Equal - Same UUID", - q1: &QRecord{ + q1: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, }, - q2: &QRecord{ + q2: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{ {Kind: qvalue.QValueKindString, Value: uuidVal1.String()}, @@ -35,11 +35,11 @@ func TestEquals(t *testing.T) { }, { name: "Not Equal - Different UUID", - q1: &QRecord{ + q1: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, }, - q2: &QRecord{ + q2: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal2}}, }, @@ -47,13 +47,13 @@ func TestEquals(t *testing.T) { }, { name: "Equal - Same numeric", - q1: &QRecord{ + q1: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{ {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, }, }, - q2: &QRecord{ + q2: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: "5"}}, }, @@ -61,13 +61,13 @@ func TestEquals(t *testing.T) { }, { name: "Not Equal - Different numeric", - q1: &QRecord{ + q1: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{ {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, }, }, - q2: &QRecord{ + q2: QRecord{ NumEntries: 1, Entries: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: "4.99"}}, },