From db376cb6f88666824056e8b04a2279e70842f51d Mon Sep 17 00:00:00 2001 From: Amogh Bharadwaj Date: Sat, 17 Feb 2024 00:20:04 +0530 Subject: [PATCH] Clickhouse: Dynamic Decimal Type (#1319) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This PR adds support for decimal data type for clickhouse with dynamic precision and scale mapping. This PR has been functionally tested. Screenshot 2024-02-16 at 11 08 44 PM Screenshot 2024-02-16 at 11 11 14 PM --------- Co-authored-by: Philip Dubé --- flow/connectors/clickhouse/clickhouse.go | 6 ++---- flow/connectors/clickhouse/normalize.go | 14 +++++++++++++- flow/model/numeric/scale.go | 6 ++++-- flow/model/qvalue/avro_converter.go | 9 +-------- flow/model/qvalue/dwh_type.go | 18 ++++++++++++++++++ 5 files changed, 38 insertions(+), 15 deletions(-) diff --git a/flow/connectors/clickhouse/clickhouse.go b/flow/connectors/clickhouse/clickhouse.go index d7f0b39ec3..4465ba62b2 100644 --- a/flow/connectors/clickhouse/clickhouse.go +++ b/flow/connectors/clickhouse/clickhouse.go @@ -5,7 +5,6 @@ import ( "crypto/tls" "database/sql" "fmt" - "log/slog" "net/url" "github.com/ClickHouse/clickhouse-go/v2" @@ -45,7 +44,6 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error return fmt.Errorf("failed to create S3 bucket and prefix: %w", err) } - slog.Info(fmt.Sprintf("Validating S3 bucke: %s", object.Bucket)) _, listErr := s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{ Bucket: &object.Bucket, }, @@ -59,7 +57,7 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error // Creates and drops a dummy table to validate the peer func ValidateClickhouse(ctx context.Context, conn *sql.DB) error { - validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4)) + validateDummyTableName := "peerdb_validation_" + shared.RandomString(4) // create a table _, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory", validateDummyTableName)) @@ -74,7 +72,7 @@ func ValidateClickhouse(ctx context.Context, conn *sql.DB) error { } // drop the table - _, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName)) + _, err = conn.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName) if err != nil { return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err) } diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index a499540aef..a1e6e43c46 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -9,6 +9,7 @@ import ( "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/numeric" "github.com/PeerDB-io/peer-flow/model/qvalue" ) @@ -79,7 +80,18 @@ func generateCreateTableSQLForNormalizedTable( if err != nil { return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err) } - stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType)) + + if colType == qvalue.QValueKindNumeric { + precision, scale := numeric.ParseNumericTypmod(column.TypeModifier) + if column.TypeModifier == -1 || precision > 76 || scale > precision { + precision = numeric.PeerDBClickhousePrecision + scale = numeric.PeerDBClickhouseScale + } + stmtBuilder.WriteString(fmt.Sprintf("`%s` DECIMAL(%d, %d), ", + colName, precision, scale)) + } else { + stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType)) + } } // TODO support soft delete diff --git a/flow/model/numeric/scale.go b/flow/model/numeric/scale.go index 1f14c1520c..3e0d1381e9 100644 --- a/flow/model/numeric/scale.go +++ b/flow/model/numeric/scale.go @@ -1,8 +1,10 @@ package numeric const ( - PeerDBNumericPrecision = 38 - PeerDBNumericScale = 20 + PeerDBNumericPrecision = 38 + PeerDBNumericScale = 20 + PeerDBClickhousePrecision = 76 + PeerDBClickhouseScale = 38 ) // This is to reverse what make_numeric_typmod of Postgres does: diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 24aeb73e20..28a4da4e30 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -12,7 +12,6 @@ import ( "go.temporal.io/sdk/log" hstore_util "github.com/PeerDB-io/peer-flow/hstore" - "github.com/PeerDB-io/peer-flow/model/numeric" ) // https://avro.apache.org/docs/1.11.0/spec.html @@ -58,13 +57,6 @@ type AvroSchemaField struct { // For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds // will return an error. func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) { - avroNumericPrecision := precision - avroNumericScale := scale - if precision > 38 || precision <= 0 || scale > 37 || scale < 0 { - avroNumericPrecision = numeric.PeerDBNumericPrecision - avroNumericScale = numeric.PeerDBNumericScale - } - switch kind { case QValueKindString, QValueKindQChar: return "string", nil @@ -86,6 +78,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision case QValueKindBytes, QValueKindBit: return "bytes", nil case QValueKindNumeric: + avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) return AvroSchemaNumeric{ Type: "bytes", LogicalType: "decimal", diff --git a/flow/model/qvalue/dwh_type.go b/flow/model/qvalue/dwh_type.go index 293e396847..b6e682be1b 100644 --- a/flow/model/qvalue/dwh_type.go +++ b/flow/model/qvalue/dwh_type.go @@ -1,5 +1,7 @@ package qvalue +import "github.com/PeerDB-io/peer-flow/model/numeric" + type QDWHType int const ( @@ -8,3 +10,19 @@ const ( QDWHTypeBigQuery QDWHType = 3 QDWHTypeClickhouse QDWHType = 4 ) + +func DetermineNumericSettingForDWH(precision int16, scale int16, dwh QDWHType) (int16, int16) { + if dwh == QDWHTypeClickhouse { + if precision > numeric.PeerDBClickhousePrecision || precision <= 0 || + scale > precision || scale < 0 { + return numeric.PeerDBClickhousePrecision, numeric.PeerDBClickhouseScale + } + } else { + if precision > numeric.PeerDBNumericPrecision || precision <= 0 || + scale > numeric.PeerDBNumericScale || scale < 0 { + return numeric.PeerDBNumericPrecision, numeric.PeerDBNumericScale + } + } + + return precision, scale +}