From 9cb34e5b5fc67ef987bc392f661ba630d87fecbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 13 Dec 2023 16:35:10 +0000 Subject: [PATCH] qvalue: pass by value (#813) QValue is a string/interface{} pair string is a pointer/int pair interface{} is a pointer/pointer pair Copying four words is cheap, especially when for the most part qvalues are stored in a QRecord which will itself be passed around by pointer Include a `QValueKindEmpty` which is used in places where previously pointer would be nil --- flow/connectors/postgres/cdc.go | 22 ++-- .../postgres/qrep_query_executor.go | 2 +- flow/connectors/postgres/qvalue_convert.go | 114 +++++++++--------- .../cdc_records/cdc_records_storage_test.go | 2 +- flow/model/conversion_avro.go | 2 +- flow/model/model.go | 24 ++-- flow/model/qrecord.go | 2 +- flow/model/qvalue/avro_converter.go | 4 +- flow/model/qvalue/kind.go | 1 + flow/model/qvalue/qvalue.go | 10 +- 10 files changed, 92 insertions(+), 91 deletions(-) diff --git a/flow/connectors/postgres/cdc.go b/flow/connectors/postgres/cdc.go index 6141dcaffd..cf75fb2e2c 100644 --- a/flow/connectors/postgres/cdc.go +++ b/flow/connectors/postgres/cdc.go @@ -640,11 +640,11 @@ func (p *PostgresCDCSource) convertTupleToMap( ) (*model.RecordItems, map[string]struct{}, error) { // if the tuple is nil, return an empty map if tuple == nil { - return model.NewRecordItems(), make(map[string]struct{}), nil + return model.NewRecordItems(0), make(map[string]struct{}), nil } // create empty map of string to interface{} - items := model.NewRecordItems() + items := model.NewRecordItems(len(tuple.Columns)) unchangedToastColumns := make(map[string]struct{}) for idx, col := range tuple.Columns { @@ -654,7 +654,7 @@ func (p *PostgresCDCSource) convertTupleToMap( } switch col.DataType { case 'n': // null - val := &qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil} + val := qvalue.QValue{Kind: qvalue.QValueKindInvalid, Value: nil} items.AddColumn(colName, val) case 't': // text /* bytea also appears here as a hex */ @@ -678,7 +678,7 @@ func (p *PostgresCDCSource) convertTupleToMap( return items, unchangedToastColumns, nil } -func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, formatCode int16) (*qvalue.QValue, error) { +func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, formatCode int16) (qvalue.QValue, error) { var parsedData any var err error if dt, ok := p.typeMap.TypeForOID(dataType); ok { @@ -689,17 +689,17 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma parsedData, err = dt.Codec.DecodeValue(p.typeMap, dataType, formatCode, data) } if err != nil { - return nil, err + return qvalue.QValue{}, err } retVal, err := parseFieldFromPostgresOID(dataType, parsedData) if err != nil { - return nil, err + return qvalue.QValue{}, err } return retVal, nil } else if dataType == uint32(oid.T_timetz) { // ugly TIMETZ workaround for CDC decoding. retVal, err := parseFieldFromPostgresOID(dataType, string(data)) if err != nil { - return nil, err + return qvalue.QValue{}, err } return retVal, nil } @@ -710,25 +710,25 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { wkt, err := GeoValidate(string(data)) if err != nil { - return &qvalue.QValue{ + return qvalue.QValue{ Kind: customQKind, Value: nil, }, nil } else { - return &qvalue.QValue{ + return qvalue.QValue{ Kind: customQKind, Value: wkt, }, nil } } else { - return &qvalue.QValue{ + return qvalue.QValue{ Kind: customQKind, Value: string(data), }, nil } } - return &qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil + return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(data)}, nil } func convertRelationMessageToProto(msg *pglogrepl.RelationMessage) *protos.RelationMessage { diff --git a/flow/connectors/postgres/qrep_query_executor.go b/flow/connectors/postgres/qrep_query_executor.go index da8e9a4a1c..ef2b6fec56 100644 --- a/flow/connectors/postgres/qrep_query_executor.go +++ b/flow/connectors/postgres/qrep_query_executor.go @@ -439,7 +439,7 @@ func mapRowToQRecord(row pgx.Rows, fds []pgconn.FieldDescription, if err != nil { return nil, fmt.Errorf("failed to parse field: %w", err) } - record.Set(i, *tmp) + record.Set(i, tmp) } else { customQKind := customTypeToQKind(typeName) if customQKind == qvalue.QValueKindGeography || customQKind == qvalue.QValueKindGeometry { diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index d0c7b1cd44..c4f2931e06 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -144,48 +144,48 @@ func qValueKindToPostgresType(qvalueKind string) string { } } -func parseJSON(value interface{}) (*qvalue.QValue, error) { +func parseJSON(value interface{}) (qvalue.QValue, error) { jsonVal, err := json.Marshal(value) if err != nil { - return nil, fmt.Errorf("failed to parse JSON: %w", err) + return qvalue.QValue{}, fmt.Errorf("failed to parse JSON: %w", err) } - return &qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil + return qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil } -func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (*qvalue.QValue, error) { - var val *qvalue.QValue = nil +func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) { + val := qvalue.QValue{} if value == nil { - val = &qvalue.QValue{Kind: qvalueKind, Value: nil} + val = qvalue.QValue{Kind: qvalueKind, Value: nil} return val, nil } switch qvalueKind { case qvalue.QValueKindTimestamp: timestamp := value.(time.Time) - val = &qvalue.QValue{Kind: qvalue.QValueKindTimestamp, Value: timestamp} + val = qvalue.QValue{Kind: qvalue.QValueKindTimestamp, Value: timestamp} case qvalue.QValueKindTimestampTZ: timestamp := value.(time.Time) - val = &qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp} + val = qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp} case qvalue.QValueKindDate: date := value.(time.Time) - val = &qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date} + val = qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date} case qvalue.QValueKindTime: timeVal := value.(pgtype.Time) if timeVal.Valid { var timeValStr any timeValStr, err := timeVal.Value() if err != nil { - return nil, fmt.Errorf("failed to parse time: %w", err) + return qvalue.QValue{}, fmt.Errorf("failed to parse time: %w", err) } // edge case, only Postgres supports this extreme value for time timeValStr = strings.Replace(timeValStr.(string), "24:00:00.000000", "23:59:59.999999", 1) t, err := time.Parse("15:04:05.999999", timeValStr.(string)) t = t.AddDate(1970, 0, 0) if err != nil { - return nil, fmt.Errorf("failed to parse time: %w", err) + return qvalue.QValue{}, fmt.Errorf("failed to parse time: %w", err) } - val = &qvalue.QValue{Kind: qvalue.QValueKindTime, Value: t} + val = qvalue.QValue{Kind: qvalue.QValueKindTime, Value: t} } case qvalue.QValueKindTimeTZ: timeVal := value.(string) @@ -195,178 +195,178 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( timeVal = strings.Replace(timeVal, "+00", "+0000", 1) t, err := time.Parse("15:04:05.999999-0700", timeVal) if err != nil { - return nil, fmt.Errorf("failed to parse time: %w", err) + return qvalue.QValue{}, fmt.Errorf("failed to parse time: %w", err) } t = t.AddDate(1970, 0, 0) - val = &qvalue.QValue{Kind: qvalue.QValueKindTimeTZ, Value: t} + val = qvalue.QValue{Kind: qvalue.QValueKindTimeTZ, Value: t} case qvalue.QValueKindBoolean: boolVal := value.(bool) - val = &qvalue.QValue{Kind: qvalue.QValueKindBoolean, Value: boolVal} + val = qvalue.QValue{Kind: qvalue.QValueKindBoolean, Value: boolVal} case qvalue.QValueKindJSON: tmp, err := parseJSON(value) if err != nil { - return nil, fmt.Errorf("failed to parse JSON: %w", err) + return qvalue.QValue{}, fmt.Errorf("failed to parse JSON: %w", err) } val = tmp case qvalue.QValueKindInt16: intVal := value.(int16) - val = &qvalue.QValue{Kind: qvalue.QValueKindInt16, Value: int32(intVal)} + val = qvalue.QValue{Kind: qvalue.QValueKindInt16, Value: int32(intVal)} case qvalue.QValueKindInt32: intVal := value.(int32) - val = &qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: intVal} + val = qvalue.QValue{Kind: qvalue.QValueKindInt32, Value: intVal} case qvalue.QValueKindInt64: intVal := value.(int64) - val = &qvalue.QValue{Kind: qvalue.QValueKindInt64, Value: intVal} + val = qvalue.QValue{Kind: qvalue.QValueKindInt64, Value: intVal} case qvalue.QValueKindFloat32: floatVal := value.(float32) - val = &qvalue.QValue{Kind: qvalue.QValueKindFloat32, Value: floatVal} + val = qvalue.QValue{Kind: qvalue.QValueKindFloat32, Value: floatVal} case qvalue.QValueKindFloat64: floatVal := value.(float64) - val = &qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal} + val = qvalue.QValue{Kind: qvalue.QValueKindFloat64, Value: floatVal} case qvalue.QValueKindString: // handling all unsupported types with strings as well for now. textVal := value - val = &qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(textVal)} + val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: fmt.Sprint(textVal)} case qvalue.QValueKindUUID: switch value.(type) { case string: - val = &qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value} + val = qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value} case [16]byte: - val = &qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value} + val = qvalue.QValue{Kind: qvalue.QValueKindUUID, Value: value} default: - return nil, fmt.Errorf("failed to parse UUID: %v", value) + return qvalue.QValue{}, fmt.Errorf("failed to parse UUID: %v", value) } case qvalue.QValueKindBytes: rawBytes := value.([]byte) - val = &qvalue.QValue{Kind: qvalue.QValueKindBytes, Value: rawBytes} + val = qvalue.QValue{Kind: qvalue.QValueKindBytes, Value: rawBytes} case qvalue.QValueKindBit: bitsVal := value.(pgtype.Bits) if bitsVal.Valid { - val = &qvalue.QValue{Kind: qvalue.QValueKindBit, Value: bitsVal.Bytes} + val = qvalue.QValue{Kind: qvalue.QValueKindBit, Value: bitsVal.Bytes} } case qvalue.QValueKindNumeric: numVal := value.(pgtype.Numeric) if numVal.Valid { rat, err := numericToRat(&numVal) if err != nil { - return nil, fmt.Errorf("failed to convert numeric [%v] to rat: %w", value, err) + return qvalue.QValue{}, fmt.Errorf("failed to convert numeric [%v] to rat: %w", value, err) } - val = &qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat} + val = qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat} } case qvalue.QValueKindArrayFloat32: switch v := value.(type) { case pgtype.Array[float32]: if v.Valid { - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements} } case []float32: - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v} case []interface{}: float32Array := make([]float32, len(v)) for i, val := range v { float32Array[i] = val.(float32) } - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array} default: - return nil, fmt.Errorf("failed to parse array float32: %v", value) + return qvalue.QValue{}, fmt.Errorf("failed to parse array float32: %v", value) } case qvalue.QValueKindArrayFloat64: switch v := value.(type) { case pgtype.Array[float64]: if v.Valid { - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements} } case []float64: - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v} case []interface{}: float64Array := make([]float64, len(v)) for i, val := range v { float64Array[i] = val.(float64) } - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array} default: - return nil, fmt.Errorf("failed to parse array float64: %v", value) + return qvalue.QValue{}, fmt.Errorf("failed to parse array float64: %v", value) } case qvalue.QValueKindArrayInt32: switch v := value.(type) { case pgtype.Array[int32]: if v.Valid { - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements} } case []int32: - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v} case []interface{}: int32Array := make([]int32, len(v)) for i, val := range v { int32Array[i] = val.(int32) } - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array} default: - return nil, fmt.Errorf("failed to parse array int32: %v", value) + return qvalue.QValue{}, fmt.Errorf("failed to parse array int32: %v", value) } case qvalue.QValueKindArrayInt64: switch v := value.(type) { case pgtype.Array[int64]: if v.Valid { - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements} } case []int64: - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v} case []interface{}: int64Array := make([]int64, len(v)) for i, val := range v { int64Array[i] = val.(int64) } - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array} default: - return nil, fmt.Errorf("failed to parse array int64: %v", value) + return qvalue.QValue{}, fmt.Errorf("failed to parse array int64: %v", value) } case qvalue.QValueKindArrayString: switch v := value.(type) { case pgtype.Array[string]: if v.Valid { - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements} } case []string: - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v} case []interface{}: stringArray := make([]string, len(v)) for i, val := range v { stringArray[i] = val.(string) } - val = &qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray} + val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray} default: - return nil, fmt.Errorf("failed to parse array string: %v", value) + return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value) } case qvalue.QValueKindHStore: hstoreVal, err := value.(pgtype.Hstore).HstoreValue() if err != nil { - return nil, fmt.Errorf("failed to parse hstore: %w", err) + return qvalue.QValue{}, fmt.Errorf("failed to parse hstore: %w", err) } - val = &qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal} + val = qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal} case qvalue.QValueKindPoint: xCoord := value.(pgtype.Point).P.X yCoord := value.(pgtype.Point).P.Y - val = &qvalue.QValue{Kind: qvalue.QValueKindPoint, + val = qvalue.QValue{Kind: qvalue.QValueKindPoint, Value: fmt.Sprintf("POINT(%f %f)", xCoord, yCoord)} default: // log.Warnf("unhandled QValueKind => %v, parsing as string", qvalueKind) textVal, ok := value.(string) if !ok { - return nil, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind) + return qvalue.QValue{}, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind) } - val = &qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} + val = qvalue.QValue{Kind: qvalue.QValueKindString, Value: textVal} } // parsing into pgtype failed. - if val == nil { - return nil, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind) + if val == (qvalue.QValue{}) { + return qvalue.QValue{}, fmt.Errorf("failed to parse value %v into QValueKind %v", value, qvalueKind) } return val, nil } -func parseFieldFromPostgresOID(oid uint32, value interface{}) (*qvalue.QValue, error) { +func parseFieldFromPostgresOID(oid uint32, value interface{}) (qvalue.QValue, error) { return parseFieldFromQValueKind(postgresOIDToQValueKind(oid), value) } diff --git a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go index f3495666d1..b2dd84ea44 100644 --- a/flow/connectors/utils/cdc_records/cdc_records_storage_test.go +++ b/flow/connectors/utils/cdc_records/cdc_records_storage_test.go @@ -31,7 +31,7 @@ func (s *CDCRecordStorageTestSuite) genKeyAndRec() (model.TableWithPkey, model.R CommitID: 2, Items: &model.RecordItems{ ColToValIdx: map[string]int{"id": 0}, - Values: []*qvalue.QValue{{ + Values: []qvalue.QValue{{ Kind: qvalue.QValueKindInt64, Value: 1, }}, diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 258e9c00e8..db58610e6b 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -36,7 +36,7 @@ func (qac *QRecordAvroConverter) Convert() (map[string]interface{}, error) { _, nullable := qac.NullableFields[key] avroConverter := qvalue.NewQValueAvroConverter( - &qac.QRecord.Entries[idx], + qac.QRecord.Entries[idx], qac.TargetDWH, nullable, ) diff --git a/flow/model/model.go b/flow/model/model.go index ecc6c0fc26..511430f7ce 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -63,20 +63,18 @@ type Record interface { // encoding/gob cannot encode unexported fields type RecordItems struct { ColToValIdx map[string]int - Values []*qvalue.QValue + Values []qvalue.QValue } -func NewRecordItems() *RecordItems { +func NewRecordItems(capacity int) *RecordItems { return &RecordItems{ - ColToValIdx: make(map[string]int), - // create a slice of 32 qvalues so that we don't have to allocate memory - // for each record to reduce GC pressure - Values: make([]*qvalue.QValue, 0, 32), + ColToValIdx: make(map[string]int, capacity), + Values: make([]qvalue.QValue, 0, capacity), } } -func NewRecordItemWithData(cols []string, val []*qvalue.QValue) *RecordItems { - recordItem := NewRecordItems() +func NewRecordItemWithData(cols []string, val []qvalue.QValue) *RecordItems { + recordItem := NewRecordItems(len(cols)) for i, col := range cols { recordItem.ColToValIdx[col] = len(recordItem.Values) recordItem.Values = append(recordItem.Values, val[i]) @@ -84,7 +82,7 @@ func NewRecordItemWithData(cols []string, val []*qvalue.QValue) *RecordItems { return recordItem } -func (r *RecordItems) AddColumn(col string, val *qvalue.QValue) { +func (r *RecordItems) AddColumn(col string, val qvalue.QValue) { if idx, ok := r.ColToValIdx[col]; ok { r.Values[idx] = val } else { @@ -93,11 +91,11 @@ func (r *RecordItems) AddColumn(col string, val *qvalue.QValue) { } } -func (r *RecordItems) GetColumnValue(col string) *qvalue.QValue { +func (r *RecordItems) GetColumnValue(col string) qvalue.QValue { if idx, ok := r.ColToValIdx[col]; ok { return r.Values[idx] } - return nil + return qvalue.QValue{} } // UpdateIfNotExists takes in a RecordItems as input and updates the values of the @@ -116,10 +114,10 @@ func (r *RecordItems) UpdateIfNotExists(input *RecordItems) []string { return updatedCols } -func (r *RecordItems) GetValueByColName(colName string) (*qvalue.QValue, error) { +func (r *RecordItems) GetValueByColName(colName string) (qvalue.QValue, error) { idx, ok := r.ColToValIdx[colName] if !ok { - return nil, fmt.Errorf("column name %s not found", colName) + return qvalue.QValue{}, fmt.Errorf("column name %s not found", colName) } return r.Values[idx], nil } diff --git a/flow/model/qrecord.go b/flow/model/qrecord.go index f39d1f9f47..5b44808be7 100644 --- a/flow/model/qrecord.go +++ b/flow/model/qrecord.go @@ -33,7 +33,7 @@ func (q *QRecord) equals(other *QRecord) bool { for i, entry := range q.Entries { otherEntry := other.Entries[i] - if !entry.Equals(&otherEntry) { + if !entry.Equals(otherEntry) { fmt.Printf("entry %d: %v != %v\n", i, entry, otherEntry) return false } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 02b8d83fc6..2cd6fe2e52 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -127,12 +127,12 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, nullable bool) (*QValueKindAvr } type QValueAvroConverter struct { - Value *QValue + Value QValue TargetDWH QDWHType Nullable bool } -func NewQValueAvroConverter(value *QValue, targetDWH QDWHType, nullable bool) *QValueAvroConverter { +func NewQValueAvroConverter(value QValue, targetDWH QDWHType, nullable bool) *QValueAvroConverter { return &QValueAvroConverter{ Value: value, TargetDWH: targetDWH, diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index b2eb1b1e48..4b728f3dad 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -5,6 +5,7 @@ import "fmt" type QValueKind string const ( + QValueKindEmpty QValueKind = "" QValueKindInvalid QValueKind = "invalid" QValueKindFloat32 QValueKind = "float32" QValueKindFloat64 QValueKind = "float64" diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 050945df37..4d80b0fb79 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -18,8 +18,10 @@ type QValue struct { Value interface{} } -func (q *QValue) Equals(other *QValue) bool { +func (q QValue) Equals(other QValue) bool { switch q.Kind { + case QValueKindEmpty: + return other.Kind == QValueKindEmpty case QValueKindInvalid: return true case QValueKindFloat32: @@ -69,7 +71,7 @@ func (q *QValue) Equals(other *QValue) bool { return false } -func (q *QValue) GoTimeConvert() (string, error) { +func (q QValue) GoTimeConvert() (string, error) { if q.Kind == QValueKindTime || q.Kind == QValueKindTimeTZ { return q.Value.(time.Time).Format("15:04:05.999999"), nil // no connector supports time with timezone yet @@ -216,8 +218,8 @@ func compareStruct(value1, value2 interface{}) bool { if !ok { return false } - q1, ok1 := v1.(*QValue) - q2, ok2 := v2.(*QValue) + q1, ok1 := v1.(QValue) + q2, ok2 := v2.(QValue) if !ok1 || !ok2 || !q1.Equals(q2) { return false }