Skip to content

Commit

Permalink
fix hstore for cdc
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 8, 2024
1 parent b78f888 commit 7c9a86d
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 4 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
var castStmt string
shortCol := m.shortColumn[colName]
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, shortCol)
Expand Down
17 changes: 17 additions & 0 deletions flow/model/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,23 @@ func (r *RecordItems) toMap() (map[string]interface{}, error) {
} else {
jsonStruct[col] = strVal
}
case qvalue.QValueKindHStore:
hstoreVal, ok := v.Value.(string)
if !ok {
return nil, fmt.Errorf("expected string value for hstore column %s for value %T", col, v.Value)
}

jsonVal, err := qvalue.HStoreToJSON(hstoreVal)
if err != nil {
return nil, fmt.Errorf("unable to convert hstore column %s to json for value %T", col, v.Value)
}

if len(jsonVal) > 15*1024*1024 {
jsonStruct[col] = ""
} else {
jsonStruct[col] = jsonVal
}

case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ, qvalue.QValueKindDate,
qvalue.QValueKindTime, qvalue.QValueKindTimeTZ:
jsonStruct[col], err = v.GoTimeConvert()
Expand Down
6 changes: 3 additions & 3 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ type QValueAvroConverter struct {
Nullable bool
}

func hstoreToJSON(hstore string) (string, error) {
func HStoreToJSON(hstore string) (string, error) {
re := regexp.MustCompile(`"(.*?)"=>(?:"([^"]*)"|NULL)`)
matches := re.FindAllStringSubmatch(hstore, -1)

Expand Down Expand Up @@ -307,7 +307,7 @@ func (c *QValueAvroConverter) processHStore() (interface{}, error) {
return nil, fmt.Errorf("invalid HSTORE value %v", c.Value.Value)
}

jsonString, err := hstoreToJSON(hstoreString)
jsonString, err := HStoreToJSON(hstoreString)
if err != nil {
return nil, err
}
Expand All @@ -318,7 +318,7 @@ func (c *QValueAvroConverter) processHStore() (interface{}, error) {
slog.Warn("Check this issue for details: https://github.com/PeerDB-io/peerdb/issues/309")
return goavro.Union("string", ""), nil
}
return goavro.Union("string", hstoreString), nil
return goavro.Union("string", jsonString), nil
}

if c.TargetDWH == QDWHTypeSnowflake && len(jsonString) > 15*1024*1024 {
Expand Down

0 comments on commit 7c9a86d

Please sign in to comment.