diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index 52648249e3..dd0d6a2438 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -130,7 +130,7 @@ func (qe *QRepQueryExecutor) ProcessRows( fieldDescriptions []pgconn.FieldDescription, ) (*model.QRecordBatch, error) { // Initialize the record slice - records := make([]model.QRecord, 0) + records := make([][]qvalue.QValue, 0) qe.logger.Info("Processing rows") // Iterate over the rows for rows.Next() { @@ -284,7 +284,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery( } batch := &model.QRecordBatch{ NumRecords: 0, - Records: make([]model.QRecord, 0), + Records: make([][]qvalue.QValue, 0), Schema: schema.Schema, } for record := range stream.Records { @@ -437,14 +437,14 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx( func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, customTypeMap map[uint32]string, -) (model.QRecord, error) { +) ([]qvalue.QValue, error) { // make vals an empty array of QValue of size len(fds) - record := model.NewQRecord(len(fds)) + record := make([]qvalue.QValue, len(fds)) values, err := row.Values() if err != nil { slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err)) - return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err) + return nil, fmt.Errorf("failed to scan row: %w", err) } for i, fd := range fds { @@ -454,9 +454,9 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i]) if err != nil { slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err)) - return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err) + return nil, fmt.Errorf("failed to parse field: %w", err) } - record.Set(i, tmp) + record[i] = tmp } else { customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { @@ -468,11 +468,10 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, values[i] = wkt } } - customTypeVal := qvalue.QValue{ + record[i] = qvalue.QValue{ Kind: customQKind, Value: values[i], } - record.Set(i, customTypeVal) } } diff --git a/flow/connectors/postgres/qrep_query_executor_test.go b/flow/connectors/postgres/qrep_query_executor_test.go index db5a04d93f..fc336e2070 100644 --- a/flow/connectors/postgres/qrep_query_executor_test.go +++ b/flow/connectors/postgres/qrep_query_executor_test.go @@ -95,8 +95,8 @@ func TestExecuteAndProcessQuery(t *testing.T) { t.Fatalf("expected 1 record, got %v", len(batch.Records)) } - if batch.Records[0].Entries[1].Value != "testdata" { - t.Fatalf("expected 'testdata', got %v", batch.Records[0].Entries[0].Value) + if batch.Records[0][1].Value != "testdata" { + t.Fatalf("expected 'testdata', got %v", batch.Records[0][0].Value) } } @@ -212,52 +212,52 @@ func TestAllDataTypes(t *testing.T) { record := batch.Records[0] expectedBool := true - if record.Entries[0].Value.(bool) != expectedBool { - t.Fatalf("expected %v, got %v", expectedBool, record.Entries[0].Value) + if record[0].Value.(bool) != expectedBool { + t.Fatalf("expected %v, got %v", expectedBool, record[0].Value) } expectedInt4 := int32(2) - if record.Entries[1].Value.(int32) != expectedInt4 { - t.Fatalf("expected %v, got %v", expectedInt4, record.Entries[1].Value) + if record[1].Value.(int32) != expectedInt4 { + t.Fatalf("expected %v, got %v", expectedInt4, record[1].Value) } expectedInt8 := int64(3) - if record.Entries[2].Value.(int64) != expectedInt8 { - t.Fatalf("expected %v, got %v", expectedInt8, record.Entries[2].Value) + if record[2].Value.(int64) != expectedInt8 { + t.Fatalf("expected %v, got %v", expectedInt8, record[2].Value) } expectedFloat4 := float32(1.1) - if record.Entries[3].Value.(float32) != expectedFloat4 { - t.Fatalf("expected %v, got %v", expectedFloat4, record.Entries[3].Value) + if record[3].Value.(float32) != expectedFloat4 { + t.Fatalf("expected %v, got %v", expectedFloat4, record[3].Value) } expectedFloat8 := float64(2.2) - if record.Entries[4].Value.(float64) != expectedFloat8 { - t.Fatalf("expected %v, got %v", expectedFloat8, record.Entries[4].Value) + if record[4].Value.(float64) != expectedFloat8 { + t.Fatalf("expected %v, got %v", expectedFloat8, record[4].Value) } expectedText := "text" - if record.Entries[5].Value.(string) != expectedText { - t.Fatalf("expected %v, got %v", expectedText, record.Entries[5].Value) + if record[5].Value.(string) != expectedText { + t.Fatalf("expected %v, got %v", expectedText, record[5].Value) } expectedBytea := []byte("bytea") - if !bytes.Equal(record.Entries[6].Value.([]byte), expectedBytea) { - t.Fatalf("expected %v, got %v", expectedBytea, record.Entries[6].Value) + if !bytes.Equal(record[6].Value.([]byte), expectedBytea) { + t.Fatalf("expected %v, got %v", expectedBytea, record[6].Value) } expectedJSON := `{"key":"value"}` - if record.Entries[7].Value.(string) != expectedJSON { - t.Fatalf("expected %v, got %v", expectedJSON, record.Entries[7].Value) + if record[7].Value.(string) != expectedJSON { + t.Fatalf("expected %v, got %v", expectedJSON, record[7].Value) } - actualUUID := record.Entries[8].Value.([16]uint8) + actualUUID := record[8].Value.([16]uint8) if !bytes.Equal(actualUUID[:], savedUUID[:]) { t.Fatalf("expected %v, got %v", savedUUID, actualUUID) } expectedNumeric := "123.456" - actualNumeric := record.Entries[10].Value.(*big.Rat).FloatString(3) + actualNumeric := record[10].Value.(*big.Rat).FloatString(3) if actualNumeric != expectedNumeric { t.Fatalf("expected %v, got %v", expectedNumeric, actualNumeric) } diff --git a/flow/connectors/snowflake/avro_file_writer_test.go b/flow/connectors/snowflake/avro_file_writer_test.go index 1e531ca3ac..28680e4249 100644 --- a/flow/connectors/snowflake/avro_file_writer_test.go +++ b/flow/connectors/snowflake/avro_file_writer_test.go @@ -98,7 +98,7 @@ func generateRecords( // Create sample records records := &model.QRecordBatch{ NumRecords: numRows, - Records: make([]model.QRecord, numRows), + Records: make([][]qvalue.QValue, numRows), Schema: schema, } @@ -121,9 +121,7 @@ func generateRecords( } } - records.Records[row] = model.QRecord{ - Entries: entries, - } + records.Records[row] = entries } stream, err := records.ToQRecordStream(1024) diff --git a/flow/connectors/sql/query_executor.go b/flow/connectors/sql/query_executor.go index b5e699e067..632a5c5ef4 100644 --- a/flow/connectors/sql/query_executor.go +++ b/flow/connectors/sql/query_executor.go @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa qfields[i] = qfield } - var records []model.QRecord + var records [][]qvalue.QValue totalRowsProcessed := 0 const heartBeatNumRows = 25000 @@ -237,13 +237,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa qValues[i] = qv } - // Create a QRecord - record := model.NewQRecord(len(qValues)) - for i, qv := range qValues { - record.Set(i, qv) - } - - records = append(records, record) + records = append(records, qValues) totalRowsProcessed += 1 if totalRowsProcessed%heartBeatNumRows == 0 { diff --git a/flow/connectors/utils/stream.go b/flow/connectors/utils/stream.go index 59602b676a..898acd03b5 100644 --- a/flow/connectors/utils/stream.go +++ b/flow/connectors/utils/stream.go @@ -183,9 +183,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor } return model.QRecordOrError{ - Record: model.QRecord{ - NumEntries: 8, - Entries: entries[:], - }, + Record: entries[:], } } diff --git a/flow/e2e/bigquery/bigquery_helper.go b/flow/e2e/bigquery/bigquery_helper.go index 0d024f9dc7..7eb7881548 100644 --- a/flow/e2e/bigquery/bigquery_helper.go +++ b/flow/e2e/bigquery/bigquery_helper.go @@ -334,7 +334,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor return nil, fmt.Errorf("failed to run command: %w", err) } - var records []model.QRecord + var records [][]qvalue.QValue for { var row []bigquery.Value err := it.Next(&row) @@ -355,13 +355,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor qValues[i] = qv } - // Create a QRecord - record := model.NewQRecord(len(qValues)) - for i, qv := range qValues { - record.Set(i, qv) - } - - records = append(records, record) + records = append(records, qValues) } // Now you should fill the column names as well. Here we assume the schema is @@ -518,5 +512,5 @@ func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) { return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords) } - return recordBatch.Records[0].Entries[0].Value.(int64), nil + return recordBatch.Records[0][0].Value.(int64), nil } diff --git a/flow/e2e/bigquery/peer_flow_bq_test.go b/flow/e2e/bigquery/peer_flow_bq_test.go index c195ab5e56..651f5551a4 100644 --- a/flow/e2e/bigquery/peer_flow_bq_test.go +++ b/flow/e2e/bigquery/peer_flow_bq_test.go @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, va return fmt.Errorf("json value check failed: %v", err) } - jsonVal := res.Records[0].Entries[0].Value + jsonVal := res.Records[0][0].Value if jsonVal != value { return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value) } @@ -114,7 +114,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel recordCount := 0 for _, record := range recordBatch.Records { - for _, entry := range record.Entries { + for _, entry := range record { if entry.Kind == qvalue.QValueKindBoolean { isDeleteVal, ok := entry.Value.(bool) if !(ok && isDeleteVal) { diff --git a/flow/e2e/snowflake/qrep_flow_sf_test.go b/flow/e2e/snowflake/qrep_flow_sf_test.go index 574869ac82..ac90de1770 100644 --- a/flow/e2e/snowflake/qrep_flow_sf_test.go +++ b/flow/e2e/snowflake/qrep_flow_sf_test.go @@ -26,7 +26,7 @@ func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, va return fmt.Errorf("json value check failed: %v", err) } - jsonVal := res.Records[0].Entries[0].Value + jsonVal := res.Records[0][0].Value if jsonVal != value { return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value) } diff --git a/flow/e2e/snowflake/snowflake_helper.go b/flow/e2e/snowflake/snowflake_helper.go index 88ce61e60d..1c4b9d2bb9 100644 --- a/flow/e2e/snowflake/snowflake_helper.go +++ b/flow/e2e/snowflake/snowflake_helper.go @@ -159,21 +159,21 @@ func (s *SnowflakeTestHelper) RunIntQuery(query string) (int, error) { } rec := rows.Records[0] - if rec.NumEntries != 1 { - return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, rec.NumEntries) + if len(rec) != 1 { + return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, len(rec)) } - switch rec.Entries[0].Kind { + switch rec[0].Kind { case qvalue.QValueKindInt32: - return int(rec.Entries[0].Value.(int32)), nil + return int(rec[0].Value.(int32)), nil case qvalue.QValueKindInt64: - return int(rec.Entries[0].Value.(int64)), nil + return int(rec[0].Value.(int64)), nil case qvalue.QValueKindNumeric: // get big.Rat and convert to int - rat := rec.Entries[0].Value.(*big.Rat) + rat := rec[0].Value.(*big.Rat) return int(rat.Num().Int64() / rat.Denom().Int64()), nil default: - return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec.Entries[0].Kind) + return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec[0].Kind) } } @@ -185,7 +185,7 @@ func (s *SnowflakeTestHelper) checkSyncedAt(query string) error { } for _, record := range recordBatch.Records { - for _, entry := range record.Entries { + for _, entry := range record { if entry.Kind != qvalue.QValueKindTimestamp { return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp") } diff --git a/flow/e2eshared/e2eshared.go b/flow/e2eshared/e2eshared.go index 9ac22762f8..9b5bd289a9 100644 --- a/flow/e2eshared/e2eshared.go +++ b/flow/e2eshared/e2eshared.go @@ -11,6 +11,7 @@ import ( "github.com/jackc/pgx/v5/pgxpool" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" ) type Suite interface { @@ -68,16 +69,16 @@ func ReadFileToBytes(path string) ([]byte, error) { } // checks if two QRecords are identical -func CheckQRecordEquality(t *testing.T, q model.QRecord, other model.QRecord) bool { +func CheckQRecordEquality(t *testing.T, q []qvalue.QValue, other []qvalue.QValue) bool { t.Helper() - if q.NumEntries != other.NumEntries { - t.Logf("unequal entry count: %d != %d", q.NumEntries, other.NumEntries) + if len(q) != len(other) { + t.Logf("unequal entry count: %d != %d", len(q), len(other)) return false } - for i, entry := range q.Entries { - otherEntry := other.Entries[i] + for i, entry := range q { + otherEntry := other[i] if !entry.Equals(otherEntry) { t.Logf("entry %d: %v != %v", i, entry, otherEntry) return false diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 9b3e97fff9..1bbbcc2cb2 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -8,14 +8,14 @@ import ( ) type QRecordAvroConverter struct { - QRecord QRecord + QRecord []qvalue.QValue TargetDWH qvalue.QDWHType NullableFields map[string]struct{} ColNames []string } func NewQRecordAvroConverter( - q QRecord, + q []qvalue.QValue, targetDWH qvalue.QDWHType, nullableFields map[string]struct{}, colNames []string, @@ -31,12 +31,12 @@ func NewQRecordAvroConverter( func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) { m := map[string]interface{}{} - for idx := range qac.QRecord.Entries { + for idx, val := range qac.QRecord { key := qac.ColNames[idx] _, nullable := qac.NullableFields[key] avroConverter := qvalue.NewQValueAvroConverter( - qac.QRecord.Entries[idx], + val, qac.TargetDWH, nullable, ) diff --git a/flow/model/qrecord.go b/flow/model/qrecord.go deleted file mode 100644 index ab7fbca24e..0000000000 --- a/flow/model/qrecord.go +++ /dev/null @@ -1,23 +0,0 @@ -package model - -import ( - "github.com/PeerDB-io/peer-flow/model/qvalue" -) - -type QRecord struct { - NumEntries int - Entries []qvalue.QValue -} - -// create a new QRecord with n values -func NewQRecord(n int) QRecord { - return QRecord{ - NumEntries: n, - Entries: make([]qvalue.QValue, n), - } -} - -// Sets the value at the given index -func (q QRecord) Set(idx int, value qvalue.QValue) { - q.Entries[idx] = value -} diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 9b18dfbbc0..9ffec360cd 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -13,10 +13,10 @@ import ( "github.com/PeerDB-io/peer-flow/model/qvalue" ) -// QRecordBatch holds a batch of QRecord objects. +// QRecordBatch holds a batch of []QValue slices type QRecordBatch struct { NumRecords uint32 // NumRecords represents the number of records in the batch. - Records []QRecord + Records [][]qvalue.QValue Schema *QRecordSchema } @@ -90,10 +90,10 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { } record := src.currentRecord.Record - numEntries := len(record.Entries) + numEntries := len(record) values := make([]interface{}, numEntries) - for i, qValue := range record.Entries { + for i, qValue := range record { if qValue.Value == nil { values[i] = nil continue diff --git a/flow/model/qrecord_stream.go b/flow/model/qrecord_stream.go index a293e66ebb..83ac85032d 100644 --- a/flow/model/qrecord_stream.go +++ b/flow/model/qrecord_stream.go @@ -1,9 +1,13 @@ package model -import "fmt" +import ( + "fmt" + + "github.com/PeerDB-io/peer-flow/model/qvalue" +) type QRecordOrError struct { - Record QRecord + Record []qvalue.QValue Err error } diff --git a/flow/model/qrecord_test.go b/flow/model/qrecord_test.go index 775d0c81fd..6c685a8f41 100644 --- a/flow/model/qrecord_test.go +++ b/flow/model/qrecord_test.go @@ -8,7 +8,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/PeerDB-io/peer-flow/e2eshared" - "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -18,62 +17,32 @@ func TestEquals(t *testing.T) { tests := []struct { name string - q1 model.QRecord - q2 model.QRecord + q1 []qvalue.QValue + q2 []qvalue.QValue want bool }{ { name: "Equal - Same UUID", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{ - {Kind: qvalue.QValueKindString, Value: uuidVal1.String()}, - }, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: uuidVal1.String()}}, want: true, }, { name: "Not Equal - Different UUID", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal2}}, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal1}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindUUID, Value: uuidVal2}}, want: false, }, { name: "Equal - Same numeric", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{ - {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, - }, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: "5"}}, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindString, Value: "5"}}, want: true, }, { name: "Not Equal - Different numeric", - q1: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{ - {Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}, - }, - }, - q2: model.QRecord{ - NumEntries: 1, - Entries: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: "4.99"}}, - }, + q1: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: big.NewRat(10, 2)}}, + q2: []qvalue.QValue{{Kind: qvalue.QValueKindNumeric, Value: "4.99"}}, want: false, }, }