Skip to content

Commit

Permalink
Replace model.QRecord with []qvalue.QValue (#1142)
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex authored Jan 24, 2024
1 parent 1370b0e commit 8e08a74
Show file tree
Hide file tree
Showing 16 changed files with 94 additions and 169 deletions.
3 changes: 1 addition & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,9 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull qrep records: %w", err)
}
numRecords := int64(recordBatch.NumRecords)
slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records)))

err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, int64(len(recordBatch.Records)))
if err != nil {
return err
}
Expand Down
28 changes: 12 additions & 16 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
fieldDescriptions []pgconn.FieldDescription,
) (*model.QRecordBatch, error) {
// Initialize the record slice
records := make([]model.QRecord, 0)
records := make([][]qvalue.QValue, 0)
qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
Expand All @@ -148,12 +148,11 @@ func (qe *QRepQueryExecutor) ProcessRows(
}

batch := &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: qe.fieldDescriptionsToSchema(fieldDescriptions),
Records: records,
Schema: qe.fieldDescriptionsToSchema(fieldDescriptions),
}

qe.logger.Info(fmt.Sprintf("[postgres] pulled %d records", batch.NumRecords))
qe.logger.Info(fmt.Sprintf("[postgres] pulled %d records", len(batch.Records)))

return batch, nil
}
Expand Down Expand Up @@ -283,9 +282,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
return nil, fmt.Errorf("failed to get schema from stream: %w", schema.Err)
}
batch := &model.QRecordBatch{
NumRecords: 0,
Records: make([]model.QRecord, 0),
Schema: schema.Schema,
Records: make([][]qvalue.QValue, 0),
Schema: schema.Schema,
}
for record := range stream.Records {
if record.Err == nil {
Expand All @@ -294,7 +292,6 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
return nil, fmt.Errorf("[pg] failed to get record from stream: %w", record.Err)
}
}
batch.NumRecords = uint32(len(batch.Records))
return batch, nil
}
}
Expand Down Expand Up @@ -437,14 +434,14 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string,
) (model.QRecord, error) {
) ([]qvalue.QValue, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))
record := make([]qvalue.QValue, len(fds))

values, err := row.Values()
if err != nil {
slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err)
return nil, fmt.Errorf("failed to scan row: %w", err)
}

for i, fd := range fds {
Expand All @@ -454,9 +451,9 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err)
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, tmp)
record[i] = tmp
} else {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
Expand All @@ -468,11 +465,10 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
values[i] = wkt
}
}
customTypeVal := qvalue.QValue{
record[i] = qvalue.QValue{
Kind: customQKind,
Value: values[i],
}
record.Set(i, customTypeVal)
}
}

Expand Down
40 changes: 20 additions & 20 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func TestExecuteAndProcessQuery(t *testing.T) {
t.Fatalf("expected 1 record, got %v", len(batch.Records))
}

if batch.Records[0].Entries[1].Value != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0].Entries[0].Value)
if batch.Records[0][1].Value != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0][0].Value)
}
}

Expand Down Expand Up @@ -212,52 +212,52 @@ func TestAllDataTypes(t *testing.T) {
record := batch.Records[0]

expectedBool := true
if record.Entries[0].Value.(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record.Entries[0].Value)
if record[0].Value.(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record[0].Value)
}

expectedInt4 := int32(2)
if record.Entries[1].Value.(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record.Entries[1].Value)
if record[1].Value.(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record[1].Value)
}

expectedInt8 := int64(3)
if record.Entries[2].Value.(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record.Entries[2].Value)
if record[2].Value.(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record[2].Value)
}

expectedFloat4 := float32(1.1)
if record.Entries[3].Value.(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record.Entries[3].Value)
if record[3].Value.(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record[3].Value)
}

expectedFloat8 := float64(2.2)
if record.Entries[4].Value.(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record.Entries[4].Value)
if record[4].Value.(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record[4].Value)
}

expectedText := "text"
if record.Entries[5].Value.(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record.Entries[5].Value)
if record[5].Value.(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record[5].Value)
}

expectedBytea := []byte("bytea")
if !bytes.Equal(record.Entries[6].Value.([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record.Entries[6].Value)
if !bytes.Equal(record[6].Value.([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record[6].Value)
}

expectedJSON := `{"key":"value"}`
if record.Entries[7].Value.(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record.Entries[7].Value)
if record[7].Value.(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record[7].Value)
}

actualUUID := record.Entries[8].Value.([16]uint8)
actualUUID := record[8].Value.([16]uint8)
if !bytes.Equal(actualUUID[:], savedUUID[:]) {
t.Fatalf("expected %v, got %v", savedUUID, actualUUID)
}

expectedNumeric := "123.456"
actualNumeric := record.Entries[10].Value.(*big.Rat).FloatString(3)
actualNumeric := record[10].Value.(*big.Rat).FloatString(3)
if actualNumeric != expectedNumeric {
t.Fatalf("expected %v, got %v", expectedNumeric, actualNumeric)
}
Expand Down
9 changes: 3 additions & 6 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ func generateRecords(

// Create sample records
records := &model.QRecordBatch{
NumRecords: numRows,
Records: make([]model.QRecord, numRows),
Schema: schema,
Records: make([][]qvalue.QValue, numRows),
Schema: schema,
}

for i, kind := range allQValueKinds {
Expand All @@ -121,9 +120,7 @@ func generateRecords(
}
}

records.Records[row] = model.QRecord{
Entries: entries,
}
records.Records[row] = entries
}

stream, err := records.ToQRecordStream(1024)
Expand Down
15 changes: 4 additions & 11 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qfields[i] = qfield
}

var records []model.QRecord
var records [][]qvalue.QValue
totalRowsProcessed := 0
const heartBeatNumRows = 25000

Expand Down Expand Up @@ -237,13 +237,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qValues[i] = qv
}

// Create a QRecord
record := model.NewQRecord(len(qValues))
for i, qv := range qValues {
record.Set(i, qv)
}

records = append(records, record)
records = append(records, qValues)
totalRowsProcessed += 1

if totalRowsProcessed%heartBeatNumRows == 0 {
Expand All @@ -258,9 +252,8 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa

// Return a QRecordBatch
return &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: model.NewQRecordSchema(qfields),
Records: records,
Schema: model.NewQRecordSchema(qfields),
}, nil
}

Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}

return model.QRecordOrError{
Record: model.QRecord{
NumEntries: 8,
Entries: entries[:],
},
Record: entries[:],
}
}
21 changes: 7 additions & 14 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
return nil, fmt.Errorf("failed to run command: %w", err)
}

var records []model.QRecord
var records [][]qvalue.QValue
for {
var row []bigquery.Value
err := it.Next(&row)
Expand All @@ -357,13 +357,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
qValues[i] = qv
}

// Create a QRecord
record := model.NewQRecord(len(qValues))
for i, qv := range qValues {
record.Set(i, qv)
}

records = append(records, record)
records = append(records, qValues)
}

// Now you should fill the column names as well. Here we assume the schema is
Expand All @@ -378,9 +372,8 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor

// Return a QRecordBatch
return &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: schema,
Records: records,
Schema: schema,
}, nil
}

Expand Down Expand Up @@ -516,9 +509,9 @@ func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) {
if err != nil {
return 0, fmt.Errorf("could not execute query: %w", err)
}
if recordBatch.NumRecords != 1 {
return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords)
if len(recordBatch.Records) != 1 {
return 0, fmt.Errorf("expected only 1 record, got %d", len(recordBatch.Records))
}

return recordBatch.Records[0].Entries[0].Value.(int64), nil
return recordBatch.Records[0][0].Value.(int64), nil
}
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, va
return fmt.Errorf("json value check failed: %v", err)
}

jsonVal := res.Records[0].Entries[0].Value
jsonVal := res.Records[0][0].Value
if jsonVal != value {
return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel
recordCount := 0

for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
for _, entry := range record {
if entry.Kind == qvalue.QValueKindBoolean {
isDeleteVal, ok := entry.Value.(bool)
if !(ok && isDeleteVal) {
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, va
return fmt.Errorf("json value check failed: %v", err)
}

jsonVal := res.Records[0].Entries[0].Value
jsonVal := res.Records[0][0].Value
if jsonVal != value {
return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value)
}
Expand Down
16 changes: 8 additions & 8 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,21 @@ func (s *SnowflakeTestHelper) RunIntQuery(query string) (int, error) {
}

rec := rows.Records[0]
if rec.NumEntries != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, rec.NumEntries)
if len(rec) != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, len(rec))
}

switch rec.Entries[0].Kind {
switch rec[0].Kind {
case qvalue.QValueKindInt32:
return int(rec.Entries[0].Value.(int32)), nil
return int(rec[0].Value.(int32)), nil
case qvalue.QValueKindInt64:
return int(rec.Entries[0].Value.(int64)), nil
return int(rec[0].Value.(int64)), nil
case qvalue.QValueKindNumeric:
// get big.Rat and convert to int
rat := rec.Entries[0].Value.(*big.Rat)
rat := rec[0].Value.(*big.Rat)
return int(rat.Num().Int64() / rat.Denom().Int64()), nil
default:
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec.Entries[0].Kind)
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec[0].Kind)
}
}

Expand All @@ -185,7 +185,7 @@ func (s *SnowflakeTestHelper) checkSyncedAt(query string) error {
}

for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
for _, entry := range record {
if entry.Kind != qvalue.QValueKindTimestamp {
return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp")
}
Expand Down
Loading

0 comments on commit 8e08a74

Please sign in to comment.