Skip to content

Commit

Permalink
Merge branch 'main' into codespace-symmetrical-dollop-qvqqrg5pjwr2xj47
Browse files Browse the repository at this point in the history
  • Loading branch information
iskakaushik authored May 24, 2024
2 parents e0fcc65 + b063b61 commit 1bc19eb
Show file tree
Hide file tree
Showing 16 changed files with 158 additions and 65 deletions.
9 changes: 2 additions & 7 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"go.temporal.io/sdk/activity"

avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
numeric "github.com/PeerDB-io/peer-flow/datatypes"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
Expand Down Expand Up @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string,
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
bqNumeric := numeric.BigQueryNumericCompatibility{}
if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) {
avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale()
}
avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH(
int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY)

considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
Expand Down
1 change: 1 addition & 0 deletions flow/connectors/postgres/postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -740,6 +740,7 @@ func (c *PostgresConnector) GetTableSchema(
}
tableSchema, err := c.getTableSchemaForTable(ctx, tableName, req.System)
if err != nil {
c.logger.Info("error fetching schema for table "+tableName, slog.Any("error", err))
return nil, err
}
res[tableName] = tableSchema
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/utils/aws.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,13 +170,13 @@ func LoadPeerDBAWSEnvConfigProvider(connectorName string) AWSCredentialsProvider

func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCredentials PeerAWSCredentials) (AWSCredentialsProvider, error) {
if !(peerCredentials.Credentials.AccessKeyID == "" && peerCredentials.Credentials.SecretAccessKey == "" &&
peerCredentials.Region == "" && peerCredentials.RoleArn == nil &&
peerCredentials.Region == "" && (peerCredentials.RoleArn == nil || *peerCredentials.RoleArn == "") &&
(peerCredentials.EndpointUrl == nil || *peerCredentials.EndpointUrl == "")) {
staticProvider := NewStaticAWSCredentialsProvider(AWSCredentials{
AWS: peerCredentials.Credentials,
EndpointUrl: peerCredentials.EndpointUrl,
}, peerCredentials.Region)
if peerCredentials.RoleArn == nil {
if peerCredentials.RoleArn == nil || *peerCredentials.RoleArn == "" {
logger.LoggerFromCtx(ctx).Info("Received AWS credentials from peer for connector: " + connectorName)
return staticProvider, nil
}
Expand Down
33 changes: 33 additions & 0 deletions flow/datatypes/bigint.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package datatypes

import (
"math"
"math/big"
)

var tenInt = big.NewInt(10)

func CountDigits(bi *big.Int) int {
if bi.IsInt64() {
i64 := bi.Int64()
// restrict fast path to integers with exact conversion to float64
if i64 <= (1<<53) && i64 >= -(1<<53) {
if i64 == 0 {
return 1
}
return int(math.Log10(math.Abs(float64(i64)))) + 1
}
}

estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10))

// estimatedNumDigits (lg10) may be off by 1, need to verify
digitsBigInt := big.NewInt(int64(estimatedNumDigits))
errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil)

if bi.CmpAbs(errorCorrectionUnit) >= 0 {
return estimatedNumDigits + 1
}

return estimatedNumDigits
}
38 changes: 38 additions & 0 deletions flow/datatypes/bigint_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package datatypes

import (
"math/big"
"testing"
)

func TestCountDigits(t *testing.T) {
bi := big.NewInt(-0)
expected := 1
result := CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

bi = big.NewInt(0)
expected = 1
result = CountDigits(bi)
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}

// 18 nines
bi = big.NewInt(-999999999999999999)
result = CountDigits(bi)
expected = 18
if result != expected {
t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result)
}
}
15 changes: 8 additions & 7 deletions flow/datatypes/numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ type WarehouseNumericCompatibility interface {
MaxPrecision() int16
MaxScale() int16
DefaultPrecisionAndScale() (int16, int16)
IsValidPrevisionAndScale(precision, scale int16) bool
IsValidPrecisionAndScale(precision, scale int16) bool
}

type ClickHouseNumericCompatibility struct{}
Expand All @@ -32,7 +32,7 @@ func (ClickHouseNumericCompatibility) DefaultPrecisionAndScale() (int16, int16)
return PeerDBClickhousePrecision, PeerDBClickhouseScale
}

func (ClickHouseNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (ClickHouseNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBClickhousePrecision && scale < precision
}

Expand All @@ -50,7 +50,7 @@ func (SnowflakeNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBSnowflakePrecision, PeerDBSnowflakeScale
}

func (SnowflakeNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (SnowflakeNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale < precision
}

Expand All @@ -68,8 +68,9 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return PeerDBBigQueryPrecision, PeerDBBigQueryScale
}

func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= 38 && scale <= 20 && scale < precision
func (BigQueryNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return precision > 0 && precision <= PeerDBBigQueryPrecision &&
scale <= PeerDBBigQueryScale && scale < precision
}

type DefaultNumericCompatibility struct{}
Expand All @@ -86,7 +87,7 @@ func (DefaultNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) {
return 38, 20
}

func (DefaultNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool {
func (DefaultNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool {
return true
}

Expand All @@ -112,7 +113,7 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC
}

precision, scale := ParseNumericTypmod(typmod)
if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) {
if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) {
return warehouseNumeric.DefaultPrecisionAndScale()
}

Expand Down
56 changes: 19 additions & 37 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"errors"
"fmt"
"log/slog"
"math"
"math/big"
"time"

"github.com/google/uuid"
Expand Down Expand Up @@ -52,6 +50,21 @@ type AvroSchemaField struct {
LogicalType string `json:"logicalType,omitempty"`
}

func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) {
if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY {
bidigi := datatypes.CountDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(precision, scale, targetDB)
if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits", slog.Any("number", num))
return num, errors.New("invalid numeric")
} else if num.Exponent() < -int32(avroScale) {
num = num.Truncate(int32(avroScale))
slog.Warn("Truncated NUMERIC value", slog.Any("number", num))
}
}
return num, nil
}

// GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind.
// The function takes in two parameters, a QValueKind and a boolean indicating if the
// Avro schema should respect null values. It returns a QValueKindAvroSchema object
Expand Down Expand Up @@ -430,43 +443,12 @@ func (c *QValueAvroConverter) processNullableUnion(
return value, nil
}

var tenInt = big.NewInt(10)

func countDigits(bi *big.Int) int {
if bi.IsUint64() {
u64 := bi.Uint64()
if u64 < (1 << 53) {
if u64 == 0 {
return 1
}
return int(math.Log10(float64(u64))) + 1
}
} else if bi.IsInt64() {
i64 := bi.Int64()
if i64 > -(1 << 53) {
return int(math.Log10(float64(-i64))) + 1
}
}

abs := new(big.Int).Abs(bi)
// lg10 may be off by 1, need to verify
lg10 := int(float64(abs.BitLen()) / math.Log2(10))
check := big.NewInt(int64(lg10))
return lg10 + abs.Cmp(check.Exp(tenInt, check, nil))
}

func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} {
if c.TargetDWH == protos.DBType_SNOWFLAKE {
bidigi := countDigits(num.BigInt())
avroPrecision, avroScale := DetermineNumericSettingForDWH(c.Precision, c.Scale, c.TargetDWH)

if bidigi+int(avroScale) > int(avroPrecision) {
slog.Warn("Clearing NUMERIC value with too many digits for Snowflake!", slog.Any("number", num))
return nil
} else if num.Exponent() < -int32(avroScale) {
num = num.Round(int32(avroScale))
}
num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH)
if err != nil {
return nil
}

rat := num.Rat()
if c.Nullable {
return goavro.Union("bytes.decimal", rat)
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/dwh.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBTy
warehouseNumeric = numeric.DefaultNumericCompatibility{}
}

if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) {
if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) {
precision, scale = warehouseNumeric.DefaultPrecisionAndScale()
}

Expand Down
5 changes: 5 additions & 0 deletions ui/app/mirrors/create/cdc/guide.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,11 @@ const GuideForDestinationSetup = ({
return 'https://docs.peerdb.io/connect/cloudsql_postgres';
case 'CRUNCHY POSTGRES':
return 'https://docs.peerdb.io/connect/crunchy_bridge';
case 'CONFLUENT':
return 'https://docs.peerdb.io/connect/confluent-cloud';
case 'REDPANDA':
case 'KAFKA':
return 'https://docs.peerdb.io/connect/kafka';
default:
return '';
}
Expand Down
7 changes: 7 additions & 0 deletions ui/app/peers/create/[peerType]/helpers/s3.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,13 @@ export const s3Setting: PeerSetting[] = [
setter((curr) => ({ ...curr, region: value as string })),
tips: 'The region where your bucket is located. For example, us-east-1. In case of GCS, this will be set to auto, which detects where your bucket it.',
},
{
label: 'Endpoint',
stateHandler: (value, setter) =>
setter((curr) => ({ ...curr, endpoint: value as string })),
tips: 'The endpoint of your S3 bucket. This is optional.',
optional: true,
},
{
label: 'Role ARN',
stateHandler: (value, setter) =>
Expand Down
3 changes: 3 additions & 0 deletions ui/app/peers/create/[peerType]/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ export default function CreateConfig({
if (peerType.includes('POSTGRES') || peerType.includes('TEMBO')) {
return 'POSTGRES';
}
if (peerType === 'CONFLUENT' || peerType === 'REDPANDA') {
return 'KAFKA';
}
return peerType;
};

Expand Down
1 change: 1 addition & 0 deletions ui/app/utils/gcsEndpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const GCS_ENDPOINT = 'https://storage.googleapis.com';
4 changes: 4 additions & 0 deletions ui/components/PeerComponent.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => {
return '/svgs/ms.svg';
case DBType.KAFKA:
case 'KAFKA':
return '/svgs/kafka.svg';
case 'CONFLUENT':
return '/svgs/confluent.svg';
case 'REDPANDA':
return '/svgs/redpanda.svg';
case DBType.PUBSUB:
case 'PUBSUB':
Expand Down
13 changes: 3 additions & 10 deletions ui/components/PeerForms/S3Form.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use client';
import { PeerSetter } from '@/app/dto/PeersDTO';
import { s3Setting } from '@/app/peers/create/[peerType]/helpers/s3';
import { GCS_ENDPOINT } from '@/app/utils/gcsEndpoint';
import { Label } from '@/lib/Label';
import { RowWithRadiobutton, RowWithTextField } from '@/lib/Layout';
import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup';
Expand All @@ -16,26 +17,18 @@ const S3Form = ({ setter }: S3Props) => {
const [storageType, setStorageType] = useState<'S3' | 'GCS'>('S3');
const displayCondition = (label: string) => {
return !(
(label === 'Region' || label === 'Role ARN') &&
(label === 'Region' || label === 'Role ARN' || label === 'Endpoint') &&
storageType === 'GCS'
);
};

useEffect(() => {
const endpoint =
storageType === 'S3' ? '' : 'https://storage.googleapis.com';
setter((prev) => {
return {
...prev,
endpoint,
};
});

if (storageType === 'GCS') {
setter((prev) => {
return {
...prev,
region: 'auto',
endpoint: GCS_ENDPOINT,
};
});
}
Expand Down
2 changes: 1 addition & 1 deletion ui/components/SelectSource.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const dbTypes = [
'CRUNCHY POSTGRES',
],
['Warehouses', 'SNOWFLAKE', 'BIGQUERY', 'S3', 'CLICKHOUSE', 'ELASTICSEARCH'],
['Queues', 'KAFKA', 'EVENTHUBS', 'PUBSUB'],
['Queues', 'REDPANDA', 'CONFLUENT', 'KAFKA', 'EVENTHUBS', 'PUBSUB'],
];

const gridContainerStyle = {
Expand Down
30 changes: 30 additions & 0 deletions ui/public/svgs/confluent.svg
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.

0 comments on commit 1bc19eb

Please sign in to comment.