From 774feec7a04b1f2e104c864e52804c9286e7846f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 24 Jan 2024 14:40:41 +0000 Subject: [PATCH] Replace model.QRecord with []qvalue.QValue --- .../postgres/qrep_query_executor.go | 17 +++---- .../snowflake/avro_file_writer_test.go | 6 +-- flow/connectors/sql/query_executor.go | 10 +--- flow/connectors/utils/stream.go | 5 +- flow/e2e/bigquery/bigquery_helper.go | 10 +--- flow/e2eshared/e2eshared.go | 11 ++-- flow/model/conversion_avro.go | 8 +-- flow/model/qrecord.go | 23 --------- flow/model/qrecord_batch.go | 8 +-- flow/model/qrecord_stream.go | 8 ++- flow/model/qrecord_test.go | 51 ++++--------------- 11 files changed, 45 insertions(+), 112 deletions(-) delete mode 100644 flow/model/qrecord.go diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 52648249e3..dd0d6a2438 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -130,7 +130,7 @@ func (qe *QRepQueryExecutor) ProcessRows( fieldDescriptions []pgconn.FieldDescription, ) (*model.QRecordBatch, error) { // Initialize the record slice - records := make([]model.QRecord, 0) + records := make([][]qvalue.QValue, 0) qe.logger.Info("Processing rows") // Iterate over the rows for rows.Next() { @@ -284,7 +284,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( } batch := &model.QRecordBatch{ NumRecords: 0, - Records: make([]model.QRecord, 0), + Records: make([][]qvalue.QValue, 0), Schema: schema.Schema, } for record := range stream.Records { @@ -437,14 +437,14 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, customTypeMap map[uint32]string, -) (model.QRecord, error) { +) ([]qvalue.QValue, error) { // make vals an empty array of QValue of size len(fds) - record := model.NewQRecord(len(fds)) + record := make([]qvalue.QValue, len(fds)) values, err := row.Values() if err != nil { slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err)) - return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err) + return nil, fmt.Errorf("failed to scan row: %w", err) } for i, fd := range fds { @@ -454,9 +454,9 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) if err != nil { slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err)) - return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err) + return nil, fmt.Errorf("failed to parse field: %w", err) } - record.Set(i, tmp) + record[i] = tmp } else { customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { @@ -468,11 +468,10 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, values[i] = wkt } } - customTypeVal := qvalue.QValue{ + record[i] = qvalue.QValue{ Kind: customQKind, Value: values[i], } - record.Set(i, customTypeVal) } } diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 1e531ca3ac..28680e4249 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -98,7 +98,7 @@ func generateRecords( // Create sample records records := &model.QRecordBatch{ NumRecords: numRows, - Records: make([]model.QRecord, numRows), + Records: make([][]qvalue.QValue, numRows), Schema: schema, } @@ -121,9 +121,7 @@ func generateRecords( } } - records.Records[row] = model.QRecord{ - Entries: entries, - } + records.Records[row] = entries } stream, err := records.ToQRecordStream(1024) diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index b5e699e067..632a5c5ef4 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa qfields[i] = qfield } - var records []model.QRecord + var records [][]qvalue.QValue totalRowsProcessed := 0 const heartBeatNumRows = 25000 @@ -237,13 +237,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa qValues[i] = qv } - // Create a QRecord - record := model.NewQRecord(len(qValues)) - for i, qv := range qValues { - record.Set(i, qv) - } - - records = append(records, record) + records = append(records, qValues) totalRowsProcessed += 1 if totalRowsProcessed%heartBeatNumRows == 0 { diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 59602b676a..898acd03b5 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -183,9 +183,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor } return model.QRecordOrError{ - Record: model.QRecord{ - NumEntries: 8, - Entries: entries[:], - }, + Record: entries[:], } } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 0d024f9dc7..e26d20136c 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -334,7 +334,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor return nil, fmt.Errorf("failed to run command: %w", err) } - var records []model.QRecord + var records [][]qvalue.QValue for { var row []bigquery.Value err := it.Next(&row) @@ -355,13 +355,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor qValues[i] = qv } - // Create a QRecord - record := model.NewQRecord(len(qValues)) - for i, qv := range qValues { - record.Set(i, qv) - } - - records = append(records, record) + records = append(records, qValues) } // Now you should fill the column names as well. Here we assume the schema is diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 9ac22762f8..9b5bd289a9 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) type Suite interface { @@ -68,16 +69,16 @@ func ReadFileToBytes(path string) ([]byte, error) { } // checks if two QRecords are identical -func CheckQRecordEquality(t *testing.T, q model.QRecord, other model.QRecord) bool { +func CheckQRecordEquality(t *testing.T, q []qvalue.QValue, other []qvalue.QValue) bool { t.Helper() - if q.NumEntries != other.NumEntries { - t.Logf("unequal entry count: %d != %d", q.NumEntries, other.NumEntries) + if len(q) != len(other) { + t.Logf("unequal entry count: %d != %d", len(q), len(other)) return false } - for i, entry := range q.Entries { - otherEntry := other.Entries[i] + for i, entry := range q { + otherEntry := other[i] if !entry.Equals(otherEntry) { t.Logf("entry %d: %v != %v", i, entry, otherEntry) return false diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 9b3e97fff9..1bbbcc2cb2 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -8,14 +8,14 @@ import ( ) type QRecordAvroConverter struct { - QRecord QRecord + QRecord []qvalue.QValue TargetDWH qvalue.QDWHType NullableFields map[string]struct{} ColNames []string } func NewQRecordAvroConverter( - q QRecord, + q []qvalue.QValue, targetDWH qvalue.QDWHType, nullableFields map[string]struct{}, colNames []string, @@ -31,12 +31,12 @@ func NewQRecordAvroConverter( func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) { m := map[string]interface{}{} - for idx := range qac.QRecord.Entries { + for idx, val := range qac.QRecord { key := qac.ColNames[idx] _, nullable := qac.NullableFields[key] avroConverter := qvalue.NewQValueAvroConverter( - qac.QRecord.Entries[idx], + val, qac.TargetDWH, nullable, ) diff --git a/flow/model/qrecord.go b/flow/model/qrecord.go deleted file mode 100644 index ab7fbca24e..0000000000 --- a/flow/model/qrecord.go +++ /dev/null @@ -1,23 +0,0 @@ -package model - -import ( - "github.com/PeerDB-io/peer-flow/model/qvalue" -) - -type QRecord struct { - NumEntries int - Entries []qvalue.QValue -} - -// create a new QRecord with n values -func NewQRecord(n int) QRecord { - return QRecord{ - NumEntries: n, - Entries: make([]qvalue.QValue, n), - } -} - -// Sets the value at the given index -func (q QRecord) Set(idx int, value qvalue.QValue) { - q.Entries[idx] = value -} diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 9b18dfbbc0..9ffec360cd 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -13,10 +13,10 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -// QRecordBatch holds a batch of QRecord objects. +// QRecordBatch holds a batch of []QValue slices type QRecordBatch struct { NumRecords uint32 // NumRecords represents the number of records in the batch. - Records []QRecord + Records [][]qvalue.QValue Schema *QRecordSchema } @@ -90,10 +90,10 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { } record := src.currentRecord.Record - numEntries := len(record.Entries) + numEntries := len(record) values := make([]interface{}, numEntries) - for i, qValue := range record.Entries { + for i, qValue := range record { if qValue.Value == nil { values[i] = nil continue diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index a293e66ebb..83ac85032d 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -1,9 +1,13 @@ package model -import "fmt" +import ( + "fmt" + + "github.com/PeerDB-io/peer-flow/model/qvalue" +) type QRecordOrError struct { - Record QRecord + Record []qvalue.QValue Err error } diff --git a/flow/model/qrecord_test.go b/flow/model/qrecord_test.go index 775d0c81fd..6c685a8f41 100644 --- a/flow/model/qrecord_test.go +++ b/flow/model/qrecord_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/PeerDB-io/peer-flow/e2eshared" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -18,62 +17,32 @@ func TestEquals(t *testing.T) { tests := []struct { name string - q1 model.QRecord - q2 model.QRecord + q1 []qvalue.QValue + q2 []qvalue.QValue want bool }{ { name: "Equal - Same UUID", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{ - {Kind: qvalue.QValueKindString, Value: uuidVal1.String()}, - }, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: uuidVal1.String()}}, want: true, }, { name: "Not Equal - Different UUID", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal2}}, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal2}}, want: false, }, { name: "Equal - Same numeric", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{ - {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, - }, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: "5"}}, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: "5"}}, want: true, }, { name: "Not Equal - Different numeric", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{ - {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, - }, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: "4.99"}}, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: "4.99"}}, want: false, }, }