Skip to content

Commit

Permalink
Clickhouse: Dynamic Decimal Type (#1319)
Browse files Browse the repository at this point in the history
This PR adds support for decimal data type for clickhouse with dynamic
precision and scale mapping. This PR has been functionally tested.
<img width="730" alt="Screenshot 2024-02-16 at 11 08 44 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/c6f38180-8eb5-4aa0-ae07-2ccfeeefaae8">
<img width="646" alt="Screenshot 2024-02-16 at 11 11 14 PM"
src="https://github.com/PeerDB-io/peerdb/assets/65964360/cffcf809-084b-4498-9630-f90353b7aea6">

---------

Co-authored-by: Philip Dubé <[email protected]>
  • Loading branch information
Amogh-Bharadwaj and serprex authored Feb 16, 2024
1 parent 1524cb3 commit db376cb
Show file tree
Hide file tree
Showing 5 changed files with 38 additions and 15 deletions.
6 changes: 2 additions & 4 deletions flow/connectors/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import (
"crypto/tls"
"database/sql"
"fmt"
"log/slog"
"net/url"

"github.com/ClickHouse/clickhouse-go/v2"
Expand Down Expand Up @@ -45,7 +44,6 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error
return fmt.Errorf("failed to create S3 bucket and prefix: %w", err)
}

slog.Info(fmt.Sprintf("Validating S3 bucke: %s", object.Bucket))
_, listErr := s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
Bucket: &object.Bucket,
},
Expand All @@ -59,7 +57,7 @@ func ValidateS3(ctx context.Context, creds *utils.ClickhouseS3Credentials) error

// Creates and drops a dummy table to validate the peer
func ValidateClickhouse(ctx context.Context, conn *sql.DB) error {
validateDummyTableName := fmt.Sprintf("peerdb_validation_%s", shared.RandomString(4))
validateDummyTableName := "peerdb_validation_" + shared.RandomString(4)
// create a table
_, err := conn.ExecContext(ctx, fmt.Sprintf("CREATE TABLE IF NOT EXISTS %s (id UInt64) ENGINE = Memory",
validateDummyTableName))
Expand All @@ -74,7 +72,7 @@ func ValidateClickhouse(ctx context.Context, conn *sql.DB) error {
}

// drop the table
_, err = conn.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", validateDummyTableName))
_, err = conn.ExecContext(ctx, "DROP TABLE IF EXISTS "+validateDummyTableName)
if err != nil {
return fmt.Errorf("failed to drop validation table %s: %w", validateDummyTableName, err)
}
Expand Down
14 changes: 13 additions & 1 deletion flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

Expand Down Expand Up @@ -79,7 +80,18 @@ func generateCreateTableSQLForNormalizedTable(
if err != nil {
return "", fmt.Errorf("error while converting column type to clickhouse type: %w", err)
}
stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType))

if colType == qvalue.QValueKindNumeric {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 76 || scale > precision {
precision = numeric.PeerDBClickhousePrecision
scale = numeric.PeerDBClickhouseScale
}
stmtBuilder.WriteString(fmt.Sprintf("`%s` DECIMAL(%d, %d), ",
colName, precision, scale))
} else {
stmtBuilder.WriteString(fmt.Sprintf("`%s` %s, ", colName, clickhouseType))
}
}

// TODO support soft delete
Expand Down
6 changes: 4 additions & 2 deletions flow/model/numeric/scale.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
package numeric

const (
PeerDBNumericPrecision = 38
PeerDBNumericScale = 20
PeerDBNumericPrecision = 38
PeerDBNumericScale = 20
PeerDBClickhousePrecision = 76
PeerDBClickhouseScale = 38
)

// This is to reverse what make_numeric_typmod of Postgres does:
Expand Down
9 changes: 1 addition & 8 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ import (
"go.temporal.io/sdk/log"

hstore_util "github.com/PeerDB-io/peer-flow/hstore"
"github.com/PeerDB-io/peer-flow/model/numeric"
)

// https://avro.apache.org/docs/1.11.0/spec.html
Expand Down Expand Up @@ -58,13 +57,6 @@ type AvroSchemaField struct {
// For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds
// will return an error.
func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) {
avroNumericPrecision := precision
avroNumericScale := scale
if precision > 38 || precision <= 0 || scale > 37 || scale < 0 {
avroNumericPrecision = numeric.PeerDBNumericPrecision
avroNumericScale = numeric.PeerDBNumericScale
}

switch kind {
case QValueKindString, QValueKindQChar:
return "string", nil
Expand All @@ -86,6 +78,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision
case QValueKindBytes, QValueKindBit:
return "bytes", nil
case QValueKindNumeric:
avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH)
return AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Expand Down
18 changes: 18 additions & 0 deletions flow/model/qvalue/dwh_type.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package qvalue

import "github.com/PeerDB-io/peer-flow/model/numeric"

type QDWHType int

const (
Expand All @@ -8,3 +10,19 @@ const (
QDWHTypeBigQuery QDWHType = 3
QDWHTypeClickhouse QDWHType = 4
)

func DetermineNumericSettingForDWH(precision int16, scale int16, dwh QDWHType) (int16, int16) {
if dwh == QDWHTypeClickhouse {
if precision > numeric.PeerDBClickhousePrecision || precision <= 0 ||
scale > precision || scale < 0 {
return numeric.PeerDBClickhousePrecision, numeric.PeerDBClickhouseScale
}
} else {
if precision > numeric.PeerDBNumericPrecision || precision <= 0 ||
scale > numeric.PeerDBNumericScale || scale < 0 {
return numeric.PeerDBNumericPrecision, numeric.PeerDBNumericScale
}
}

return precision, scale
}

0 comments on commit db376cb

Please sign in to comment.