Skip to content

Commit

Permalink
Merge pull request #64 from AbsaOSS/fix/schema-evolution
Browse files Browse the repository at this point in the history
Fix confluent schema evolution functionality
  • Loading branch information
cerveada authored Oct 1, 2019
2 parents 173f2fc + b92cc73 commit 814458a
Show file tree
Hide file tree
Showing 11 changed files with 271 additions and 81 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ The API consists of four Spark SQL expressions:
* ```to_avro``` and ```from_avro``` used for normal Avro payload
* ```to_confluent_avro``` and ```from_confluent_avro``` used for Confluent Avro data format

Full runable examples can be found in ```za.co.absa.abris.examples.sql``` package.
Full runnable examples can be found in ```za.co.absa.abris.examples``` package. You can also take a look at unit tests in package ```za.co.absa.abris.avro.sql```.

### Deprecation Note
Old ABRiS API is deprecated, but is still included in the library. Documentation for old API is in [ABRiS 2.2.3.](https://github.com/AbsaOSS/ABRiS/tree/v2.2.3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.avro.generic.{GenericRecord, IndexedRecord}
import org.apache.spark.sql.Row
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.types._
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.ConfluentConstants
import za.co.absa.abris.avro.write.AvroWriterHolder

Expand Down Expand Up @@ -97,6 +98,13 @@ object SparkAvroConversions {
toByteArray(record, avroSchema, schemaId)
}

/**
* Translates an Avro Schema into a Spark's StructType.
*
* Relies on Databricks Spark-Avro library to do the job.
*/
def toSqlType(schema: String): StructType = toSqlType(AvroSchemaUtils.parse(schema))

/**
* Translates an Avro Schema into a Spark's StructType.
*
Expand Down
28 changes: 17 additions & 11 deletions src/main/scala/za/co/absa/abris/avro/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,10 +54,11 @@ object functions {

/**
* Converts a binary column of confluent avro format into its corresponding catalyst value using schema registry.
* The schema id is removed from start of the avro payload, but it's not used. You still need to provide some schema
* id in the schemaRegistryConf.
* The schema loaded from schema registry must match the read data, otherwise the behavior is undefined:
* it may fail or return arbitrary result.
* There are two avro schemas used: writer schema and reader schema.
*
* The configuration you provide (naming strategy, topic, ...) is used for getting the reader schema from schema
* registry. The writer schema is also loaded from the registry but it's found by the schema id that is taken from
* beginning of confluent avro payload.
*
* @param data the binary column.
* @param schemaRegistryConf schema registry configuration.
Expand All @@ -68,17 +69,22 @@ object functions {
}

/**
* Converts a binary column of confluent avro format into its corresponding catalyst value using provided schema.
* The schema id is removed from start of the avro payload, but it's not used.
* The specified schema must match the read data, otherwise the behavior is undefined:
* it may fail or return arbitrary result.
* Converts a binary column of confluent avro format into its corresponding catalyst value using schema registry.
* There are two avro schemas used: writer schema and reader schema.
*
* The reader schema is provided as a parameter.
*
* The writer schema is loaded from the registry, it's found by the schema id that is taken from
* beginning of confluent avro payload.
*
* @param data the binary column.
* @param jsonFormatSchema the avro schema in JSON string format.
* @param readerSchema the reader avro schema in JSON string format.
* @param schemaRegistryConf schema registry configuration for getting the writer schema.
*
*/
def from_confluent_avro(data: Column, jsonFormatSchema: String): Column = {
new Column(sql.AvroDataToCatalyst(data.expr, Some(jsonFormatSchema), None, confluentCompliant = true))
def from_confluent_avro(data: Column, readerSchema: String, schemaRegistryConf: Map[String,String]): Column = {
new Column(sql.AvroDataToCatalyst(
data.expr, Some(readerSchema), Some(schemaRegistryConf), confluentCompliant = true))
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

package za.co.absa.abris.avro.parsing.utils

import java.security.InvalidParameterException

import io.confluent.kafka.schemaregistry.client.SchemaMetadata
import org.apache.avro.Schema
import org.slf4j.LoggerFactory
Expand Down Expand Up @@ -107,4 +109,43 @@ object AvroSchemaUtils {
def loadPlain(path: String): String = {
SchemaLoader.loadFromFile(path)
}

/**
* Tries to manage schema registration in case credentials to access Schema Registry are provided.
*/
@throws[InvalidParameterException]
def registerSchema(schemaAsString: String, registryConfig: Map[String,String]): Option[Int] =
registerSchema(parse(schemaAsString), registryConfig)

/**
* Tries to manage schema registration in case credentials to access Schema Registry are provided.
*/
@throws[InvalidParameterException]
def registerSchema(schema: Schema, registryConfig: Map[String,String]): Option[Int] = {

val topic = registryConfig(SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC)

val valueStrategy = registryConfig.get(SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY)
val keyStrategy = registryConfig.get(SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY)

val schemaId = (valueStrategy, keyStrategy) match {
case (Some(valueStrategy), None) => AvroSchemaUtils.registerIfCompatibleValueSchema(topic, schema, registryConfig)
case (None, Some(keyStrategy)) => AvroSchemaUtils.registerIfCompatibleKeySchema(topic, schema, registryConfig)
case (Some(_), Some(_)) =>
throw new InvalidParameterException(
"Both key.schema.naming.strategy and value.schema.naming.strategy were defined. " +
"Only one of them supposed to be defined!")
case _ =>
throw new InvalidParameterException(
"At least one of key.schema.naming.strategy or value.schema.naming.strategy " +
"must be defined to use schema registry!")
}

if (schemaId.isEmpty) {
throw new InvalidParameterException(s"Schema could not be registered for topic '$topic'. " +
"Make sure that the Schema Registry is available, the parameters are correct and the schemas ar compatible")
}

schemaId
}
}
37 changes: 21 additions & 16 deletions src/main/scala/za/co/absa/abris/avro/sql/AvroDataToCatalyst.scala
Original file line number Diff line number Diff line change
Expand Up @@ -46,14 +46,13 @@ case class AvroDataToCatalyst(

override def nullable: Boolean = true

@transient private lazy val reader = new GenericDatumReader[Any](avroSchema)

@transient private lazy val avroSchema = (jsonFormatSchema, schemaRegistryConf) match {
case (Some(schemaString), _) => new Schema.Parser().parse(schemaString)
case (_, Some(schemaRegistryConf)) => loadSchemaFromRegistry(schemaRegistryConf)
case _ => throw new SparkException("Schema or schema registry configuration must be provided")
}

@transient private var reader: GenericDatumReader[Any] = _
@transient private var decoder: BinaryDecoder = _

override def nullSafeEval(input: Any): Any = {
Expand All @@ -80,34 +79,40 @@ case class AvroDataToCatalyst(
s"(${CodeGenerator.boxedType(dataType)})$expr.nullSafeEval($input)")
}

private def decode(payload: Array[Byte]): Any = {

if (confluentCompliant) {
decoder = getConfluentDecoder(payload)
} else {
decoder = getVanillaDecoder(payload, 0, payload.length)
}

reader.read(null, decoder)
private def decode(payload: Array[Byte]): Any = if (confluentCompliant) {
decodeConfluentAvro(payload)
} else {
decodeVanillaAvro(payload)
}

private def getConfluentDecoder(payload: Array[Byte]): BinaryDecoder = {
private def decodeConfluentAvro(payload: Array[Byte]): Any = {

val buffer = ByteBuffer.wrap(payload)
if (buffer.get() != ConfluentConstants.MAGIC_BYTE) {
throw new SerializationException("Unknown magic byte!")
}

buffer.getInt() // schema id, currently not used
val schemaId = buffer.getInt()

val start = buffer.position() + buffer.arrayOffset()
val length = buffer.limit() - 1 - ConfluentConstants.SCHEMA_ID_SIZE_BYTES
decoder = DecoderFactory.get().binaryDecoder(buffer.array(), start, length, decoder)

val writerSchema = getWriterSchema(schemaId)
reader = new GenericDatumReader[Any](writerSchema, avroSchema)

getVanillaDecoder(buffer.array(), start, length)
reader.read(reader, decoder)
}

private def getVanillaDecoder(payload: Array[Byte], offset: Int, length: Int) =
DecoderFactory.get().binaryDecoder(payload, offset, length, decoder)
private def getWriterSchema(id: Int): Schema = SchemaManager.getById(id).get

private def decodeVanillaAvro(payload: Array[Byte]): Any = {

decoder = DecoderFactory.get().binaryDecoder(payload, 0, payload.length, decoder)
reader = new GenericDatumReader[Any](avroSchema)

reader.read(reader, decoder)
}

private def loadSchemaFromRegistry(registryConfig: Map[String, String]): Schema = {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,6 @@

package za.co.absa.abris.avro.sql

import java.security.InvalidParameterException

import org.apache.avro.Schema
import org.apache.avro.generic.{GenericData, IndexedRecord}
import org.apache.spark.sql.avro.AvroSerializer
Expand All @@ -26,7 +24,6 @@ import org.apache.spark.sql.catalyst.expressions.{Expression, UnaryExpression}
import org.apache.spark.sql.types.{BinaryType, DataType}
import za.co.absa.abris.avro.format.SparkAvroConversions
import za.co.absa.abris.avro.parsing.utils.AvroSchemaUtils
import za.co.absa.abris.avro.read.confluent.SchemaManager

case class CatalystDataToAvro(
child: Expression,
Expand Down Expand Up @@ -71,34 +68,12 @@ case class CatalystDataToAvro(
/**
* Tries to manage schema registration in case credentials to access Schema Registry are provided.
*/
@throws[InvalidParameterException]
private def registerSchema(
schema: Schema,
registryConfig: Map[String,String],
prependSchemaId: Boolean): Option[Int] = {

val topic = registryConfig(SchemaManager.PARAM_SCHEMA_REGISTRY_TOPIC)

val valueStrategy = registryConfig.get(SchemaManager.PARAM_VALUE_SCHEMA_NAMING_STRATEGY)
val keyStrategy = registryConfig.get(SchemaManager.PARAM_KEY_SCHEMA_NAMING_STRATEGY)

val schemaId = (valueStrategy, keyStrategy) match {
case (Some(valueStrategy), None) => AvroSchemaUtils.registerIfCompatibleValueSchema(topic, schema, registryConfig)
case (None, Some(keyStrategy)) => AvroSchemaUtils.registerIfCompatibleKeySchema(topic, schema, registryConfig)
case (Some(_), Some(_)) =>
throw new InvalidParameterException(
"Both key.schema.naming.strategy and value.schema.naming.strategy were defined. " +
"Only one of them supposed to be defined!")
case _ =>
throw new InvalidParameterException(
"At least one of key.schema.naming.strategy or value.schema.naming.strategy " +
"must be defined to use schema registry!")
}

if (schemaId.isEmpty) {
throw new InvalidParameterException(s"Schema could not be registered for topic '$topic'. " +
"Make sure that the Schema Registry is available, the parameters are correct and the schemas ar compatible")
}
val schemaId = AvroSchemaUtils.registerSchema(schema, registryConfig)

if (prependSchemaId) schemaId else None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ object ConfluentKafkaAvroReader {
val stream = spark
.readStream
.format("kafka")
.option("startingOffsets", "earliest")
.addOptions(properties) // 1. this method will add the properties starting with "option."
// 2. security options can be set in the properties file

Expand All @@ -65,13 +66,15 @@ object ConfluentKafkaAvroReader {

import za.co.absa.abris.avro.functions.from_confluent_avro

val schemaRegistryConfig = properties.getSchemaRegistryConfigurations(PARAM_OPTION_SUBSCRIBE)

if (properties.getProperty(PARAM_EXAMPLE_SHOULD_USE_SCHEMA_REGISTRY).toBoolean) {
val schemaRegistryConfig = properties.getSchemaRegistryConfigurations(PARAM_OPTION_SUBSCRIBE)
dataFrame.select(from_confluent_avro(col("value"), schemaRegistryConfig) as 'data).select("data.*")
} else {
val source = scala.io.Source.fromFile(properties.getProperty(PARAM_PAYLOAD_AVRO_SCHEMA))
val schemaString = try source.mkString finally source.close()
dataFrame.select(from_confluent_avro(col("value"), schemaString) as 'data).select("data.*")
dataFrame.select(from_confluent_avro(col("value"), schemaString, schemaRegistryConfig) as 'data)
.select("data.*")
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -93,8 +93,8 @@ object ConfluentKafkaAvroReaderWithKey {
val valueSchema = loadSchemaFromFile(properties(PARAM_PAYLOAD_AVRO_SCHEMA))
val keySchema = loadSchemaFromFile(properties(PARAM_KEY_AVRO_SCHEMA))
dataFrame.select(
from_confluent_avro(col("key"), keySchema) as 'key,
from_confluent_avro(col("value"), valueSchema) as 'value)
from_confluent_avro(col("key"), keySchema, keyRegistryConfig) as 'key,
from_confluent_avro(col("value"), valueSchema, valueRegistryConfig) as 'value)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ class CatalystAvroConversionSpec extends FlatSpec with Matchers with BeforeAndAf
avroBytes.collect() // force evaluation

val result = avroBytes
.select(from_confluent_avro('bytes, schemaString) as 'result)
.select(from_confluent_avro('bytes, schemaString, schemaRegistryConfig) as 'result)
.select("result.*")

shouldEqualByData(dataFrame, result)
Expand Down Expand Up @@ -361,28 +361,6 @@ class CatalystAvroConversionSpec extends FlatSpec with Matchers with BeforeAndAf
shouldEqualByData(dataFrame, result)
}

/**
* assert that both dataFrames contain the same data
*/
private def shouldEqualByData(inputFrame: DataFrame, outputFrame: DataFrame): Unit = {

def columnNames(frame: DataFrame) = frame.schema.fields.map(_.name)

val inputColNames = columnNames(inputFrame)
val outputColNames = columnNames(outputFrame)

inputColNames shouldEqual outputColNames

inputColNames.foreach(col => {
val inputColumn = inputFrame.select(col).collect().map(row => row.toSeq.head)
val outputColumn = outputFrame.select(col).collect().map(row => row.toSeq.head)

for ((input, output ) <- inputColumn.zip(outputColumn)) {
input shouldEqual output
}
})
}

private def getEncoder: Encoder[Row] = {
val avroSchema = AvroSchemaUtils.parse(ComplexRecordsGenerator.usedAvroSchema)
val sparkSchema = SparkAvroConversions.toSqlType(avroSchema)
Expand Down
Loading

0 comments on commit 814458a

Please sign in to comment.