From 4a3b0e21362f90ff003987e46b6de5936d306989 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Philip=20Dub=C3=A9?= Date: Wed, 14 Feb 2024 04:07:46 +0000 Subject: [PATCH] Generalize recent []int32 nil handling fix to rest of array types --- flow/connectors/postgres/qvalue_convert.go | 191 +++------------------ flow/connectors/utils/array.go | 13 ++ 2 files changed, 37 insertions(+), 167 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 4823aa2ecc..42115dc40e 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -12,6 +12,7 @@ import ( "github.com/jackc/pgx/v5/pgtype" "github.com/lib/pq/oid" + "github.com/PeerDB-io/peer-flow/connectors/utils" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -190,6 +191,20 @@ func parseJSON(value interface{}) (qvalue.QValue, error) { return qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil } +func convertToArray[T any](kind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) { + switch v := value.(type) { + case pgtype.Array[T]: + if v.Valid { + return qvalue.QValue{Kind: kind, Value: v.Elements}, nil + } + case []T: + return qvalue.QValue{Kind: kind, Value: v}, nil + case []interface{}: + return qvalue.QValue{Kind: kind, Value: utils.ArrayCastElements[T](v)}, nil + } + return qvalue.QValue{}, fmt.Errorf("failed to parse array %T: %v", value, value) +} + func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) { val := qvalue.QValue{} @@ -319,179 +334,21 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( val = qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat} } case qvalue.QValueKindArrayFloat32: - switch v := value.(type) { - case pgtype.Array[float32]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements} - } - case []float32: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v} - case []interface{}: - float32Array := make([]float32, len(v)) - for i, val := range v { - float32Array[i] = val.(float32) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array float32: %v", value) - } + return convertToArray[float32](qvalueKind, value) case qvalue.QValueKindArrayFloat64: - switch v := value.(type) { - case pgtype.Array[float64]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements} - } - case []float64: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v} - case []interface{}: - float64Array := make([]float64, len(v)) - for i, val := range v { - float64Array[i] = val.(float64) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array float64: %v", value) - } + return convertToArray[float64](qvalueKind, value) case qvalue.QValueKindArrayInt16: - switch v := value.(type) { - case pgtype.Array[int16]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v.Elements} - } - case []int16: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v} - case []interface{}: - int16Array := make([]int16, len(v)) - for i, val := range v { - int16Array[i] = val.(int16) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: int16Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int16: %v", value) - } + return convertToArray[int16](qvalueKind, value) case qvalue.QValueKindArrayInt32: - switch v := value.(type) { - case pgtype.Array[int32]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements} - } - case []int32: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v} - case []interface{}: - int32Array := make([]int32, len(v)) - for i, val := range v { - if val == nil { - int32Array[i] = 0 - } else { - int32Array[i] = val.(int32) - } - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int32: %v", value) - } + return convertToArray[int32](qvalueKind, value) case qvalue.QValueKindArrayInt64: - switch v := value.(type) { - case pgtype.Array[int64]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements} - } - case []int64: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v} - case []interface{}: - int64Array := make([]int64, len(v)) - for i, val := range v { - int64Array[i] = val.(int64) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array int64: %v", value) - } - case qvalue.QValueKindArrayDate: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v} - case []interface{}: - dateArray := make([]time.Time, len(v)) - for i, val := range v { - dateArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: dateArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array date: %v", value) - } - case qvalue.QValueKindArrayTimestamp: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v} - case []interface{}: - timestampArray := make([]time.Time, len(v)) - for i, val := range v { - timestampArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: timestampArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamp: %v", value) - } - case qvalue.QValueKindArrayTimestampTZ: - switch v := value.(type) { - case pgtype.Array[time.Time]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v.Elements} - } - case []time.Time: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v} - case []interface{}: - timestampTZArray := make([]time.Time, len(v)) - for i, val := range v { - timestampTZArray[i] = val.(time.Time) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: timestampTZArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamptz: %v", value) - } + return convertToArray[int64](qvalueKind, value) + case qvalue.QValueKindArrayDate, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ: + return convertToArray[time.Time](qvalueKind, value) case qvalue.QValueKindArrayBoolean: - switch v := value.(type) { - case pgtype.Array[bool]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v.Elements} - } - case []bool: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v} - case []interface{}: - boolArray := make([]bool, len(v)) - for i, val := range v { - boolArray[i] = val.(bool) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: boolArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array boolean: %v", value) - } + return convertToArray[bool](qvalueKind, value) case qvalue.QValueKindArrayString: - switch v := value.(type) { - case pgtype.Array[string]: - if v.Valid { - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements} - } - case []string: - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v} - case []interface{}: - stringArray := make([]string, len(v)) - for i, val := range v { - stringArray[i] = val.(string) - } - val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray} - default: - return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value) - } + return convertToArray[string](qvalueKind, value) case qvalue.QValueKindPoint: xCoord := value.(pgtype.Point).P.X yCoord := value.(pgtype.Point).P.Y diff --git a/flow/connectors/utils/array.go b/flow/connectors/utils/array.go index d203beacc2..2633153ae6 100644 --- a/flow/connectors/utils/array.go +++ b/flow/connectors/utils/array.go @@ -52,3 +52,16 @@ func ArraysHaveOverlap[T comparable](first, second []T) bool { return false } + +func ArrayCastElements[T any](arr []any) []T { + res := make([]T, 0, len(arr)) + for _, val := range arr { + if v, ok := val.(T); ok { + res = append(res, v) + } else { + var none T + res = append(res, none) + } + } + return res +}