Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support more data types #1089

Merged
merged 9 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 0 additions & 5 deletions flow/connectors/bigquery/avro_transform_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,6 @@ func TestAvroTransform(t *testing.T) {
Name: "col2",
Type: bigquery.JSONFieldType,
},
&bigquery.FieldSchema{
Name: "col3",
Type: bigquery.DateFieldType,
},
&bigquery.FieldSchema{
Name: "camelCol4",
Type: bigquery.StringFieldType,
Expand All @@ -34,7 +30,6 @@ func TestAvroTransform(t *testing.T) {
expectedTransformCols := []string{
"ST_GEOGFROMTEXT(`col1`) AS `col1`",
"PARSE_JSON(`col2`,wide_number_mode=>'round') AS `col2`",
"CAST(`col3` AS DATE) AS `col3`",
"`camelCol4`",
}
transformedCols := getTransformedColumns(dstSchema, "sync_col", "del_col")
Expand Down
6 changes: 4 additions & 2 deletions flow/connectors/bigquery/merge_stmt_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,10 @@ func (m *mergeStmtGenerator) generateFlattenedCTE() string {
case qvalue.QValueKindBytes, qvalue.QValueKindBit:
castStmt = fmt.Sprintf("FROM_BASE64(JSON_VALUE(_peerdb_data,'$.%s')) AS `%s`",
colName, shortCol)
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString:
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64, qvalue.QValueKindArrayInt16,
qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64, qvalue.QValueKindArrayString,
qvalue.QValueKindArrayBoolean, qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ,
qvalue.QValueKindArrayDate:
castStmt = fmt.Sprintf("ARRAY(SELECT CAST(element AS %s) FROM "+
"UNNEST(CAST(JSON_VALUE_ARRAY(_peerdb_data, '$.%s') AS ARRAY<STRING>)) AS element WHERE element IS NOT null) AS `%s`",
bqType, colName, shortCol)
Expand Down
96 changes: 54 additions & 42 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,6 @@ func getTransformedColumns(dstSchema *bigquery.Schema, syncedAtCol string, softD
case bigquery.JSONFieldType:
transformedColumns = append(transformedColumns,
fmt.Sprintf("PARSE_JSON(`%s`,wide_number_mode=>'round') AS `%s`", col.Name, col.Name))
case bigquery.DateFieldType:
transformedColumns = append(transformedColumns,
fmt.Sprintf("CAST(`%s` AS DATE) AS `%s`", col.Name, col.Name))
default:
transformedColumns = append(transformedColumns, fmt.Sprintf("`%s`", col.Name))
}
Expand Down Expand Up @@ -290,9 +287,9 @@ func DefineAvroSchema(dstTableName string,
func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
considerRepeated := func(typ string, repeated bool) interface{} {
if repeated {
return map[string]interface{}{
"type": "array",
"items": typ,
return qvalue.AvroSchemaArray{
Type: "array",
Items: typ,
}
} else {
return typ
Expand All @@ -309,64 +306,79 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
case bigquery.FloatFieldType:
return considerRepeated("double", bqField.Repeated), nil
case bigquery.BooleanFieldType:
return "boolean", nil
return considerRepeated("boolean", bqField.Repeated), nil
case bigquery.TimestampFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
}, nil
timestampSchema := qvalue.AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
}
if bqField.Repeated {
return qvalue.AvroSchemaComplexArray{
Type: "array",
Items: timestampSchema,
}, nil
}
return timestampSchema, nil
case bigquery.DateFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
}, nil
dateSchema := qvalue.AvroSchemaField{
Type: "int",
LogicalType: "date",
}
if bqField.Repeated {
return qvalue.AvroSchemaComplexArray{
Type: "array",
Items: dateSchema,
}, nil
}
return dateSchema, nil

case bigquery.TimeFieldType:
return map[string]string{
"type": "long",
"logicalType": "timestamp-micros",
return qvalue.AvroSchemaField{
Type: "long",
LogicalType: "timestamp-micros",
}, nil
case bigquery.DateTimeFieldType:
return map[string]interface{}{
"type": "record",
"name": "datetime",
"fields": []map[string]string{
return qvalue.AvroSchemaRecord{
Type: "record",
Name: "datetime",
Fields: []qvalue.AvroSchemaField{
{
"name": "date",
"type": "int",
"logicalType": "date",
Name: "date",
Type: "int",
LogicalType: "date",
},
{
"name": "time",
"type": "long",
"logicalType": "time-micros",
Name: "time",
Type: "long",
LogicalType: "time-micros",
},
},
}, nil
case bigquery.NumericFieldType:
return map[string]interface{}{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 9,
return qvalue.AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Precision: 38,
Scale: 9,
}, nil
case bigquery.RecordFieldType:
avroFields := []map[string]interface{}{}
avroFields := []qvalue.AvroSchemaField{}
for _, bqSubField := range bqField.Schema {
avroType, err := GetAvroType(bqSubField)
if err != nil {
return nil, err
}
avroFields = append(avroFields, map[string]interface{}{
"name": bqSubField.Name,
"type": avroType,
avroFields = append(avroFields, qvalue.AvroSchemaField{
Name: bqSubField.Name,
Type: avroType,
})
}
return map[string]interface{}{
"type": "record",
"name": bqField.Name,
"fields": avroFields,
return qvalue.AvroSchemaRecord{
Type: "record",
Name: bqField.Name,
Fields: avroFields,
}, nil
// TODO(kaushik/sai): Add other field types as needed

default:
return nil, fmt.Errorf("unsupported BigQuery field type: %s", bqField.Type)
}
Expand Down
8 changes: 7 additions & 1 deletion flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,16 @@ func qValueKindToBigQueryType(colType string) bigquery.FieldType {
// For Arrays we return the types of the individual elements,
// and wherever this function is called, the 'Repeated' attribute of
// FieldSchema must be set to true.
case qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64:
case qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64:
return bigquery.IntegerFieldType
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64:
return bigquery.FloatFieldType
case qvalue.QValueKindArrayBoolean:
return bigquery.BooleanFieldType
case qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ:
return bigquery.TimestampFieldType
case qvalue.QValueKindArrayDate:
return bigquery.DateFieldType
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
return bigquery.GeographyFieldType
// rest will be strings
Expand Down
4 changes: 2 additions & 2 deletions flow/connectors/postgres/cdc.go
Original file line number Diff line number Diff line change
Expand Up @@ -741,8 +741,8 @@ func (p *PostgresCDCSource) decodeColumnData(data []byte, dataType uint32, forma
var parsedData any
var err error
if dt, ok := p.typeMap.TypeForOID(dataType); ok {
if dt.Name == "uuid" {
// below is required to decode uuid to string
if dt.Name == "uuid" || dt.Name == "cidr" || dt.Name == "inet" || dt.Name == "macaddr" {
// below is required to decode above types to string
parsedData, err = dt.Codec.DecodeDatabaseSQLValue(p.typeMap, dataType, pgtype.TextFormatCode, data)
} else {
parsedData, err = dt.Codec.DecodeValue(p.typeMap, dataType, formatCode, data)
Expand Down
Loading
Loading