Skip to content

Commit

Permalink
Support more data types (#1089)
Browse files Browse the repository at this point in the history
This PR adds support for the following types across PG, SF and BQ:
- Array of Boolean
- Array of Date
- Array of Timestamp(/TZ)
- Array of Int16 (smallint)
- CIDR
- MacAddr
- INET

The latter 3 is more relevant for Postgres.
This PR achieves the correct AVRO type mapping for Date in BigQuery -
eliminating the hack which was being done in merge transforms.
This PR also writes date and time values as human readable text for BQ
QRep

Tests added as well

Fixes #666 
Fixes #20
  • Loading branch information
Amogh-Bharadwaj authored Jan 17, 2024
1 parent 9c447cf commit 84aaef2
Show file tree
Hide file tree
Showing 17 changed files with 730 additions and 121 deletions.
5 changes: 0 additions & 5 deletions flow/connectors/bigquery/avro_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ func TestAvroTransform(t *testing.T) {
Name: "col2",
Type: bigquery.JSONFieldType,
},
&bigquery.FieldSchema{
Name: "col3",
Type: bigquery.DateFieldType,
},
&bigquery.FieldSchema{
Name: "camelCol4",
Type: bigquery.StringFieldType,
Expand All @@ -34,7 +30,6 @@ func TestAvroTransform(t *testing.T) {
expectedTransformCols := []string{
"ST_GEOGFROMTEXT(`col1`) AS `col1`",
"PARSE_JSON(`col2`,wide_number_mode=>'round') AS `col2`",
"CAST(`col3` AS DATE) AS `col3`",
"`camelCol4`",
}
transformedCols := getTransformedColumns(dstSchema, "sync_col", "del_col")
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
colName, shortCol)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString,
qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ,
qvalue.QValueKindArrayDate:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
bqType, colName, shortCol)
Expand Down
96 changes: 54 additions & 42 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softD
case bigquery.JSONFieldType:
transformedColumns = append(transformedColumns,
fmt.Sprintf("PARSE_JSON(`%s`,wide_number_mode=>'round') AS `%s`", col.Name, col.Name))
case bigquery.DateFieldType:
transformedColumns = append(transformedColumns,
fmt.Sprintf("CAST(`%s` AS DATE) AS `%s`", col.Name, col.Name))
default:
transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name))
}
Expand Down Expand Up @@ -290,9 +287,9 @@ func DefineAvroSchema(dstTableName string,
func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
return map[string]interface{}{
"type": "array",
"items": typ,
return qvalue.AvroSchemaArray{
Type: "array",
Items: typ,
}
} else {
return typ
Expand All @@ -309,64 +306,79 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
case bigquery.FloatFieldType:
return considerRepeated("double", bqField.Repeated), nil
case bigquery.BooleanFieldType:
return "boolean", nil
return considerRepeated("boolean", bqField.Repeated), nil
case bigquery.TimestampFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
}, nil
timestampSchema := qvalue.AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
}
if bqField.Repeated {
return qvalue.AvroSchemaComplexArray{
Type: "array",
Items: timestampSchema,
}, nil
}
return timestampSchema, nil
case bigquery.DateFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
}, nil
dateSchema := qvalue.AvroSchemaField{
Type: "int",
LogicalType: "date",
}
if bqField.Repeated {
return qvalue.AvroSchemaComplexArray{
Type: "array",
Items: dateSchema,
}, nil
}
return dateSchema, nil

case bigquery.TimeFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
return qvalue.AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
}, nil
case bigquery.DateTimeFieldType:
return map[string]interface{}{
"type": "record",
"name": "datetime",
"fields": []map[string]string{
return qvalue.AvroSchemaRecord{
Type: "record",
Name: "datetime",
Fields: []qvalue.AvroSchemaField{
{
"name": "date",
"type": "int",
"logicalType": "date",
Name: "date",
Type: "int",
LogicalType: "date",
},
{
"name": "time",
"type": "long",
"logicalType": "time-micros",
Name: "time",
Type: "long",
LogicalType: "time-micros",
},
},
}, nil
case bigquery.NumericFieldType:
return map[string]interface{}{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 9,
return qvalue.AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Precision: 38,
Scale: 9,
}, nil
case bigquery.RecordFieldType:
avroFields := []map[string]interface{}{}
avroFields := []qvalue.AvroSchemaField{}
for _, bqSubField := range bqField.Schema {
avroType, err := GetAvroType(bqSubField)
if err != nil {
return nil, err
}
avroFields = append(avroFields, map[string]interface{}{
"name": bqSubField.Name,
"type": avroType,
avroFields = append(avroFields, qvalue.AvroSchemaField{
Name: bqSubField.Name,
Type: avroType,
})
}
return map[string]interface{}{
"type": "record",
"name": bqField.Name,
"fields": avroFields,
return qvalue.AvroSchemaRecord{
Type: "record",
Name: bqField.Name,
Fields: avroFields,
}, nil
// TODO(kaushik/sai): Add other field types as needed

default:
return nil, fmt.Errorf("unsupported BigQuery field type: %s", bqField.Type)
}
Expand Down
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
// For Arrays we return the types of the individual elements,
// and wherever this function is called, the 'Repeated' attribute of
// FieldSchema must be set to true.
case qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64:
case qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64:
return bigquery.IntegerFieldType
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64:
return bigquery.FloatFieldType
case qvalue.QValueKindArrayBoolean:
return bigquery.BooleanFieldType
case qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ:
return bigquery.TimestampFieldType
case qvalue.QValueKindArrayDate:
return bigquery.DateFieldType
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
return bigquery.GeographyFieldType
// rest will be strings
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,8 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
var parsedData any
var err error
if dt, ok := p.typeMap.TypeForOID(dataType); ok {
if dt.Name == "uuid" {
// below is required to decode uuid to string
if dt.Name == "uuid" || dt.Name == "cidr" || dt.Name == "inet" || dt.Name == "macaddr" {
// below is required to decode above types to string
parsedData, err = dt.Codec.DecodeDatabaseSQLValue(p.typeMap, dataType, pgtype.TextFormatCode, data)
} else {
parsedData, err = dt.Codec.DecodeValue(p.typeMap, dataType, formatCode, data)
Expand Down
Loading

0 comments on commit 84aaef2

Please sign in to comment.