Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snowflake: Support interval data type #1515

Merged
merged 17 commits into from
Mar 21, 2024
Merged
24 changes: 24 additions & 0 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lib/pq/oid"
"github.com/shopspring/decimal"

peerdb_interval "github.com/PeerDB-io/peer-flow/interval"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -80,6 +81,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindArrayTimestampTZ
case pgtype.TextArrayOID, pgtype.VarcharArrayOID, pgtype.BPCharArrayOID:
return qvalue.QValueKindArrayString
case pgtype.IntervalOID:
return qvalue.QValueKindInterval
default:
typeName, ok := pgtype.NewMap().TypeForOID(recvOID)
if !ok {
Expand Down Expand Up @@ -225,6 +228,27 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
case qvalue.QValueKindTimestampTZ:
timestamp := value.(time.Time)
val = qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp}
case qvalue.QValueKindInterval:
intervalObject := value.(pgtype.Interval)
var interval peerdb_interval.PeerDBInterval
interval.Hours = int(intervalObject.Microseconds / 3600000000)
interval.Minutes = int((intervalObject.Microseconds % 3600000000) / 60000000)
interval.Seconds = float64(intervalObject.Microseconds%60000000) / 1000000.0
interval.Days = int(intervalObject.Days)
interval.Years = int(intervalObject.Months / 12)
interval.Months = int(intervalObject.Months % 12)
interval.Valid = intervalObject.Valid

intervalJSON, err := json.Marshal(interval)
if err != nil {
return qvalue.QValue{}, fmt.Errorf("failed to parse interval: %w", err)
}

if !interval.Valid {
return qvalue.QValue{}, fmt.Errorf("invalid interval: %v", value)
}

return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(intervalJSON)}, nil
case qvalue.QValueKindDate:
date := value.(time.Time)
val = qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date}
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) {
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("TO_GEOMETRY(CAST(%s:\"%s\" AS STRING),true) AS %s",
toVariantColumnName, column.Name, targetColumnName))
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
case qvalue.QValueKindJSON, qvalue.QValueKindHStore, qvalue.QValueKindInterval:
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("PARSE_JSON(CAST(%s:\"%s\" AS STRING)) AS %s",
toVariantColumnName, column.Name, targetColumnName))
Expand Down
16 changes: 14 additions & 2 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -439,7 +439,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
`, srcTableName))
e2e.EnvNoError(s.t, env, err)

e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize types", func() bool {
e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize types", func() bool {
noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{
"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
Expand All @@ -448,7 +448,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() {
"c50", "c51", "c52", "c53", "c54",
})
if err != nil {
s.t.Log(err)
return false
}

// interval checks
if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil {
return false
}

if err := s.checkJSONValue(dstTableName, "c16", "months", "2"); err != nil {
return false
}

if err := s.checkJSONValue(dstTableName, "c16", "days", "29"); err != nil {
return false
}

Expand Down
11 changes: 11 additions & 0 deletions flow/interval/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package peerdb_interval

type PeerDBInterval struct {
Hours int `json:"hours,omitempty"`
Minutes int `json:"minutes,omitempty"`
Seconds float64 `json:"seconds,omitempty"`
Days int `json:"days,omitempty"`
Months int `json:"months,omitempty"`
Years int `json:"years,omitempty"`
Valid bool `json:"valid"`
}
8 changes: 6 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ type AvroSchemaField struct {
// will return an error.
func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) {
switch kind {
case QValueKindString, QValueKindQChar, QValueKindCIDR, QValueKindINET:
case QValueKindString:
return "string", nil
case QValueKindQChar, QValueKindCIDR, QValueKindINET:
return "string", nil
case QValueKindInterval:
return "string", nil
case QValueKindUUID:
return AvroSchemaLogical{
Expand Down Expand Up @@ -285,7 +289,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return t, nil
case QValueKindQChar:
return c.processNullableUnion("string", string(c.Value.(uint8)))
case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr:
case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr, QValueKindInterval:
if c.TargetDWH == QDWHTypeSnowflake && c.Value != nil &&
(len(c.Value.(string)) > 15*1024*1024) {
slog.Warn("Truncating TEXT value > 15MB for Snowflake!")
Expand Down
3 changes: 3 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
QValueKindDate QValueKind = "date"
QValueKindTime QValueKind = "time"
QValueKindTimeTZ QValueKind = "timetz"
QValueKindInterval QValueKind = "interval"
QValueKindNumeric QValueKind = "numeric"
QValueKindBytes QValueKind = "bytes"
QValueKindUUID QValueKind = "uuid"
Expand Down Expand Up @@ -69,6 +70,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindJSON: "VARIANT",
QValueKindTimestamp: "TIMESTAMP_NTZ",
QValueKindTimestampTZ: "TIMESTAMP_TZ",
QValueKindInterval: "VARIANT",
QValueKindTime: "TIME",
QValueKindTimeTZ: "TIME",
QValueKindDate: "DATE",
Expand Down Expand Up @@ -117,6 +119,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{
QValueKindTimeTZ: "String",
QValueKindInvalid: "String",
QValueKindHStore: "String",

// array types will be mapped to VARIANT
QValueKindArrayFloat32: "Array(Float32)",
QValueKindArrayFloat64: "Array(Float64)",
Expand Down
Loading