Skip to content

Commit

Permalink
HStore Support for BigQuery (#995)
Browse files Browse the repository at this point in the history
Maps HStore to JSON in BigQuery. 
Achieves this by transforming `'"a"=>"b"'` to `{"a":"b"}` via string
functions, in both qrep/initial load and cdc
Test added

Relies on the fact hstore key and value must be quoted strings (although
the key can be empty and this case is supported)
And follows the syntax in the example above

```sql
postgres=# select 'a'::hstore;
ERROR:  Unexpected end of string
LINE 1: select 'a'::hstore;
               ^
postgres=# select 'a=>b'::hstore;
-[ RECORD 1 ]----
hstore | "a"=>"b"

postgres=# select 'a=>'::hstore;
ERROR:  Unexpected end of string
LINE 1: select 'a=>'::hstore;

postgres=# select 'a=>3434'::hstore;
-[ RECORD 1 ]-------
hstore | "a"=>"3434"
```
  • Loading branch information
Amogh-Bharadwaj authored Jan 10, 2024
1 parent 8075aa6 commit 595749d
Show file tree
Hide file tree
Showing 9 changed files with 412 additions and 21 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
var castStmt string
shortCol := m.shortColumn[colName]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, shortCol)
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
case qvalue.QValueKindString:
return bigquery.StringFieldType
// json also is stored as string for now
case qvalue.QValueKindJSON:
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
return bigquery.JSONFieldType
// time related
case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ:
Expand Down
13 changes: 5 additions & 8 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,14 +104,11 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip
cname := fd.Name
ctype := postgresOIDToQValueKind(fd.DataTypeOID)
if ctype == qvalue.QValueKindInvalid {
var err error
if err != nil {
typeName, ok := qe.customTypeMap[fd.DataTypeOID]
if ok {
ctype = customTypeToQKind(typeName)
} else {
ctype = qvalue.QValueKindString
}
typeName, ok := qe.customTypeMap[fd.DataTypeOID]
if ok {
ctype = customTypeToQKind(typeName)
} else {
ctype = qvalue.QValueKindString
}
}
// there isn't a way to know if a column is nullable or not
Expand Down
10 changes: 4 additions & 6 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func qValueKindToPostgresType(qvalueKind string) string {
return "BYTEA"
case qvalue.QValueKindJSON:
return "JSONB"
case qvalue.QValueKindHStore:
return "HSTORE"
case qvalue.QValueKindUUID:
return "UUID"
case qvalue.QValueKindTime:
Expand Down Expand Up @@ -335,12 +337,6 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value)
}
case qvalue.QValueKindHStore:
hstoreVal, err := value.(pgtype.Hstore).HstoreValue()
if err != nil {
return qvalue.QValue{}, fmt.Errorf("failed to parse hstore: %w", err)
}
val = qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal}
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
Expand Down Expand Up @@ -399,6 +395,8 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
qValueKind = qvalue.QValueKindGeometry
case "geography":
qValueKind = qvalue.QValueKindGeography
case "hstore":
qValueKind = qvalue.QValueKindHStore
default:
qValueKind = qvalue.QValueKindString
}
Expand Down
17 changes: 13 additions & 4 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -702,7 +702,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
c14 INET,c15 INTEGER,c16 INTERVAL,c17 JSON,c18 JSONB,c21 MACADDR,c22 MONEY,
c23 NUMERIC,c24 OID,c28 REAL,c29 SMALLINT,c30 SMALLSERIAL,c31 SERIAL,c32 TEXT,
c33 TIMESTAMP,c34 TIMESTAMPTZ,c35 TIME, c36 TIMETZ,c37 TSQUERY,c38 TSVECTOR,
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood);
c39 TXID_SNAPSHOT,c40 UUID,c41 XML, c42 INT[], c43 FLOAT[], c44 TEXT[], c45 mood, c46 HSTORE);
`, srcTableName))
require.NoError(s.t, err)

Expand Down Expand Up @@ -737,8 +737,9 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
txid_current_snapshot(),
'66073c38-b8df-4bdb-bbca-1c97596b8940'::uuid,xmlcomment('hello'),
ARRAY[10299301,2579827],
ARRAY[0.0003, 8902.0092, 'NaN'],
ARRAY['hello','bye'],'happy';
ARRAY[0.0003, 8902.0092],
ARRAY['hello','bye'],'happy',
'key1=>value1, key2=>NULL'::hstore
`, srcTableName))
e2e.EnvNoError(s.t, env, err)
}()
Expand All @@ -756,7 +757,7 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
"c41", "c1", "c2", "c3", "c4",
"c6", "c39", "c40", "id", "c9", "c11", "c12", "c13", "c14", "c15", "c16", "c17", "c18",
"c21", "c22", "c23", "c24", "c28", "c29", "c30", "c31", "c33", "c34", "c35", "c36",
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45",
"c37", "c38", "c7", "c8", "c32", "c42", "c43", "c44", "c45", "c46",
})
if err != nil {
s.t.Log(err)
Expand All @@ -767,6 +768,14 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
// check if JSON on bigquery side is a good JSON
err = s.checkJSONValue(dstTableName, "c17", "sai", "-8.021390374331551")
require.NoError(s.t, err)

// check if HSTORE on bigquery side is a good JSON
err = s.checkJSONValue(dstTableName, "c46", "key1", "\"value1\"")
require.NoError(s.t, err)
err = s.checkJSONValue(dstTableName, "c46", "key2", "null")
require.NoError(s.t, err)

env.AssertExpectations(s.t)
}

func (s PeerFlowE2ETestSuiteBQ) Test_NaN_Doubles_BQ() {
Expand Down
233 changes: 233 additions & 0 deletions flow/hstore/hstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
This is in reference to PostgreSQL's hstore:
https://github.com/postgres/postgres/blob/bea18b1c949145ba2ca79d4765dba3cc9494a480/contrib/hstore/hstore_io.c
This package is an implementation based on the above code.
It's simplified to only parse the subset which `hstore_out` outputs.
*/
package hstore_util

import (
"encoding/json"
"errors"
"fmt"
"strings"
)

type text struct {
String string
Valid bool
}

type hstore map[string]*string

type hstoreParser struct {
str string
pos int
nextBackslash int
}

func newHSP(in string) *hstoreParser {
return &hstoreParser{
pos: 0,
str: in,
nextBackslash: strings.IndexByte(in, '\\'),
}
}

func (p *hstoreParser) atEnd() bool {
return p.pos >= len(p.str)
}

// consume returns the next byte of the string, or end if the string is done.
func (p *hstoreParser) consume() (b byte, end bool) {
if p.pos >= len(p.str) {
return 0, true
}
b = p.str[p.pos]
p.pos++
return b, false
}

func unexpectedByteErr(actualB byte, expectedB byte) error {
return fmt.Errorf("expected '%c' ('%#v'); found '%c' ('%#v')", expectedB, expectedB, actualB, actualB)
}

// consumeExpectedByte consumes expectedB from the string, or returns an error.
func (p *hstoreParser) consumeExpectedByte(expectedB byte) error {
nextB, end := p.consume()
if end {
return fmt.Errorf("expected '%c' ('%#v'); found end", expectedB, expectedB)
}
if nextB != expectedB {
return unexpectedByteErr(nextB, expectedB)
}
return nil
}

func (p *hstoreParser) consumeExpected2(one byte, two byte) error {
if p.pos+2 > len(p.str) {
return errors.New("unexpected end of string")
}
if p.str[p.pos] != one {
return unexpectedByteErr(p.str[p.pos], one)
}
if p.str[p.pos+1] != two {
return unexpectedByteErr(p.str[p.pos+1], two)
}
p.pos += 2
return nil
}

var errEOSInQuoted = errors.New(`found end before closing double-quote ('"')`)

// consumeDoubleQuoted consumes a double-quoted string from p. The double quote must have been
// parsed already.
func (p *hstoreParser) consumeDoubleQuoted() (string, error) {
// fast path: assume most keys/values do not contain escapes
nextDoubleQuote := strings.IndexByte(p.str[p.pos:], '"')
if nextDoubleQuote == -1 {
return "", errEOSInQuoted
}
nextDoubleQuote += p.pos
if p.nextBackslash == -1 || p.nextBackslash > nextDoubleQuote {
s := p.str[p.pos:nextDoubleQuote]
p.pos = nextDoubleQuote + 1
return s, nil
}

s, err := p.consumeDoubleQuotedWithEscapes(p.nextBackslash)
p.nextBackslash = strings.IndexByte(p.str[p.pos:], '\\')
if p.nextBackslash != -1 {
p.nextBackslash += p.pos
}
return s, err
}

// consumeDoubleQuotedWithEscapes consumes a double-quoted string containing escapes, starting
// at p.pos, and with the first backslash at firstBackslash. This copies the string so it can be
// garbage collected separately.
func (p *hstoreParser) consumeDoubleQuotedWithEscapes(firstBackslash int) (string, error) {
// copy the prefix that does not contain backslashes
var builder strings.Builder
builder.WriteString(p.str[p.pos:firstBackslash])

// skip to the backslash
p.pos = firstBackslash

// copy bytes until the end, unescaping backslashes
for {
nextB, end := p.consume()
if end {
return "", errEOSInQuoted
} else if nextB == '"' {
break
} else if nextB == '\\' {
// escape: skip the backslash and copy the char
nextB, end = p.consume()
if end {
return "", errEOSInQuoted
}
if !(nextB == '\\' || nextB == '"') {
return "", fmt.Errorf("unexpected escape in quoted string: found '%#v'", nextB)
}
builder.WriteByte(nextB)
} else {
// normal byte: copy it
builder.WriteByte(nextB)
}
}
return builder.String(), nil
}

// consumePairSeparator consumes the Hstore pair separator ", " or returns an error.
func (p *hstoreParser) consumePairSeparator() error {
return p.consumeExpected2(',', ' ')
}

// consumeKVSeparator consumes the Hstore key/value separator "=>" or returns an error.
func (p *hstoreParser) consumeKVSeparator() error {
return p.consumeExpected2('=', '>')
}

// consumeDoubleQuotedOrNull consumes the Hstore key/value separator "=>" or returns an error.
func (p *hstoreParser) consumeDoubleQuotedOrNull() (text, error) {
// peek at the next byte
if p.atEnd() {
return text{}, errors.New("found end instead of value")
}
next := p.str[p.pos]
if next == 'N' {
// must be the exact string NULL: use consumeExpected2 twice
err := p.consumeExpected2('N', 'U')
if err != nil {
return text{}, err
}
err = p.consumeExpected2('L', 'L')
if err != nil {
return text{}, err
}
return text{String: "", Valid: false}, nil
} else if next != '"' {
return text{}, unexpectedByteErr(next, '"')
}

// skip the double quote
p.pos += 1
s, err := p.consumeDoubleQuoted()
if err != nil {
return text{}, err
}
return text{String: s, Valid: true}, nil
}

func ParseHstore(s string) (string, error) {
p := newHSP(s)

// This is an over-estimate of the number of key/value pairs.
numPairsEstimate := strings.Count(s, ">")
result := make(hstore, numPairsEstimate)
first := true
for !p.atEnd() {
if !first {
err := p.consumePairSeparator()
if err != nil {
return "", err
}
} else {
first = false
}

err := p.consumeExpectedByte('"')
if err != nil {
return "", err
}

key, err := p.consumeDoubleQuoted()
if err != nil {
return "", err
}

err = p.consumeKVSeparator()
if err != nil {
return "", err
}

value, err := p.consumeDoubleQuotedOrNull()
if err != nil {
return "", err
}
if value.Valid {
result[key] = &value.String
} else {
result[key] = nil
}
}

jsonBytes, err := json.Marshal(result)
if err != nil {
return "", err
}

return string(jsonBytes), nil
}
Loading

0 comments on commit 595749d

Please sign in to comment.