Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PG to PG: TSTZRange and JSONB #1687

Closed
wants to merge 12 commits into from
55 changes: 53 additions & 2 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,10 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindString
case pgtype.ByteaOID:
return qvalue.QValueKindBytes
case pgtype.JSONOID, pgtype.JSONBOID:
case pgtype.JSONOID:
return qvalue.QValueKindJSON
case pgtype.JSONBOID:
return qvalue.QValueKindJSONB
case pgtype.UUIDOID:
return qvalue.QValueKindUUID
case pgtype.TimeOID:
Expand Down Expand Up @@ -108,6 +110,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu
return qvalue.QValueKindArrayString
case pgtype.IntervalOID:
return qvalue.QValueKindInterval
case pgtype.TstzrangeOID:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should also handle tsrange at least

return qvalue.QValueKindTSTZRange
default:
typeName, ok := pgtype.NewMap().TypeForOID(recvOID)
if !ok {
Expand Down Expand Up @@ -163,6 +167,8 @@ func qValueKindToPostgresType(colTypeStr string) string {
return "BYTEA"
case qvalue.QValueKindJSON:
return "JSON"
case qvalue.QValueKindJSONB:
return "JSONB"
case qvalue.QValueKindHStore:
return "HSTORE"
case qvalue.QValueKindUUID:
Expand Down Expand Up @@ -273,6 +279,31 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
}

return qvalue.QValueString{Val: string(intervalJSON)}, nil
case qvalue.QValueKindTSTZRange:
tstzrangeObject := value.(pgtype.Range[interface{}])
lowerBoundType := tstzrangeObject.LowerType
upperBoundType := tstzrangeObject.UpperType
lowerTime, err := ConvertTimeRangeBounds(tstzrangeObject.Lower)
if err != nil {
return nil, fmt.Errorf("[tstzrange]error for lower time bound: %v", err)
}

upperTime, err := ConvertTimeRangeBounds(tstzrangeObject.Upper)
if err != nil {
return nil, fmt.Errorf("[tstzrange]error for upper time bound: %v", err)
}

lowerBracket := "["
if lowerBoundType == pgtype.Exclusive {
lowerBracket = "("
}
upperBracket := "]"
if upperBoundType == pgtype.Exclusive {
upperBracket = ")"
}
tstzrangeStr := fmt.Sprintf("%s%v,%v%s",
lowerBracket, lowerTime, upperTime, upperBracket)
return qvalue.QValueTSTZRange{Val: tstzrangeStr}, nil
case qvalue.QValueKindDate:
date := value.(time.Time)
return qvalue.QValueDate{Val: date}, nil
Expand All @@ -298,7 +329,7 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
case qvalue.QValueKindBoolean:
boolVal := value.(bool)
return qvalue.QValueBoolean{Val: boolVal}, nil
case qvalue.QValueKindJSON:
case qvalue.QValueKindJSON, qvalue.QValueKindJSONB:
tmp, err := parseJSON(value)
if err != nil {
return nil, fmt.Errorf("failed to parse JSON: %w", err)
Expand Down Expand Up @@ -481,3 +512,23 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
return qvalue.QValueKindString
}
}

// Postgres does not like timestamps of the form 2006-01-02 15:04:05 +0000 UTC
// in tstzrange.
// ConvertTimeRangeBounds removes the +0000 UTC part
func ConvertTimeRangeBounds(timeBound interface{}) (string, error) {
Amogh-Bharadwaj marked this conversation as resolved.
Show resolved Hide resolved
layout := "2006-01-02 15:04:05 -0700 MST"
postgresFormat := "2006-01-02 15:04:05"
var convertedTime string
if timeBound != nil {
lowerParsed, err := time.Parse(layout, fmt.Sprint(timeBound))
if err != nil {
return "", fmt.Errorf("unexpected lower bound value in tstzrange. Error: %v", err)
}
convertedTime = lowerParsed.Format(postgresFormat)
} else {
convertedTime = ""
}

return convertedTime, nil
}
2 changes: 2 additions & 0 deletions flow/model/qrecord_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,8 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) {
values[i] = pgtype.Timestamp{Time: v.Val, Valid: true}
case qvalue.QValueTimestampTZ:
values[i] = pgtype.Timestamptz{Time: v.Val, Valid: true}
case qvalue.QValueTSTZRange:
values[i] = v.Val
case qvalue.QValueUUID:
values[i] = uuid.UUID(v.Val)
case qvalue.QValueNumeric:
Expand Down
2 changes: 2 additions & 0 deletions flow/model/qvalue/kind.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
QValueKindTime QValueKind = "time"
QValueKindTimeTZ QValueKind = "timetz"
QValueKindInterval QValueKind = "interval"
QValueKindTSTZRange QValueKind = "tstzrange"
QValueKindNumeric QValueKind = "numeric"
QValueKindBytes QValueKind = "bytes"
QValueKindUUID QValueKind = "uuid"
QValueKindJSON QValueKind = "json"
QValueKindJSONB QValueKind = "jsonb"
QValueKindBit QValueKind = "bit"
QValueKindHStore QValueKind = "hstore"
QValueKindGeography QValueKind = "geography"
Expand Down
18 changes: 17 additions & 1 deletion flow/model/qvalue/qvalue.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (

"github.com/google/uuid"
"github.com/shopspring/decimal"
"github.com/yuin/gopher-lua"
lua "github.com/yuin/gopher-lua"

"github.com/PeerDB-io/glua64"
"github.com/PeerDB-io/peer-flow/shared"
Expand Down Expand Up @@ -294,6 +294,22 @@ func (v QValueInterval) LValue(ls *lua.LState) lua.LValue {
return lua.LString(v.Val)
}

type QValueTSTZRange struct {
Val string
}

func (QValueTSTZRange) Kind() QValueKind {
return QValueKindInterval
}

func (v QValueTSTZRange) Value() any {
return v.Val
}

func (v QValueTSTZRange) LValue(ls *lua.LState) lua.LValue {
return lua.LString(v.Val)
}

type QValueNumeric struct {
Val decimal.Decimal
}
Expand Down
Loading