From 06297c9c48fc63b980eb17ec8d7214d2387c54f4 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:06:55 +0530 Subject: [PATCH 01/10] support tstzrange --- flow/connectors/postgres/qvalue_convert.go | 44 ++++++++++++++++++++++ flow/model/qrecord_batch.go | 2 + flow/model/qvalue/kind.go | 1 + flow/model/qvalue/qvalue.go | 18 ++++++++- 4 files changed, 64 insertions(+), 1 deletion(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index ef12fd61c4..05285297c1 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -108,6 +108,8 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu return qvalue.QValueKindArrayString case pgtype.IntervalOID: return qvalue.QValueKindInterval + case pgtype.TstzrangeOID: + return qvalue.QValueKindTSTZRange default: typeName, ok := pgtype.NewMap().TypeForOID(recvOID) if !ok { @@ -273,6 +275,31 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( } return qvalue.QValueString{Val: string(intervalJSON)}, nil + case qvalue.QValueKindTSTZRange: + tstzrangeObject := value.(pgtype.Range[interface{}]) + lowerBoundType := tstzrangeObject.LowerType + upperBoundType := tstzrangeObject.UpperType + lowerTime, err := ConvertTimeRangeBounds(tstzrangeObject.Lower) + if err != nil { + return nil, fmt.Errorf("[tstzrange]error for lower time bound: %v", err) + } + + upperTime, err := ConvertTimeRangeBounds(tstzrangeObject.Upper) + if err != nil { + return nil, fmt.Errorf("[tstzrange]error for upper time bound: %v", err) + } + + lowerBracket := "[" + if lowerBoundType == pgtype.Exclusive { + lowerBracket = "(" + } + upperBracket := "]" + if upperBoundType == pgtype.Exclusive { + upperBracket = ")" + } + tstzrangeStr := fmt.Sprintf("%s%v,%v%s", + lowerBracket, lowerTime, upperTime, upperBracket) + return qvalue.QValueTSTZRange{Val: tstzrangeStr}, nil case qvalue.QValueKindDate: date := value.(time.Time) return qvalue.QValueDate{Val: date}, nil @@ -481,3 +508,20 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { return qvalue.QValueKindString } } + +func ConvertTimeRangeBounds(timeBound interface{}) (string, error) { + layout := "2006-01-02 15:04:05 -0700 MST" + postgresFormat := "2006-01-02 15:04:05" + var convertedTime string + if timeBound != nil { + lowerParsed, err := time.Parse(layout, fmt.Sprint(timeBound)) + if err != nil { + return "", fmt.Errorf("Unexpected lower bound value in tstzrange. Error: %v", err) + } + convertedTime = lowerParsed.Format(postgresFormat) + } else { + convertedTime = "" + } + + return convertedTime, nil +} diff --git a/flow/model/qrecord_batch.go b/flow/model/qrecord_batch.go index 1f787b5c1c..3b76c363d0 100644 --- a/flow/model/qrecord_batch.go +++ b/flow/model/qrecord_batch.go @@ -118,6 +118,8 @@ func (src *QRecordBatchCopyFromSource) Values() ([]interface{}, error) { values[i] = pgtype.Timestamp{Time: v.Val, Valid: true} case qvalue.QValueTimestampTZ: values[i] = pgtype.Timestamptz{Time: v.Val, Valid: true} + case qvalue.QValueTSTZRange: + values[i] = v.Val case qvalue.QValueUUID: values[i] = uuid.UUID(v.Val) case qvalue.QValueNumeric: diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index fa0a3c2235..14233338f5 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -26,6 +26,7 @@ const ( QValueKindTime QValueKind = "time" QValueKindTimeTZ QValueKind = "timetz" QValueKindInterval QValueKind = "interval" + QValueKindTSTZRange QValueKind = "tstzrange" QValueKindNumeric QValueKind = "numeric" QValueKindBytes QValueKind = "bytes" QValueKindUUID QValueKind = "uuid" diff --git a/flow/model/qvalue/qvalue.go b/flow/model/qvalue/qvalue.go index 91b9e3fe31..961028ceb9 100644 --- a/flow/model/qvalue/qvalue.go +++ b/flow/model/qvalue/qvalue.go @@ -6,7 +6,7 @@ import ( "github.com/google/uuid" "github.com/shopspring/decimal" - "github.com/yuin/gopher-lua" + lua "github.com/yuin/gopher-lua" "github.com/PeerDB-io/glua64" "github.com/PeerDB-io/peer-flow/shared" @@ -294,6 +294,22 @@ func (v QValueInterval) LValue(ls *lua.LState) lua.LValue { return lua.LString(v.Val) } +type QValueTSTZRange struct { + Val string +} + +func (QValueTSTZRange) Kind() QValueKind { + return QValueKindInterval +} + +func (v QValueTSTZRange) Value() any { + return v.Val +} + +func (v QValueTSTZRange) LValue(ls *lua.LState) lua.LValue { + return lua.LString(v.Val) +} + type QValueNumeric struct { Val decimal.Decimal } From 9a464d6d9089084ff4481c23f193d463f912ade5 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:13:57 +0530 Subject: [PATCH 02/10] support jsonb --- flow/connectors/postgres/qvalue_convert.go | 8 ++++++-- flow/model/qvalue/kind.go | 1 + 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 05285297c1..0c68935368 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -62,8 +62,10 @@ func (c *PostgresConnector) postgresOIDToQValueKind(recvOID uint32) qvalue.QValu return qvalue.QValueKindString case pgtype.ByteaOID: return qvalue.QValueKindBytes - case pgtype.JSONOID, pgtype.JSONBOID: + case pgtype.JSONOID: return qvalue.QValueKindJSON + case pgtype.JSONBOID: + return qvalue.QValueKindJSONB case pgtype.UUIDOID: return qvalue.QValueKindUUID case pgtype.TimeOID: @@ -165,6 +167,8 @@ func qValueKindToPostgresType(colTypeStr string) string { return "BYTEA" case qvalue.QValueKindJSON: return "JSON" + case qvalue.QValueKindJSONB: + return "JSONB" case qvalue.QValueKindHStore: return "HSTORE" case qvalue.QValueKindUUID: @@ -325,7 +329,7 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( case qvalue.QValueKindBoolean: boolVal := value.(bool) return qvalue.QValueBoolean{Val: boolVal}, nil - case qvalue.QValueKindJSON: + case qvalue.QValueKindJSON, qvalue.QValueKindJSONB: tmp, err := parseJSON(value) if err != nil { return nil, fmt.Errorf("failed to parse JSON: %w", err) diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 14233338f5..6bae7e04e3 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -31,6 +31,7 @@ const ( QValueKindBytes QValueKind = "bytes" QValueKindUUID QValueKind = "uuid" QValueKindJSON QValueKind = "json" + QValueKindJSONB QValueKind = "jsonb" QValueKindBit QValueKind = "bit" QValueKindHStore QValueKind = "hstore" QValueKindGeography QValueKind = "geography" From 2337bcc41f824a9cbb36c06031629d25e56b627d Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:18:29 +0530 Subject: [PATCH 03/10] add comment --- flow/connectors/postgres/qvalue_convert.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 0c68935368..7704914256 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -513,6 +513,9 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { } } +// Postgres does not like timestamps of the form 2006-01-02 15:04:05 +0000 UTC +// in tstzrange. +// ConvertTimeRangeBounds removes the +0000 UTC part func ConvertTimeRangeBounds(timeBound interface{}) (string, error) { layout := "2006-01-02 15:04:05 -0700 MST" postgresFormat := "2006-01-02 15:04:05" From 537cf65459c0e2e2882630b57395bbce193763aa Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:25:53 +0530 Subject: [PATCH 04/10] lint --- flow/connectors/postgres/qvalue_convert.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 7704914256..bec977448f 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -523,7 +523,7 @@ func ConvertTimeRangeBounds(timeBound interface{}) (string, error) { if timeBound != nil { lowerParsed, err := time.Parse(layout, fmt.Sprint(timeBound)) if err != nil { - return "", fmt.Errorf("Unexpected lower bound value in tstzrange. Error: %v", err) + return "", fmt.Errorf("unexpected lower bound value in tstzrange. Error: %v", err) } convertedTime = lowerParsed.Format(postgresFormat) } else { From e5a906902606ca84b51f2aa06976f3176dd31fcc Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:28:54 +0530 Subject: [PATCH 05/10] minor change --- flow/connectors/postgres/qvalue_convert.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index bec977448f..4562b97899 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -283,12 +283,12 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( tstzrangeObject := value.(pgtype.Range[interface{}]) lowerBoundType := tstzrangeObject.LowerType upperBoundType := tstzrangeObject.UpperType - lowerTime, err := ConvertTimeRangeBounds(tstzrangeObject.Lower) + lowerTime, err := convertTimeRangeBounds(tstzrangeObject.Lower) if err != nil { return nil, fmt.Errorf("[tstzrange]error for lower time bound: %v", err) } - upperTime, err := ConvertTimeRangeBounds(tstzrangeObject.Upper) + upperTime, err := convertTimeRangeBounds(tstzrangeObject.Upper) if err != nil { return nil, fmt.Errorf("[tstzrange]error for upper time bound: %v", err) } @@ -515,8 +515,8 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { // Postgres does not like timestamps of the form 2006-01-02 15:04:05 +0000 UTC // in tstzrange. -// ConvertTimeRangeBounds removes the +0000 UTC part -func ConvertTimeRangeBounds(timeBound interface{}) (string, error) { +// convertTimeRangeBounds removes the +0000 UTC part +func convertTimeRangeBounds(timeBound interface{}) (string, error) { layout := "2006-01-02 15:04:05 -0700 MST" postgresFormat := "2006-01-02 15:04:05" var convertedTime string From dfcafefaefeeab1663fb519471e8865831a7f5f0 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:32:00 +0530 Subject: [PATCH 06/10] rename --- flow/connectors/postgres/qvalue_convert.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flow/connectors/postgres/qvalue_convert.go b/flow/connectors/postgres/qvalue_convert.go index 4562b97899..da3cb8f10c 100644 --- a/flow/connectors/postgres/qvalue_convert.go +++ b/flow/connectors/postgres/qvalue_convert.go @@ -283,12 +283,12 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) ( tstzrangeObject := value.(pgtype.Range[interface{}]) lowerBoundType := tstzrangeObject.LowerType upperBoundType := tstzrangeObject.UpperType - lowerTime, err := convertTimeRangeBounds(tstzrangeObject.Lower) + lowerTime, err := convertTimeRangeBound(tstzrangeObject.Lower) if err != nil { return nil, fmt.Errorf("[tstzrange]error for lower time bound: %v", err) } - upperTime, err := convertTimeRangeBounds(tstzrangeObject.Upper) + upperTime, err := convertTimeRangeBound(tstzrangeObject.Upper) if err != nil { return nil, fmt.Errorf("[tstzrange]error for upper time bound: %v", err) } @@ -515,8 +515,8 @@ func customTypeToQKind(typeName string) qvalue.QValueKind { // Postgres does not like timestamps of the form 2006-01-02 15:04:05 +0000 UTC // in tstzrange. -// convertTimeRangeBounds removes the +0000 UTC part -func convertTimeRangeBounds(timeBound interface{}) (string, error) { +// convertTimeRangeBound removes the +0000 UTC part +func convertTimeRangeBound(timeBound interface{}) (string, error) { layout := "2006-01-02 15:04:05 -0700 MST" postgresFormat := "2006-01-02 15:04:05" var convertedTime string From ab06ed3f18e296e1918b9a3a8926fb192f2cf5d5 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:44:51 +0530 Subject: [PATCH 07/10] add in avro define --- flow/model/qvalue/avro_converter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 763009b2c6..3e77d0a467 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -114,7 +114,7 @@ func GetAvroSchemaFromQValueKind(kind QValueKind, targetDWH protos.DBType, preci }, nil } return "string", nil - case QValueKindHStore, QValueKindJSON, QValueKindStruct: + case QValueKindHStore, QValueKindJSON, QValueKindJSONB, QValueKindStruct: return "string", nil case QValueKindArrayFloat32: return AvroSchemaArray{ From 034e71e2addc3c0d23ea397a1cb6a9c2f957832f Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 01:46:06 +0530 Subject: [PATCH 08/10] add jsonb for bigquery qvalue_convert --- flow/connectors/bigquery/qvalue_convert.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flow/connectors/bigquery/qvalue_convert.go b/flow/connectors/bigquery/qvalue_convert.go index db96a2d590..5203003544 100644 --- a/flow/connectors/bigquery/qvalue_convert.go +++ b/flow/connectors/bigquery/qvalue_convert.go @@ -23,7 +23,7 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType { case qvalue.QValueKindString: return bigquery.StringFieldType // json also is stored as string for now - case qvalue.QValueKindJSON, qvalue.QValueKindHStore: + case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore: return bigquery.JSONFieldType // time related case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ: From cd93009ef4da40d2d42faf6752770ce1f52d359b Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 02:04:55 +0530 Subject: [PATCH 09/10] add jsonb to snowflake qvalue_convert --- flow/model/qvalue/kind.go | 1 + 1 file changed, 1 insertion(+) diff --git a/flow/model/qvalue/kind.go b/flow/model/qvalue/kind.go index 6bae7e04e3..d427917880 100644 --- a/flow/model/qvalue/kind.go +++ b/flow/model/qvalue/kind.go @@ -71,6 +71,7 @@ var QValueKindToSnowflakeTypeMap = map[QValueKind]string{ QValueKindQChar: "CHAR", QValueKindString: "STRING", QValueKindJSON: "VARIANT", + QValueKindJSONB: "VARIANT", QValueKindTimestamp: "TIMESTAMP_NTZ", QValueKindTimestampTZ: "TIMESTAMP_TZ", QValueKindInterval: "VARIANT", From b311ea23bec4bfd0ca386c121bba9e4d9fd9b251 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Wed, 8 May 2024 02:36:25 +0530 Subject: [PATCH 10/10] jsonb in merge statements --- flow/connectors/bigquery/merge_stmt_generator.go | 2 +- flow/connectors/snowflake/merge_stmt_generator.go | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/flow/connectors/bigquery/merge_stmt_generator.go b/flow/connectors/bigquery/merge_stmt_generator.go index 5243576d88..e9f3ec7ad7 100644 --- a/flow/connectors/bigquery/merge_stmt_generator.go +++ b/flow/connectors/bigquery/merge_stmt_generator.go @@ -34,7 +34,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE(dstTable string, normalizedTab var castStmt string shortCol := m.shortColumn[column.Name] switch qvalue.QValueKind(colType) { - case qvalue.QValueKindJSON, qvalue.QValueKindHStore: + case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore: // if the type is JSON, then just extract JSON castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", column.Name, bqTypeString, shortCol) diff --git a/flow/connectors/snowflake/merge_stmt_generator.go b/flow/connectors/snowflake/merge_stmt_generator.go index aa637dadf6..f5e940264a 100644 --- a/flow/connectors/snowflake/merge_stmt_generator.go +++ b/flow/connectors/snowflake/merge_stmt_generator.go @@ -52,7 +52,7 @@ func (m *mergeStmtGenerator) generateMergeStmt(dstTable string) (string, error) flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("TO_GEOMETRY(CAST(%s:\"%s\" AS STRING),true) AS %s", toVariantColumnName, column.Name, targetColumnName)) - case qvalue.QValueKindJSON, qvalue.QValueKindHStore, qvalue.QValueKindInterval: + case qvalue.QValueKindJSON, qvalue.QValueKindJSONB, qvalue.QValueKindHStore, qvalue.QValueKindInterval: flattenedCastsSQLArray = append(flattenedCastsSQLArray, fmt.Sprintf("PARSE_JSON(CAST(%s:\"%s\" AS STRING)) AS %s", toVariantColumnName, column.Name, targetColumnName))