diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 79c817d28c..d0e3e8cc0b 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -12,6 +12,7 @@ import ( "github.com/lib/pq/oid" "github.com/shopspring/decimal" + peerdb_interval "github.com/PeerDB-io/peer-flow/interval" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/shared" ) @@ -80,6 +81,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu return qvalue.QValueKindArrayTimestampTZ case pgtype.TextArrayOID, pgtype.VarcharArrayOID, pgtype.BPCharArrayOID: return qvalue.QValueKindArrayString + case pgtype.IntervalOID: + return qvalue.QValueKindInterval default: typeName, ok := pgtype.NewMap().TypeForOID(recvOID) if !ok { @@ -225,6 +228,27 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindTimestampTZ: timestamp := value.(time.Time) val = qvalue.QValue{Kind: qvalue.QValueKindTimestampTZ, Value: timestamp} + case qvalue.QValueKindInterval: + intervalObject := value.(pgtype.Interval) + var interval peerdb_interval.PeerDBInterval + interval.Hours = int(intervalObject.Microseconds / 3600000000) + interval.Minutes = int((intervalObject.Microseconds % 3600000000) / 60000000) + interval.Seconds = float64(intervalObject.Microseconds%60000000) / 1000000.0 + interval.Days = int(intervalObject.Days) + interval.Years = int(intervalObject.Months / 12) + interval.Months = int(intervalObject.Months % 12) + interval.Valid = intervalObject.Valid + + intervalJSON, err := json.Marshal(interval) + if err != nil { + return qvalue.QValue{}, fmt.Errorf("failed to parse interval: %w", err) + } + + if !interval.Valid { + return qvalue.QValue{}, fmt.Errorf("invalid interval: %v", value) + } + + return qvalue.QValue{Kind: qvalue.QValueKindString, Value: string(intervalJSON)}, nil case qvalue.QValueKindDate: date := value.(time.Time) val = qvalue.QValue{Kind: qvalue.QValueKindDate, Value: date} diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index 19be0cfd94..b25d465a74 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -51,7 +51,7 @@ func (m *mergeStmtGenerator) generateMergeStmt() (string, error) { flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("TO_GEOMETRY(CAST(%s:\"%s\" AS STRING),true) AS %s", toVariantColumnName, column.Name, targetColumnName)) - case qvalue.QValueKindJSON, qvalue.QValueKindHStore: + case qvalue.QValueKindJSON, qvalue.QValueKindHStore, qvalue.QValueKindInterval: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("PARSE_JSON(CAST(%s:\"%s\" AS STRING)) AS %s", toVariantColumnName, column.Name, targetColumnName)) diff --git a/flow/e2e/snowflake/peer_flow_sf_test.go b/flow/e2e/snowflake/peer_flow_sf_test.go index 635a7f3e0d..90b16be522 100644 --- a/flow/e2e/snowflake/peer_flow_sf_test.go +++ b/flow/e2e/snowflake/peer_flow_sf_test.go @@ -439,7 +439,7 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { `, srcTableName)) e2e.EnvNoError(s.t, env, err) - e2e.EnvWaitFor(s.t, env, 2*time.Minute, "normalize types", func() bool { + e2e.EnvWaitFor(s.t, env, 3*time.Minute, "normalize types", func() bool { noNulls, err := s.sfHelper.CheckNull("test_types_sf", []string{ "c41", "c1", "c2", "c3", "c4", "c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18", @@ -448,7 +448,19 @@ func (s PeerFlowE2ETestSuiteSF) Test_Types_SF() { "c50", "c51", "c52", "c53", "c54", }) if err != nil { - s.t.Log(err) + return false + } + + // interval checks + if err := s.checkJSONValue(dstTableName, "c16", "years", "5"); err != nil { + return false + } + + if err := s.checkJSONValue(dstTableName, "c16", "months", "2"); err != nil { + return false + } + + if err := s.checkJSONValue(dstTableName, "c16", "days", "29"); err != nil { return false } diff --git a/flow/interval/interval.go b/flow/interval/interval.go new file mode 100644 index 0000000000..79fbdc3ecf --- /dev/null +++ b/flow/interval/interval.go @@ -0,0 +1,11 @@ +package peerdb_interval + +type PeerDBInterval struct { + Hours int `json:"hours,omitempty"` + Minutes int `json:"minutes,omitempty"` + Seconds float64 `json:"seconds,omitempty"` + Days int `json:"days,omitempty"` + Months int `json:"months,omitempty"` + Years int `json:"years,omitempty"` + Valid bool `json:"valid"` +} diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 0a299cf82f..3df8738209 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -58,7 +58,11 @@ type AvroSchemaField struct { // will return an error. func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH QDWHType, precision int16, scale int16) (interface{}, error) { switch kind { - case QValueKindString, QValueKindQChar, QValueKindCIDR, QValueKindINET: + case QValueKindString: + return "string", nil + case QValueKindQChar, QValueKindCIDR, QValueKindINET: + return "string", nil + case QValueKindInterval: return "string", nil case QValueKindUUID: return AvroSchemaLogical{ @@ -285,7 +289,7 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) { return t, nil case QValueKindQChar: return c.processNullableUnion("string", string(c.Value.(uint8))) - case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr: + case QValueKindString, QValueKindCIDR, QValueKindINET, QValueKindMacaddr, QValueKindInterval: if c.TargetDWH == QDWHTypeSnowflake && c.Value != nil && (len(c.Value.(string)) > 15*1024*1024) { slog.Warn("Truncating TEXT value > 15MB for Snowflake!") diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 9def7821f4..9ed9ac0beb 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -24,6 +24,7 @@ const ( QValueKindDate QValueKind = "date" QValueKindTime QValueKind = "time" QValueKindTimeTZ QValueKind = "timetz" + QValueKindInterval QValueKind = "interval" QValueKindNumeric QValueKind = "numeric" QValueKindBytes QValueKind = "bytes" QValueKindUUID QValueKind = "uuid" @@ -69,6 +70,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindJSON: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", QValueKindTimestampTZ: "TIMESTAMP_TZ", + QValueKindInterval: "VARIANT", QValueKindTime: "TIME", QValueKindTimeTZ: "TIME", QValueKindDate: "DATE", @@ -117,6 +119,7 @@ var QValueKindToClickhouseTypeMap = map[QValueKind]string{ QValueKindTimeTZ: "String", QValueKindInvalid: "String", QValueKindHStore: "String", + // array types will be mapped to VARIANT QValueKindArrayFloat32: "Array(Float32)", QValueKindArrayFloat64: "Array(Float64)",