Skip to content

Commit

Permalink
support interval for snowflake
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Mar 20, 2024
1 parent 966e4a5 commit c88ce1c
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 2 deletions.
24 changes: 24 additions & 0 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/lib/pq/oid"
"github.com/shopspring/decimal"

peerdb_interval "github.com/PeerDB-io/peer-flow/interval"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/shared"
)
Expand Down Expand Up @@ -80,6 +81,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindArrayTimestampTZ
case pgtype.TextArrayOID, pgtype.VarcharArrayOID, pgtype.BPCharArrayOID:
return qvalue.QValueKindArrayString
case pgtype.IntervalOID:
return qvalue.QValueKindInterval
default:
typeName, ok := pgtype.NewMap().TypeForOID(recvOID)
if !ok {
Expand Down Expand Up @@ -225,6 +228,27 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
case qvalue.QValueKindTimestampTZ:
timestamp := value.(time.Time)
val = qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp}
case qvalue.QValueKindInterval:
intervalObject := value.(pgtype.Interval)
var interval peerdb_interval.PeerDBInterval
interval.Hours = int(intervalObject.Microseconds / 3600000000)
interval.Minutes = int((intervalObject.Microseconds % 3600000000) / 60000000)
interval.Seconds = float64((intervalObject.Microseconds % 60000000) / 1000000)
interval.Days = int(intervalObject.Days)
interval.Years = int(intervalObject.Months / 12)
interval.Months = int(intervalObject.Months % 12)
interval.Valid = intervalObject.Valid

intervalJSON, err := json.Marshal(interval)
if err != nil {
return qvalue.QValue{}, fmt.Errorf("failed to parse interval: %w", err)
}

if !interval.Valid {
return qvalue.QValue{}, fmt.Errorf("invalid interval: %v", value)
}

return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(intervalJSON)}, nil
case qvalue.QValueKindDate:
date := value.(time.Time)
val = qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date}
Expand Down
53 changes: 53 additions & 0 deletions flow/e2e/snowflake/peer_flow_sf_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1119,3 +1119,56 @@ func (s PeerFlowE2ETestSuiteSF) Test_Supported_Mixed_Case_Table_SF() {

e2e.RequireEnvCanceled(s.t, env)
}

func (s PeerFlowE2ETestSuiteSF) Test_Interval_SF() {
tc := e2e.NewTemporalClient(s.t)

srcTableName := s.attachSchemaSuffix("testintervalsf")
dstTableName := fmt.Sprintf("%s.%s", s.sfHelper.testSchemaName, "testintervalsf")

_, err := s.Conn().Exec(context.Background(), fmt.Sprintf(`
CREATE TABLE IF NOT EXISTS e2e_test_%s."%s" (
id SERIAL PRIMARY KEY,
dur INTERVAL
);
`, s.pgSuffix, "testintervalsf"))
require.NoError(s.t, err)

connectionGen := e2e.FlowConnectionGenerationConfig{
FlowJobName: s.attachSuffix("test_interval_sf"),
TableNameMapping: map[string]string{srcTableName: dstTableName},
Destination: s.sfHelper.Peer,
}

flowConnConfig := connectionGen.GenerateFlowConnectionConfigs()
flowConnConfig.MaxBatchSize = 100

// wait for PeerFlowStatusQuery to finish setup
// and then insert 20 rows into the source table
env := e2e.ExecutePeerflow(tc, peerflow.CDCFlowWorkflow, flowConnConfig, nil)
e2e.SetupCDCFlowStatusQuery(s.t, env, connectionGen)
// insert 20 rows into the source table
for range 20 {
_, err = s.Conn().Exec(context.Background(), fmt.Sprintf(`
INSERT INTO e2e_test_%s."%s"(dur)
SELECT
floor(random() * 100)::int || ' days ' ||
floor(random() * 24)::int || ' hours ' ||
floor(random() * 60)::int || ' minutes ' ||
floor(random() * 60)::int || ' seconds ' ||
floor(random() * 30)::int || ' months' AS random_interval;
`, s.pgSuffix, "testintervalsf"))
e2e.EnvNoError(s.t, env, err)
}
s.t.Log("Inserted 20 rows into the source table")
e2e.EnvWaitForEqualTablesWithNames(
env,
s,
"normalize interval sf test",
"testintervalsf",
"\"testintervalsf\"",
"id,dur",
)
env.Cancel()
e2e.RequireEnvCanceled(s.t, env)
}
11 changes: 11 additions & 0 deletions flow/interval/interval.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package peerdb_interval

type PeerDBInterval struct {
Hours int `json:"hours,omitempty"`
Minutes int `json:"minutes,omitempty"`
Seconds float64 `json:"seconds,omitempty"`
Days int `json:"days,omitempty"`
Months int `json:"months,omitempty"`
Years int `json:"years,omitempty"`
Valid bool `json:"valid"`
}
8 changes: 6 additions & 2 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,11 @@ type AvroSchemaField struct {
// will return an error.
func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) {
switch kind {
case QValueKindString, QValueKindQChar, QValueKindCIDR, QValueKindINET:
case QValueKindString:
return "string", nil
case QValueKindQChar, QValueKindCIDR, QValueKindINET:
return "string", nil
case QValueKindInterval:
return "string", nil
case QValueKindUUID:
return AvroSchemaLogical{
Expand Down Expand Up @@ -285,7 +289,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return t, nil
case QValueKindQChar:
return c.processNullableUnion("string", string(c.Value.(uint8)))
case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr:
case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr, QValueKindInterval:
if c.TargetDWH == QDWHTypeSnowflake && c.Value != nil &&
(len(c.Value.(string)) > 15*1024*1024) {
slog.Warn("Truncating TEXT value > 15MB for Snowflake!")
Expand Down
1 change: 1 addition & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ const (
QValueKindDate QValueKind = "date"
QValueKindTime QValueKind = "time"
QValueKindTimeTZ QValueKind = "timetz"
QValueKindInterval QValueKind = "interval"
QValueKindNumeric QValueKind = "numeric"
QValueKindBytes QValueKind = "bytes"
QValueKindUUID QValueKind = "uuid"
Expand Down
2 changes: 2 additions & 0 deletions flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ func (q QValue) Equals(other QValue) bool {
case QValueKindDate,
QValueKindTimestamp, QValueKindTimestampTZ:
return compareGoTime(q.Value, other.Value)
case QValueKindInterval:
return compareString(q.Value, other.Value)
case QValueKindTime, QValueKindTimeTZ:
return compareGoCivilTime(q.Value, other.Value)
case QValueKindNumeric:
Expand Down

0 comments on commit c88ce1c

Please sign in to comment.