Skip to content

Commit

Permalink
refactor bigquery qrep transform and support hstore
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed Jan 5, 2024
1 parent 0900c31 commit 916f802
Show file tree
Hide file tree
Showing 6 changed files with 37 additions and 35 deletions.
3 changes: 3 additions & 0 deletions flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s'),wide_number_mode=>'round') AS %s) AS `%s`",
colName, bqType, colName)
case qvalue.QValueKindHStore:
castStmt = fmt.Sprintf("CAST(PARSE_JSON(CONCAT('{', REPLACE(JSON_VALUE(_peerdb_data, '$.%s'), '=>', ':'), '}')) as %s) AS `%s`",
colName, bqType, colName)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data, '$.%s')) AS `%s`",
Expand Down
38 changes: 22 additions & 16 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,24 +110,26 @@ func (s *QRepAvroSyncMethod) SyncRecords(
return numRecords, nil
}

func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol string, softDeleteCol string) []string {
transformedColumns := make([]string, 0, len(dstTableMetadata.Schema))
for _, col := range dstTableMetadata.Schema {
if col.Name == syncedAtCol || col.Name == softDeleteCol {
continue
}
switch col.Type {
case bigquery.GeographyFieldType:
func getTransformedColumns(schema *model.QRecordSchema) []string {
transformedColumns := make([]string, 0, len(schema.Fields))
for _, field := range schema.Fields {
slog.Info(fmt.Sprintf("field %s has type %s", field.Name, field.Type))
switch field.Type {
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
transformedColumns = append(transformedColumns,
fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", field.Name, field.Name))
case qvalue.QValueKindJSON:
transformedColumns = append(transformedColumns,
fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", col.Name, col.Name))
case bigquery.JSONFieldType:
fmt.Sprintf("PARSE_JSON(`%s`,wide_number_mode=>'round') AS `%s`", field.Name, field.Name))
case qvalue.QValueKindHStore:
transformedColumns = append(transformedColumns,
fmt.Sprintf("PARSE_JSON(`%s`,wide_number_mode=>'round') AS `%s`", col.Name, col.Name))
case bigquery.DateFieldType:
fmt.Sprintf("PARSE_JSON(CONCAT('{', REPLACE(`%s`, '=>', ':'), '}')) AS `%s`",
field.Name, field.Name))
case qvalue.QValueKindDate:
transformedColumns = append(transformedColumns,
fmt.Sprintf("CAST(`%s` AS DATE) AS `%s`", col.Name, col.Name))
fmt.Sprintf("CAST(`%s` AS DATE) AS `%s`", field.Name, field.Name))
default:
transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name))
transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", field.Name))
}
}
return transformedColumns
Expand All @@ -153,7 +155,7 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
slog.Info("Obtained Avro schema for destination table", flowLog)

slog.Info(fmt.Sprintf("Avro schema: %v\n", avroSchema), flowLog)
// create a staging table name with partitionID replace hyphens with underscores
dstDatasetTable, _ := s.connector.convertToDatasetTable(dstTableName)
Expand All @@ -174,7 +176,11 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
)
bqClient := s.connector.client

transformedColumns := getTransformedColumns(dstTableMetadata, syncedAtCol, softDeleteCol)
schema, err := stream.Schema()
if err != nil {
return 0, fmt.Errorf("failed to get schema of source table %s: %w", dstTableName, err)
}
transformedColumns := getTransformedColumns(schema)
selector := strings.Join(transformedColumns, ", ")

if softDeleteCol != "" { // PeerDB column
Expand Down
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
case qvalue.QValueKindString:
return bigquery.StringFieldType
// json also is stored as string for now
case qvalue.QValueKindJSON:
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
return bigquery.JSONFieldType
// time related
case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ:
Expand Down
13 changes: 5 additions & 8 deletions flow/connectors/postgres/qrep_query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,14 +107,11 @@ func (qe *QRepQueryExecutor) fieldDescriptionsToSchema(fds []pgconn.FieldDescrip
cname := fd.Name
ctype := postgresOIDToQValueKind(fd.DataTypeOID)
if ctype == qvalue.QValueKindInvalid {
var err error
if err != nil {
typeName, ok := qe.customTypeMap[fd.DataTypeOID]
if ok {
ctype = customTypeToQKind(typeName)
} else {
ctype = qvalue.QValueKindString
}
typeName, ok := qe.customTypeMap[fd.DataTypeOID]
if ok {
ctype = customTypeToQKind(typeName)
} else {
ctype = qvalue.QValueKindString
}
}
// there isn't a way to know if a column is nullable or not
Expand Down
10 changes: 4 additions & 6 deletions flow/connectors/postgres/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ func qValueKindToPostgresType(qvalueKind string) string {
return "BYTEA"
case qvalue.QValueKindJSON:
return "JSONB"
case qvalue.QValueKindHStore:
return "HSTORE"
case qvalue.QValueKindUUID:
return "UUID"
case qvalue.QValueKindTime:
Expand Down Expand Up @@ -335,12 +337,6 @@ func parseFieldFromQValueKind(qvalueKind qvalue.QValueKind, value interface{}) (
default:
return qvalue.QValue{}, fmt.Errorf("failed to parse array string: %v", value)
}
case qvalue.QValueKindHStore:
hstoreVal, err := value.(pgtype.Hstore).HstoreValue()
if err != nil {
return qvalue.QValue{}, fmt.Errorf("failed to parse hstore: %w", err)
}
val = qvalue.QValue{Kind: qvalue.QValueKindHStore, Value: hstoreVal}
case qvalue.QValueKindPoint:
xCoord := value.(pgtype.Point).P.X
yCoord := value.(pgtype.Point).P.Y
Expand Down Expand Up @@ -399,6 +395,8 @@ func customTypeToQKind(typeName string) qvalue.QValueKind {
qValueKind = qvalue.QValueKindGeometry
case "geography":
qValueKind = qvalue.QValueKindGeography
case "hstore":
qValueKind = qvalue.QValueKindHStore
default:
qValueKind = qvalue.QValueKindString
}
Expand Down
6 changes: 2 additions & 4 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,10 +153,8 @@ func (c *QValueAvroConverter) ToAvroValue() (interface{}, error) {
return c.processNumeric()
case QValueKindBytes, QValueKindBit:
return c.processBytes()
case QValueKindJSON:
case QValueKindJSON, QValueKindHStore:
return c.processJSON()
case QValueKindHStore:
return nil, fmt.Errorf("QValueKindHStore not supported")
case QValueKindArrayFloat32:
return c.processArrayFloat32()
case QValueKindArrayFloat64:
Expand Down Expand Up @@ -251,7 +249,7 @@ func (c *QValueAvroConverter) processJSON() (interface{}, error) {

jsonString, ok := c.Value.Value.(string)
if !ok {
return nil, fmt.Errorf("invalid JSON value %v", c.Value.Value)
return nil, fmt.Errorf("invalid JSON or HStore value %v", c.Value.Value)
}

if c.Nullable {
Expand Down

0 comments on commit 916f802

Please sign in to comment.