Skip to content

Commit

Permalink
Replace model.QRecord with []qvalue.QValue
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Jan 24, 2024
1 parent 932820f commit c78ff84
Show file tree
Hide file tree
Showing 15 changed files with 77 additions and 144 deletions.
17 changes: 8 additions & 9 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
fieldDescriptions []pgconn.FieldDescription,
) (*model.QRecordBatch, error) {
// Initialize the record slice
records := make([]model.QRecord, 0)
records := make([][]qvalue.QValue, 0)
qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
Expand Down Expand Up @@ -284,7 +284,7 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
}
batch := &model.QRecordBatch{
NumRecords: 0,
Records: make([]model.QRecord, 0),
Records: make([][]qvalue.QValue, 0),
Schema: schema.Schema,
}
for record := range stream.Records {
Expand Down Expand Up @@ -437,14 +437,14 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string,
) (model.QRecord, error) {
) ([]qvalue.QValue, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))
record := make([]qvalue.QValue, len(fds))

values, err := row.Values()
if err != nil {
slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err)
return nil, fmt.Errorf("failed to scan row: %w", err)
}

for i, fd := range fds {
Expand All @@ -454,9 +454,9 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err)
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, tmp)
record[i] = tmp
} else {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
Expand All @@ -468,11 +468,10 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
values[i] = wkt
}
}
customTypeVal := qvalue.QValue{
record[i] = qvalue.QValue{
Kind: customQKind,
Value: values[i],
}
record.Set(i, customTypeVal)
}
}

Expand Down
40 changes: 20 additions & 20 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func TestExecuteAndProcessQuery(t *testing.T) {
t.Fatalf("expected 1 record, got %v", len(batch.Records))
}

if batch.Records[0].Entries[1].Value != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0].Entries[0].Value)
if batch.Records[0][1].Value != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0][0].Value)
}
}

Expand Down Expand Up @@ -212,52 +212,52 @@ func TestAllDataTypes(t *testing.T) {
record := batch.Records[0]

expectedBool := true
if record.Entries[0].Value.(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record.Entries[0].Value)
if record[0].Value.(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record[0].Value)
}

expectedInt4 := int32(2)
if record.Entries[1].Value.(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record.Entries[1].Value)
if record[1].Value.(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record[1].Value)
}

expectedInt8 := int64(3)
if record.Entries[2].Value.(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record.Entries[2].Value)
if record[2].Value.(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record[2].Value)
}

expectedFloat4 := float32(1.1)
if record.Entries[3].Value.(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record.Entries[3].Value)
if record[3].Value.(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record[3].Value)
}

expectedFloat8 := float64(2.2)
if record.Entries[4].Value.(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record.Entries[4].Value)
if record[4].Value.(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record[4].Value)
}

expectedText := "text"
if record.Entries[5].Value.(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record.Entries[5].Value)
if record[5].Value.(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record[5].Value)
}

expectedBytea := []byte("bytea")
if !bytes.Equal(record.Entries[6].Value.([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record.Entries[6].Value)
if !bytes.Equal(record[6].Value.([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record[6].Value)
}

expectedJSON := `{"key":"value"}`
if record.Entries[7].Value.(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record.Entries[7].Value)
if record[7].Value.(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record[7].Value)
}

actualUUID := record.Entries[8].Value.([16]uint8)
actualUUID := record[8].Value.([16]uint8)
if !bytes.Equal(actualUUID[:], savedUUID[:]) {
t.Fatalf("expected %v, got %v", savedUUID, actualUUID)
}

expectedNumeric := "123.456"
actualNumeric := record.Entries[10].Value.(*big.Rat).FloatString(3)
actualNumeric := record[10].Value.(*big.Rat).FloatString(3)
if actualNumeric != expectedNumeric {
t.Fatalf("expected %v, got %v", expectedNumeric, actualNumeric)
}
Expand Down
6 changes: 2 additions & 4 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func generateRecords(
// Create sample records
records := &model.QRecordBatch{
NumRecords: numRows,
Records: make([]model.QRecord, numRows),
Records: make([][]qvalue.QValue, numRows),
Schema: schema,
}

Expand All @@ -121,9 +121,7 @@ func generateRecords(
}
}

records.Records[row] = model.QRecord{
Entries: entries,
}
records.Records[row] = entries
}

stream, err := records.ToQRecordStream(1024)
Expand Down
10 changes: 2 additions & 8 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qfields[i] = qfield
}

var records []model.QRecord
var records [][]qvalue.QValue
totalRowsProcessed := 0
const heartBeatNumRows = 25000

Expand Down Expand Up @@ -237,13 +237,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qValues[i] = qv
}

// Create a QRecord
record := model.NewQRecord(len(qValues))
for i, qv := range qValues {
record.Set(i, qv)
}

records = append(records, record)
records = append(records, qValues)
totalRowsProcessed += 1

if totalRowsProcessed%heartBeatNumRows == 0 {
Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}

return model.QRecordOrError{
Record: model.QRecord{
NumEntries: 8,
Entries: entries[:],
},
Record: entries[:],
}
}
12 changes: 3 additions & 9 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
return nil, fmt.Errorf("failed to run command: %w", err)
}

var records []model.QRecord
var records [][]qvalue.QValue
for {
var row []bigquery.Value
err := it.Next(&row)
Expand All @@ -355,13 +355,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
qValues[i] = qv
}

// Create a QRecord
record := model.NewQRecord(len(qValues))
for i, qv := range qValues {
record.Set(i, qv)
}

records = append(records, record)
records = append(records, qValues)
}

// Now you should fill the column names as well. Here we assume the schema is
Expand Down Expand Up @@ -518,5 +512,5 @@ func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) {
return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords)
}

return recordBatch.Records[0].Entries[0].Value.(int64), nil
return recordBatch.Records[0][0].Value.(int64), nil
}
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, va
return fmt.Errorf("json value check failed: %v", err)
}

jsonVal := res.Records[0].Entries[0].Value
jsonVal := res.Records[0][0].Value
if jsonVal != value {
return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel
recordCount := 0

for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
for _, entry := range record {
if entry.Kind == qvalue.QValueKindBoolean {
isDeleteVal, ok := entry.Value.(bool)
if !(ok && isDeleteVal) {
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, va
return fmt.Errorf("json value check failed: %v", err)
}

jsonVal := res.Records[0].Entries[0].Value
jsonVal := res.Records[0][0].Value
if jsonVal != value {
return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value)
}
Expand Down
16 changes: 8 additions & 8 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,21 @@ func (s *SnowflakeTestHelper) RunIntQuery(query string) (int, error) {
}

rec := rows.Records[0]
if rec.NumEntries != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, rec.NumEntries)
if len(rec) != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, len(rec))
}

switch rec.Entries[0].Kind {
switch rec[0].Kind {
case qvalue.QValueKindInt32:
return int(rec.Entries[0].Value.(int32)), nil
return int(rec[0].Value.(int32)), nil
case qvalue.QValueKindInt64:
return int(rec.Entries[0].Value.(int64)), nil
return int(rec[0].Value.(int64)), nil
case qvalue.QValueKindNumeric:
// get big.Rat and convert to int
rat := rec.Entries[0].Value.(*big.Rat)
rat := rec[0].Value.(*big.Rat)
return int(rat.Num().Int64() / rat.Denom().Int64()), nil
default:
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec.Entries[0].Kind)
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec[0].Kind)
}
}

Expand All @@ -185,7 +185,7 @@ func (s *SnowflakeTestHelper) checkSyncedAt(query string) error {
}

for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
for _, entry := range record {
if entry.Kind != qvalue.QValueKindTimestamp {
return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp")
}
Expand Down
11 changes: 6 additions & 5 deletions flow/e2eshared/e2eshared.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/jackc/pgx/v5/pgxpool"

"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

type Suite interface {
Expand Down Expand Up @@ -68,16 +69,16 @@ func ReadFileToBytes(path string) ([]byte, error) {
}

// checks if two QRecords are identical
func CheckQRecordEquality(t *testing.T, q model.QRecord, other model.QRecord) bool {
func CheckQRecordEquality(t *testing.T, q []qvalue.QValue, other []qvalue.QValue) bool {
t.Helper()

if q.NumEntries != other.NumEntries {
t.Logf("unequal entry count: %d != %d", q.NumEntries, other.NumEntries)
if len(q) != len(other) {
t.Logf("unequal entry count: %d != %d", len(q), len(other))
return false
}

for i, entry := range q.Entries {
otherEntry := other.Entries[i]
for i, entry := range q {
otherEntry := other[i]
if !entry.Equals(otherEntry) {
t.Logf("entry %d: %v != %v", i, entry, otherEntry)
return false
Expand Down
8 changes: 4 additions & 4 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@ import (
)

type QRecordAvroConverter struct {
QRecord QRecord
QRecord []qvalue.QValue
TargetDWH qvalue.QDWHType
NullableFields map[string]struct{}
ColNames []string
}

func NewQRecordAvroConverter(
q QRecord,
q []qvalue.QValue,
targetDWH qvalue.QDWHType,
nullableFields map[string]struct{},
colNames []string,
Expand All @@ -31,12 +31,12 @@ func NewQRecordAvroConverter(
func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) {
m := map[string]interface{}{}

for idx := range qac.QRecord.Entries {
for idx, val := range qac.QRecord {
key := qac.ColNames[idx]
_, nullable := qac.NullableFields[key]

avroConverter := qvalue.NewQValueAvroConverter(
qac.QRecord.Entries[idx],
val,
qac.TargetDWH,
nullable,
)
Expand Down
23 changes: 0 additions & 23 deletions flow/model/qrecord.go

This file was deleted.

Loading

0 comments on commit c78ff84

Please sign in to comment.