Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Numeric: refactoring and truncate for BigQuery #1746

Merged
merged 8 commits into from
May 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 2 additions & 7 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string,
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
bqNumeric := numeric.BigQueryNumericCompatibility{}
if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) {
avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale()
}
avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH(
int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY)

considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
Expand Down
33 changes: 33 additions & 0 deletions flow/datatypes/bigint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package datatypes

import (
"math"
"math/big"
)

var tenInt = big.NewInt(10)

func CountDigits(bi *big.Int) int {
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
if bi.IsInt64() {
i64 := bi.Int64()
// restrict fast path to integers with exact conversion to float64
if i64 <= (1<<53) && i64 >= -(1<<53) {
if i64 == 0 {
return 1
}
return int(math.Log10(math.Abs(float64(i64)))) + 1
}
}

estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10))

// estimatedNumDigits (lg10) may be off by 1, need to verify
digitsBigInt := big.NewInt(int64(estimatedNumDigits))
errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil)

if bi.CmpAbs(errorCorrectionUnit) >= 0 {
return estimatedNumDigits + 1
}

return estimatedNumDigits
}
38 changes: 38 additions & 0 deletions flow/datatypes/bigint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package datatypes

import (
"math/big"
"testing"
)

func TestCountDigits(t *testing.T) {
bi := big.NewInt(-0)
expected := 1
result := CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

bi = big.NewInt(0)
expected = 1
result = CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(-999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}
15 changes: 8 additions & 7 deletions flow/datatypes/numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type WarehouseNumericCompatibility interface {
MaxPrecision() int16
MaxScale() int16
DefaultPrecisionAndScale() (int16, int16)
IsValidPrevisionAndScale(precision, scale int16) bool
IsValidPrecisionAndScale(precision, scale int16) bool
}

type ClickHouseNumericCompatibility struct{}
Expand All @@ -32,7 +32,7 @@ func (ClickHouseNumericCompatibility) DefaultPrecisionAndScale() (int16, int16)
return PeerDBClickhousePrecision, PeerDBClickhouseScale
}

func (ClickHouseNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (ClickHouseNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBClickhousePrecision && scale < precision
}

Expand All @@ -50,7 +50,7 @@ func (SnowflakeNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBSnowflakePrecision, PeerDBSnowflakeScale
}

func (SnowflakeNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (SnowflakeNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale < precision
}

Expand All @@ -68,8 +68,9 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBBigQueryPrecision, PeerDBBigQueryScale
}

func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale <= 20 && scale < precision
func (BigQueryNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBBigQueryPrecision &&
scale <= PeerDBBigQueryScale && scale < precision
}

type DefaultNumericCompatibility struct{}
Expand All @@ -86,7 +87,7 @@ func (DefaultNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return 38, 20
}

func (DefaultNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (DefaultNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return true
}

Expand All @@ -112,7 +113,7 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC
}

precision, scale := ParseNumericTypmod(typmod)
if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) {
if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) {
return warehouseNumeric.DefaultPrecisionAndScale()
}

Expand Down
58 changes: 19 additions & 39 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"math/big"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -52,6 +50,21 @@ type AvroSchemaField struct {
LogicalType string `json:"logicalType,omitempty"`
}

func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) {
if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY {
bidigi := datatypes.CountDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(precision, scale, targetDB)
if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits", slog.Any("number", num))
return num, errors.New("invalid numeric")
} else if num.Exponent() < -int32(avroScale) {
num = num.Truncate(int32(avroScale))
slog.Warn("Truncated NUMERIC value", slog.Any("number", num))
}
}
return num, nil
}

// GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind.
// The function takes in two parameters, a QValueKind and a boolean indicating if the
// Avro schema should respect null values. It returns a QValueKindAvroSchema object
Expand Down Expand Up @@ -430,45 +443,12 @@ func (c *QValueAvroConverter) processNullableUnion(
return value, nil
}

var tenInt = big.NewInt(10)

func countDigits(bi *big.Int) int {
if bi.IsInt64() {
i64 := bi.Int64()
// restrict fast path to integers with exact conversion to float64
if i64 <= (1<<53) && i64 >= -(1<<53) {
if i64 == 0 {
return 1
}
return int(math.Log10(math.Abs(float64(i64)))) + 1
}
}

estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10))

// estimatedNumDigits (lg10) may be off by 1, need to verify
digitsBigInt := big.NewInt(int64(estimatedNumDigits))
errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil)

if bi.CmpAbs(errorCorrectionUnit) >= 0 {
return estimatedNumDigits + 1
}

return estimatedNumDigits
}

func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} {
if c.TargetDWH == protos.DBType_SNOWFLAKE {
bidigi := countDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(c.Precision, c.Scale, c.TargetDWH)

if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits for Snowflake!", slog.Any("number", num))
return nil
} else if num.Exponent() < -int32(avroScale) {
num = num.Round(int32(avroScale))
}
num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH)
if err != nil {
return nil
}

rat := num.Rat()
if c.Nullable {
return goavro.Union("bytes.decimal", rat)
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBTy
warehouseNumeric = numeric.DefaultNumericCompatibility{}
}

if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) {
if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) {
precision, scale = warehouseNumeric.DefaultPrecisionAndScale()
}

Expand Down
Loading