Skip to content

Commit

Permalink
Change QValue into an interface (#1528)
Browse files Browse the repository at this point in the history
Existing QValue was troublesome,
an artifact of wanting tagged unions,
but result was having to dispatch on tag & then dispatch again on type,
not only in terms of perf, but in terms of what can be in Value:
UUID could be string, [16]byte, or uuid.UUID

This problem was not unique to UUID

This isn't an attempt at a final QValue representation,
long term I'd like to get rid of QValueKind altogether

Not happy with duplication around time types,
ideally we'd have 1 time type with a kind tag,
maybe a QTime interface the types share
  • Loading branch information
serprex authored Mar 25, 2024
1 parent a735989 commit c27afd6
Show file tree
Hide file tree
Showing 34 changed files with 1,720 additions and 1,695 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
stream, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}
Expand All @@ -384,7 +384,7 @@ func (c *BigQueryConnector) syncRecordsViaAvro(
}

res, err := avroSync.SyncRecords(ctx, req, rawTableName,
rawTableMetadata, syncBatchID, streamRes.Stream, streamReq.TableMapping)
rawTableMetadata, syncBatchID, stream, streamReq.TableMapping)
if err != nil {
return nil, fmt.Errorf("failed to sync records via avro: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/clickhouse/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
) (*model.SyncResponse, error) {
tableNameRowsMapping := utils.InitialiseTableRowsMap(req.TableMappings)
streamReq := model.NewRecordsToStreamRequest(req.Records.GetRecords(), tableNameRowsMapping, syncBatchID)
streamRes, err := utils.RecordsToRawTableStream(streamReq)
stream, err := utils.RecordsToRawTableStream(streamReq)
if err != nil {
return nil, fmt.Errorf("failed to convert records to raw table stream: %w", err)
}
Expand All @@ -98,7 +98,7 @@ func (c *ClickhouseConnector) syncRecordsViaAvro(
return nil, err
}

numRecords, err := avroSyncer.SyncRecords(ctx, destinationTableSchema, streamRes.Stream, req.FlowJobName)
numRecords, err := avroSyncer.SyncRecords(ctx, destinationTableSchema, stream, req.FlowJobName)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/eventhub/eventhub.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,7 @@ func (c *EventHubConnector) processBatch(
// partition_column is the column in the table that is used to determine
// the partition key for the eventhub.
partitionColumn := destination.PartitionKeyColumn
partitionValue := record.GetItems().GetColumnValue(partitionColumn).Value
partitionValue := record.GetItems().GetColumnValue(partitionColumn).Value()
var partitionKey string
if partitionValue != nil {
partitionKey = fmt.Sprint(partitionValue)
Expand Down
49 changes: 28 additions & 21 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -607,7 +607,7 @@ func (p *PostgresCDCSource) convertTupleToMap(
}
switch col.DataType {
case 'n': // null
val := qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
val := qvalue.QValueNull(qvalue.QValueKindInvalid)
items.AddColumn(colName, val)
case 't': // text
/* bytea also appears here as a hex */
Expand Down Expand Up @@ -649,48 +649,55 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
// but not representable by time.Time
p.logger.Warn(fmt.Sprintf("Invalidated and hence nulled %s data: %s",
dt.Name, string(data)))
return qvalue.QValue{}, nil
switch dt.Name {
case "time":
return qvalue.QValueNull(qvalue.QValueKindTime), nil
case "timetz":
return qvalue.QValueNull(qvalue.QValueKindTimeTZ), nil
case "timestamp":
return qvalue.QValueNull(qvalue.QValueKindTimestamp), nil
case "timestamptz":
return qvalue.QValueNull(qvalue.QValueKindTimestampTZ), nil
}
}
return qvalue.QValue{}, err
return nil, err
}
retVal, err := p.parseFieldFromPostgresOID(dataType, parsedData)
if err != nil {
return qvalue.QValue{}, err
return nil, err
}
return retVal, nil
} else if dataType == uint32(oid.T_timetz) { // ugly TIMETZ workaround for CDC decoding.
retVal, err := p.parseFieldFromPostgresOID(dataType, string(data))
if err != nil {
return qvalue.QValue{}, err
return nil, err
}
return retVal, nil
}

typeName, ok := p.customTypesMapping[dataType]
if ok {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
switch customQKind {
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry:
wkt, err := geo.GeoValidate(string(data))
if err != nil {
return qvalue.QValue{
Kind: customQKind,
Value: nil,
}, nil
return qvalue.QValueNull(customQKind), nil
} else if customQKind == qvalue.QValueKindGeography {
return qvalue.QValueGeography{Val: wkt}, nil
} else {
return qvalue.QValue{
Kind: customQKind,
Value: wkt,
}, nil
return qvalue.QValueGeometry{Val: wkt}, nil
}
} else {
return qvalue.QValue{
Kind: customQKind,
Value: string(data),
}, nil
case qvalue.QValueKindHStore:
return qvalue.QValueHStore{Val: string(data)}, nil
case qvalue.QValueKindString:
return qvalue.QValueString{Val: string(data)}, nil
default:
return nil, fmt.Errorf("unknown custom qkind: %s", customQKind)
}
}

return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
return qvalue.QValueString{Val: string(data)}, nil
}

func (p *PostgresCDCSource) auditSchemaDelta(ctx context.Context, flowJobName string, rec *model.RelationRecord) error {
Expand Down Expand Up @@ -795,7 +802,7 @@ func (p *PostgresCDCSource) recToTablePKey(req *model.PullRecordsRequest,
if err != nil {
return nil, fmt.Errorf("error getting pkey column value: %w", err)
}
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value)))
pkeyColsMerged = append(pkeyColsMerged, []byte(fmt.Sprint(pkeyColVal.Value())))
}

return &model.TableWithPkey{
Expand Down
16 changes: 8 additions & 8 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -415,8 +415,8 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
var row []any
switch typedRecord := record.(type) {
case *model.InsertRecord:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(model.ToJSONOptions{
UnnestColumns: nil,
HStoreAsJSON: false,
})
if err != nil {
Expand All @@ -435,15 +435,15 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
}

case *model.UpdateRecord:
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
newItemsJSON, err := typedRecord.NewItems.ToJSONWithOptions(model.ToJSONOptions{
UnnestColumns: nil,
HStoreAsJSON: false,
})
if err != nil {
return nil, fmt.Errorf("failed to serialize update record new items to JSON: %w", err)
}
oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
oldItemsJSON, err := typedRecord.OldItems.ToJSONWithOptions(model.ToJSONOptions{
UnnestColumns: nil,
HStoreAsJSON: false,
})
if err != nil {
Expand All @@ -462,8 +462,8 @@ func (c *PostgresConnector) SyncRecords(ctx context.Context, req *model.SyncReco
}

case *model.DeleteRecord:
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(&model.ToJSONOptions{
UnnestColumns: map[string]struct{}{},
itemsJSON, err := typedRecord.Items.ToJSONWithOptions(model.ToJSONOptions{
UnnestColumns: nil,
HStoreAsJSON: false,
})
if err != nil {
Expand Down
29 changes: 18 additions & 11 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -458,19 +458,26 @@ func (qe *QRepQueryExecutor) mapRowToQRecord(
record[i] = tmp
} else {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
wkbString, ok := values[i].(string)
wkt, err := geo.GeoValidate(wkbString)
if err != nil || !ok {
values[i] = nil
} else {
values[i] = wkt
if values[i] == nil {
record[i] = qvalue.QValueNull(customQKind)
} else {
switch customQKind {
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry:
wkbString, ok := values[i].(string)
wkt, err := geo.GeoValidate(wkbString)
if err != nil || !ok {
record[i] = qvalue.QValueNull(qvalue.QValueKindGeography)
} else if customQKind == qvalue.QValueKindGeography {
record[i] = qvalue.QValueGeography{Val: wkt}
} else {
record[i] = qvalue.QValueGeometry{Val: wkt}
}
case qvalue.QValueKindHStore:
record[i] = qvalue.QValueHStore{Val: fmt.Sprint(values[i])}
case qvalue.QValueKindString:
record[i] = qvalue.QValueString{Val: fmt.Sprint(values[i])}
}
}
record[i] = qvalue.QValue{
Kind: customQKind,
Value: values[i],
}
}
}

Expand Down
40 changes: 20 additions & 20 deletions flow/connectors/postgres/qrep_query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,8 +75,8 @@ func TestExecuteAndProcessQuery(t *testing.T) {
t.Fatalf("expected 1 record, got %v", len(batch.Records))
}

if batch.Records[0][1].Value != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0][0].Value)
if batch.Records[0][1].Value() != "testdata" {
t.Fatalf("expected 'testdata', got %v", batch.Records[0][0].Value())
}
}

Expand Down Expand Up @@ -189,52 +189,52 @@ func TestAllDataTypes(t *testing.T) {
record := batch.Records[0]

expectedBool := true
if record[0].Value.(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record[0].Value)
if record[0].Value().(bool) != expectedBool {
t.Fatalf("expected %v, got %v", expectedBool, record[0].Value())
}

expectedInt4 := int32(2)
if record[1].Value.(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record[1].Value)
if record[1].Value().(int32) != expectedInt4 {
t.Fatalf("expected %v, got %v", expectedInt4, record[1].Value())
}

expectedInt8 := int64(3)
if record[2].Value.(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record[2].Value)
if record[2].Value().(int64) != expectedInt8 {
t.Fatalf("expected %v, got %v", expectedInt8, record[2].Value())
}

expectedFloat4 := float32(1.1)
if record[3].Value.(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record[3].Value)
if record[3].Value().(float32) != expectedFloat4 {
t.Fatalf("expected %v, got %v", expectedFloat4, record[3].Value())
}

expectedFloat8 := float64(2.2)
if record[4].Value.(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record[4].Value)
if record[4].Value().(float64) != expectedFloat8 {
t.Fatalf("expected %v, got %v", expectedFloat8, record[4].Value())
}

expectedText := "text"
if record[5].Value.(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record[5].Value)
if record[5].Value().(string) != expectedText {
t.Fatalf("expected %v, got %v", expectedText, record[5].Value())
}

expectedBytea := []byte("bytea")
if !bytes.Equal(record[6].Value.([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record[6].Value)
if !bytes.Equal(record[6].Value().([]byte), expectedBytea) {
t.Fatalf("expected %v, got %v", expectedBytea, record[6].Value())
}

expectedJSON := `{"key":"value"}`
if record[7].Value.(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record[7].Value)
if record[7].Value().(string) != expectedJSON {
t.Fatalf("expected %v, got %v", expectedJSON, record[7].Value())
}

actualUUID := record[8].Value.([16]uint8)
actualUUID := record[8].Value().([16]uint8)
if !bytes.Equal(actualUUID[:], savedUUID[:]) {
t.Fatalf("expected %v, got %v", savedUUID, actualUUID)
}

expectedNumeric := "123.456"
actualNumeric := record[10].Value.(decimal.Decimal).String()
actualNumeric := record[10].Value().(decimal.Decimal).String()
if actualNumeric != expectedNumeric {
t.Fatalf("expected %v, got %v", expectedNumeric, actualNumeric)
}
Expand Down
18 changes: 9 additions & 9 deletions flow/connectors/postgres/qrep_sql_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
copySource,
)
if err != nil {
return -1, fmt.Errorf("failed to copy records into destination table: %v", err)
return -1, fmt.Errorf("failed to copy records into destination table: %w", err)
}

if syncedAtCol != "" {
Expand All @@ -104,7 +104,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
)
_, err = tx.Exec(context.Background(), updateSyncedAtStmt)
if err != nil {
return -1, fmt.Errorf("failed to update synced_at column: %v", err)
return -1, fmt.Errorf("failed to update synced_at column: %w", err)
}
}
} else {
Expand All @@ -123,7 +123,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
stagingTableName, createStagingTableStmt), syncLog)
_, err = tx.Exec(context.Background(), createStagingTableStmt)
if err != nil {
return -1, fmt.Errorf("failed to create staging table: %v", err)
return -1, fmt.Errorf("failed to create staging table: %w", err)
}

// Step 2.2: Insert records into the staging table
Expand All @@ -134,7 +134,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
copySource,
)
if err != nil || numRowsSynced != int64(copySource.NumRecords()) {
return -1, fmt.Errorf("failed to copy records into staging table: %v", err)
return -1, fmt.Errorf("failed to copy records into staging table: %w", err)
}

// construct the SET clause for the upsert operation
Expand Down Expand Up @@ -173,7 +173,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
s.connector.logger.Info("Performing upsert operation", slog.String("upsertStmt", upsertStmt), syncLog)
res, err := tx.Exec(context.Background(), upsertStmt)
if err != nil {
return -1, fmt.Errorf("failed to perform upsert operation: %v", err)
return -1, fmt.Errorf("failed to perform upsert operation: %w", err)
}

numRowsSynced = res.RowsAffected()
Expand All @@ -186,7 +186,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
s.connector.logger.Info("Dropping staging table", slog.String("stagingTable", stagingTableName), syncLog)
_, err = tx.Exec(context.Background(), dropStagingTableStmt)
if err != nil {
return -1, fmt.Errorf("failed to drop staging table: %v", err)
return -1, fmt.Errorf("failed to drop staging table: %w", err)
}
}

Expand All @@ -195,7 +195,7 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
// marshal the partition to json using protojson
pbytes, err := protojson.Marshal(partition)
if err != nil {
return -1, fmt.Errorf("failed to marshal partition to json: %v", err)
return -1, fmt.Errorf("failed to marshal partition to json: %w", err)
}

metadataTableIdentifier := pgx.Identifier{s.connector.metadataSchema, qRepMetadataTableName}
Expand All @@ -214,12 +214,12 @@ func (s *QRepStagingTableSync) SyncQRepRecords(
time.Now(),
)
if err != nil {
return -1, fmt.Errorf("failed to execute statements in a transaction: %v", err)
return -1, fmt.Errorf("failed to execute statements in a transaction: %w", err)
}

err = tx.Commit(context.Background())
if err != nil {
return -1, fmt.Errorf("failed to commit transaction: %v", err)
return -1, fmt.Errorf("failed to commit transaction: %w", err)
}

numRowsInserted := copySource.NumRecords()
Expand Down
Loading

0 comments on commit c27afd6

Please sign in to comment.