diff --git a/flow/connectors/bigquery/qrep_avro_sync.go b/flow/connectors/bigquery/qrep_avro_sync.go index d022045b70..aad8078ac3 100644 --- a/flow/connectors/bigquery/qrep_avro_sync.go +++ b/flow/connectors/bigquery/qrep_avro_sync.go @@ -14,7 +14,6 @@ import ( "go.temporal.io/sdk/activity" avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro" - numeric "github.com/PeerDB-io/peer-flow/datatypes" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" "github.com/PeerDB-io/peer-flow/model/qvalue" @@ -281,12 +280,8 @@ func DefineAvroSchema(dstTableName string, } func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) { - avroNumericPrecision := int16(bqField.Precision) - avroNumericScale := int16(bqField.Scale) - bqNumeric := numeric.BigQueryNumericCompatibility{} - if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) { - avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale() - } + avroNumericPrecision, avroNumericScale := qvalue.DetermineNumericSettingForDWH( + int16(bqField.Precision), int16(bqField.Scale), protos.DBType_BIGQUERY) considerRepeated := func(typ string, repeated bool) interface{} { if repeated { diff --git a/flow/connectors/postgres/postgres.go b/flow/connectors/postgres/postgres.go index d8dea391de..6bd8401064 100644 --- a/flow/connectors/postgres/postgres.go +++ b/flow/connectors/postgres/postgres.go @@ -740,6 +740,7 @@ func (c *PostgresConnector) GetTableSchema( } tableSchema, err := c.getTableSchemaForTable(ctx, tableName, req.System) if err != nil { + c.logger.Info("error fetching schema for table "+tableName, slog.Any("error", err)) return nil, err } res[tableName] = tableSchema diff --git a/flow/connectors/utils/aws.go b/flow/connectors/utils/aws.go index d1886f2c1f..7138144bb9 100644 --- a/flow/connectors/utils/aws.go +++ b/flow/connectors/utils/aws.go @@ -170,13 +170,13 @@ func LoadPeerDBAWSEnvConfigProvider(connectorName string) AWSCredentialsProvider func GetAWSCredentialsProvider(ctx context.Context, connectorName string, peerCredentials PeerAWSCredentials) (AWSCredentialsProvider, error) { if !(peerCredentials.Credentials.AccessKeyID == "" && peerCredentials.Credentials.SecretAccessKey == "" && - peerCredentials.Region == "" && peerCredentials.RoleArn == nil && + peerCredentials.Region == "" && (peerCredentials.RoleArn == nil || *peerCredentials.RoleArn == "") && (peerCredentials.EndpointUrl == nil || *peerCredentials.EndpointUrl == "")) { staticProvider := NewStaticAWSCredentialsProvider(AWSCredentials{ AWS: peerCredentials.Credentials, EndpointUrl: peerCredentials.EndpointUrl, }, peerCredentials.Region) - if peerCredentials.RoleArn == nil { + if peerCredentials.RoleArn == nil || *peerCredentials.RoleArn == "" { logger.LoggerFromCtx(ctx).Info("Received AWS credentials from peer for connector: " + connectorName) return staticProvider, nil } diff --git a/flow/datatypes/bigint.go b/flow/datatypes/bigint.go new file mode 100644 index 0000000000..28a3b97148 --- /dev/null +++ b/flow/datatypes/bigint.go @@ -0,0 +1,33 @@ +package datatypes + +import ( + "math" + "math/big" +) + +var tenInt = big.NewInt(10) + +func CountDigits(bi *big.Int) int { + if bi.IsInt64() { + i64 := bi.Int64() + // restrict fast path to integers with exact conversion to float64 + if i64 <= (1<<53) && i64 >= -(1<<53) { + if i64 == 0 { + return 1 + } + return int(math.Log10(math.Abs(float64(i64)))) + 1 + } + } + + estimatedNumDigits := int(float64(bi.BitLen()) / math.Log2(10)) + + // estimatedNumDigits (lg10) may be off by 1, need to verify + digitsBigInt := big.NewInt(int64(estimatedNumDigits)) + errorCorrectionUnit := digitsBigInt.Exp(tenInt, digitsBigInt, nil) + + if bi.CmpAbs(errorCorrectionUnit) >= 0 { + return estimatedNumDigits + 1 + } + + return estimatedNumDigits +} diff --git a/flow/datatypes/bigint_test.go b/flow/datatypes/bigint_test.go new file mode 100644 index 0000000000..43d386ebb8 --- /dev/null +++ b/flow/datatypes/bigint_test.go @@ -0,0 +1,38 @@ +package datatypes + +import ( + "math/big" + "testing" +) + +func TestCountDigits(t *testing.T) { + bi := big.NewInt(-0) + expected := 1 + result := CountDigits(bi) + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + bi = big.NewInt(0) + expected = 1 + result = CountDigits(bi) + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + // 18 nines + bi = big.NewInt(999999999999999999) + result = CountDigits(bi) + expected = 18 + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } + + // 18 nines + bi = big.NewInt(-999999999999999999) + result = CountDigits(bi) + expected = 18 + if result != expected { + t.Errorf("Unexpected result. Expected: %v, but got: %v", expected, result) + } +} diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 356ef06605..3051ca40d8 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -15,7 +15,7 @@ type WarehouseNumericCompatibility interface { MaxPrecision() int16 MaxScale() int16 DefaultPrecisionAndScale() (int16, int16) - IsValidPrevisionAndScale(precision, scale int16) bool + IsValidPrecisionAndScale(precision, scale int16) bool } type ClickHouseNumericCompatibility struct{} @@ -32,7 +32,7 @@ func (ClickHouseNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) return PeerDBClickhousePrecision, PeerDBClickhouseScale } -func (ClickHouseNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (ClickHouseNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return precision > 0 && precision <= PeerDBClickhousePrecision && scale < precision } @@ -50,7 +50,7 @@ func (SnowflakeNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return PeerDBSnowflakePrecision, PeerDBSnowflakeScale } -func (SnowflakeNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (SnowflakeNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return precision > 0 && precision <= 38 && scale < precision } @@ -68,8 +68,9 @@ func (BigQueryNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return PeerDBBigQueryPrecision, PeerDBBigQueryScale } -func (BigQueryNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { - return precision > 0 && precision <= 38 && scale <= 20 && scale < precision +func (BigQueryNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { + return precision > 0 && precision <= PeerDBBigQueryPrecision && + scale <= PeerDBBigQueryScale && scale < precision } type DefaultNumericCompatibility struct{} @@ -86,7 +87,7 @@ func (DefaultNumericCompatibility) DefaultPrecisionAndScale() (int16, int16) { return 38, 20 } -func (DefaultNumericCompatibility) IsValidPrevisionAndScale(precision, scale int16) bool { +func (DefaultNumericCompatibility) IsValidPrecisionAndScale(precision, scale int16) bool { return true } @@ -112,7 +113,7 @@ func GetNumericTypeForWarehouse(typmod int32, warehouseNumeric WarehouseNumericC } precision, scale := ParseNumericTypmod(typmod) - if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) { + if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) { return warehouseNumeric.DefaultPrecisionAndScale() } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 763009b2c6..764fd86759 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -4,8 +4,6 @@ import ( "errors" "fmt" "log/slog" - "math" - "math/big" "time" "github.com/google/uuid" @@ -52,6 +50,21 @@ type AvroSchemaField struct { LogicalType string `json:"logicalType,omitempty"` } +func TruncateOrLogNumeric(num decimal.Decimal, precision int16, scale int16, targetDB protos.DBType) (decimal.Decimal, error) { + if targetDB == protos.DBType_SNOWFLAKE || targetDB == protos.DBType_BIGQUERY { + bidigi := datatypes.CountDigits(num.BigInt()) + avroPrecision, avroScale := DetermineNumericSettingForDWH(precision, scale, targetDB) + if bidigi+int(avroScale) > int(avroPrecision) { + slog.Warn("Clearing NUMERIC value with too many digits", slog.Any("number", num)) + return num, errors.New("invalid numeric") + } else if num.Exponent() < -int32(avroScale) { + num = num.Truncate(int32(avroScale)) + slog.Warn("Truncated NUMERIC value", slog.Any("number", num)) + } + } + return num, nil +} + // GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind. // The function takes in two parameters, a QValueKind and a boolean indicating if the // Avro schema should respect null values. It returns a QValueKindAvroSchema object @@ -430,43 +443,12 @@ func (c *QValueAvroConverter) processNullableUnion( return value, nil } -var tenInt = big.NewInt(10) - -func countDigits(bi *big.Int) int { - if bi.IsUint64() { - u64 := bi.Uint64() - if u64 < (1 << 53) { - if u64 == 0 { - return 1 - } - return int(math.Log10(float64(u64))) + 1 - } - } else if bi.IsInt64() { - i64 := bi.Int64() - if i64 > -(1 << 53) { - return int(math.Log10(float64(-i64))) + 1 - } - } - - abs := new(big.Int).Abs(bi) - // lg10 may be off by 1, need to verify - lg10 := int(float64(abs.BitLen()) / math.Log2(10)) - check := big.NewInt(int64(lg10)) - return lg10 + abs.Cmp(check.Exp(tenInt, check, nil)) -} - func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { - if c.TargetDWH == protos.DBType_SNOWFLAKE { - bidigi := countDigits(num.BigInt()) - avroPrecision, avroScale := DetermineNumericSettingForDWH(c.Precision, c.Scale, c.TargetDWH) - - if bidigi+int(avroScale) > int(avroPrecision) { - slog.Warn("Clearing NUMERIC value with too many digits for Snowflake!", slog.Any("number", num)) - return nil - } else if num.Exponent() < -int32(avroScale) { - num = num.Round(int32(avroScale)) - } + num, err := TruncateOrLogNumeric(num, c.Precision, c.Scale, c.TargetDWH) + if err != nil { + return nil } + rat := num.Rat() if c.Nullable { return goavro.Union("bytes.decimal", rat) diff --git a/flow/model/qvalue/dwh.go b/flow/model/qvalue/dwh.go index 5dd7440baf..cb6eb9e868 100644 --- a/flow/model/qvalue/dwh.go +++ b/flow/model/qvalue/dwh.go @@ -22,7 +22,7 @@ func DetermineNumericSettingForDWH(precision int16, scale int16, dwh protos.DBTy warehouseNumeric = numeric.DefaultNumericCompatibility{} } - if !warehouseNumeric.IsValidPrevisionAndScale(precision, scale) { + if !warehouseNumeric.IsValidPrecisionAndScale(precision, scale) { precision, scale = warehouseNumeric.DefaultPrecisionAndScale() } diff --git a/ui/app/mirrors/create/cdc/guide.tsx b/ui/app/mirrors/create/cdc/guide.tsx index c53562cff0..4a08219a06 100644 --- a/ui/app/mirrors/create/cdc/guide.tsx +++ b/ui/app/mirrors/create/cdc/guide.tsx @@ -22,6 +22,11 @@ const GuideForDestinationSetup = ({ return 'https://docs.peerdb.io/connect/cloudsql_postgres'; case 'CRUNCHY POSTGRES': return 'https://docs.peerdb.io/connect/crunchy_bridge'; + case 'CONFLUENT': + return 'https://docs.peerdb.io/connect/confluent-cloud'; + case 'REDPANDA': + case 'KAFKA': + return 'https://docs.peerdb.io/connect/kafka'; default: return ''; } diff --git a/ui/app/peers/create/[peerType]/helpers/s3.ts b/ui/app/peers/create/[peerType]/helpers/s3.ts index cd34c443a3..8e98b91649 100644 --- a/ui/app/peers/create/[peerType]/helpers/s3.ts +++ b/ui/app/peers/create/[peerType]/helpers/s3.ts @@ -33,6 +33,13 @@ export const s3Setting: PeerSetting[] = [ setter((curr) => ({ ...curr, region: value as string })), tips: 'The region where your bucket is located. For example, us-east-1. In case of GCS, this will be set to auto, which detects where your bucket it.', }, + { + label: 'Endpoint', + stateHandler: (value, setter) => + setter((curr) => ({ ...curr, endpoint: value as string })), + tips: 'The endpoint of your S3 bucket. This is optional.', + optional: true, + }, { label: 'Role ARN', stateHandler: (value, setter) => diff --git a/ui/app/peers/create/[peerType]/page.tsx b/ui/app/peers/create/[peerType]/page.tsx index 0e46f5f4b7..22be6043a6 100644 --- a/ui/app/peers/create/[peerType]/page.tsx +++ b/ui/app/peers/create/[peerType]/page.tsx @@ -52,6 +52,9 @@ export default function CreateConfig({ if (peerType.includes('POSTGRES') || peerType.includes('TEMBO')) { return 'POSTGRES'; } + if (peerType === 'CONFLUENT' || peerType === 'REDPANDA') { + return 'KAFKA'; + } return peerType; }; diff --git a/ui/app/utils/gcsEndpoint.ts b/ui/app/utils/gcsEndpoint.ts new file mode 100644 index 0000000000..eb379dc106 --- /dev/null +++ b/ui/app/utils/gcsEndpoint.ts @@ -0,0 +1 @@ +export const GCS_ENDPOINT = 'https://storage.googleapis.com'; diff --git a/ui/components/PeerComponent.tsx b/ui/components/PeerComponent.tsx index bedbf41531..5689ea845f 100644 --- a/ui/components/PeerComponent.tsx +++ b/ui/components/PeerComponent.tsx @@ -37,6 +37,10 @@ export const DBTypeToImageMapping = (peerType: DBType | string) => { return '/svgs/ms.svg'; case DBType.KAFKA: case 'KAFKA': + return '/svgs/kafka.svg'; + case 'CONFLUENT': + return '/svgs/confluent.svg'; + case 'REDPANDA': return '/svgs/redpanda.svg'; case DBType.PUBSUB: case 'PUBSUB': diff --git a/ui/components/PeerForms/S3Form.tsx b/ui/components/PeerForms/S3Form.tsx index f4cfcc351f..db075c3366 100644 --- a/ui/components/PeerForms/S3Form.tsx +++ b/ui/components/PeerForms/S3Form.tsx @@ -1,6 +1,7 @@ 'use client'; import { PeerSetter } from '@/app/dto/PeersDTO'; import { s3Setting } from '@/app/peers/create/[peerType]/helpers/s3'; +import { GCS_ENDPOINT } from '@/app/utils/gcsEndpoint'; import { Label } from '@/lib/Label'; import { RowWithRadiobutton, RowWithTextField } from '@/lib/Layout'; import { RadioButton, RadioButtonGroup } from '@/lib/RadioButtonGroup'; @@ -16,26 +17,18 @@ const S3Form = ({ setter }: S3Props) => { const [storageType, setStorageType] = useState<'S3' | 'GCS'>('S3'); const displayCondition = (label: string) => { return !( - (label === 'Region' || label === 'Role ARN') && + (label === 'Region' || label === 'Role ARN' || label === 'Endpoint') && storageType === 'GCS' ); }; useEffect(() => { - const endpoint = - storageType === 'S3' ? '' : 'https://storage.googleapis.com'; - setter((prev) => { - return { - ...prev, - endpoint, - }; - }); - if (storageType === 'GCS') { setter((prev) => { return { ...prev, region: 'auto', + endpoint: GCS_ENDPOINT, }; }); } diff --git a/ui/components/SelectSource.tsx b/ui/components/SelectSource.tsx index cdb788d858..5464c733c2 100644 --- a/ui/components/SelectSource.tsx +++ b/ui/components/SelectSource.tsx @@ -42,7 +42,7 @@ const dbTypes = [ 'CRUNCHY POSTGRES', ], ['Warehouses', 'SNOWFLAKE', 'BIGQUERY', 'S3', 'CLICKHOUSE', 'ELASTICSEARCH'], - ['Queues', 'KAFKA', 'EVENTHUBS', 'PUBSUB'], + ['Queues', 'REDPANDA', 'CONFLUENT', 'KAFKA', 'EVENTHUBS', 'PUBSUB'], ]; const gridContainerStyle = { diff --git a/ui/public/svgs/confluent.svg b/ui/public/svgs/confluent.svg new file mode 100644 index 0000000000..0b821f23fa --- /dev/null +++ b/ui/public/svgs/confluent.svg @@ -0,0 +1,30 @@ + + + + + + + + + + + + + + + + + \ No newline at end of file