diff --git a/flow/connectors/clickhouse/normalize.go b/flow/connectors/clickhouse/normalize.go index c971c3f692..d65d61e9d7 100644 --- a/flow/connectors/clickhouse/normalize.go +++ b/flow/connectors/clickhouse/normalize.go @@ -77,6 +77,16 @@ func getColName(overrides map[string]string, name string) string { return name } +func getClickhouseTypeForNumericColumn(column *protos.FieldDescription) string { + rawPrecision, _ := datatypes.ParseNumericTypmod(column.TypeModifier) + if rawPrecision > datatypes.PeerDBClickHouseMaxPrecision { + return "String" + } else { + precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) + return fmt.Sprintf("Decimal(%d, %d)", precision, scale) + } +} + func generateCreateTableSQLForNormalizedTable( ctx context.Context, config *protos.SetupNormalizedTableBatchInput, @@ -129,8 +139,7 @@ func generateCreateTableSQLForNormalizedTable( if clickHouseType == "" { if colType == qvalue.QValueKindNumeric { - precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) - clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale) + clickHouseType = getClickhouseTypeForNumericColumn(column) } else { var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) @@ -323,8 +332,7 @@ func (c *ClickHouseConnector) NormalizeRecords( colSelector.WriteString(fmt.Sprintf("`%s`,", dstColName)) if clickHouseType == "" { if colType == qvalue.QValueKindNumeric { - precision, scale := datatypes.GetNumericTypeForWarehouse(column.TypeModifier, datatypes.ClickHouseNumericCompatibility{}) - clickHouseType = fmt.Sprintf("Decimal(%d, %d)", precision, scale) + clickHouseType = getClickhouseTypeForNumericColumn(column) } else { var err error clickHouseType, err = colType.ToDWHColumnType(protos.DBType_CLICKHOUSE) diff --git a/flow/datatypes/numeric.go b/flow/datatypes/numeric.go index 6c57071484..56c1b17839 100644 --- a/flow/datatypes/numeric.go +++ b/flow/datatypes/numeric.go @@ -5,7 +5,9 @@ const ( PeerDBBigQueryScale = 20 PeerDBSnowflakeScale = 20 PeerDBClickHouseScale = 38 - VARHDRSZ = 4 + + PeerDBClickHouseMaxPrecision = 76 + VARHDRSZ = 4 ) type WarehouseNumericCompatibility interface { @@ -17,7 +19,7 @@ type WarehouseNumericCompatibility interface { type ClickHouseNumericCompatibility struct{} func (ClickHouseNumericCompatibility) MaxPrecision() int16 { - return 76 + return PeerDBClickHouseMaxPrecision } func (ClickHouseNumericCompatibility) MaxScale() int16 { diff --git a/flow/e2e/clickhouse/clickhouse.go b/flow/e2e/clickhouse/clickhouse.go index 891fe55365..e1eafd6b4b 100644 --- a/flow/e2e/clickhouse/clickhouse.go +++ b/flow/e2e/clickhouse/clickhouse.go @@ -2,7 +2,6 @@ package e2e_clickhouse import ( "context" - "errors" "fmt" "reflect" "strings" @@ -93,13 +92,9 @@ func (s ClickHouseSuite) GetRows(table string, cols string) (*model.QRecordBatch return nil, err } - firstCol, _, _ := strings.Cut(cols, ",") - if firstCol == "" { - return nil, errors.New("no columns specified") - } rows, err := ch.Query( context.Background(), - fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY %s SETTINGS use_query_cache = false`, cols, table, firstCol), + fmt.Sprintf(`SELECT %s FROM %s FINAL WHERE _peerdb_is_deleted = 0 ORDER BY 1 SETTINGS use_query_cache = false`, cols, table), ) if err != nil { return nil, err diff --git a/flow/e2e/clickhouse/peer_flow_ch_test.go b/flow/e2e/clickhouse/peer_flow_ch_test.go index 4dcee7feb5..3cf1f97597 100644 --- a/flow/e2e/clickhouse/peer_flow_ch_test.go +++ b/flow/e2e/clickhouse/peer_flow_ch_test.go @@ -4,15 +4,18 @@ import ( "context" "embed" "fmt" + "strings" "testing" "time" + "github.com/shopspring/decimal" "github.com/stretchr/testify/require" "github.com/PeerDB-io/peer-flow/e2e" "github.com/PeerDB-io/peer-flow/e2eshared" "github.com/PeerDB-io/peer-flow/generated/protos" "github.com/PeerDB-io/peer-flow/model" + "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" peerflow "github.com/PeerDB-io/peer-flow/workflows" ) @@ -499,3 +502,59 @@ func (s ClickHouseSuite) Test_Weird_Table_And_Column() { env.Cancel() e2e.RequireEnvCanceled(s.t, env) } + +// large NUMERICs (precision >76) are mapped to String on CH, test +func (s ClickHouseSuite) Test_Large_Numeric() { + srcFullName := s.attachSchemaSuffix("lnumeric") + dstTableName := "lnumeric" + + _, err := s.Conn().Exec(context.Background(), fmt.Sprintf(` + CREATE TABLE IF NOT EXISTS %s( + id INT PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + c1 NUMERIC(76,0), + c2 NUMERIC(78,0) + ); + `, srcFullName)) + require.NoError(s.t, err) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + require.NoError(s.t, err) + + connectionGen := e2e.FlowConnectionGenerationConfig{ + FlowJobName: s.attachSuffix("clickhouse_test_large_numerics"), + TableNameMapping: map[string]string{srcFullName: dstTableName}, + Destination: s.Peer().Name, + } + flowConnConfig := connectionGen.GenerateFlowConnectionConfigs(s.t) + flowConnConfig.DoInitialSnapshot = true + tc := e2e.NewTemporalClient(s.t) + env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil) + e2e.SetupCDCFlowStatusQuery(s.t, env, flowConnConfig) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 1) + + _, err = s.Conn().Exec(context.Background(), fmt.Sprintf(` + INSERT INTO %s(c1,c2) VALUES(%s,%s);`, srcFullName, strings.Repeat("7", 76), strings.Repeat("9", 78))) + require.NoError(s.t, err) + + e2e.EnvWaitForCount(env, s, "waiting for CDC count", dstTableName, "id,c1,c2", 2) + + rows, err := s.GetRows(dstTableName, "c1,c2") + require.NoError(s.t, err) + require.Len(s.t, rows.Records, 2, "expected 2 rows") + for _, row := range rows.Records { + require.Len(s.t, row, 2, "expected 2 columns") + require.Equal(s.t, qvalue.QValueKindNumeric, row[0].Kind(), "expected NUMERIC(76,0) to be Decimal") + require.Equal(s.t, qvalue.QValueKindString, row[1].Kind(), "expected NUMERIC(78,0) to be String") + c1, ok := row[0].Value().(decimal.Decimal) + require.True(s.t, ok, "expected NUMERIC(76,0) to be Decimal") + require.Equal(s.t, strings.Repeat("7", 76), c1.String(), "expected NUMERIC(76,0) to be 7s") + c2, ok := row[1].Value().(string) + require.True(s.t, ok, "expected NUMERIC(78,0) to be String") + require.Equal(s.t, strings.Repeat("9", 78), c2, "expected NUMERIC(78,0) to be 9s") + } + + env.Cancel() + e2e.RequireEnvCanceled(s.t, env) +} diff --git a/flow/e2e/test_utils.go b/flow/e2e/test_utils.go index 2eca0520c9..9dadc49852 100644 --- a/flow/e2e/test_utils.go +++ b/flow/e2e/test_utils.go @@ -168,6 +168,30 @@ func EnvWaitForEqualTablesWithNames( }) } +func EnvWaitForCount( + env WorkflowRun, + suite RowSource, + reason string, + dstTable string, + cols string, + expectedCount int, +) { + t := suite.T() + t.Helper() + + EnvWaitFor(t, env, 3*time.Minute, reason, func() bool { + t.Helper() + + rows, err := suite.GetRows(dstTable, cols) + if err != nil { + t.Log(err) + return false + } + + return len(rows.Records) == expectedCount + }) +} + func RequireEnvCanceled(t *testing.T, env WorkflowRun) { t.Helper() EnvWaitForFinished(t, env, time.Minute) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index c2382e5b0c..9738f46e8f 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -103,6 +103,10 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci } return "bytes", nil case QValueKindNumeric: + if targetDWH == protos.DBType_CLICKHOUSE && + precision > datatypes.PeerDBClickHouseMaxPrecision { + return "string", nil + } avroNumericPrecision, avroNumericScale := DetermineNumericSettingForDWH(precision, scale, targetDWH) return AvroSchemaNumeric{ Type: "bytes", @@ -454,6 +458,12 @@ func (c *QValueAvroConverter) processNumeric(num decimal.Decimal) interface{} { return nil } + if c.TargetDWH == protos.DBType_CLICKHOUSE && + c.Precision > datatypes.PeerDBClickHouseMaxPrecision { + // no error returned + numStr, _ := c.processNullableUnion("string", num.String()) + return numStr + } rat := num.Rat() if c.Nullable { return goavro.Union("bytes.decimal", rat)