From 6e31b8b8d0894f84d979acbc8e376bf39bfef5c6 Mon Sep 17 00:00:00 2001 From: Amogh-Bharadwaj Date: Tue, 9 Jan 2024 01:42:38 +0530 Subject: [PATCH] better hstore parse, merge handling --- .../bigquery/merge_statement_generator.go | 9 +- flow/hstore/hstore.go | 234 ++++++++++++++++++ flow/model/model.go | 3 +- flow/model/qvalue/avro_converter.go | 31 +-- 4 files changed, 247 insertions(+), 30 deletions(-) create mode 100644 flow/hstore/hstore.go diff --git a/flow/connectors/bigquery/merge_statement_generator.go b/flow/connectors/bigquery/merge_statement_generator.go index 7b2a54fa2b..bc320f8a4c 100644 --- a/flow/connectors/bigquery/merge_statement_generator.go +++ b/flow/connectors/bigquery/merge_statement_generator.go @@ -47,10 +47,17 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string { var castStmt string shortCol := m.shortColumn[colName] switch qvalue.QValueKind(colType) { - case qvalue.QValueKindJSON, qvalue.QValueKindHStore: + case qvalue.QValueKindJSON: // if the type is JSON, then just extract JSON castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`", colName, bqType, shortCol) + case qvalue.QValueKindHStore: + // PARSE_JSON doesn't work for HSTORE with \" in the value, so use SAFE.PARSE_JSON and coalesce with TO_JSON + castStmt = fmt.Sprintf("CAST(COALESCE("+ + "SAFE.PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'))"+ + ",TO_JSON(JSON_VALUE(_peerdb_data, '$.%s'))"+ + ") AS %s) AS `%s`", + colName, colName, bqType, shortCol) // expecting data in BASE64 format case qvalue.QValueKindBytes, qvalue.QValueKindBit: castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`", diff --git a/flow/hstore/hstore.go b/flow/hstore/hstore.go new file mode 100644 index 0000000000..915c05269e --- /dev/null +++ b/flow/hstore/hstore.go @@ -0,0 +1,234 @@ +package hstore_util + +import ( + "errors" + "fmt" + "strings" +) + +type text struct { + String string + Valid bool +} + +type hstore map[string]*string + +type hstoreParser struct { + str string + pos int + nextBackslash int +} + +func newHSP(in string) *hstoreParser { + return &hstoreParser{ + pos: 0, + str: in, + nextBackslash: strings.IndexByte(in, '\\'), + } +} + +func (p *hstoreParser) atEnd() bool { + return p.pos >= len(p.str) +} + +// consume returns the next byte of the string, or end if the string is done. +func (p *hstoreParser) consume() (b byte, end bool) { + if p.pos >= len(p.str) { + return 0, true + } + b = p.str[p.pos] + p.pos++ + return b, false +} + +func unexpectedByteErr(actualB byte, expectedB byte) error { + return fmt.Errorf("expected '%c' ('%#v'); found '%c' ('%#v')", expectedB, expectedB, actualB, actualB) +} + +// consumeExpectedByte consumes expectedB from the string, or returns an error. +func (p *hstoreParser) consumeExpectedByte(expectedB byte) error { + nextB, end := p.consume() + if end { + return fmt.Errorf("expected '%c' ('%#v'); found end", expectedB, expectedB) + } + if nextB != expectedB { + return unexpectedByteErr(nextB, expectedB) + } + return nil +} + +func (p *hstoreParser) consumeExpected2(one byte, two byte) error { + if p.pos+2 > len(p.str) { + return errors.New("unexpected end of string") + } + if p.str[p.pos] != one { + return unexpectedByteErr(p.str[p.pos], one) + } + if p.str[p.pos+1] != two { + return unexpectedByteErr(p.str[p.pos+1], two) + } + p.pos += 2 + return nil +} + +var errEOSInQuoted = errors.New(`found end before closing double-quote ('"')`) + +// consumeDoubleQuoted consumes a double-quoted string from p. The double quote must have been +// parsed already. This copies the string from the backing string so it can be garbage collected. +func (p *hstoreParser) consumeDoubleQuoted() (string, error) { + // fast path: assume most keys/values do not contain escapes + nextDoubleQuote := strings.IndexByte(p.str[p.pos:], '"') + if nextDoubleQuote == -1 { + return "", errEOSInQuoted + } + nextDoubleQuote += p.pos + if p.nextBackslash == -1 || p.nextBackslash > nextDoubleQuote { + s := strings.Clone(p.str[p.pos:nextDoubleQuote]) + p.pos = nextDoubleQuote + 1 + return s, nil + } + + s, err := p.consumeDoubleQuotedWithEscapes(p.nextBackslash) + p.nextBackslash = strings.IndexByte(p.str[p.pos:], '\\') + if p.nextBackslash != -1 { + p.nextBackslash += p.pos + } + return s, err +} + +// consumeDoubleQuotedWithEscapes consumes a double-quoted string containing escapes, starting +// at p.pos, and with the first backslash at firstBackslash. This copies the string so it can be +// garbage collected separately. +func (p *hstoreParser) consumeDoubleQuotedWithEscapes(firstBackslash int) (string, error) { + // copy the prefix that does not contain backslashes + var builder strings.Builder + builder.WriteString(p.str[p.pos:firstBackslash]) + + // skip to the backslash + p.pos = firstBackslash + + // copy bytes until the end, unescaping backslashes + for { + nextB, end := p.consume() + if end { + return "", errEOSInQuoted + } else if nextB == '"' { + break + } else if nextB == '\\' { + // escape: skip the backslash and copy the char + nextB, end = p.consume() + if end { + return "", errEOSInQuoted + } + if !(nextB == '\\' || nextB == '"') { + return "", fmt.Errorf("unexpected escape in quoted string: found '%#v'", nextB) + } + builder.WriteByte(nextB) + } else { + // normal byte: copy it + builder.WriteByte(nextB) + } + } + return builder.String(), nil +} + +// consumePairSeparator consumes the Hstore pair separator ", " or returns an error. +func (p *hstoreParser) consumePairSeparator() error { + return p.consumeExpected2(',', ' ') +} + +// consumeKVSeparator consumes the Hstore key/value separator "=>" or returns an error. +func (p *hstoreParser) consumeKVSeparator() error { + return p.consumeExpected2('=', '>') +} + +// consumeDoubleQuotedOrNull consumes the Hstore key/value separator "=>" or returns an error. +func (p *hstoreParser) consumeDoubleQuotedOrNull() (text, error) { + // peek at the next byte + if p.atEnd() { + return text{}, errors.New("found end instead of value") + } + next := p.str[p.pos] + if next == 'N' { + // must be the exact string NULL: use consumeExpected2 twice + err := p.consumeExpected2('N', 'U') + if err != nil { + return text{}, err + } + err = p.consumeExpected2('L', 'L') + if err != nil { + return text{}, err + } + return text{String: "", Valid: false}, nil + } else if next != '"' { + return text{}, unexpectedByteErr(next, '"') + } + + // skip the double quote + p.pos += 1 + s, err := p.consumeDoubleQuoted() + if err != nil { + return text{}, err + } + return text{String: s, Valid: true}, nil +} + +func ParseHstore(s string) (string, error) { + p := newHSP(s) + + // This is an over-estimate of the number of key/value pairs. + numPairsEstimate := strings.Count(s, ">") + // makes one allocation of strings for the entire Hstore, rather than one allocation per value. + valueStrings := make([]string, 0, numPairsEstimate) + result := make(hstore, numPairsEstimate) + first := true + for !p.atEnd() { + if !first { + err := p.consumePairSeparator() + if err != nil { + return "", err + } + } else { + first = false + } + + err := p.consumeExpectedByte('"') + if err != nil { + return "", err + } + + key, err := p.consumeDoubleQuoted() + if err != nil { + return "", err + } + + err = p.consumeKVSeparator() + if err != nil { + return "", err + } + + value, err := p.consumeDoubleQuotedOrNull() + if err != nil { + return "", err + } + if value.Valid { + valueStrings = append(valueStrings, value.String) + result[key] = &valueStrings[len(valueStrings)-1] + } else { + result[key] = nil + } + } + + // Convert to JSON string + jsonString := "{" + for k, v := range result { + if v != nil { + jsonString += fmt.Sprintf("\"%s\":\"%s\",", k, *v) + } else { + jsonString += fmt.Sprintf("\"%s\":\"\",", k) + } + } + jsonString = strings.TrimSuffix(jsonString, ",") + jsonString += "}" + return jsonString, nil +} diff --git a/flow/model/model.go b/flow/model/model.go index 1b439c44ca..eec1439888 100644 --- a/flow/model/model.go +++ b/flow/model/model.go @@ -10,6 +10,7 @@ import ( "time" "github.com/PeerDB-io/peer-flow/generated/protos" + hstore_util "github.com/PeerDB-io/peer-flow/hstore" "github.com/PeerDB-io/peer-flow/model/qvalue" "github.com/PeerDB-io/peer-flow/peerdbenv" ) @@ -161,7 +162,7 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) { return nil, fmt.Errorf("expected string value for hstore column %s for value %T", col, v.Value) } - jsonVal, err := qvalue.HStoreToJSON(hstoreVal) + jsonVal, err := hstore_util.ParseHstore(hstoreVal) if err != nil { return nil, fmt.Errorf("unable to convert hstore column %s to json for value %T", col, v.Value) } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index e96dae6e88..9fafb8a18c 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -1,13 +1,12 @@ package qvalue import ( - "encoding/json" "fmt" "log/slog" "math/big" - "regexp" "time" + hstore_util "github.com/PeerDB-io/peer-flow/hstore" "github.com/google/uuid" "github.com/linkedin/goavro/v2" ) @@ -98,30 +97,6 @@ type QValueAvroConverter struct { Nullable bool } -func HStoreToJSON(hstore string) (string, error) { - re := regexp.MustCompile(`"(.*?)"=>(?:"([^"]*)"|NULL)`) - matches := re.FindAllStringSubmatch(hstore, -1) - - result := make(map[string]interface{}) - for _, match := range matches { - if len(match) == 3 { - key := match[1] - if match[2] != "NULL" { - result[key] = match[2] - } else { - result[key] = nil - } - } - } - - jsonData, err := json.Marshal(result) - if err != nil { - return "", err - } - - return string(jsonData), nil -} - func NewQValueAvroConverter(value QValue, targetDWH QDWHType, nullable bool) *QValueAvroConverter { return &QValueAvroConverter{ Value: value, @@ -307,9 +282,9 @@ func (c *QValueAvroConverter) processHStore() (interface{}, error) { return nil, fmt.Errorf("invalid HSTORE value %v", c.Value.Value) } - jsonString, err := HStoreToJSON(hstoreString) + jsonString, err := hstore_util.ParseHstore(hstoreString) if err != nil { - return nil, err + return "", err } if c.Nullable {