Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace model.QRecord with []qvalue.QValue #1142

Merged
merged 1 commit into from
Jan 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions flow/activities/flowable.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,10 +590,9 @@ func (a *FlowableActivity) replicateQRepPartition(ctx context.Context,
a.Alerter.LogFlowError(ctx, config.FlowJobName, err)
return fmt.Errorf("failed to pull qrep records: %w", err)
}
numRecords := int64(recordBatch.NumRecords)
slog.InfoContext(ctx, fmt.Sprintf("pulled %d records\n", len(recordBatch.Records)))

err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, numRecords)
err = monitoring.UpdatePullEndTimeAndRowsForPartition(ctx, a.CatalogPool, runUUID, partition, int64(len(recordBatch.Records)))
if err != nil {
return err
}
Expand Down
28 changes: 12 additions & 16 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (qe *QRepQueryExecutor) ProcessRows(
fieldDescriptions []pgconn.FieldDescription,
) (*model.QRecordBatch, error) {
// Initialize the record slice
records := make([]model.QRecord, 0)
records := make([][]qvalue.QValue, 0)
Copy link
Contributor

@heavycrystal heavycrystal Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

will help readability if we do something like this

type QValueArr []qvalue.QValue

Copy link
Contributor Author

@serprex serprex Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I disagree, moving [] to the end as Arr adds little & requires wrapping the type while passing it around, when we just want []qvalue.QValue anyways

I did consider putting type QRecord []qvalue.QValue in model, but seems like it obfuscates the truth. I'm generally annoyed when I'm using a library & it turns out they've wrapped []T into some other type. I'd go as far as to say I'd prefer a type QRecord struct { Values []qvalue.QValue } over a type alias when it comes to wrapping slice types

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me it's that double slices read weird, and having an alias for that would help. Fair point on requiring type wrapping though, would a type alias work better? https://stackoverflow.com/questions/61247864/what-is-the-difference-between-type-alias-and-type-definition-in-go

Approving anyway as this is primarily a stylistic difference.

Copy link
Contributor Author

@serprex serprex Jan 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Type alias is type wrapping, it's specifically what I want to avoid since it makes a type that you can use slice operators on without being an explicit slice type, so you'll end up casting it to slice when you want to do other things like call functions from slices on it

Double slice is indeed awkward, but it let's someone know they should get used to dealing with nested slices since signature or not that's what they're dealing with

qe.logger.Info("Processing rows")
// Iterate over the rows
for rows.Next() {
Expand All @@ -148,12 +148,11 @@ func (qe *QRepQueryExecutor) ProcessRows(
}

batch := &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: qe.fieldDescriptionsToSchema(fieldDescriptions),
Records: records,
Schema: qe.fieldDescriptionsToSchema(fieldDescriptions),
}

qe.logger.Info(fmt.Sprintf("[postgres] pulled %d records", batch.NumRecords))
qe.logger.Info(fmt.Sprintf("[postgres] pulled %d records", len(batch.Records)))

return batch, nil
}
Expand Down Expand Up @@ -283,9 +282,8 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
return nil, fmt.Errorf("failed to get schema from stream: %w", schema.Err)
}
batch := &model.QRecordBatch{
NumRecords: 0,
Records: make([]model.QRecord, 0),
Schema: schema.Schema,
Records: make([][]qvalue.QValue, 0),
Schema: schema.Schema,
}
for record := range stream.Records {
if record.Err == nil {
Expand All @@ -294,7 +292,6 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQuery(
return nil, fmt.Errorf("[pg] failed to get record from stream: %w", record.Err)
}
}
batch.NumRecords = uint32(len(batch.Records))
return batch, nil
}
}
Expand Down Expand Up @@ -437,14 +434,14 @@ func (qe *QRepQueryExecutor) ExecuteAndProcessQueryStreamWithTx(

func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
customTypeMap map[uint32]string,
) (model.QRecord, error) {
) ([]qvalue.QValue, error) {
// make vals an empty array of QValue of size len(fds)
record := model.NewQRecord(len(fds))
record := make([]qvalue.QValue, len(fds))

values, err := row.Values()
if err != nil {
slog.Error("[pg_query_executor] failed to get values from row", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to scan row: %w", err)
return nil, fmt.Errorf("failed to scan row: %w", err)
}

for i, fd := range fds {
Expand All @@ -454,9 +451,9 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
tmp, err := parseFieldFromPostgresOID(fd.DataTypeOID, values[i])
if err != nil {
slog.Error("[pg_query_executor] failed to parse field", slog.Any("error", err))
return model.QRecord{}, fmt.Errorf("failed to parse field: %w", err)
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, tmp)
record[i] = tmp
} else {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
Expand All @@ -468,11 +465,10 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
values[i] = wkt
}
}
customTypeVal := qvalue.QValue{
record[i] = qvalue.QValue{
Kind: customQKind,
Value: values[i],
}
record.Set(i, customTypeVal)
}
}

Expand Down
40 changes: 20 additions & 20 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,8 @@ func TestExecuteAndProcessQuery(t *testing.T) {
t.Fatalf("expected 1 record, got %v", len(batch.Records))
}

if batch.Records[0].Entries[1].Value != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0].Entries[0].Value)
if batch.Records[0][1].Value != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0][0].Value)
}
}

Expand Down Expand Up @@ -212,52 +212,52 @@ func TestAllDataTypes(t *testing.T) {
record := batch.Records[0]

expectedBool := true
if record.Entries[0].Value.(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record.Entries[0].Value)
if record[0].Value.(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record[0].Value)
}

expectedInt4 := int32(2)
if record.Entries[1].Value.(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record.Entries[1].Value)
if record[1].Value.(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record[1].Value)
}

expectedInt8 := int64(3)
if record.Entries[2].Value.(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record.Entries[2].Value)
if record[2].Value.(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record[2].Value)
}

expectedFloat4 := float32(1.1)
if record.Entries[3].Value.(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record.Entries[3].Value)
if record[3].Value.(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record[3].Value)
}

expectedFloat8 := float64(2.2)
if record.Entries[4].Value.(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record.Entries[4].Value)
if record[4].Value.(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record[4].Value)
}

expectedText := "text"
if record.Entries[5].Value.(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record.Entries[5].Value)
if record[5].Value.(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record[5].Value)
}

expectedBytea := []byte("bytea")
if !bytes.Equal(record.Entries[6].Value.([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record.Entries[6].Value)
if !bytes.Equal(record[6].Value.([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record[6].Value)
}

expectedJSON := `{"key":"value"}`
if record.Entries[7].Value.(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record.Entries[7].Value)
if record[7].Value.(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record[7].Value)
}

actualUUID := record.Entries[8].Value.([16]uint8)
actualUUID := record[8].Value.([16]uint8)
if !bytes.Equal(actualUUID[:], savedUUID[:]) {
t.Fatalf("expected %v, got %v", savedUUID, actualUUID)
}

expectedNumeric := "123.456"
actualNumeric := record.Entries[10].Value.(*big.Rat).FloatString(3)
actualNumeric := record[10].Value.(*big.Rat).FloatString(3)
if actualNumeric != expectedNumeric {
t.Fatalf("expected %v, got %v", expectedNumeric, actualNumeric)
}
Expand Down
9 changes: 3 additions & 6 deletions flow/connectors/snowflake/avro_file_writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,8 @@ func generateRecords(

// Create sample records
records := &model.QRecordBatch{
NumRecords: numRows,
Records: make([]model.QRecord, numRows),
Schema: schema,
Records: make([][]qvalue.QValue, numRows),
Schema: schema,
}

for i, kind := range allQValueKinds {
Expand All @@ -121,9 +120,7 @@ func generateRecords(
}
}

records.Records[row] = model.QRecord{
Entries: entries,
}
records.Records[row] = entries
}

stream, err := records.ToQRecordStream(1024)
Expand Down
15 changes: 4 additions & 11 deletions flow/connectors/sql/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qfields[i] = qfield
}

var records []model.QRecord
var records [][]qvalue.QValue
totalRowsProcessed := 0
const heartBeatNumRows = 25000

Expand Down Expand Up @@ -237,13 +237,7 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa
qValues[i] = qv
}

// Create a QRecord
record := model.NewQRecord(len(qValues))
for i, qv := range qValues {
record.Set(i, qv)
}

records = append(records, record)
records = append(records, qValues)
totalRowsProcessed += 1

if totalRowsProcessed%heartBeatNumRows == 0 {
Expand All @@ -258,9 +252,8 @@ func (g *GenericSQLQueryExecutor) processRows(rows *sqlx.Rows) (*model.QRecordBa

// Return a QRecordBatch
return &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: model.NewQRecordSchema(qfields),
Records: records,
Schema: model.NewQRecordSchema(qfields),
}, nil
}

Expand Down
5 changes: 1 addition & 4 deletions flow/connectors/utils/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,9 +183,6 @@ func recordToQRecordOrError(tableMapping map[string]uint32, batchID int64, recor
}

return model.QRecordOrError{
Record: model.QRecord{
NumEntries: 8,
Entries: entries[:],
},
Record: entries[:],
}
}
21 changes: 7 additions & 14 deletions flow/e2e/bigquery/bigquery_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
return nil, fmt.Errorf("failed to run command: %w", err)
}

var records []model.QRecord
var records [][]qvalue.QValue
for {
var row []bigquery.Value
err := it.Next(&row)
Expand All @@ -355,13 +355,7 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor
qValues[i] = qv
}

// Create a QRecord
record := model.NewQRecord(len(qValues))
for i, qv := range qValues {
record.Set(i, qv)
}

records = append(records, record)
records = append(records, qValues)
}

// Now you should fill the column names as well. Here we assume the schema is
Expand All @@ -376,9 +370,8 @@ func (b *BigQueryTestHelper) ExecuteAndProcessQuery(query string) (*model.QRecor

// Return a QRecordBatch
return &model.QRecordBatch{
NumRecords: uint32(len(records)),
Records: records,
Schema: schema,
Records: records,
Schema: schema,
}, nil
}

Expand Down Expand Up @@ -514,9 +507,9 @@ func (b *BigQueryTestHelper) RunInt64Query(query string) (int64, error) {
if err != nil {
return 0, fmt.Errorf("could not execute query: %w", err)
}
if recordBatch.NumRecords != 1 {
return 0, fmt.Errorf("expected only 1 record, got %d", recordBatch.NumRecords)
if len(recordBatch.Records) != 1 {
return 0, fmt.Errorf("expected only 1 record, got %d", len(recordBatch.Records))
}

return recordBatch.Records[0].Entries[0].Value.(int64), nil
return recordBatch.Records[0][0].Value.(int64), nil
}
4 changes: 2 additions & 2 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, va
return fmt.Errorf("json value check failed: %v", err)
}

jsonVal := res.Records[0].Entries[0].Value
jsonVal := res.Records[0][0].Value
if jsonVal != value {
return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value)
}
Expand Down Expand Up @@ -114,7 +114,7 @@ func (s *PeerFlowE2ETestSuiteBQ) checkPeerdbColumns(dstQualified string, softDel
recordCount := 0

for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
for _, entry := range record {
if entry.Kind == qvalue.QValueKindBoolean {
isDeleteVal, ok := entry.Value.(bool)
if !(ok && isDeleteVal) {
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/snowflake/qrep_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (s PeerFlowE2ETestSuiteSF) checkJSONValue(tableName, colName, fieldName, va
return fmt.Errorf("json value check failed: %v", err)
}

jsonVal := res.Records[0].Entries[0].Value
jsonVal := res.Records[0][0].Value
if jsonVal != value {
return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value)
}
Expand Down
16 changes: 8 additions & 8 deletions flow/e2e/snowflake/snowflake_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -159,21 +159,21 @@ func (s *SnowflakeTestHelper) RunIntQuery(query string) (int, error) {
}

rec := rows.Records[0]
if rec.NumEntries != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, rec.NumEntries)
if len(rec) != 1 {
return 0, fmt.Errorf("failed to execute query: %s, returned %d != 1 columns", query, len(rec))
}

switch rec.Entries[0].Kind {
switch rec[0].Kind {
case qvalue.QValueKindInt32:
return int(rec.Entries[0].Value.(int32)), nil
return int(rec[0].Value.(int32)), nil
case qvalue.QValueKindInt64:
return int(rec.Entries[0].Value.(int64)), nil
return int(rec[0].Value.(int64)), nil
case qvalue.QValueKindNumeric:
// get big.Rat and convert to int
rat := rec.Entries[0].Value.(*big.Rat)
rat := rec[0].Value.(*big.Rat)
return int(rat.Num().Int64() / rat.Denom().Int64()), nil
default:
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec.Entries[0].Kind)
return 0, fmt.Errorf("failed to execute query: %s, returned value of type %s", query, rec[0].Kind)
}
}

Expand All @@ -185,7 +185,7 @@ func (s *SnowflakeTestHelper) checkSyncedAt(query string) error {
}

for _, record := range recordBatch.Records {
for _, entry := range record.Entries {
for _, entry := range record {
if entry.Kind != qvalue.QValueKindTimestamp {
return fmt.Errorf("synced_at column check failed: _PEERDB_SYNCED_AT is not timestamp")
}
Expand Down
Loading
Loading