Skip to content

Commit

Permalink
qvalue: pass by value (#813)
Browse files Browse the repository at this point in the history
QValue is a string/interface{} pair
string is a pointer/int pair
interface{} is a pointer/pointer pair

Copying four words is cheap, especially when for the most part qvalues
are stored in a QRecord which will itself be passed around by pointer

Include a `QValueKindEmpty` which is used in places where previously
pointer would be nil
  • Loading branch information
serprex authored Dec 13, 2023
1 parent 5124b0d commit 9cb34e5
Show file tree
Hide file tree
Showing 10 changed files with 92 additions and 91 deletions.
22 changes: 11 additions & 11 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,11 +640,11 @@ func (p *PostgresCDCSource) convertTupleToMap(
) (*model.RecordItems, map[string]struct{}, error) {
// if the tuple is nil, return an empty map
if tuple == nil {
return model.NewRecordItems(), make(map[string]struct{}), nil
return model.NewRecordItems(0), make(map[string]struct{}), nil
}

// create empty map of string to interface{}
items := model.NewRecordItems()
items := model.NewRecordItems(len(tuple.Columns))
unchangedToastColumns := make(map[string]struct{})

for idx, col := range tuple.Columns {
Expand All @@ -654,7 +654,7 @@ func (p *PostgresCDCSource) convertTupleToMap(
}
switch col.DataType {
case 'n': // null
val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
val := qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil}
items.AddColumn(colName, val)
case 't': // text
/* bytea also appears here as a hex */
Expand All @@ -678,7 +678,7 @@ func (p *PostgresCDCSource) convertTupleToMap(
return items, unchangedToastColumns, nil
}

func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, formatCode int16) (*qvalue.QValue, error) {
func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, formatCode int16) (qvalue.QValue, error) {
var parsedData any
var err error
if dt, ok := p.typeMap.TypeForOID(dataType); ok {
Expand All @@ -689,17 +689,17 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
parsedData, err = dt.Codec.DecodeValue(p.typeMap, dataType, formatCode, data)
}
if err != nil {
return nil, err
return qvalue.QValue{}, err
}
retVal, err := parseFieldFromPostgresOID(dataType, parsedData)
if err != nil {
return nil, err
return qvalue.QValue{}, err
}
return retVal, nil
} else if dataType == uint32(oid.T_timetz) { // ugly TIMETZ workaround for CDC decoding.
retVal, err := parseFieldFromPostgresOID(dataType, string(data))
if err != nil {
return nil, err
return qvalue.QValue{}, err
}
return retVal, nil
}
Expand All @@ -710,25 +710,25 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
wkt, err := GeoValidate(string(data))
if err != nil {
return &qvalue.QValue{
return qvalue.QValue{
Kind: customQKind,
Value: nil,
}, nil
} else {
return &qvalue.QValue{
return qvalue.QValue{
Kind: customQKind,
Value: wkt,
}, nil
}
} else {
return &qvalue.QValue{
return qvalue.QValue{
Kind: customQKind,
Value: string(data),
}, nil
}
}

return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil
}

func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.RelationMessage {
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription,
if err != nil {
return nil, fmt.Errorf("failed to parse field: %w", err)
}
record.Set(i, *tmp)
record.Set(i, tmp)
} else {
customQKind := customTypeToQKind(typeName)
if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry {
Expand Down
114 changes: 57 additions & 57 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,48 +144,48 @@ func qValueKindToPostgresType(qvalueKind string) string {
}
}

func parseJSON(value interface{}) (*qvalue.QValue, error) {
func parseJSON(value interface{}) (qvalue.QValue, error) {
jsonVal, err := json.Marshal(value)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON: %w", err)
return qvalue.QValue{}, fmt.Errorf("failed to parse JSON: %w", err)
}
return &qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil
return qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil
}

func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (*qvalue.QValue, error) {
var val *qvalue.QValue = nil
func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) {
val := qvalue.QValue{}

if value == nil {
val = &qvalue.QValue{Kind: qvalueKind, Value: nil}
val = qvalue.QValue{Kind: qvalueKind, Value: nil}
return val, nil
}

switch qvalueKind {
case qvalue.QValueKindTimestamp:
timestamp := value.(time.Time)
val = &qvalue.QValue{Kind: qvalue.QValueKindTimestamp, Value: timestamp}
val = qvalue.QValue{Kind: qvalue.QValueKindTimestamp, Value: timestamp}
case qvalue.QValueKindTimestampTZ:
timestamp := value.(time.Time)
val = &qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp}
val = qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp}
case qvalue.QValueKindDate:
date := value.(time.Time)
val = &qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date}
val = qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date}
case qvalue.QValueKindTime:
timeVal := value.(pgtype.Time)
if timeVal.Valid {
var timeValStr any
timeValStr, err := timeVal.Value()
if err != nil {
return nil, fmt.Errorf("failed to parse time: %w", err)
return qvalue.QValue{}, fmt.Errorf("failed to parse time: %w", err)
}
// edge case, only Postgres supports this extreme value for time
timeValStr = strings.Replace(timeValStr.(string), "24:00:00.000000", "23:59:59.999999", 1)
t, err := time.Parse("15:04:05.999999", timeValStr.(string))
t = t.AddDate(1970, 0, 0)
if err != nil {
return nil, fmt.Errorf("failed to parse time: %w", err)
return qvalue.QValue{}, fmt.Errorf("failed to parse time: %w", err)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindTime, Value: t}
val = qvalue.QValue{Kind: qvalue.QValueKindTime, Value: t}
}
case qvalue.QValueKindTimeTZ:
timeVal := value.(string)
Expand All @@ -195,178 +195,178 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
timeVal = strings.Replace(timeVal, "+00", "+0000", 1)
t, err := time.Parse("15:04:05.999999-0700", timeVal)
if err != nil {
return nil, fmt.Errorf("failed to parse time: %w", err)
return qvalue.QValue{}, fmt.Errorf("failed to parse time: %w", err)
}
t = t.AddDate(1970, 0, 0)
val = &qvalue.QValue{Kind: qvalue.QValueKindTimeTZ, Value: t}
val = qvalue.QValue{Kind: qvalue.QValueKindTimeTZ, Value: t}

case qvalue.QValueKindBoolean:
boolVal := value.(bool)
val = &qvalue.QValue{Kind: qvalue.QValueKindBoolean, Value: boolVal}
val = qvalue.QValue{Kind: qvalue.QValueKindBoolean, Value: boolVal}
case qvalue.QValueKindJSON:
tmp, err := parseJSON(value)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON: %w", err)
return qvalue.QValue{}, fmt.Errorf("failed to parse JSON: %w", err)
}
val = tmp
case qvalue.QValueKindInt16:
intVal := value.(int16)
val = &qvalue.QValue{Kind: qvalue.QValueKindInt16, Value: int32(intVal)}
val = qvalue.QValue{Kind: qvalue.QValueKindInt16, Value: int32(intVal)}
case qvalue.QValueKindInt32:
intVal := value.(int32)
val = &qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: intVal}
val = qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: intVal}
case qvalue.QValueKindInt64:
intVal := value.(int64)
val = &qvalue.QValue{Kind: qvalue.QValueKindInt64, Value: intVal}
val = qvalue.QValue{Kind: qvalue.QValueKindInt64, Value: intVal}
case qvalue.QValueKindFloat32:
floatVal := value.(float32)
val = &qvalue.QValue{Kind: qvalue.QValueKindFloat32, Value: floatVal}
val = qvalue.QValue{Kind: qvalue.QValueKindFloat32, Value: floatVal}
case qvalue.QValueKindFloat64:
floatVal := value.(float64)
val = &qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal}
val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal}
case qvalue.QValueKindString:
// handling all unsupported types with strings as well for now.
textVal := value
val = &qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(textVal)}
val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(textVal)}
case qvalue.QValueKindUUID:
switch value.(type) {
case string:
val = &qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value}
val = qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value}
case [16]byte:
val = &qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value}
val = qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value}
default:
return nil, fmt.Errorf("failed to parse UUID: %v", value)
return qvalue.QValue{}, fmt.Errorf("failed to parse UUID: %v", value)
}
case qvalue.QValueKindBytes:
rawBytes := value.([]byte)
val = &qvalue.QValue{Kind: qvalue.QValueKindBytes, Value: rawBytes}
val = qvalue.QValue{Kind: qvalue.QValueKindBytes, Value: rawBytes}
case qvalue.QValueKindBit:
bitsVal := value.(pgtype.Bits)
if bitsVal.Valid {
val = &qvalue.QValue{Kind: qvalue.QValueKindBit, Value: bitsVal.Bytes}
val = qvalue.QValue{Kind: qvalue.QValueKindBit, Value: bitsVal.Bytes}
}
case qvalue.QValueKindNumeric:
numVal := value.(pgtype.Numeric)
if numVal.Valid {
rat, err := numericToRat(&numVal)
if err != nil {
return nil, fmt.Errorf("failed to convert numeric [%v] to rat: %w", value, err)
return qvalue.QValue{}, fmt.Errorf("failed to convert numeric [%v] to rat: %w", value, err)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat}
val = qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat}
}
case qvalue.QValueKindArrayFloat32:
switch v := value.(type) {
case pgtype.Array[float32]:
if v.Valid {
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements}
}
case []float32:
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v}
case []interface{}:
float32Array := make([]float32, len(v))
for i, val := range v {
float32Array[i] = val.(float32)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array}
default:
return nil, fmt.Errorf("failed to parse array float32: %v", value)
return qvalue.QValue{}, fmt.Errorf("failed to parse array float32: %v", value)
}
case qvalue.QValueKindArrayFloat64:
switch v := value.(type) {
case pgtype.Array[float64]:
if v.Valid {
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements}
}
case []float64:
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v}
case []interface{}:
float64Array := make([]float64, len(v))
for i, val := range v {
float64Array[i] = val.(float64)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array}
default:
return nil, fmt.Errorf("failed to parse array float64: %v", value)
return qvalue.QValue{}, fmt.Errorf("failed to parse array float64: %v", value)
}
case qvalue.QValueKindArrayInt32:
switch v := value.(type) {
case pgtype.Array[int32]:
if v.Valid {
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements}
}
case []int32:
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v}
case []interface{}:
int32Array := make([]int32, len(v))
for i, val := range v {
int32Array[i] = val.(int32)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array}
default:
return nil, fmt.Errorf("failed to parse array int32: %v", value)
return qvalue.QValue{}, fmt.Errorf("failed to parse array int32: %v", value)
}
case qvalue.QValueKindArrayInt64:
switch v := value.(type) {
case pgtype.Array[int64]:
if v.Valid {
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements}
}
case []int64:
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v}
case []interface{}:
int64Array := make([]int64, len(v))
for i, val := range v {
int64Array[i] = val.(int64)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array}
default:
return nil, fmt.Errorf("failed to parse array int64: %v", value)
return qvalue.QValue{}, fmt.Errorf("failed to parse array int64: %v", value)
}
case qvalue.QValueKindArrayString:
switch v := value.(type) {
case pgtype.Array[string]:
if v.Valid {
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements}
}
case []string:
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v}
case []interface{}:
stringArray := make([]string, len(v))
for i, val := range v {
stringArray[i] = val.(string)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray}
default:
return nil, fmt.Errorf("failed to parse array string: %v", value)
return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value)
}
case qvalue.QValueKindHStore:
hstoreVal, err := value.(pgtype.Hstore).HstoreValue()
if err != nil {
return nil, fmt.Errorf("failed to parse hstore: %w", err)
return qvalue.QValue{}, fmt.Errorf("failed to parse hstore: %w", err)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal}
val = qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal}
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
val = &qvalue.QValue{Kind: qvalue.QValueKindPoint,
val = qvalue.QValue{Kind: qvalue.QValueKindPoint,
Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)}
default:
// log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind)
textVal, ok := value.(string)
if !ok {
return nil, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind)
return qvalue.QValue{}, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind)
}
val = &qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal}
val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal}
}

// parsing into pgtype failed.
if val == nil {
return nil, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind)
if val == (qvalue.QValue{}) {
return qvalue.QValue{}, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind)
}
return val, nil
}

func parseFieldFromPostgresOID(oid uint32, value interface{}) (*qvalue.QValue, error) {
func parseFieldFromPostgresOID(oid uint32, value interface{}) (qvalue.QValue, error) {
return parseFieldFromQValueKind(postgresOIDToQValueKind(oid), value)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R
CommitID: 2,
Items: &model.RecordItems{
ColToValIdx: map[string]int{"id": 0},
Values: []*qvalue.QValue{{
Values: []qvalue.QValue{{
Kind: qvalue.QValueKindInt64,
Value: 1,
}},
Expand Down
2 changes: 1 addition & 1 deletion flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) {
_, nullable := qac.NullableFields[key]

avroConverter := qvalue.NewQValueAvroConverter(
&qac.QRecord.Entries[idx],
qac.QRecord.Entries[idx],
qac.TargetDWH,
nullable,
)
Expand Down
Loading

0 comments on commit 9cb34e5

Please sign in to comment.