Skip to content

Commit

Permalink
Merge branch 'main' into kafka-heartbeats
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored May 22, 2024
2 parents 9a274a1 + 1008519 commit ed99520
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 54 deletions.
9 changes: 2 additions & 7 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string,
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
bqNumeric := numeric.BigQueryNumericCompatibility{}
if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) {
avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale()
}
avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH(
int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY)

considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
Expand Down
33 changes: 33 additions & 0 deletions flow/datatypes/bigint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package datatypes

import (
"math"
"math/big"
)

var tenInt = big.NewInt(10)

func CountDigits(bi *big.Int) int {
if bi.IsInt64() {
i64 := bi.Int64()
// restrict fast path to integers with exact conversion to float64
if i64 <= (1<<53) && i64 >= -(1<<53) {
if i64 == 0 {
return 1
}
return int(math.Log10(math.Abs(float64(i64)))) + 1
}
}

estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10))

// estimatedNumDigits (lg10) may be off by 1, need to verify
digitsBigInt := big.NewInt(int64(estimatedNumDigits))
errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil)

if bi.CmpAbs(errorCorrectionUnit) >= 0 {
return estimatedNumDigits + 1
}

return estimatedNumDigits
}
38 changes: 38 additions & 0 deletions flow/datatypes/bigint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package datatypes

import (
"math/big"
"testing"
)

func TestCountDigits(t *testing.T) {
bi := big.NewInt(-0)
expected := 1
result := CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

bi = big.NewInt(0)
expected = 1
result = CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(-999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}
15 changes: 8 additions & 7 deletions flow/datatypes/numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type WarehouseNumericCompatibility interface {
MaxPrecision() int16
MaxScale() int16
DefaultPrecisionAndScale() (int16, int16)
IsValidPrevisionAndScale(precision, scale int16) bool
IsValidPrecisionAndScale(precision, scale int16) bool
}

type ClickHouseNumericCompatibility struct{}
Expand All @@ -32,7 +32,7 @@ func (ClickHouseNumericCompatibility) DefaultPrecisionAndScale() (int16, int16)
return PeerDBClickhousePrecision, PeerDBClickhouseScale
}

func (ClickHouseNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (ClickHouseNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBClickhousePrecision && scale < precision
}

Expand All @@ -50,7 +50,7 @@ func (SnowflakeNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBSnowflakePrecision, PeerDBSnowflakeScale
}

func (SnowflakeNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (SnowflakeNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale < precision
}

Expand All @@ -68,8 +68,9 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBBigQueryPrecision, PeerDBBigQueryScale
}

func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale <= 20 && scale < precision
func (BigQueryNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBBigQueryPrecision &&
scale <= PeerDBBigQueryScale && scale < precision
}

type DefaultNumericCompatibility struct{}
Expand All @@ -86,7 +87,7 @@ func (DefaultNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return 38, 20
}

func (DefaultNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (DefaultNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return true
}

Expand All @@ -112,7 +113,7 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC
}

precision, scale := ParseNumericTypmod(typmod)
if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) {
if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) {
return warehouseNumeric.DefaultPrecisionAndScale()
}

Expand Down
58 changes: 19 additions & 39 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"math/big"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -52,6 +50,21 @@ type AvroSchemaField struct {
LogicalType string `json:"logicalType,omitempty"`
}

func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) {
if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY {
bidigi := datatypes.CountDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(precision, scale, targetDB)
if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits", slog.Any("number", num))
return num, errors.New("invalid numeric")
} else if num.Exponent() < -int32(avroScale) {
num = num.Truncate(int32(avroScale))
slog.Warn("Truncated NUMERIC value", slog.Any("number", num))
}
}
return num, nil
}

// GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind.
// The function takes in two parameters, a QValueKind and a boolean indicating if the
// Avro schema should respect null values. It returns a QValueKindAvroSchema object
Expand Down Expand Up @@ -430,45 +443,12 @@ func (c *QValueAvroConverter) processNullableUnion(
return value, nil
}

var tenInt = big.NewInt(10)

func countDigits(bi *big.Int) int {
if bi.IsInt64() {
i64 := bi.Int64()
// restrict fast path to integers with exact conversion to float64
if i64 <= (1<<53) && i64 >= -(1<<53) {
if i64 == 0 {
return 1
}
return int(math.Log10(math.Abs(float64(i64)))) + 1
}
}

estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10))

// estimatedNumDigits (lg10) may be off by 1, need to verify
digitsBigInt := big.NewInt(int64(estimatedNumDigits))
errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil)

if bi.CmpAbs(errorCorrectionUnit) >= 0 {
return estimatedNumDigits + 1
}

return estimatedNumDigits
}

func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} {
if c.TargetDWH == protos.DBType_SNOWFLAKE {
bidigi := countDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(c.Precision, c.Scale, c.TargetDWH)

if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits for Snowflake!", slog.Any("number", num))
return nil
} else if num.Exponent() < -int32(avroScale) {
num = num.Round(int32(avroScale))
}
num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH)
if err != nil {
return nil
}

rat := num.Rat()
if c.Nullable {
return goavro.Union("bytes.decimal", rat)
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBTy
warehouseNumeric = numeric.DefaultNumericCompatibility{}
}

if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) {
if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) {
precision, scale = warehouseNumeric.DefaultPrecisionAndScale()
}

Expand Down

0 comments on commit ed99520

Please sign in to comment.