Skip to content

Commit

Permalink
better hstore parse, merge handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 8, 2024
1 parent 30f1e54 commit 6e31b8b
Show file tree
Hide file tree
Showing 4 changed files with 247 additions and 30 deletions.
9 changes: 8 additions & 1 deletion flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,17 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
var castStmt string
shortCol := m.shortColumn[colName]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
case qvalue.QValueKindJSON:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, shortCol)
case qvalue.QValueKindHStore:
// PARSE_JSON doesn't work for HSTORE with \" in the value, so use SAFE.PARSE_JSON and coalesce with TO_JSON
castStmt = fmt.Sprintf("CAST(COALESCE("+
"SAFE.PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'))"+
",TO_JSON(JSON_VALUE(_peerdb_data, '$.%s'))"+
") AS %s) AS `%s`",
colName, colName, bqType, shortCol)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
Expand Down
234 changes: 234 additions & 0 deletions flow/hstore/hstore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
package hstore_util

import (
"errors"
"fmt"
"strings"
)

type text struct {
String string
Valid bool
}

type hstore map[string]*string

type hstoreParser struct {
str string
pos int
nextBackslash int
}

func newHSP(in string) *hstoreParser {
return &hstoreParser{
pos: 0,
str: in,
nextBackslash: strings.IndexByte(in, '\\'),
}
}

func (p *hstoreParser) atEnd() bool {
return p.pos >= len(p.str)
}

// consume returns the next byte of the string, or end if the string is done.
func (p *hstoreParser) consume() (b byte, end bool) {
if p.pos >= len(p.str) {
return 0, true
}
b = p.str[p.pos]
p.pos++
return b, false
}

func unexpectedByteErr(actualB byte, expectedB byte) error {
return fmt.Errorf("expected '%c' ('%#v'); found '%c' ('%#v')", expectedB, expectedB, actualB, actualB)
}

// consumeExpectedByte consumes expectedB from the string, or returns an error.
func (p *hstoreParser) consumeExpectedByte(expectedB byte) error {
nextB, end := p.consume()
if end {
return fmt.Errorf("expected '%c' ('%#v'); found end", expectedB, expectedB)
}
if nextB != expectedB {
return unexpectedByteErr(nextB, expectedB)
}
return nil
}

func (p *hstoreParser) consumeExpected2(one byte, two byte) error {
if p.pos+2 > len(p.str) {
return errors.New("unexpected end of string")
}
if p.str[p.pos] != one {
return unexpectedByteErr(p.str[p.pos], one)
}
if p.str[p.pos+1] != two {
return unexpectedByteErr(p.str[p.pos+1], two)
}
p.pos += 2
return nil
}

var errEOSInQuoted = errors.New(`found end before closing double-quote ('"')`)

// consumeDoubleQuoted consumes a double-quoted string from p. The double quote must have been
// parsed already. This copies the string from the backing string so it can be garbage collected.
func (p *hstoreParser) consumeDoubleQuoted() (string, error) {
// fast path: assume most keys/values do not contain escapes
nextDoubleQuote := strings.IndexByte(p.str[p.pos:], '"')
if nextDoubleQuote == -1 {
return "", errEOSInQuoted
}
nextDoubleQuote += p.pos
if p.nextBackslash == -1 || p.nextBackslash > nextDoubleQuote {
s := strings.Clone(p.str[p.pos:nextDoubleQuote])
p.pos = nextDoubleQuote + 1
return s, nil
}

s, err := p.consumeDoubleQuotedWithEscapes(p.nextBackslash)
p.nextBackslash = strings.IndexByte(p.str[p.pos:], '\\')
if p.nextBackslash != -1 {
p.nextBackslash += p.pos
}
return s, err
}

// consumeDoubleQuotedWithEscapes consumes a double-quoted string containing escapes, starting
// at p.pos, and with the first backslash at firstBackslash. This copies the string so it can be
// garbage collected separately.
func (p *hstoreParser) consumeDoubleQuotedWithEscapes(firstBackslash int) (string, error) {
// copy the prefix that does not contain backslashes
var builder strings.Builder
builder.WriteString(p.str[p.pos:firstBackslash])

// skip to the backslash
p.pos = firstBackslash

// copy bytes until the end, unescaping backslashes
for {
nextB, end := p.consume()
if end {
return "", errEOSInQuoted
} else if nextB == '"' {
break
} else if nextB == '\\' {
// escape: skip the backslash and copy the char
nextB, end = p.consume()
if end {
return "", errEOSInQuoted
}
if !(nextB == '\\' || nextB == '"') {
return "", fmt.Errorf("unexpected escape in quoted string: found '%#v'", nextB)
}
builder.WriteByte(nextB)
} else {
// normal byte: copy it
builder.WriteByte(nextB)
}
}
return builder.String(), nil
}

// consumePairSeparator consumes the Hstore pair separator ", " or returns an error.
func (p *hstoreParser) consumePairSeparator() error {
return p.consumeExpected2(',', ' ')
}

// consumeKVSeparator consumes the Hstore key/value separator "=>" or returns an error.
func (p *hstoreParser) consumeKVSeparator() error {
return p.consumeExpected2('=', '>')
}

// consumeDoubleQuotedOrNull consumes the Hstore key/value separator "=>" or returns an error.
func (p *hstoreParser) consumeDoubleQuotedOrNull() (text, error) {
// peek at the next byte
if p.atEnd() {
return text{}, errors.New("found end instead of value")
}
next := p.str[p.pos]
if next == 'N' {
// must be the exact string NULL: use consumeExpected2 twice
err := p.consumeExpected2('N', 'U')
if err != nil {
return text{}, err
}
err = p.consumeExpected2('L', 'L')
if err != nil {
return text{}, err
}
return text{String: "", Valid: false}, nil
} else if next != '"' {
return text{}, unexpectedByteErr(next, '"')
}

// skip the double quote
p.pos += 1
s, err := p.consumeDoubleQuoted()
if err != nil {
return text{}, err
}
return text{String: s, Valid: true}, nil
}

func ParseHstore(s string) (string, error) {
p := newHSP(s)

// This is an over-estimate of the number of key/value pairs.
numPairsEstimate := strings.Count(s, ">")
// makes one allocation of strings for the entire Hstore, rather than one allocation per value.
valueStrings := make([]string, 0, numPairsEstimate)
result := make(hstore, numPairsEstimate)
first := true
for !p.atEnd() {
if !first {
err := p.consumePairSeparator()
if err != nil {
return "", err
}
} else {
first = false
}

err := p.consumeExpectedByte('"')
if err != nil {
return "", err
}

key, err := p.consumeDoubleQuoted()
if err != nil {
return "", err
}

err = p.consumeKVSeparator()
if err != nil {
return "", err
}

value, err := p.consumeDoubleQuotedOrNull()
if err != nil {
return "", err
}
if value.Valid {
valueStrings = append(valueStrings, value.String)
result[key] = &valueStrings[len(valueStrings)-1]
} else {
result[key] = nil
}
}

// Convert to JSON string
jsonString := "{"
for k, v := range result {
if v != nil {
jsonString += fmt.Sprintf("\"%s\":\"%s\",", k, *v)
} else {
jsonString += fmt.Sprintf("\"%s\":\"\",", k)
}
}
jsonString = strings.TrimSuffix(jsonString, ",")
jsonString += "}"
return jsonString, nil
}
3 changes: 2 additions & 1 deletion flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"github.com/PeerDB-io/peer-flow/generated/protos"
hstore_util "github.com/PeerDB-io/peer-flow/hstore"
"github.com/PeerDB-io/peer-flow/model/qvalue"
"github.com/PeerDB-io/peer-flow/peerdbenv"
)
Expand Down Expand Up @@ -161,7 +162,7 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) {
return nil, fmt.Errorf("expected string value for hstore column %s for value %T", col, v.Value)
}

jsonVal, err := qvalue.HStoreToJSON(hstoreVal)
jsonVal, err := hstore_util.ParseHstore(hstoreVal)
if err != nil {
return nil, fmt.Errorf("unable to convert hstore column %s to json for value %T", col, v.Value)
}
Expand Down
31 changes: 3 additions & 28 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
package qvalue

import (
"encoding/json"
"fmt"
"log/slog"
"math/big"
"regexp"
"time"

hstore_util "github.com/PeerDB-io/peer-flow/hstore"
"github.com/google/uuid"
"github.com/linkedin/goavro/v2"
)
Expand Down Expand Up @@ -98,30 +97,6 @@ type QValueAvroConverter struct {
Nullable bool
}

func HStoreToJSON(hstore string) (string, error) {
re := regexp.MustCompile(`"(.*?)"=>(?:"([^"]*)"|NULL)`)
matches := re.FindAllStringSubmatch(hstore, -1)

result := make(map[string]interface{})
for _, match := range matches {
if len(match) == 3 {
key := match[1]
if match[2] != "NULL" {
result[key] = match[2]
} else {
result[key] = nil
}
}
}

jsonData, err := json.Marshal(result)
if err != nil {
return "", err
}

return string(jsonData), nil
}

func NewQValueAvroConverter(value QValue, targetDWH QDWHType, nullable bool) *QValueAvroConverter {
return &QValueAvroConverter{
Value: value,
Expand Down Expand Up @@ -307,9 +282,9 @@ func (c *QValueAvroConverter) processHStore() (interface{}, error) {
return nil, fmt.Errorf("invalid HSTORE value %v", c.Value.Value)
}

jsonString, err := HStoreToJSON(hstoreString)
jsonString, err := hstore_util.ParseHstore(hstoreString)
if err != nil {
return nil, err
return "", err
}

if c.Nullable {
Expand Down

0 comments on commit 6e31b8b

Please sign in to comment.