Skip to content

Commit

Permalink
Generalize recent []int32 nil handling fix to rest of array types
Browse files Browse the repository at this point in the history
  • Loading branch information
serprex committed Feb 14, 2024
1 parent c47dbb2 commit 4a3b0e2
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 167 deletions.
191 changes: 24 additions & 167 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/jackc/pgx/v5/pgtype"
"github.com/lib/pq/oid"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

Expand Down Expand Up @@ -190,6 +191,20 @@ func parseJSON(value interface{}) (qvalue.QValue, error) {
return qvalue.QValue{Kind: qvalue.QValueKindJSON, Value: string(jsonVal)}, nil
}

func convertToArray[T any](kind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) {
switch v := value.(type) {
case pgtype.Array[T]:
if v.Valid {
return qvalue.QValue{Kind: kind, Value: v.Elements}, nil
}
case []T:
return qvalue.QValue{Kind: kind, Value: v}, nil
case []interface{}:
return qvalue.QValue{Kind: kind, Value: utils.ArrayCastElements[T](v)}, nil
}
return qvalue.QValue{}, fmt.Errorf("failed to parse array %T: %v", value, value)
}

func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (qvalue.QValue, error) {
val := qvalue.QValue{}

Expand Down Expand Up @@ -319,179 +334,21 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
val = qvalue.QValue{Kind: qvalue.QValueKindNumeric, Value: rat}
}
case qvalue.QValueKindArrayFloat32:
switch v := value.(type) {
case pgtype.Array[float32]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v.Elements}
}
case []float32:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: v}
case []interface{}:
float32Array := make([]float32, len(v))
for i, val := range v {
float32Array[i] = val.(float32)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat32, Value: float32Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array float32: %v", value)
}
return convertToArray[float32](qvalueKind, value)
case qvalue.QValueKindArrayFloat64:
switch v := value.(type) {
case pgtype.Array[float64]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v.Elements}
}
case []float64:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: v}
case []interface{}:
float64Array := make([]float64, len(v))
for i, val := range v {
float64Array[i] = val.(float64)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayFloat64, Value: float64Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array float64: %v", value)
}
return convertToArray[float64](qvalueKind, value)
case qvalue.QValueKindArrayInt16:
switch v := value.(type) {
case pgtype.Array[int16]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v.Elements}
}
case []int16:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: v}
case []interface{}:
int16Array := make([]int16, len(v))
for i, val := range v {
int16Array[i] = val.(int16)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt16, Value: int16Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array int16: %v", value)
}
return convertToArray[int16](qvalueKind, value)
case qvalue.QValueKindArrayInt32:
switch v := value.(type) {
case pgtype.Array[int32]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v.Elements}
}
case []int32:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: v}
case []interface{}:
int32Array := make([]int32, len(v))
for i, val := range v {
if val == nil {
int32Array[i] = 0
} else {
int32Array[i] = val.(int32)
}
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt32, Value: int32Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array int32: %v", value)
}
return convertToArray[int32](qvalueKind, value)
case qvalue.QValueKindArrayInt64:
switch v := value.(type) {
case pgtype.Array[int64]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v.Elements}
}
case []int64:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: v}
case []interface{}:
int64Array := make([]int64, len(v))
for i, val := range v {
int64Array[i] = val.(int64)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayInt64, Value: int64Array}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array int64: %v", value)
}
case qvalue.QValueKindArrayDate:
switch v := value.(type) {
case pgtype.Array[time.Time]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v.Elements}
}
case []time.Time:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: v}
case []interface{}:
dateArray := make([]time.Time, len(v))
for i, val := range v {
dateArray[i] = val.(time.Time)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayDate, Value: dateArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array date: %v", value)
}
case qvalue.QValueKindArrayTimestamp:
switch v := value.(type) {
case pgtype.Array[time.Time]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v.Elements}
}
case []time.Time:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: v}
case []interface{}:
timestampArray := make([]time.Time, len(v))
for i, val := range v {
timestampArray[i] = val.(time.Time)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestamp, Value: timestampArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamp: %v", value)
}
case qvalue.QValueKindArrayTimestampTZ:
switch v := value.(type) {
case pgtype.Array[time.Time]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v.Elements}
}
case []time.Time:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: v}
case []interface{}:
timestampTZArray := make([]time.Time, len(v))
for i, val := range v {
timestampTZArray[i] = val.(time.Time)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayTimestampTZ, Value: timestampTZArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array timestamptz: %v", value)
}
return convertToArray[int64](qvalueKind, value)
case qvalue.QValueKindArrayDate, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ:
return convertToArray[time.Time](qvalueKind, value)
case qvalue.QValueKindArrayBoolean:
switch v := value.(type) {
case pgtype.Array[bool]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v.Elements}
}
case []bool:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: v}
case []interface{}:
boolArray := make([]bool, len(v))
for i, val := range v {
boolArray[i] = val.(bool)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayBoolean, Value: boolArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array boolean: %v", value)
}
return convertToArray[bool](qvalueKind, value)
case qvalue.QValueKindArrayString:
switch v := value.(type) {
case pgtype.Array[string]:
if v.Valid {
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v.Elements}
}
case []string:
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: v}
case []interface{}:
stringArray := make([]string, len(v))
for i, val := range v {
stringArray[i] = val.(string)
}
val = qvalue.QValue{Kind: qvalue.QValueKindArrayString, Value: stringArray}
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value)
}
return convertToArray[string](qvalueKind, value)
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
Expand Down
13 changes: 13 additions & 0 deletions flow/connectors/utils/array.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,3 +52,16 @@ func ArraysHaveOverlap[T comparable](first, second []T) bool {

return false
}

func ArrayCastElements[T any](arr []any) []T {
res := make([]T, 0, len(arr))
for _, val := range arr {
if v, ok := val.(T); ok {
res = append(res, v)
} else {
var none T
res = append(res, none)
}
}
return res
}

0 comments on commit 4a3b0e2

Please sign in to comment.