From 53f169ce52dcf1f41049b8691db4a356be0ad9ec Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 18:21:21 +0530 Subject: [PATCH 1/8] refactoring and include bq truncate for numeric --- flow/connectors/bigquery/qrep_avro_sync.go | 9 ++--- flow/datatypes/bigint.go | 41 ++++++++++++++++++++++ flow/datatypes/numeric.go | 5 ++- flow/model/qvalue/avro_converter.go | 29 +++++++++------ 4 files changed, 66 insertions(+), 18 deletions(-) create mode 100644 flow/datatypes/bigint.go diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d022045b70..aad8078ac3 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -14,7 +14,6 @@ import ( "go.temporal.io/sdk/activity" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" - numeric "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string, } func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { - avroNumericPrecision := int16(bqField.Precision) - avroNumericScale := int16(bqField.Scale) - bqNumeric := numeric.BigQueryNumericCompatibility{} - if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) { - avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale() - } + avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH( + int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY) considerRepeated := func(typ string, repeated bool) interface{} { if repeated { diff --git a/flow/datatypes/bigint.go b/flow/datatypes/bigint.go new file mode 100644 index 0000000000..116546598c --- /dev/null +++ b/flow/datatypes/bigint.go @@ -0,0 +1,41 @@ +package datatypes + +import ( + "math" + "math/big" +) + +var tenInt = big.NewInt(10) + +func CountDigits(bi *big.Int) int { + tenInt := big.NewInt(10) + if bi.IsUint64() { + u64 := bi.Uint64() + if u64 < (1 << 53) { // Check if it is less than 2^53 + if u64 == 0 { + return 1 // 0 has one digit + } + return int(math.Log10(float64(u64))) + 1 + } + } else if bi.IsInt64() { + i64 := bi.Int64() + if i64 > -(1 << 53) { // Check if it is greater than -2^53 + if i64 == 0 { + return 1 // 0 has one digit + } + return int(math.Log10(float64(-i64))) + 1 + } + } + + // For larger numbers, use the bit length and logarithms + abs := new(big.Int).Abs(bi) + lg10 := int(float64(abs.BitLen()) / math.Log2(10)) + + // Verify and adjust lg10 if necessary + check := new(big.Int).Exp(tenInt, big.NewInt(int64(lg10)), nil) + if abs.Cmp(check) >= 0 { + lg10++ + } + + return lg10 +} diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 356ef06605..04a3a27fdd 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -8,6 +8,8 @@ const ( PeerDBSnowflakeScale = 20 PeerDBClickhousePrecision = 76 PeerDBClickhouseScale = 38 + BigQueryNumericUpperBound = 999999999999999999.99999999999999999999 + BigQueryNumericLowerBound = -999999999999999999.99999999999999999999 VARHDRSZ = 4 ) @@ -69,7 +71,8 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { } func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { - return precision > 0 && precision <= 38 && scale <= 20 && scale < precision + return precision > 0 && precision <= PeerDBBigQueryPrecision && + scale <= PeerDBBigQueryScale && scale < precision } type DefaultNumericCompatibility struct{} diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 45d0d572ea..8c2aa8ae0c 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -52,6 +52,21 @@ type AvroSchemaField struct { LogicalType string `json:"logicalType,omitempty"` } +func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) { + if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY { + bidigi := datatypes.CountDigits(num.BigInt()) + avroPrecision, avroScale := DetermineNumericSettingForDWH(precision, scale, targetDB) + if bidigi+int(avroScale) > int(avroPrecision) { + slog.Warn("Clearing NUMERIC value with too many digits", slog.Any("number", num)) + return num, errors.New("invalid numeric") + } else if num.Exponent() < -int32(avroScale) { + num = num.Truncate(int32(avroScale)) + slog.Info("Truncated NUMERIC value", slog.Any("number", num)) + } + } + return num, nil +} + // GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind. // The function takes in two parameters, a QValueKind and a boolean indicating if the // Avro schema should respect null values. It returns a QValueKindAvroSchema object @@ -458,17 +473,11 @@ func countDigits(bi *big.Int) int { } func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { - if c.TargetDWH == protos.DBType_SNOWFLAKE { - bidigi := countDigits(num.BigInt()) - avroPrecision, avroScale := DetermineNumericSettingForDWH(c.Precision, c.Scale, c.TargetDWH) - - if bidigi+int(avroScale) > int(avroPrecision) { - slog.Warn("Clearing NUMERIC value with too many digits for Snowflake!", slog.Any("number", num)) - return nil - } else if num.Exponent() < -int32(avroScale) { - num = num.Round(int32(avroScale)) - } + num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH) + if err != nil { + return nil } + rat := num.Rat() if c.Nullable { return goavro.Union("bytes.decimal", rat) From f69995c230aa8ac7acc27eb042ab7aaf1f7ac8d4 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 18:25:28 +0530 Subject: [PATCH 2/8] change to warn --- flow/model/qvalue/avro_converter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 8c2aa8ae0c..2579511fdf 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -61,7 +61,7 @@ func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, tar return num, errors.New("invalid numeric") } else if num.Exponent() < -int32(avroScale) { num = num.Truncate(int32(avroScale)) - slog.Info("Truncated NUMERIC value", slog.Any("number", num)) + slog.Warn("Truncated NUMERIC value", slog.Any("number", num)) } } return num, nil From 8f6baf4ea11010e797ee92d6c6472a83354eab07 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 18:28:27 +0530 Subject: [PATCH 3/8] revert file --- flow/datatypes/numeric.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 04a3a27fdd..d2cad7233d 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -8,8 +8,6 @@ const ( PeerDBSnowflakeScale = 20 PeerDBClickhousePrecision = 76 PeerDBClickhouseScale = 38 - BigQueryNumericUpperBound = 999999999999999999.99999999999999999999 - BigQueryNumericLowerBound = -999999999999999999.99999999999999999999 VARHDRSZ = 4 ) From b4f0ecc5f094ce327d29fe42916814f0ae7d52ff Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 18:36:12 +0530 Subject: [PATCH 4/8] use string conversion for countdigits --- flow/datatypes/bigint.go | 35 +++-------------------------------- 1 file changed, 3 insertions(+), 32 deletions(-) diff --git a/flow/datatypes/bigint.go b/flow/datatypes/bigint.go index 116546598c..c33d8506a2 100644 --- a/flow/datatypes/bigint.go +++ b/flow/datatypes/bigint.go @@ -1,41 +1,12 @@ package datatypes import ( - "math" "math/big" ) -var tenInt = big.NewInt(10) - func CountDigits(bi *big.Int) int { - tenInt := big.NewInt(10) - if bi.IsUint64() { - u64 := bi.Uint64() - if u64 < (1 << 53) { // Check if it is less than 2^53 - if u64 == 0 { - return 1 // 0 has one digit - } - return int(math.Log10(float64(u64))) + 1 - } - } else if bi.IsInt64() { - i64 := bi.Int64() - if i64 > -(1 << 53) { // Check if it is greater than -2^53 - if i64 == 0 { - return 1 // 0 has one digit - } - return int(math.Log10(float64(-i64))) + 1 - } + if bi.Sign() < 0 { + return len(bi.String()) - 1 } - - // For larger numbers, use the bit length and logarithms - abs := new(big.Int).Abs(bi) - lg10 := int(float64(abs.BitLen()) / math.Log2(10)) - - // Verify and adjust lg10 if necessary - check := new(big.Int).Exp(tenInt, big.NewInt(int64(lg10)), nil) - if abs.Cmp(check) >= 0 { - lg10++ - } - - return lg10 + return len(bi.String()) } From 7f55285f7d8e534cce890c826efe4e6ae4c3b14f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 18:40:39 +0530 Subject: [PATCH 5/8] add unit test for countdigits --- flow/datatypes/bigint_test.go | 38 +++++++++++++++++++++++++++++++++++ 1 file changed, 38 insertions(+) create mode 100644 flow/datatypes/bigint_test.go diff --git a/flow/datatypes/bigint_test.go b/flow/datatypes/bigint_test.go new file mode 100644 index 0000000000..b1b87180b7 --- /dev/null +++ b/flow/datatypes/bigint_test.go @@ -0,0 +1,38 @@ +package datatypes + +import ( + "math/big" + "testing" +) + +func TestCountDigits(t *testing.T) { + bi := big.NewInt(1234567890) + expected := 10 + result := CountDigits(bi) + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + bi = big.NewInt(-1234567890) + expected = 10 + result = CountDigits(bi) + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + // 18 nines + bi = big.NewInt(999999999999999999) + result = CountDigits(bi) + expected = 18 + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + // 18 nines + bi = big.NewInt(-999999999999999999) + result = CountDigits(bi) + expected = 18 + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } +} From 4b3807cb993a8787bc3340403af0bedec584374f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 18:41:57 +0530 Subject: [PATCH 6/8] adjust test --- flow/datatypes/bigint_test.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/datatypes/bigint_test.go b/flow/datatypes/bigint_test.go index b1b87180b7..43d386ebb8 100644 --- a/flow/datatypes/bigint_test.go +++ b/flow/datatypes/bigint_test.go @@ -6,15 +6,15 @@ import ( ) func TestCountDigits(t *testing.T) { - bi := big.NewInt(1234567890) - expected := 10 + bi := big.NewInt(-0) + expected := 1 result := CountDigits(bi) if result != expected { t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) } - bi = big.NewInt(-1234567890) - expected = 10 + bi = big.NewInt(0) + expected = 1 result = CountDigits(bi) if result != expected { t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) From b5774c0b30067d38771a9af55e731145122b089f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 19:28:52 +0530 Subject: [PATCH 7/8] move countDigits to separate file --- flow/datatypes/bigint.go | 27 ++++++++++++++++++++++++--- flow/model/qvalue/avro_converter.go | 29 ----------------------------- 2 files changed, 24 insertions(+), 32 deletions(-) diff --git a/flow/datatypes/bigint.go b/flow/datatypes/bigint.go index c33d8506a2..28a3b97148 100644 --- a/flow/datatypes/bigint.go +++ b/flow/datatypes/bigint.go @@ -1,12 +1,33 @@ package datatypes import ( + "math" "math/big" ) +var tenInt = big.NewInt(10) + func CountDigits(bi *big.Int) int { - if bi.Sign() < 0 { - return len(bi.String()) - 1 + if bi.IsInt64() { + i64 := bi.Int64() + // restrict fast path to integers with exact conversion to float64 + if i64 <= (1<<53) && i64 >= -(1<<53) { + if i64 == 0 { + return 1 + } + return int(math.Log10(math.Abs(float64(i64)))) + 1 + } + } + + estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10)) + + // estimatedNumDigits (lg10) may be off by 1, need to verify + digitsBigInt := big.NewInt(int64(estimatedNumDigits)) + errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil) + + if bi.CmpAbs(errorCorrectionUnit) >= 0 { + return estimatedNumDigits + 1 } - return len(bi.String()) + + return estimatedNumDigits } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 2579511fdf..764fd86759 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" "log/slog" - "math" - "math/big" "time" "github.com/google/uuid" @@ -445,33 +443,6 @@ func (c *QValueAvroConverter) processNullableUnion( return value, nil } -var tenInt = big.NewInt(10) - -func countDigits(bi *big.Int) int { - if bi.IsInt64() { - i64 := bi.Int64() - // restrict fast path to integers with exact conversion to float64 - if i64 <= (1<<53) && i64 >= -(1<<53) { - if i64 == 0 { - return 1 - } - return int(math.Log10(math.Abs(float64(i64)))) + 1 - } - } - - estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10)) - - // estimatedNumDigits (lg10) may be off by 1, need to verify - digitsBigInt := big.NewInt(int64(estimatedNumDigits)) - errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil) - - if bi.CmpAbs(errorCorrectionUnit) >= 0 { - return estimatedNumDigits + 1 - } - - return estimatedNumDigits -} - func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH) if err != nil { From cb10a4566f89b37ab9cf5fc9e8bf8993b3a0264a Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 22 May 2024 20:14:30 +0530 Subject: [PATCH 8/8] fix typo --- flow/datatypes/numeric.go | 12 ++++++------ flow/model/qvalue/dwh.go | 2 +- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index d2cad7233d..3051ca40d8 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -15,7 +15,7 @@ type WarehouseNumericCompatibility interface { MaxPrecision() int16 MaxScale() int16 DefaultPrecisionAndScale() (int16, int16) - IsValidPrevisionAndScale(precision, scale int16) bool + IsValidPrecisionAndScale(precision, scale int16) bool } type ClickHouseNumericCompatibility struct{} @@ -32,7 +32,7 @@ func (ClickHouseNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) return PeerDBClickhousePrecision, PeerDBClickhouseScale } -func (ClickHouseNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (ClickHouseNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return precision > 0 && precision <= PeerDBClickhousePrecision && scale < precision } @@ -50,7 +50,7 @@ func (SnowflakeNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return PeerDBSnowflakePrecision, PeerDBSnowflakeScale } -func (SnowflakeNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (SnowflakeNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return precision > 0 && precision <= 38 && scale < precision } @@ -68,7 +68,7 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return PeerDBBigQueryPrecision, PeerDBBigQueryScale } -func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (BigQueryNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return precision > 0 && precision <= PeerDBBigQueryPrecision && scale <= PeerDBBigQueryScale && scale < precision } @@ -87,7 +87,7 @@ func (DefaultNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return 38, 20 } -func (DefaultNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (DefaultNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return true } @@ -113,7 +113,7 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC } precision, scale := ParseNumericTypmod(typmod) - if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) { + if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) { return warehouseNumeric.DefaultPrecisionAndScale() } diff --git a/flow/model/qvalue/dwh.go b/flow/model/qvalue/dwh.go index 5dd7440baf..cb6eb9e868 100644 --- a/flow/model/qvalue/dwh.go +++ b/flow/model/qvalue/dwh.go @@ -22,7 +22,7 @@ func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBTy warehouseNumeric = numeric.DefaultNumericCompatibility{} } - if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) { + if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) { precision, scale = warehouseNumeric.DefaultPrecisionAndScale() }