Skip to content

Commit

Permalink
refactoring and include bq truncate for numeric
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 22, 2024
1 parent 86dcba6 commit 60372cb
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 45 deletions.
9 changes: 2 additions & 7 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string,
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
bqNumeric := numeric.BigQueryNumericCompatibility{}
if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) {
avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale()
}
avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH(
int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY)

considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
Expand Down
41 changes: 41 additions & 0 deletions flow/datatypes/bigint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package datatypes

import (
"math"
"math/big"
)

var tenInt = big.NewInt(10)

Check failure on line 8 in flow/datatypes/bigint.go

View workflow job for this annotation

GitHub Actions / lint

var `tenInt` is unused (unused)

func CountDigits(bi *big.Int) int {
tenInt := big.NewInt(10)
if bi.IsUint64() {
u64 := bi.Uint64()
if u64 < (1 << 53) { // Check if it is less than 2^53
if u64 == 0 {
return 1 // 0 has one digit
}
return int(math.Log10(float64(u64))) + 1
}
} else if bi.IsInt64() {
i64 := bi.Int64()
if i64 > -(1 << 53) { // Check if it is greater than -2^53
if i64 == 0 {
return 1 // 0 has one digit
}
return int(math.Log10(float64(-i64))) + 1
}
}

// For larger numbers, use the bit length and logarithms
abs := new(big.Int).Abs(bi)
lg10 := int(float64(abs.BitLen()) / math.Log2(10))

// Verify and adjust lg10 if necessary
check := new(big.Int).Exp(tenInt, big.NewInt(int64(lg10)), nil)
if abs.Cmp(check) >= 0 {
lg10++
}

return lg10
}
5 changes: 4 additions & 1 deletion flow/datatypes/numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ const (
PeerDBSnowflakeScale = 20
PeerDBClickhousePrecision = 76
PeerDBClickhouseScale = 38
BigQueryNumericUpperBound = 999999999999999999.99999999999999999999
BigQueryNumericLowerBound = -999999999999999999.99999999999999999999
VARHDRSZ = 4
)

Expand Down Expand Up @@ -69,7 +71,8 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
}

func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale <= 20 && scale < precision
return precision > 0 && precision <= PeerDBBigQueryPrecision &&
scale <= PeerDBBigQueryScale && scale < precision
}

type DefaultNumericCompatibility struct{}
Expand Down
56 changes: 19 additions & 37 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"math/big"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -52,6 +50,21 @@ type AvroSchemaField struct {
LogicalType string `json:"logicalType,omitempty"`
}

func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) {
if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY {
bidigi := datatypes.CountDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(precision, scale, targetDB)
if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits", slog.Any("number", num))
return num, errors.New("invalid numeric")
} else if num.Exponent() < -int32(avroScale) {
num = num.Truncate(int32(avroScale))
slog.Info("Truncated NUMERIC value", slog.Any("number", num))
}
}
return num, nil
}

// GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind.
// The function takes in two parameters, a QValueKind and a boolean indicating if the
// Avro schema should respect null values. It returns a QValueKindAvroSchema object
Expand Down Expand Up @@ -430,43 +443,12 @@ func (c *QValueAvroConverter) processNullableUnion(
return value, nil
}

var tenInt = big.NewInt(10)

func countDigits(bi *big.Int) int {
if bi.IsUint64() {
u64 := bi.Uint64()
if u64 < (1 << 53) {
if u64 == 0 {
return 1
}
return int(math.Log10(float64(u64))) + 1
}
} else if bi.IsInt64() {
i64 := bi.Int64()
if i64 > -(1 << 53) {
return int(math.Log10(float64(-i64))) + 1
}
}

abs := new(big.Int).Abs(bi)
// lg10 may be off by 1, need to verify
lg10 := int(float64(abs.BitLen()) / math.Log2(10))
check := big.NewInt(int64(lg10))
return lg10 + abs.Cmp(check.Exp(tenInt, check, nil))
}

func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} {
if c.TargetDWH == protos.DBType_SNOWFLAKE {
bidigi := countDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(c.Precision, c.Scale, c.TargetDWH)

if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits for Snowflake!", slog.Any("number", num))
return nil
} else if num.Exponent() < -int32(avroScale) {
num = num.Round(int32(avroScale))
}
num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH)
if err != nil {
return nil
}

rat := num.Rat()
if c.Nullable {
return goavro.Union("bytes.decimal", rat)
Expand Down

0 comments on commit 60372cb

Please sign in to comment.