Skip to content

Commit

Permalink
Support dynamic numeric with defaults (#1194)
Browse files Browse the repository at this point in the history
Suppose the column on PG table is `numeric(20,17)` . Right now we are
not picking up the `(20,17)` part when getting table schema, and on
snowflake end we are creating the column as `number(38,9)`

This PR propagates precision and scale information from pull to sync,
with default of 38, 9 for the unbounded case (just `numeric`)

Fixes #1192
  • Loading branch information
Amogh-Bharadwaj authored Feb 1, 2024
1 parent d85cf33 commit bb8bc2f
Show file tree
Hide file tree
Showing 16 changed files with 117 additions and 62 deletions.
26 changes: 21 additions & 5 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -672,11 +673,26 @@ func (c *BigQueryConnector) SetupNormalizedTables(
columns := make([]*bigquery.FieldSchema, 0, len(tableSchema.Columns)+2)
for _, column := range tableSchema.Columns {
genericColType := column.Type
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: qValueKindToBigQueryType(genericColType),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
})
if genericColType == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: bigquery.BigNumericFieldType,
Repeated: qvalue.QValueKind(genericColType).IsArray(),
Precision: int64(precision),
Scale: int64(scale),
})
} else {
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: qValueKindToBigQueryType(genericColType),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
})
}
}

if req.SoftDeleteColName != "" {
Expand Down
16 changes: 13 additions & 3 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
avro "github.com/PeerDB-io/peer-flow/connectors/utils/avro"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -283,6 +284,14 @@ func DefineAvroSchema(dstTableName string,
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
if avroNumericPrecision > 38 || avroNumericPrecision <= 0 ||
avroNumericScale > 38 || avroNumericScale < 0 {
avroNumericPrecision = numeric.PeerDBNumericPrecision
avroNumericScale = numeric.PeerDBNumericScale
}

considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
return qvalue.AvroSchemaArray{
Expand Down Expand Up @@ -352,12 +361,12 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
},
},
}, nil
case bigquery.NumericFieldType:
case bigquery.BigNumericFieldType:
return qvalue.AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Precision: 38,
Scale: 9,
Precision: avroNumericPrecision,
Scale: avroNumericScale,
}, nil
case bigquery.RecordFieldType:
avroFields := []qvalue.AvroSchemaField{}
Expand Down Expand Up @@ -458,6 +467,7 @@ func (s *QRepAvroSyncMethod) writeToStage(

loader := bqClient.DatasetInProject(s.connector.projectID, stagingTable.dataset).Table(stagingTable.table).LoaderFrom(avroRef)
loader.UseAvroLogicalTypes = true
loader.DecimalTargetTypes = []bigquery.DecimalTargetType{bigquery.BigNumericTargetType}
loader.WriteDisposition = bigquery.WriteTruncate
job, err := loader.Run(s.connector.ctx)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
case qvalue.QValueKindFloat32, qvalue.QValueKindFloat64:
return bigquery.FloatFieldType
case qvalue.QValueKindNumeric:
return bigquery.NumericFieldType
return bigquery.BigNumericFieldType
// string related
case qvalue.QValueKindString:
return bigquery.StringFieldType
Expand Down Expand Up @@ -82,7 +82,7 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind,
return qvalue.QValueKindTime, nil
case bigquery.RecordFieldType:
return qvalue.QValueKindStruct, nil
case bigquery.NumericFieldType:
case bigquery.NumericFieldType, bigquery.BigNumericFieldType:
return qvalue.QValueKindNumeric, nil
case bigquery.GeographyFieldType:
return qvalue.QValueKindGeography, nil
Expand Down
11 changes: 9 additions & 2 deletions flow/connectors/postgres/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (
"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
)

type PGVersion int
Expand Down Expand Up @@ -425,9 +426,15 @@ func generateCreateTableSQLForNormalizedTable(
) string {
createTableSQLArray := make([]string, 0, len(sourceTableSchema.Columns)+2)
for _, column := range sourceTableSchema.Columns {
genericColumnType := column.Type
pgColumnType := qValueKindToPostgresType(column.Type)
if column.Type == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier != -1 {
pgColumnType = fmt.Sprintf("numeric(%d,%d)", precision, scale)
}
}
createTableSQLArray = append(createTableSQLArray,
fmt.Sprintf("%s %s", QuoteIdentifier(column.Name), qValueKindToPostgresType(genericColumnType)))
fmt.Sprintf("%s %s", QuoteIdentifier(column.Name), pgColumnType))
}

if softDeleteColName != "" {
Expand Down
20 changes: 16 additions & 4 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/PeerDB-io/peer-flow/geo"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -118,10 +119,21 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip
// there isn't a way to know if a column is nullable or not
// TODO fix this.
cnullable := true
qfields[i] = model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
if ctype == qvalue.QValueKindNumeric {
precision, scale := numeric.ParseNumericTypmod(fd.TypeModifier)
qfields[i] = model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
Precision: precision,
Scale: scale,
}
} else {
qfields[i] = model.QField{
Name: cname,
Type: ctype,
Nullable: cnullable,
}
}
}
return model.NewQRecordSchema(qfields)
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,8 +107,8 @@ func postgresOIDToQValueKind(recvOID uint32) qvalue.QValueKind {
}
}

func qValueKindToPostgresType(qvalueKind string) string {
switch qvalue.QValueKind(qvalueKind) {
func qValueKindToPostgresType(colTypeStr string) string {
switch qvalue.QValueKind(colTypeStr) {
case qvalue.QValueKindBoolean:
return "BOOLEAN"
case qvalue.QValueKindInt16:
Expand Down
9 changes: 8 additions & 1 deletion flow/connectors/snowflake/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (

"github.com/PeerDB-io/peer-flow/connectors/utils"
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

Expand Down Expand Up @@ -62,9 +63,15 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) {
// "AS %s", toVariantColumnName, columnName, columnName))
default:
if qvKind == qvalue.QValueKindNumeric {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
numericType := fmt.Sprintf("NUMERIC(%d,%d)", precision, scale)
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("TRY_CAST((%s:\"%s\")::text AS %s) AS %s",
toVariantColumnName, column.Name, sfType, targetColumnName))
toVariantColumnName, column.Name, numericType, targetColumnName))
} else {
flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("CAST(%s:\"%s\" AS %s) AS %s",
toVariantColumnName, column.Name, sfType, targetColumnName))
Expand Down
11 changes: 11 additions & 0 deletions flow/connectors/snowflake/snowflake.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/PeerDB-io/peer-flow/generated/protos"
"github.com/PeerDB-io/peer-flow/logger"
"github.com/PeerDB-io/peer-flow/model"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -658,6 +659,16 @@ func generateCreateTableSQLForNormalizedTable(
slog.Any("error", err))
continue
}

if genericColumnType == "numeric" {
precision, scale := numeric.ParseNumericTypmod(column.TypeModifier)
if column.TypeModifier == -1 || precision > 38 || scale > 37 {
precision = numeric.PeerDBNumericPrecision
scale = numeric.PeerDBNumericScale
}
sfColType = fmt.Sprintf("NUMERIC(%d,%d)", precision, scale)
}

createTableSQLArray = append(createTableSQLArray, fmt.Sprintf(`%s %s`, normalizedColName, sfColType))
}

Expand Down
4 changes: 2 additions & 2 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
CREATE TABLE IF NOT EXISTS %s (id serial PRIMARY KEY,c1 BIGINT,c2 BIT,c3 VARBIT,c4 BOOLEAN,
c6 BYTEA,c7 CHARACTER,c8 varchar,c9 CIDR,c11 DATE,c12 FLOAT,c13 DOUBLE PRECISION,
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c23 NUMERIC(16,5),c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 GEOMETRY(POINT), c43 GEOGRAPHY(POINT),
c44 GEOGRAPHY(POLYGON), c45 GEOGRAPHY(LINESTRING), c46 GEOMETRY(LINESTRING), c47 GEOMETRY(POLYGON),
Expand Down Expand Up @@ -605,7 +605,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
CURRENT_DATE,1.23,1.234,'192.168.1.5'::inet,1,
'5 years 2 months 29 days 1 minute 2 seconds 200 milliseconds 20000 microseconds'::interval,
'{"sai":1}'::json,'{"sai":1}'::jsonb,'08:00:2b:01:02:03'::macaddr,
1.2,1.23,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
1.2,100.24553,4::oid,1.23,1,1,1,'test',now(),now(),now()::time,now()::timetz,
'fat & rat'::tsquery,'a fat cat sat on a mat and ate a fat rat'::tsvector,
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
Expand Down
2 changes: 1 addition & 1 deletion flow/e2e/test_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,7 +219,7 @@ func CreateTableForQRep(conn *pgx.Conn, suffix string, tableName string) error {
"card_eth_value DOUBLE PRECISION",
"paid_eth_price DOUBLE PRECISION",
"card_bought_notified BOOLEAN DEFAULT false NOT NULL",
"address NUMERIC",
"address NUMERIC(20,8)",
"account_id UUID",
"asset_id NUMERIC NOT NULL",
"status INTEGER",
Expand Down
2 changes: 1 addition & 1 deletion flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ func GetAvroSchemaDefinition(
nullableFields := make(map[string]struct{})

for _, qField := range qRecordSchema.Fields {
avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, targetDWH)
avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, targetDWH, qField.Precision, qField.Scale)
if err != nil {
return nil, err
}
Expand Down
3 changes: 1 addition & 2 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (

"github.com/PeerDB-io/peer-flow/generated/protos"
hstore_util "github.com/PeerDB-io/peer-flow/hstore"
"github.com/PeerDB-io/peer-flow/model/numeric"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)
Expand Down Expand Up @@ -215,7 +214,7 @@ func (r *RecordItems) toMap(hstoreAsJSON bool) (map[string]interface{}, error) {
if !ok {
return nil, errors.New("expected *big.Rat value")
}
jsonStruct[col] = bigRat.FloatString(numeric.PeerDBNumericScale)
jsonStruct[col] = bigRat.FloatString(100)
case qvalue.QValueKindFloat64:
floatVal, ok := v.Value.(float64)
if !ok {
Expand Down
18 changes: 11 additions & 7 deletions flow/model/numeric/scale.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,15 @@
package numeric

import "strings"
const (
PeerDBNumericPrecision = 38
PeerDBNumericScale = 20
)

const PeerDBNumericScale = 9

func StripTrailingZeros(value string) string {
value = strings.TrimRight(value, "0")
value = strings.TrimSuffix(value, ".")
return value
// This is to reverse what make_numeric_typmod of Postgres does:
// https://github.com/postgres/postgres/blob/21912e3c0262e2cfe64856e028799d6927862563/src/backend/utils/adt/numeric.c#L897
func ParseNumericTypmod(typmod int32) (int16, int16) {
offsetMod := typmod - 4
precision := int16((offsetMod >> 16) & 0x7FFF)
scale := int16(offsetMod & 0x7FFF)
return precision, scale
}
8 changes: 5 additions & 3 deletions flow/model/qschema.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@ import (
)

type QField struct {
Name string
Type qvalue.QValueKind
Nullable bool
Name string
Type qvalue.QValueKind
Precision int16
Scale int16
Nullable bool
}

type QRecordSchema struct {
Expand Down
20 changes: 13 additions & 7 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@ type AvroSchemaComplexArray struct {
type AvroSchemaNumeric struct {
Type string `json:"type"`
LogicalType string `json:"logicalType"`
Precision int `json:"precision"`
Scale int `json:"scale"`
Precision int16 `json:"precision"`
Scale int16 `json:"scale"`
}

type AvroSchemaRecord struct {
Expand All @@ -50,7 +50,14 @@ type AvroSchemaField struct {
//
// For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds
// will return an error.
func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType) (interface{}, error) {
func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) {
avroNumericPrecision := precision
avroNumericScale := scale
if precision > 38 || precision <= 0 || scale > 37 || scale < 0 {
avroNumericPrecision = numeric.PeerDBNumericPrecision
avroNumericScale = numeric.PeerDBNumericScale
}

switch kind {
case QValueKindString, QValueKindUUID:
return "string", nil
Expand All @@ -73,8 +80,8 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType) (interface
return AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Precision: 38,
Scale: 9,
Precision: avroNumericPrecision,
Scale: avroNumericScale,
}, nil
case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ:
if targetDWH == QDWHTypeClickhouse {
Expand Down Expand Up @@ -447,8 +454,7 @@ func (c *QValueAvroConverter) processNumeric() (interface{}, error) {
return nil, fmt.Errorf("invalid Numeric value: expected *big.Rat, got %T", c.Value.Value)
}

scale := numeric.PeerDBNumericScale
decimalValue := num.FloatString(scale)
decimalValue := num.FloatString(100)
num.SetString(decimalValue)
if c.Nullable {
return goavro.Union("bytes.decimal", num), nil
Expand Down
Loading

0 comments on commit bb8bc2f

Please sign in to comment.