diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d022045b70..aad8078ac3 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -14,7 +14,6 @@ import ( "go.temporal.io/sdk/activity" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" - numeric "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string, } func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { - avroNumericPrecision := int16(bqField.Precision) - avroNumericScale := int16(bqField.Scale) - bqNumeric := numeric.BigQueryNumericCompatibility{} - if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) { - avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale() - } + avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH( + int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY) considerRepeated := func(typ string, repeated bool) interface{} { if repeated { diff --git a/flow/datatypes/bigint.go b/flow/datatypes/bigint.go new file mode 100644 index 0000000000..28a3b97148 --- /dev/null +++ b/flow/datatypes/bigint.go @@ -0,0 +1,33 @@ +package datatypes + +import ( + "math" + "math/big" +) + +var tenInt = big.NewInt(10) + +func CountDigits(bi *big.Int) int { + if bi.IsInt64() { + i64 := bi.Int64() + // restrict fast path to integers with exact conversion to float64 + if i64 <= (1<<53) && i64 >= -(1<<53) { + if i64 == 0 { + return 1 + } + return int(math.Log10(math.Abs(float64(i64)))) + 1 + } + } + + estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10)) + + // estimatedNumDigits (lg10) may be off by 1, need to verify + digitsBigInt := big.NewInt(int64(estimatedNumDigits)) + errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil) + + if bi.CmpAbs(errorCorrectionUnit) >= 0 { + return estimatedNumDigits + 1 + } + + return estimatedNumDigits +} diff --git a/flow/datatypes/bigint_test.go b/flow/datatypes/bigint_test.go new file mode 100644 index 0000000000..43d386ebb8 --- /dev/null +++ b/flow/datatypes/bigint_test.go @@ -0,0 +1,38 @@ +package datatypes + +import ( + "math/big" + "testing" +) + +func TestCountDigits(t *testing.T) { + bi := big.NewInt(-0) + expected := 1 + result := CountDigits(bi) + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + bi = big.NewInt(0) + expected = 1 + result = CountDigits(bi) + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + // 18 nines + bi = big.NewInt(999999999999999999) + result = CountDigits(bi) + expected = 18 + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + // 18 nines + bi = big.NewInt(-999999999999999999) + result = CountDigits(bi) + expected = 18 + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } +} diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 356ef06605..3051ca40d8 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -15,7 +15,7 @@ type WarehouseNumericCompatibility interface { MaxPrecision() int16 MaxScale() int16 DefaultPrecisionAndScale() (int16, int16) - IsValidPrevisionAndScale(precision, scale int16) bool + IsValidPrecisionAndScale(precision, scale int16) bool } type ClickHouseNumericCompatibility struct{} @@ -32,7 +32,7 @@ func (ClickHouseNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) return PeerDBClickhousePrecision, PeerDBClickhouseScale } -func (ClickHouseNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (ClickHouseNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return precision > 0 && precision <= PeerDBClickhousePrecision && scale < precision } @@ -50,7 +50,7 @@ func (SnowflakeNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return PeerDBSnowflakePrecision, PeerDBSnowflakeScale } -func (SnowflakeNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (SnowflakeNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return precision > 0 && precision <= 38 && scale < precision } @@ -68,8 +68,9 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return PeerDBBigQueryPrecision, PeerDBBigQueryScale } -func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { - return precision > 0 && precision <= 38 && scale <= 20 && scale < precision +func (BigQueryNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { + return precision > 0 && precision <= PeerDBBigQueryPrecision && + scale <= PeerDBBigQueryScale && scale < precision } type DefaultNumericCompatibility struct{} @@ -86,7 +87,7 @@ func (DefaultNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return 38, 20 } -func (DefaultNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (DefaultNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return true } @@ -112,7 +113,7 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC } precision, scale := ParseNumericTypmod(typmod) - if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) { + if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) { return warehouseNumeric.DefaultPrecisionAndScale() } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 45d0d572ea..764fd86759 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" "log/slog" - "math" - "math/big" "time" "github.com/google/uuid" @@ -52,6 +50,21 @@ type AvroSchemaField struct { LogicalType string `json:"logicalType,omitempty"` } +func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) { + if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY { + bidigi := datatypes.CountDigits(num.BigInt()) + avroPrecision, avroScale := DetermineNumericSettingForDWH(precision, scale, targetDB) + if bidigi+int(avroScale) > int(avroPrecision) { + slog.Warn("Clearing NUMERIC value with too many digits", slog.Any("number", num)) + return num, errors.New("invalid numeric") + } else if num.Exponent() < -int32(avroScale) { + num = num.Truncate(int32(avroScale)) + slog.Warn("Truncated NUMERIC value", slog.Any("number", num)) + } + } + return num, nil +} + // GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind. // The function takes in two parameters, a QValueKind and a boolean indicating if the // Avro schema should respect null values. It returns a QValueKindAvroSchema object @@ -430,45 +443,12 @@ func (c *QValueAvroConverter) processNullableUnion( return value, nil } -var tenInt = big.NewInt(10) - -func countDigits(bi *big.Int) int { - if bi.IsInt64() { - i64 := bi.Int64() - // restrict fast path to integers with exact conversion to float64 - if i64 <= (1<<53) && i64 >= -(1<<53) { - if i64 == 0 { - return 1 - } - return int(math.Log10(math.Abs(float64(i64)))) + 1 - } - } - - estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10)) - - // estimatedNumDigits (lg10) may be off by 1, need to verify - digitsBigInt := big.NewInt(int64(estimatedNumDigits)) - errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil) - - if bi.CmpAbs(errorCorrectionUnit) >= 0 { - return estimatedNumDigits + 1 - } - - return estimatedNumDigits -} - func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { - if c.TargetDWH == protos.DBType_SNOWFLAKE { - bidigi := countDigits(num.BigInt()) - avroPrecision, avroScale := DetermineNumericSettingForDWH(c.Precision, c.Scale, c.TargetDWH) - - if bidigi+int(avroScale) > int(avroPrecision) { - slog.Warn("Clearing NUMERIC value with too many digits for Snowflake!", slog.Any("number", num)) - return nil - } else if num.Exponent() < -int32(avroScale) { - num = num.Round(int32(avroScale)) - } + num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH) + if err != nil { + return nil } + rat := num.Rat() if c.Nullable { return goavro.Union("bytes.decimal", rat) diff --git a/flow/model/qvalue/dwh.go b/flow/model/qvalue/dwh.go index 5dd7440baf..cb6eb9e868 100644 --- a/flow/model/qvalue/dwh.go +++ b/flow/model/qvalue/dwh.go @@ -22,7 +22,7 @@ func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBTy warehouseNumeric = numeric.DefaultNumericCompatibility{} } - if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) { + if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) { precision, scale = warehouseNumeric.DefaultPrecisionAndScale() }