Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PG to PG: TSTZRange and JSONB #1687

Closed
wants to merge 12 commits into from
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab
var castStmt string
shortCol := m.shortColumn[column.Name]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
column.Name, bqTypeString, shortCol)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
case qvalue.QValueKindString:
return bigquery.StringFieldType
// json also is stored as string for now
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore:
return bigquery.JSONFieldType
// time related
case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ:
Expand Down
55 changes: 53 additions & 2 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindString
case pgtype.ByteaOID:
return qvalue.QValueKindBytes
case pgtype.JSONOID, pgtype.JSONBOID:
case pgtype.JSONOID:
return qvalue.QValueKindJSON
case pgtype.JSONBOID:
return qvalue.QValueKindJSONB
case pgtype.UUIDOID:
return qvalue.QValueKindUUID
case pgtype.TimeOID:
Expand Down Expand Up @@ -108,6 +110,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindArrayString
case pgtype.IntervalOID:
return qvalue.QValueKindInterval
case pgtype.TstzrangeOID:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also handle tsrange at least

return qvalue.QValueKindTSTZRange
default:
typeName, ok := pgtype.NewMap().TypeForOID(recvOID)
if !ok {
Expand Down Expand Up @@ -163,6 +167,8 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "BYTEA"
case qvalue.QValueKindJSON:
return "JSON"
case qvalue.QValueKindJSONB:
return "JSONB"
case qvalue.QValueKindHStore:
return "HSTORE"
case qvalue.QValueKindUUID:
Expand Down Expand Up @@ -273,6 +279,31 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
}

return qvalue.QValueString{Val: string(intervalJSON)}, nil
case qvalue.QValueKindTSTZRange:
tstzrangeObject := value.(pgtype.Range[interface{}])
lowerBoundType := tstzrangeObject.LowerType
upperBoundType := tstzrangeObject.UpperType
lowerTime, err := convertTimeRangeBound(tstzrangeObject.Lower)
if err != nil {
return nil, fmt.Errorf("[tstzrange]error for lower time bound: %v", err)
}

upperTime, err := convertTimeRangeBound(tstzrangeObject.Upper)
if err != nil {
return nil, fmt.Errorf("[tstzrange]error for upper time bound: %v", err)
}

lowerBracket := "["
if lowerBoundType == pgtype.Exclusive {
lowerBracket = "("
}
upperBracket := "]"
if upperBoundType == pgtype.Exclusive {
upperBracket = ")"
}
tstzrangeStr := fmt.Sprintf("%s%v,%v%s",
lowerBracket, lowerTime, upperTime, upperBracket)
return qvalue.QValueTSTZRange{Val: tstzrangeStr}, nil
case qvalue.QValueKindDate:
date := value.(time.Time)
return qvalue.QValueDate{Val: date}, nil
Expand All @@ -298,7 +329,7 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
case qvalue.QValueKindBoolean:
boolVal := value.(bool)
return qvalue.QValueBoolean{Val: boolVal}, nil
case qvalue.QValueKindJSON:
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB:
tmp, err := parseJSON(value)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON: %w", err)
Expand Down Expand Up @@ -481,3 +512,23 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
return qvalue.QValueKindString
}
}

// Postgres does not like timestamps of the form 2006-01-02 15:04:05 +0000 UTC
// in tstzrange.
// convertTimeRangeBound removes the +0000 UTC part
func convertTimeRangeBound(timeBound interface{}) (string, error) {
layout := "2006-01-02 15:04:05 -0700 MST"
postgresFormat := "2006-01-02 15:04:05"
var convertedTime string
if timeBound != nil {
lowerParsed, err := time.Parse(layout, fmt.Sprint(timeBound))
if err != nil {
return "", fmt.Errorf("unexpected lower bound value in tstzrange. Error: %v", err)
}
convertedTime = lowerParsed.Format(postgresFormat)
} else {
convertedTime = ""
}

return convertedTime, nil
}
2 changes: 1 addition & 1 deletion flow/connectors/snowflake/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(dstTable string) (string, error)
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("TO_GEOMETRY(CAST(%s:\"%s\" AS STRING),true) AS %s",
toVariantColumnName, column.Name, targetColumnName))
case qvalue.QValueKindJSON, qvalue.QValueKindHStore, qvalue.QValueKindInterval:
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore, qvalue.QValueKindInterval:
flattenedCastsSQLArray = append(flattenedCastsSQLArray,
fmt.Sprintf("PARSE_JSON(CAST(%s:\"%s\" AS STRING)) AS %s",
toVariantColumnName, column.Name, targetColumnName))
Expand Down
2 changes: 2 additions & 0 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {
values[i] = pgtype.Timestamp{Time: v.Val, Valid: true}
case qvalue.QValueTimestampTZ:
values[i] = pgtype.Timestamptz{Time: v.Val, Valid: true}
case qvalue.QValueTSTZRange:
values[i] = v.Val
case qvalue.QValueUUID:
values[i] = uuid.UUID(v.Val)
case qvalue.QValueNumeric:
Expand Down
2 changes: 1 addition & 1 deletion flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci
}, nil
}
return "string", nil
case QValueKindHStore, QValueKindJSON, QValueKindStruct:
case QValueKindHStore, QValueKindJSON, QValueKindJSONB, QValueKindStruct:
return "string", nil
case QValueKindArrayFloat32:
return AvroSchemaArray{
Expand Down
3 changes: 3 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
QValueKindTime QValueKind = "time"
QValueKindTimeTZ QValueKind = "timetz"
QValueKindInterval QValueKind = "interval"
QValueKindTSTZRange QValueKind = "tstzrange"
QValueKindNumeric QValueKind = "numeric"
QValueKindBytes QValueKind = "bytes"
QValueKindUUID QValueKind = "uuid"
QValueKindJSON QValueKind = "json"
QValueKindJSONB QValueKind = "jsonb"
QValueKindBit QValueKind = "bit"
QValueKindHStore QValueKind = "hstore"
QValueKindGeography QValueKind = "geography"
Expand Down Expand Up @@ -69,6 +71,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{
QValueKindQChar: "CHAR",
QValueKindString: "STRING",
QValueKindJSON: "VARIANT",
QValueKindJSONB: "VARIANT",
QValueKindTimestamp: "TIMESTAMP_NTZ",
QValueKindTimestampTZ: "TIMESTAMP_TZ",
QValueKindInterval: "VARIANT",
Expand Down
18 changes: 17 additions & 1 deletion flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/google/uuid"
"github.com/shopspring/decimal"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"

"github.com/PeerDB-io/glua64"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -294,6 +294,22 @@ func (v QValueInterval) LValue(ls *lua.LState) lua.LValue {
return lua.LString(v.Val)
}

type QValueTSTZRange struct {
Val string
}

func (QValueTSTZRange) Kind() QValueKind {
return QValueKindInterval
}

func (v QValueTSTZRange) Value() any {
return v.Val
}

func (v QValueTSTZRange) LValue(ls *lua.LState) lua.LValue {
return lua.LString(v.Val)
}

type QValueNumeric struct {
Val decimal.Decimal
}
Expand Down
Loading