From 2a215a926ae88e0b2efbef6424e241f40bb88988 Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Tue, 9 Jan 2024 13:14:59 +0530 Subject: [PATCH] Handle NaNs for Floats and Float Arrays (#1030) Same as #1027 --- flow/model/model.go | 70 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 70 insertions(+) diff --git a/flow/model/model.go b/flow/model/model.go index 10c04d599c..980820d7be 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -4,14 +4,18 @@ import ( "encoding/json" "errors" "fmt" + "log/slog" + "math" "math/big" "slices" + "strconv" "sync/atomic" "time" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/peerdbenv" + "golang.org/x/exp/constraints" ) type NameAndExclude struct { @@ -129,6 +133,14 @@ func (r *RecordItems) Len() int { return len(r.Values) } +func convertFloatArrayToStringArray[T constraints.Float](floatArr []T) []string { + strArr := make([]string, len(floatArr)) + for i, v := range floatArr { + strArr[i] = strconv.FormatFloat(float64(v), 'f', -1, 64) + } + return strArr +} + func (r *RecordItems) toMap() (map[string]interface{}, error) { if r.ColToValIdx == nil { return nil, errors.New("colToValIdx is nil") @@ -167,6 +179,64 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) { return nil, errors.New("expected *big.Rat value") } jsonStruct[col] = bigRat.FloatString(9) + case qvalue.QValueKindFloat64: + floatVal, ok := v.Value.(float64) + if !ok { + return nil, errors.New("expected float64 value") + } + if math.IsNaN(floatVal) { + jsonStruct[col] = nil + } else { + jsonStruct[col] = floatVal + } + case qvalue.QValueKindFloat32: + floatVal, ok := v.Value.(float32) + if !ok { + return nil, errors.New("expected float64 value") + } + if math.IsNaN(float64(floatVal)) { + jsonStruct[col] = nil + } else { + jsonStruct[col] = floatVal + } + case qvalue.QValueKindArrayFloat64: + floatArr, ok := v.Value.([]float64) + if !ok { + return nil, errors.New("expected []float64 value") + } + + // json.Marshal cannot support NaN or INF values + // BigQuery cannot support NULL values in arrays + // Solution: Skip NaN values in float arrays we get + cleanedArr := make([]float64, 0, len(floatArr)) + for _, val := range floatArr { + if !math.IsNaN(val) && !math.IsInf(val, 0) { + cleanedArr = append(cleanedArr, val) + } else { + slog.Warn("NaN or INF value found in float array - skipping", + slog.Any("array", convertFloatArrayToStringArray(floatArr))) + } + } + jsonStruct[col] = cleanedArr + case qvalue.QValueKindArrayFloat32: + floatArr, ok := v.Value.([]float32) + if !ok { + return nil, errors.New("expected []float32 value") + } + + // json.Marshal cannot support NaN or INF values + // BigQuery cannot support NULL values in arrays + // Solution: Skip NaN values in float arrays we get + cleanedArr := make([]float32, 0, len(floatArr)) + for _, val := range floatArr { + if !math.IsNaN(float64(val)) && !math.IsInf(float64(val), 0) { + cleanedArr = append(cleanedArr, val) + } else { + slog.Warn("NaN or INF value found in float array - skipping", + slog.Any("array", convertFloatArrayToStringArray(floatArr))) + } + } + jsonStruct[col] = cleanedArr default: jsonStruct[col] = v.Value }