Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace QValueKindAvroSchema with interface{} #846

Merged
merged 5 commits into from
Dec 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 4 additions & 6 deletions flow/model/conversion_avro.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,25 +71,23 @@ func GetAvroSchemaDefinition(
dstTableName string,
qRecordSchema *QRecordSchema,
) (*QRecordAvroSchemaDefinition, error) {
avroFields := []QRecordAvroField{}
avroFields := make([]QRecordAvroField, 0, len(qRecordSchema.Fields))
nullableFields := make(map[string]struct{})

for _, qField := range qRecordSchema.Fields {
avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, qField.Nullable)
avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type)
if err != nil {
return nil, err
}

consolidatedType := avroType.AvroLogicalSchema

if qField.Nullable {
consolidatedType = []interface{}{"null", consolidatedType}
avroType = []interface{}{"null", avroType}
nullableFields[qField.Name] = struct{}{}
}

avroFields = append(avroFields, QRecordAvroField{
Name: qField.Name,
Type: consolidatedType,
Type: avroType,
})
}

Expand Down
120 changes: 42 additions & 78 deletions flow/model/qvalue/avro_converter.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,17 @@ import (
"golang.org/x/exp/slog"
)

// QValueKindAvroSchema defines a structure for representing Avro schemas.
// AvroLogicalSchema holds the Avro logical schema for a corresponding QValueKind.
type QValueKindAvroSchema struct {
AvroLogicalSchema interface{}
// https://avro.apache.org/docs/1.11.0/spec.html
type AvroSchemaArray struct {
Type string `json:"type"`
Items string `json:"items"`
}

type AvroSchemaNumeric struct {
Type string `json:"type"`
LogicalType string `json:"logicalType"`
Precision int `json:"precision"`
Scale int `json:"scale"`
}

// GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind.
Expand All @@ -23,104 +30,61 @@ type QValueKindAvroSchema struct {
//
// For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds
// will return an error.
//
// The function currently does not support the following QValueKinds:
// - QValueKindBit
//
// Please note that for QValueKindNumeric and QValueKindETime, RespectNull is always
// set to false, regardless of the nullable value passed in.
func GetAvroSchemaFromQValueKind(kind QValueKind, nullable bool) (*QValueKindAvroSchema, error) {
func GetAvroSchemaFromQValueKind(kind QValueKind) (interface{}, error) {
switch kind {
case QValueKindString, QValueKindUUID:
return &QValueKindAvroSchema{
AvroLogicalSchema: "string",
}, nil
return "string", nil
case QValueKindGeometry, QValueKindGeography, QValueKindPoint:
return &QValueKindAvroSchema{
AvroLogicalSchema: "string",
}, nil
return "string", nil
case QValueKindInt16, QValueKindInt32, QValueKindInt64:
return &QValueKindAvroSchema{
AvroLogicalSchema: "long",
}, nil
return "long", nil
case QValueKindFloat32:
return &QValueKindAvroSchema{
AvroLogicalSchema: "float",
}, nil
return "float", nil
case QValueKindFloat64:
return &QValueKindAvroSchema{
AvroLogicalSchema: "double",
}, nil
return "double", nil
case QValueKindBoolean:
return &QValueKindAvroSchema{
AvroLogicalSchema: "boolean",
}, nil
return "boolean", nil
case QValueKindBytes, QValueKindBit:
return &QValueKindAvroSchema{
AvroLogicalSchema: "bytes",
}, nil
return "bytes", nil
case QValueKindNumeric:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]interface{}{
"type": "bytes",
"logicalType": "decimal",
"precision": 38,
"scale": 9,
},
return AvroSchemaNumeric{
Type: "bytes",
LogicalType: "decimal",
Precision: 38,
Scale: 9,
}, nil
case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]string{
"type": "string",
},
}, nil
return "string", nil
case QValueKindHStore, QValueKindJSON, QValueKindStruct:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]interface{}{
"type": "string",
"values": "string",
},
}, nil
return "string", nil
case QValueKindArrayFloat32:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]interface{}{
"type": "array",
"items": "float",
},
return AvroSchemaArray{
Type: "array",
Items: "float",
}, nil
case QValueKindArrayFloat64:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]interface{}{
"type": "array",
"items": "double",
},
return AvroSchemaArray{
Type: "array",
Items: "double",
}, nil
case QValueKindArrayInt32:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]interface{}{
"type": "array",
"items": "int",
},
return AvroSchemaArray{
Type: "array",
Items: "int",
}, nil
case QValueKindArrayInt64:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]interface{}{
"type": "array",
"items": "long",
},
return AvroSchemaArray{
Type: "array",
Items: "long",
}, nil
case QValueKindArrayString:
return &QValueKindAvroSchema{
AvroLogicalSchema: map[string]interface{}{
"type": "array",
"items": "string",
},
return AvroSchemaArray{
Type: "array",
Items: "string",
}, nil
case QValueKindInvalid:
// lets attempt to do invalid as a string
return &QValueKindAvroSchema{
AvroLogicalSchema: "string",
}, nil
return "string", nil
default:
return nil, fmt.Errorf("unsupported QValueKind type: %s", kind)
}
Expand Down
Loading