Skip to content

Commit

Permalink
[clickhouse] handle numerics with >76 precision as string (#2209)
Browse files Browse the repository at this point in the history
  • Loading branch information
heavycrystal authored Oct 31, 2024
1 parent 4a6bd86 commit 69e355f
Show file tree
Hide file tree
Showing 6 changed files with 110 additions and 12 deletions.
16 changes: 12 additions & 4 deletions flow/connectors/clickhouse/normalize.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,16 @@ func getColName(overrides map[string]string, name string) string {
return name
}

func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string {
rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier)
if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision {
return "String"
} else {
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
return fmt.Sprintf("Decimal(%d, %d)", precision, scale)
}
}

func generateCreateTableSQLForNormalizedTable(
ctx context.Context,
config *protos.SetupNormalizedTableBatchInput,
Expand Down Expand Up @@ -129,8 +139,7 @@ func generateCreateTableSQLForNormalizedTable(

if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale)
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
Expand Down Expand Up @@ -323,8 +332,7 @@ func (c *ClickHouseConnector) NormalizeRecords(
colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName))
if clickHouseType == "" {
if colType == qvalue.QValueKindNumeric {
precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{})
clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale)
clickHouseType = getClickhouseTypeForNumericColumn(column)
} else {
var err error
clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE)
Expand Down
6 changes: 4 additions & 2 deletions flow/datatypes/numeric.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ const (
PeerDBBigQueryScale = 20
PeerDBSnowflakeScale = 20
PeerDBClickHouseScale = 38
VARHDRSZ = 4

PeerDBClickHouseMaxPrecision = 76
VARHDRSZ = 4
)

type WarehouseNumericCompatibility interface {
Expand All @@ -17,7 +19,7 @@ type WarehouseNumericCompatibility interface {
type ClickHouseNumericCompatibility struct{}

func (ClickHouseNumericCompatibility) MaxPrecision() int16 {
return 76
return PeerDBClickHouseMaxPrecision
}

func (ClickHouseNumericCompatibility) MaxScale() int16 {
Expand Down
7 changes: 1 addition & 6 deletions flow/e2e/clickhouse/clickhouse.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package e2e_clickhouse

import (
"context"
"errors"
"fmt"
"reflect"
"strings"
Expand Down Expand Up @@ -93,13 +92,9 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch
return nil, err
}

firstCol, _, _ := strings.Cut(cols, ",")
if firstCol == "" {
return nil, errors.New("no columns specified")
}
rows, err := ch.Query(
context.Background(),
fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, cols, table, firstCol),
fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table),
)
if err != nil {
return nil, err
Expand Down
59 changes: 59 additions & 0 deletions flow/e2e/clickhouse/peer_flow_ch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,18 @@ import (
"context"
"embed"
"fmt"
"strings"
"testing"
"time"

"github.com/shopspring/decimal"
"github.com/stretchr/testify/require"

"github.com/PeerDB-io/peer-flow/e2e"
"github.com/PeerDB-io/peer-flow/e2eshared"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
peerflow "github.com/PeerDB-io/peer-flow/workflows"
)
Expand Down Expand Up @@ -499,3 +502,59 @@ func (s ClickHouseSuite) Test_Weird_Table_And_Column() {
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}

// large NUMERICs (precision >76) are mapped to String on CH, test
func (s ClickHouseSuite) Test_Large_Numeric() {
srcFullName := s.attachSchemaSuffix("lnumeric")
dstTableName := "lnumeric"

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS %s(
id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
c1 NUMERIC(76,0),
c2 NUMERIC(78,0)
);
`, srcFullName))
require.NoError(s.t, err)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78)))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("clickhouse_test_large_numerics"),
TableNameMapping: map[string]string{srcFullName: dstTableName},
Destination: s.Peer().Name,
}
flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t)
flowConnConfig.DoInitialSnapshot = true
tc := e2e.NewTemporalClient(s.t)
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 1)

_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78)))
require.NoError(s.t, err)

e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 2)

rows, err := s.GetRows(dstTableName, "c1,c2")
require.NoError(s.t, err)
require.Len(s.t, rows.Records, 2, "expected 2 rows")
for _, row := range rows.Records {
require.Len(s.t, row, 2, "expected 2 columns")
require.Equal(s.t, qvalue.QValueKindNumeric, row[0].Kind(), "expected NUMERIC(76,0) to be Decimal")
require.Equal(s.t, qvalue.QValueKindString, row[1].Kind(), "expected NUMERIC(78,0) to be String")
c1, ok := row[0].Value().(decimal.Decimal)
require.True(s.t, ok, "expected NUMERIC(76,0) to be Decimal")
require.Equal(s.t, strings.Repeat("7", 76), c1.String(), "expected NUMERIC(76,0) to be 7s")
c2, ok := row[1].Value().(string)
require.True(s.t, ok, "expected NUMERIC(78,0) to be String")
require.Equal(s.t, strings.Repeat("9", 78), c2, "expected NUMERIC(78,0) to be 9s")
}

env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}
24 changes: 24 additions & 0 deletions flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,30 @@ func EnvWaitForEqualTablesWithNames(
})
}

func EnvWaitForCount(
env WorkflowRun,
suite RowSource,
reason string,
dstTable string,
cols string,
expectedCount int,
) {
t := suite.T()
t.Helper()

EnvWaitFor(t, env, 3*time.Minute, reason, func() bool {
t.Helper()

rows, err := suite.GetRows(dstTable, cols)
if err != nil {
t.Log(err)
return false
}

return len(rows.Records) == expectedCount
})
}

func RequireEnvCanceled(t *testing.T, env WorkflowRun) {
t.Helper()
EnvWaitForFinished(t, env, time.Minute)
Expand Down
10 changes: 10 additions & 0 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,10 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci
}
return "bytes", nil
case QValueKindNumeric:
if targetDWH == protos.DBType_CLICKHOUSE &&
precision > datatypes.PeerDBClickHouseMaxPrecision {
return "string", nil
}
avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH)
return AvroSchemaNumeric{
Type: "bytes",
Expand Down Expand Up @@ -454,6 +458,12 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} {
return nil
}

if c.TargetDWH == protos.DBType_CLICKHOUSE &&
c.Precision > datatypes.PeerDBClickHouseMaxPrecision {
// no error returned
numStr, _ := c.processNullableUnion("string", num.String())
return numStr
}
rat := num.Rat()
if c.Nullable {
return goavro.Union("bytes.decimal", rat)
Expand Down

0 comments on commit 69e355f

Please sign in to comment.