diff --git a/flow/model/conversion_avro.go b/flow/model/conversion_avro.go index 6818299073..02a52b26d6 100644 --- a/flow/model/conversion_avro.go +++ b/flow/model/conversion_avro.go @@ -71,25 +71,23 @@ func GetAvroSchemaDefinition( dstTableName string, qRecordSchema *QRecordSchema, ) (*QRecordAvroSchemaDefinition, error) { - avroFields := []QRecordAvroField{} + avroFields := make([]QRecordAvroField, 0, len(qRecordSchema.Fields)) nullableFields := make(map[string]struct{}) for _, qField := range qRecordSchema.Fields { - avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type, qField.Nullable) + avroType, err := qvalue.GetAvroSchemaFromQValueKind(qField.Type) if err != nil { return nil, err } - consolidatedType := avroType.AvroLogicalSchema - if qField.Nullable { - consolidatedType = []interface{}{"null", consolidatedType} + avroType = []interface{}{"null", avroType} nullableFields[qField.Name] = struct{}{} } avroFields = append(avroFields, QRecordAvroField{ Name: qField.Name, - Type: consolidatedType, + Type: avroType, }) } diff --git a/flow/model/qvalue/avro_converter.go b/flow/model/qvalue/avro_converter.go index 2cd6fe2e52..f9a5849e19 100644 --- a/flow/model/qvalue/avro_converter.go +++ b/flow/model/qvalue/avro_converter.go @@ -10,10 +10,22 @@ import ( "golang.org/x/exp/slog" ) -// QValueKindAvroSchema defines a structure for representing Avro schemas. -// AvroLogicalSchema holds the Avro logical schema for a corresponding QValueKind. -type QValueKindAvroSchema struct { - AvroLogicalSchema interface{} +// https://avro.apache.org/docs/1.11.0/spec.html +type AvroSchemaArray struct { + Type string `json:"type"` + Items string `json:"items"` +} + +type AvroSchemaMap struct { + Type string `json:"type"` + Values string `json:"values"` +} + +type AvroSchemaNumeric struct { + Type string `json:"type"` + LogicalType string `json:"logicalType"` + Precision int `json:"precision"` + Scale int `json:"scale"` } // GetAvroSchemaFromQValueKind returns the Avro schema for a given QValueKind. @@ -23,104 +35,64 @@ type QValueKindAvroSchema struct { // // For example, QValueKindInt64 would return an AvroLogicalSchema of "long". Unsupported QValueKinds // will return an error. -// -// The function currently does not support the following QValueKinds: -// - QValueKindBit -// -// Please note that for QValueKindNumeric and QValueKindETime, RespectNull is always -// set to false, regardless of the nullable value passed in. -func GetAvroSchemaFromQValueKind(kind QValueKind, nullable bool) (*QValueKindAvroSchema, error) { +func GetAvroSchemaFromQValueKind(kind QValueKind) (interface{}, error) { switch kind { case QValueKindString, QValueKindUUID: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "string", - }, nil + return "string", nil case QValueKindGeometry, QValueKindGeography, QValueKindPoint: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "string", - }, nil + return "string", nil case QValueKindInt16, QValueKindInt32, QValueKindInt64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "long", - }, nil + return "long", nil case QValueKindFloat32: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "float", - }, nil + return "float", nil case QValueKindFloat64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "double", - }, nil + return "double", nil case QValueKindBoolean: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "boolean", - }, nil + return "boolean", nil case QValueKindBytes, QValueKindBit: - return &QValueKindAvroSchema{ - AvroLogicalSchema: "bytes", - }, nil + return "bytes", nil case QValueKindNumeric: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "bytes", - "logicalType": "decimal", - "precision": 38, - "scale": 9, - }, + return AvroSchemaNumeric{ + Type: "bytes", + LogicalType: "decimal", + Precision: 38, + Scale: 9, }, nil case QValueKindTime, QValueKindTimeTZ, QValueKindDate, QValueKindTimestamp, QValueKindTimestampTZ: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]string{ - "type": "string", - }, - }, nil + return "string", nil case QValueKindHStore, QValueKindJSON, QValueKindStruct: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "string", - "values": "string", - }, + return AvroSchemaMap{ + Type: "map", + Values: "string", }, nil case QValueKindArrayFloat32: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "float", - }, + return AvroSchemaArray{ + Type: "array", + Items: "float", }, nil case QValueKindArrayFloat64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "double", - }, + return AvroSchemaArray{ + Type: "array", + Items: "double", }, nil case QValueKindArrayInt32: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "int", - }, + return AvroSchemaArray{ + Type: "array", + Items: "int", }, nil case QValueKindArrayInt64: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "long", - }, + return AvroSchemaArray{ + Type: "array", + Items: "long", }, nil case QValueKindArrayString: - return &QValueKindAvroSchema{ - AvroLogicalSchema: map[string]interface{}{ - "type": "array", - "items": "string", - }, + return AvroSchemaArray{ + Type: "array", + Items: "string", }, nil case QValueKindInvalid: // lets attempt to do invalid as a string - return &QValueKindAvroSchema{ - AvroLogicalSchema: "string", - }, nil + return "string", nil default: return nil, fmt.Errorf("unsupported QValueKind type: %s", kind) }