Skip to content

Commit

Permalink
Better JSON datatype handling in BQ (#965)
Browse files Browse the repository at this point in the history
Before we were creating the destination table (SetupNormalize) with the
JSON column as STRING. Now it is truly JSON at the BigQuery destination
  • Loading branch information
Amogh-Bharadwaj authored Jan 2, 2024
1 parent 7ade9aa commit 8e4d68c
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 5 deletions.
2 changes: 1 addition & 1 deletion flow/connectors/bigquery/merge_statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
switch qvalue.QValueKind(colType) {
case qvalue.QValueKindJSON:
// if the type is JSON, then just extract JSON
castStmt = fmt.Sprintf("CAST(JSON_VALUE(_peerdb_data, '$.%s') AS %s) AS `%s`",
castStmt = fmt.Sprintf("CAST(PARSE_JSON(JSON_VALUE(_peerdb_data, '$.%s')) AS %s) AS `%s`",
colName, bqType, colName)
// expecting data in BASE64 format
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
Expand Down
10 changes: 7 additions & 3 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,14 @@ func getTransformedColumns(dstTableMetadata *bigquery.TableMetadata, syncedAtCol
if col.Name == syncedAtCol || col.Name == softDeleteCol {
continue
}
if col.Type == bigquery.GeographyFieldType {
switch col.Type {
case bigquery.GeographyFieldType:
transformedColumns = append(transformedColumns,
fmt.Sprintf("ST_GEOGFROMTEXT(`%s`) AS `%s`", col.Name, col.Name))
} else {
case bigquery.JSONFieldType:
transformedColumns = append(transformedColumns,
fmt.Sprintf("PARSE_JSON(`%s`) AS `%s`", col.Name, col.Name))
default:
transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name))
}
}
Expand Down Expand Up @@ -280,7 +284,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
}

switch bqField.Type {
case bigquery.StringFieldType, bigquery.GeographyFieldType:
case bigquery.StringFieldType, bigquery.GeographyFieldType, bigquery.JSONFieldType:
return considerRepeated("string", bqField.Repeated), nil
case bigquery.BytesFieldType:
return "bytes", nil
Expand Down
4 changes: 3 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
return bigquery.StringFieldType
// json also is stored as string for now
case qvalue.QValueKindJSON:
return bigquery.StringFieldType
return bigquery.JSONFieldType
// time related
case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ:
return bigquery.TimestampFieldType
Expand Down Expand Up @@ -79,6 +79,8 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) (qvalue.QValueKind,
return qvalue.QValueKindNumeric, nil
case bigquery.GeographyFieldType:
return qvalue.QValueKindGeography, nil
case bigquery.JSONFieldType:
return qvalue.QValueKindJSON, nil
default:
return "", fmt.Errorf("unsupported bigquery field type: %v", fieldType)
}
Expand Down
19 changes: 19 additions & 0 deletions flow/e2e/bigquery/peer_flow_bq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,21 @@ func TestPeerFlowE2ETestSuiteBQ(t *testing.T) {
})
}

func (s PeerFlowE2ETestSuiteBQ) checkJSONValue(tableName, colName, fieldName, value string) error {
res, err := s.bqHelper.ExecuteAndProcessQuery(fmt.Sprintf(
"SELECT `%s`.%s FROM `%s.%s`;",
colName, fieldName, s.bqHelper.datasetName, tableName))
if err != nil {
return fmt.Errorf("json value check failed: %v", err)
}

jsonVal := res.Records[0].Entries[0].Value
if jsonVal != value {
return fmt.Errorf("bad json value in field %s of column %s: %v. expected: %v", fieldName, colName, jsonVal, value)
}
return nil
}

func (s PeerFlowE2ETestSuiteBQ) attachSchemaSuffix(tableName string) string {
return fmt.Sprintf("e2e_test_%s.%s", s.bqSuffix, tableName)
}
Expand Down Expand Up @@ -741,6 +756,10 @@ func (s PeerFlowE2ETestSuiteBQ) Test_Types_BQ() {
// Make sure that there are no nulls
require.True(s.t, noNulls)

// check if JSON on bigquery side is a good JSON
err = s.checkJSONValue(dstTableName, "c17", "sai", "1")
require.NoError(s.t, err)

env.AssertExpectations(s.t)
}

Expand Down

0 comments on commit 8e4d68c

Please sign in to comment.