Skip to content

Commit

Permalink
optimise bigquery avro handling
Browse files Browse the repository at this point in the history
  • Loading branch information
Amogh-Bharadwaj committed May 7, 2024
1 parent 23a3ec6 commit ffe19ed
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 71 deletions.
4 changes: 2 additions & 2 deletions flow/connectors/bigquery/bigquery.go
Original file line number Diff line number Diff line change
Expand Up @@ -647,8 +647,8 @@ func (c *BigQueryConnector) SetupNormalizedTable(
} else {
columns = append(columns, &bigquery.FieldSchema{
Name: column.Name,
Type: qValueKindToBigQueryType(genericColType),
Repeated: qvalue.QValueKind(genericColType).IsArray(),
Type: qValueKindToBigQueryType(genericColType).Type,
Repeated: qValueKindToBigQueryType(genericColType).Repeated,
})
}
}
Expand Down
57 changes: 21 additions & 36 deletions flow/connectors/bigquery/qrep_avro_sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,9 @@ func (s *QRepAvroSyncMethod) SyncRecords(
s.connector.logger.Info(
fmt.Sprintf("Obtaining Avro schema for destination table %s and sync batch ID %d",
rawTableName, syncBatchID))
sourceSchema := stream.Schema()
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(rawTableName, dstTableMetadata, "", "")
avroSchema, err := DefineAvroSchema(rawTableName, &sourceSchema, "", "")
if err != nil {
return nil, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down Expand Up @@ -160,8 +161,10 @@ func (s *QRepAvroSyncMethod) SyncQRepRecords(
slog.String(string(shared.PartitionIDKey), partition.PartitionId),
slog.String("destinationTable", dstTableName),
)

sourceSchema := stream.Schema()
// You will need to define your Avro schema as a string
avroSchema, err := DefineAvroSchema(dstTableName, dstTableMetadata, syncedAtCol, softDeleteCol)
avroSchema, err := DefineAvroSchema(dstTableName, &sourceSchema, syncedAtCol, softDeleteCol)
if err != nil {
return 0, fmt.Errorf("failed to define Avro schema: %w", err)
}
Expand Down Expand Up @@ -245,22 +248,22 @@ type AvroSchema struct {
}

func DefineAvroSchema(dstTableName string,
dstTableMetadata *bigquery.TableMetadata,
schema *qvalue.QRecordSchema,
syncedAtCol string,
softDeleteCol string,
) (*model.QRecordAvroSchemaDefinition, error) {
avroFields := make([]AvroField, 0, len(dstTableMetadata.Schema))
avroFields := make([]AvroField, 0, len(schema.Fields))
qFields := make([]qvalue.QField, 0, len(avroFields))
for _, bqField := range dstTableMetadata.Schema {
if bqField.Name == syncedAtCol || bqField.Name == softDeleteCol {
for _, sourceField := range schema.Fields {
if sourceField.Name == syncedAtCol || sourceField.Name == softDeleteCol {
continue
}
avroField, err := GetAvroField(bqField)
avroField, err := GetAvroField(&sourceField)
if err != nil {
return nil, err
}
avroFields = append(avroFields, avroField)
qFields = append(qFields, BigQueryFieldToQField(bqField))
qFields = append(qFields, sourceField)
}

avroSchema := AvroSchema{
Expand All @@ -280,9 +283,9 @@ func DefineAvroSchema(dstTableName string,
}, nil
}

func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
avroNumericPrecision := int16(bqField.Precision)
avroNumericScale := int16(bqField.Scale)
func GetAvroType(sourceField *qvalue.QField) (interface{}, error) {
avroNumericPrecision := int16(sourceField.Precision)

Check failure on line 287 in flow/connectors/bigquery/qrep_avro_sync.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary conversion (unconvert)
avroNumericScale := int16(sourceField.Scale)

Check failure on line 288 in flow/connectors/bigquery/qrep_avro_sync.go

View workflow job for this annotation

GitHub Actions / lint

unnecessary conversion (unconvert)
bqNumeric := numeric.BigQueryNumericCompatibility{}
if !bqNumeric.IsValidPrevisionAndScale(avroNumericPrecision, avroNumericScale) {
avroNumericPrecision, avroNumericScale = bqNumeric.DefaultPrecisionAndScale()
Expand All @@ -299,6 +302,7 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
}
}

bqField := qValueKindToBigQueryType(string(sourceField.Type))
switch bqField.Type {
case bigquery.StringFieldType, bigquery.GeographyFieldType, bigquery.JSONFieldType:
return considerRepeated("string", bqField.Repeated), nil
Expand Down Expand Up @@ -364,42 +368,22 @@ func GetAvroType(bqField *bigquery.FieldSchema) (interface{}, error) {
Precision: avroNumericPrecision,
Scale: avroNumericScale,
}, nil
case bigquery.RecordFieldType:
avroFields := []qvalue.AvroSchemaField{}
for _, bqSubField := range bqField.Schema {
avroType, err := GetAvroType(bqSubField)
if err != nil {
return nil, err
}
avroFields = append(avroFields, qvalue.AvroSchemaField{
Name: bqSubField.Name,
Type: avroType,
})
}
return qvalue.AvroSchemaRecord{
Type: "record",
Name: bqField.Name,
Fields: avroFields,
}, nil

default:
return nil, fmt.Errorf("unsupported BigQuery field type: %s", bqField.Type)
return nil, fmt.Errorf("[bq:defineAvro]unsupported BigQuery field type: " + string(bqField.Type))

Check failure on line 373 in flow/connectors/bigquery/qrep_avro_sync.go

View workflow job for this annotation

GitHub Actions / lint

dynamicFmtString: use errors.New("[bq:defineAvro]unsupported BigQuery field type: " + string(bqField.Type)) or fmt.Errorf("%s", "[bq:defineAvro]unsupported BigQuery field type: " + string(bqField.Type)) instead (gocritic)
}
}

func GetAvroField(bqField *bigquery.FieldSchema) (AvroField, error) {
avroType, err := GetAvroType(bqField)
func GetAvroField(sourceField *qvalue.QField) (AvroField, error) {
avroType, err := GetAvroType(sourceField)
if err != nil {
return AvroField{}, err
}

// If a field is nullable, its Avro type should be ["null", actualType]
if !bqField.Required {
avroType = []interface{}{"null", avroType}
}
avroType = []interface{}{"null", avroType}

return AvroField{
Name: bqField.Name,
Name: sourceField.Name,
Type: avroType,
}, nil
}
Expand All @@ -413,6 +397,7 @@ func (s *QRepAvroSyncMethod) writeToStage(
stream *model.QRecordStream,
flowName string,
) (int, error) {
stream.Schema()
var avroFile *avro.AvroFile
ocfWriter := avro.NewPeerDBOCFWriter(stream, avroSchema, avro.CompressNone, protos.DBType_BIGQUERY)
idLog := slog.Group("write-metadata",
Expand Down
91 changes: 58 additions & 33 deletions flow/connectors/bigquery/qvalue_convert.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,56 +6,81 @@ import (
"github.com/PeerDB-io/peer-flow/model/qvalue"
)

func qValueKindToBigQueryType(colType string) bigquery.FieldType {
func qValueKindToBigQueryType(colType string) bigquery.FieldSchema {
switch qvalue.QValueKind(colType) {
// boolean
case qvalue.QValueKindBoolean:
return bigquery.BooleanFieldType
// integer types
return bigquery.FieldSchema{
Type: bigquery.BooleanFieldType,
}
case qvalue.QValueKindInt16, qvalue.QValueKindInt32, qvalue.QValueKindInt64:
return bigquery.IntegerFieldType
// decimal types
return bigquery.FieldSchema{
Type: bigquery.IntegerFieldType,
}
case qvalue.QValueKindFloat32, qvalue.QValueKindFloat64:
return bigquery.FloatFieldType
return bigquery.FieldSchema{
Type: bigquery.FloatFieldType,
}
case qvalue.QValueKindNumeric:
return bigquery.BigNumericFieldType
// string related
return bigquery.FieldSchema{
Type: bigquery.NumericFieldType,
}
case qvalue.QValueKindString:
return bigquery.StringFieldType
// json also is stored as string for now
return bigquery.FieldSchema{
Type: bigquery.StringFieldType,
}
case qvalue.QValueKindJSON, qvalue.QValueKindHStore:
return bigquery.JSONFieldType
// time related
return bigquery.FieldSchema{
Type: bigquery.JSONFieldType,
}
case qvalue.QValueKindTimestamp, qvalue.QValueKindTimestampTZ:
return bigquery.TimestampFieldType
// TODO: https://github.com/PeerDB-io/peerdb/issues/189 - DATE support is incomplete
return bigquery.FieldSchema{
Type: bigquery.TimestampFieldType,
}
case qvalue.QValueKindDate:
return bigquery.DateFieldType
// TODO: https://github.com/PeerDB-io/peerdb/issues/189 - TIME/TIMETZ support is incomplete
return bigquery.FieldSchema{
Type: bigquery.DateFieldType,
}
case qvalue.QValueKindTime, qvalue.QValueKindTimeTZ:
return bigquery.TimeFieldType
// TODO: https://github.com/PeerDB-io/peerdb/issues/189 - handle INTERVAL types again,
// bytes
return bigquery.FieldSchema{
Type: bigquery.TimeFieldType,
}
case qvalue.QValueKindBit, qvalue.QValueKindBytes:
return bigquery.BytesFieldType
// For Arrays we return the types of the individual elements,
// and wherever this function is called, the 'Repeated' attribute of
// FieldSchema must be set to true.
return bigquery.FieldSchema{
Type: bigquery.BytesFieldType,
}
case qvalue.QValueKindArrayInt16, qvalue.QValueKindArrayInt32, qvalue.QValueKindArrayInt64:
return bigquery.IntegerFieldType
return bigquery.FieldSchema{
Type: bigquery.IntegerFieldType,
Repeated: true,
}
case qvalue.QValueKindArrayFloat32, qvalue.QValueKindArrayFloat64:
return bigquery.FloatFieldType
return bigquery.FieldSchema{
Type: bigquery.FloatFieldType,
Repeated: true,
}
case qvalue.QValueKindArrayBoolean:
return bigquery.BooleanFieldType
return bigquery.FieldSchema{
Type: bigquery.BooleanFieldType,
Repeated: true,
}
case qvalue.QValueKindArrayTimestamp, qvalue.QValueKindArrayTimestampTZ:
return bigquery.TimestampFieldType
return bigquery.FieldSchema{
Type: bigquery.TimestampFieldType,
Repeated: true,
}
case qvalue.QValueKindArrayDate:
return bigquery.DateFieldType
return bigquery.FieldSchema{
Type: bigquery.DateFieldType,
Repeated: true,
}
case qvalue.QValueKindGeography, qvalue.QValueKindGeometry, qvalue.QValueKindPoint:
return bigquery.GeographyFieldType
// rest will be strings
return bigquery.FieldSchema{
Type: bigquery.GeographyFieldType,
}
default:
return bigquery.StringFieldType
return bigquery.FieldSchema{
Type: bigquery.StringFieldType,
}
}
}

Expand Down Expand Up @@ -92,7 +117,7 @@ func BigQueryTypeToQValueKind(fieldType bigquery.FieldType) qvalue.QValueKind {
}

func qValueKindToBigQueryTypeString(colType string) string {
bqType := qValueKindToBigQueryType(colType)
bqType := qValueKindToBigQueryType(colType).Type
bqTypeAsString := string(bqType)
// string(bigquery.FloatFieldType) is "FLOAT" which is not a BigQuery type.
if bqType == bigquery.FloatFieldType {
Expand Down

0 comments on commit ffe19ed

Please sign in to comment.