Skip to content

Commit

Permalink
Merge branch 'main' into fix/recreate-destination-connector
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj authored May 23, 2024
2 parents b30da81 + 1008519 commit e19fd33
Show file tree
Hide file tree
Showing 15 changed files with 1,357 additions and 633 deletions.
9 changes: 2 additions & 7 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string,
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
bqNumeric := numeric.BigQueryNumericCompatibility{}
if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) {
avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale()
}
avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH(
int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY)

considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
Expand Down
6 changes: 6 additions & 0 deletions flow/connectors/kafka/qrep.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/twmb/franz-go/pkg/kgo"
lua "github.com/yuin/gopher-lua"

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/pua"
Expand All @@ -28,6 +29,11 @@ func (c *KafkaConnector) SyncQRepRecords(
numRecords := atomic.Int64{}
schema := stream.Schema()

shutdown := utils.HeartbeatRoutine(ctx, func() string {
return fmt.Sprintf("sent %d records to %s", numRecords.Load(), config.DestinationTableIdentifier)
})
defer shutdown()

queueCtx, queueErr := context.WithCancelCause(ctx)
pool, err := c.createPool(queueCtx, config.Script, config.FlowJobName, nil, queueErr)
if err != nil {
Expand Down
33 changes: 33 additions & 0 deletions flow/datatypes/bigint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package datatypes

import (
"math"
"math/big"
)

var tenInt = big.NewInt(10)

func CountDigits(bi *big.Int) int {
if bi.IsInt64() {
i64 := bi.Int64()
// restrict fast path to integers with exact conversion to float64
if i64 <= (1<<53) && i64 >= -(1<<53) {
if i64 == 0 {
return 1
}
return int(math.Log10(math.Abs(float64(i64)))) + 1
}
}

estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10))

// estimatedNumDigits (lg10) may be off by 1, need to verify
digitsBigInt := big.NewInt(int64(estimatedNumDigits))
errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil)

if bi.CmpAbs(errorCorrectionUnit) >= 0 {
return estimatedNumDigits + 1
}

return estimatedNumDigits
}
38 changes: 38 additions & 0 deletions flow/datatypes/bigint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package datatypes

import (
"math/big"
"testing"
)

func TestCountDigits(t *testing.T) {
bi := big.NewInt(-0)
expected := 1
result := CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

bi = big.NewInt(0)
expected = 1
result = CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(-999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}
15 changes: 8 additions & 7 deletions flow/datatypes/numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type WarehouseNumericCompatibility interface {
MaxPrecision() int16
MaxScale() int16
DefaultPrecisionAndScale() (int16, int16)
IsValidPrevisionAndScale(precision, scale int16) bool
IsValidPrecisionAndScale(precision, scale int16) bool
}

type ClickHouseNumericCompatibility struct{}
Expand All @@ -32,7 +32,7 @@ func (ClickHouseNumericCompatibility) DefaultPrecisionAndScale() (int16, int16)
return PeerDBClickhousePrecision, PeerDBClickhouseScale
}

func (ClickHouseNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (ClickHouseNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBClickhousePrecision && scale < precision
}

Expand All @@ -50,7 +50,7 @@ func (SnowflakeNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBSnowflakePrecision, PeerDBSnowflakeScale
}

func (SnowflakeNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (SnowflakeNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale < precision
}

Expand All @@ -68,8 +68,9 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBBigQueryPrecision, PeerDBBigQueryScale
}

func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale <= 20 && scale < precision
func (BigQueryNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBBigQueryPrecision &&
scale <= PeerDBBigQueryScale && scale < precision
}

type DefaultNumericCompatibility struct{}
Expand All @@ -86,7 +87,7 @@ func (DefaultNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return 38, 20
}

func (DefaultNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (DefaultNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return true
}

Expand All @@ -112,7 +113,7 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC
}

precision, scale := ParseNumericTypmod(typmod)
if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) {
if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) {
return warehouseNumeric.DefaultPrecisionAndScale()
}

Expand Down
79 changes: 39 additions & 40 deletions flow/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,33 +3,33 @@ module github.com/PeerDB-io/peer-flow
go 1.22.3

require (
cloud.google.com/go v0.112.2
cloud.google.com/go v0.113.0
cloud.google.com/go/bigquery v1.61.0
cloud.google.com/go/pubsub v1.37.0
cloud.google.com/go/storage v1.40.0
cloud.google.com/go/pubsub v1.38.0
cloud.google.com/go/storage v1.41.0
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.2
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.1.0
github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs v1.2.0
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.2.0
github.com/ClickHouse/clickhouse-go/v2 v2.23.2
github.com/ClickHouse/clickhouse-go/v2 v2.24.0
github.com/PeerDB-io/glua64 v1.0.1
github.com/PeerDB-io/gluabit32 v1.0.2
github.com/PeerDB-io/gluaflatbuffers v1.0.1
github.com/PeerDB-io/gluajson v1.0.2
github.com/PeerDB-io/gluamsgpack v1.0.4
github.com/PeerDB-io/gluautf8 v1.0.0
github.com/aws/aws-sdk-go-v2 v1.26.1
github.com/aws/aws-sdk-go-v2/config v1.27.11
github.com/aws/aws-sdk-go-v2/credentials v1.17.11
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.15
github.com/aws/aws-sdk-go-v2/service/s3 v1.53.1
github.com/aws/aws-sdk-go-v2/service/ses v1.22.5
github.com/aws/aws-sdk-go-v2/service/sns v1.29.4
github.com/aws/aws-sdk-go-v2 v1.27.0
github.com/aws/aws-sdk-go-v2/config v1.27.15
github.com/aws/aws-sdk-go-v2/credentials v1.17.15
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.16.20
github.com/aws/aws-sdk-go-v2/service/s3 v1.54.2
github.com/aws/aws-sdk-go-v2/service/ses v1.22.8
github.com/aws/aws-sdk-go-v2/service/sns v1.29.7
github.com/aws/smithy-go v1.20.2
github.com/cockroachdb/pebble v1.1.0
github.com/elastic/go-elasticsearch/v8 v8.13.1
github.com/google/uuid v1.6.0
github.com/grafana/pyroscope-go v1.1.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.1
github.com/grpc-ecosystem/grpc-gateway/v2 v2.20.0
github.com/jackc/pgerrcode v0.0.0-20240316143900-6e2875d9b438
github.com/jackc/pglogrepl v0.0.0-20240307033717-828fbfe908e9
github.com/jackc/pgx/v5 v5.5.5
Expand All @@ -41,8 +41,8 @@ require (
github.com/microsoft/go-mssqldb v1.7.1
github.com/orcaman/concurrent-map/v2 v2.0.1
github.com/shopspring/decimal v1.4.0
github.com/slack-go/slack v0.12.5
github.com/snowflakedb/gosnowflake v1.9.0
github.com/slack-go/slack v0.13.0
github.com/snowflakedb/gosnowflake v1.10.0
github.com/stretchr/testify v1.9.0
github.com/twmb/franz-go v1.16.1
github.com/twmb/franz-go/plugin/kslog v1.0.0
Expand All @@ -51,36 +51,37 @@ require (
github.com/youmark/pkcs8 v0.0.0-20240424034433-3c2c7870ae76
github.com/yuin/gopher-lua v1.1.1
go.opentelemetry.io/otel v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.26.0
go.opentelemetry.io/otel/metric v1.26.0
go.opentelemetry.io/otel/sdk v1.26.0
go.opentelemetry.io/otel/sdk/metric v1.26.0
go.temporal.io/api v1.32.0
go.temporal.io/api v1.33.0
go.temporal.io/sdk v1.26.1
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.23.0
golang.org/x/mod v0.17.0
golang.org/x/sync v0.7.0
google.golang.org/api v0.177.0
google.golang.org/genproto/googleapis/api v0.0.0-20240429193739-8cf5692501f6
google.golang.org/grpc v1.63.2
google.golang.org/api v0.181.0
google.golang.org/genproto/googleapis/api v0.0.0-20240515191416-fc5f0ca64291
google.golang.org/grpc v1.64.0
google.golang.org/protobuf v1.34.1
)

require (
cloud.google.com/go/auth v0.3.0 // indirect
cloud.google.com/go/auth v0.4.2 // indirect
cloud.google.com/go/auth/oauth2adapt v0.2.2 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
github.com/99designs/keyring v1.2.2 // indirect
github.com/ClickHouse/ch-go v0.61.5 // indirect
github.com/DataDog/zstd v1.5.5 // indirect
github.com/JohnCGriffin/overflow v0.0.0-20211019200055-46fa312c352c // indirect
github.com/apache/arrow/go/v15 v15.0.2 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.1 // indirect
github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.16.3 // indirect
github.com/aws/aws-sdk-go-v2/internal/ini v1.8.0 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.5 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.23.4 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.6 // indirect
github.com/aws/aws-sdk-go-v2/service/sso v1.20.8 // indirect
github.com/aws/aws-sdk-go-v2/service/ssooidc v1.24.2 // indirect
github.com/aws/aws-sdk-go-v2/service/sts v1.28.9 // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
Expand All @@ -106,17 +107,16 @@ require (
github.com/mtibben/percent v0.2.1 // indirect
github.com/paulmach/orb v0.11.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/prometheus/client_golang v1.19.0 // indirect
github.com/prometheus/client_golang v1.19.1 // indirect
github.com/prometheus/client_model v0.6.1 // indirect
github.com/prometheus/common v0.53.0 // indirect
github.com/prometheus/procfs v0.14.0 // indirect
github.com/prometheus/procfs v0.15.0 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.7.0 // indirect
github.com/twmb/franz-go/pkg/kmsg v1.8.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.51.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.51.0 // indirect
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.26.0 // indirect
go.opentelemetry.io/otel/trace v1.26.0 // indirect
go.opentelemetry.io/proto/otlp v1.2.0 // indirect
golang.org/x/term v0.20.0 // indirect
Expand All @@ -126,24 +126,23 @@ require (
cloud.google.com/go/compute/metadata v0.3.0 // indirect
cloud.google.com/go/iam v1.1.8 // indirect
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.11.1 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.7.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/internal v1.8.0 // indirect
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.2 // indirect
github.com/Azure/go-amqp v1.0.5 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 // indirect
github.com/andybalholm/brotli v1.1.0 // indirect
github.com/aws/aws-sdk-go-v2/aws/protocol/eventstream v1.6.2 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.5 // indirect
github.com/aws/aws-sdk-go-v2/internal/configsources v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.6.7 // indirect
github.com/aws/aws-sdk-go-v2/internal/v4a v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.11.2 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.7 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.5 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/checksum v1.3.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.11.9 // indirect
github.com/aws/aws-sdk-go-v2/service/internal/s3shared v1.17.7 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/djherbis/buffer v1.2.0
github.com/djherbis/nio/v3 v3.0.1
github.com/facebookgo/clock v0.0.0-20150410010913-600d898af40a // indirect
github.com/form3tech-oss/jwt-go v3.2.5+incompatible // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang-jwt/jwt/v5 v5.2.1 // indirect
Expand Down Expand Up @@ -175,15 +174,15 @@ require (
github.com/xrash/smetrics v0.0.0-20240312152122-5f08fbb34913 // indirect
github.com/zeebo/xxh3 v1.0.2 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/exp v0.0.0-20240416160154-fe59bbe5cc7f
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842
golang.org/x/net v0.25.0 // indirect
golang.org/x/oauth2 v0.20.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
golang.org/x/time v0.5.0 // indirect
golang.org/x/tools v0.20.0 // indirect
golang.org/x/tools v0.21.0 // indirect
golang.org/x/xerrors v0.0.0-20231012003039-104605ab7028 // indirect
google.golang.org/genproto v0.0.0-20240429193739-8cf5692501f6 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240429193739-8cf5692501f6 // indirect
google.golang.org/genproto v0.0.0-20240515191416-fc5f0ca64291 // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20240515191416-fc5f0ca64291 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading

0 comments on commit e19fd33

Please sign in to comment.