Skip to content

Commit

Permalink
Merge pull request #932 from fd4s/auto-registration-fix
Browse files Browse the repository at this point in the history
Fix auto-registration of Avro union (etc) types
  • Loading branch information
bplommer authored Apr 1, 2022
2 parents d93aa4c + 9b63123 commit 6673fda
Show file tree
Hide file tree
Showing 7 changed files with 109 additions and 25 deletions.
7 changes: 6 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -285,7 +285,12 @@ lazy val mimaSettings = Seq(

// sealed
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ConsumerSettings.withDeserializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers")
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.ProducerSettings.withSerializers"),
ProblemFilters.exclude[ReversedMissingMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),
ProblemFilters.exclude[FinalMethodProblem]("fs2.kafka.vulcan.AvroSettings.*"),

// private
ProblemFilters.exclude[Problem]("fs2.kafka.vulcan.AvroSettings#AvroSettingsImpl.*")
)
// format: on
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
/*
* Copyright 2018-2022 OVO Energy Limited
*
* SPDX-License-Identifier: Apache-2.0
*/

package kafka.utils

// Workaround for https://github.com/lampepfl/dotty/issues/13523 and https://github.com/confluentinc/schema-registry/issues/553
private class VerifiableProperties
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,23 @@ final class AvroSerializer[A] private[vulcan] (
def using[F[_]](
settings: AvroSettings[F]
)(implicit F: Sync[F]): RecordSerializer[F, A] = {
val createSerializer: Boolean => F[Serializer[F, A]] =
settings.createAvroSerializer(_).map {
case (serializer, _) =>
Serializer.instance { (topic, _, a) =>
F.defer {
codec.encode(a) match {
case Right(value) => F.pure(serializer.serialize(topic, value))
case Left(error) => F.raiseError(error.throwable)
val createSerializer: Boolean => F[Serializer[F, A]] = isKey => {
codec.schema match {
case Left(e) => F.pure(Serializer.fail(e.throwable))
case Right(writerSchema) =>
settings.createAvroSerializer(isKey, Some(writerSchema)).map {
case (serializer, _) =>
Serializer.instance { (topic, _, a) =>
F.defer {
codec.encode(a) match {
case Right(value) => F.pure(serializer.serialize(topic, value))
case Left(error) => F.raiseError(error.throwable)
}
}
}
}
}
}
}

RecordSerializer.instance(
forKey = createSerializer(true),
Expand Down
60 changes: 52 additions & 8 deletions modules/vulcan/src/main/scala/fs2/kafka/vulcan/AvroSettings.scala
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import cats.syntax.all._
import fs2.kafka.internal.converters.collection._
import fs2.kafka.internal.syntax._
import io.confluent.kafka.schemaregistry.avro.AvroSchema
import org.apache.avro.Schema
import vulcan.Codec

/**
Expand Down Expand Up @@ -103,7 +104,16 @@ sealed abstract class AvroSettings[F[_]] {
* specified `isKey` flag, denoting whether a record key or
* value is being serialized.
*/
def createAvroSerializer(isKey: Boolean): F[(KafkaAvroSerializer, SchemaRegistryClient)]
def createAvroSerializer(
isKey: Boolean,
writerSchema: Option[Schema]
): F[(KafkaAvroSerializer, SchemaRegistryClient)]

@deprecated("use the overload that takes an optional writer schema", "2.5.0-M3")
final def createAvroSerializer(
isKey: Boolean
): F[(KafkaAvroSerializer, SchemaRegistryClient)] =
createAvroSerializer(isKey, writerSchema = None)

/**
* Creates a new [[AvroSettings]] instance with the specified
Expand All @@ -125,10 +135,20 @@ sealed abstract class AvroSettings[F[_]] {
*/
def withCreateAvroSerializer(
// format: off
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
// format: on
): AvroSettings[F]

@deprecated("use the overload that has an `Option[Schema]` argument", "2.5.0-M3")
final def withCreateAvroSerializer(
// format: off
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
// format: on
): AvroSettings[F] =
withCreateAvroSerializer(
(client, isKey, _, properties) => createAvroSerializerWith(client, isKey, properties)
)

/**
* Creates a new [[AvroSettings]] instance with the specified
* function for registering schemas from settings.
Expand All @@ -145,7 +165,7 @@ object AvroSettings {
override val properties: Map[String, String],
// format: off
val createAvroDeserializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroDeserializer, SchemaRegistryClient)],
val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)],
val createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)],
val registerSchemaWith: (F[SchemaRegistryClient], String, Codec[_]) => F[Int]
// format: on
) extends AvroSettings[F] {
Expand Down Expand Up @@ -177,9 +197,10 @@ object AvroSettings {
createAvroDeserializerWith(schemaRegistryClient, isKey, properties)

override def createAvroSerializer(
isKey: Boolean
isKey: Boolean,
writerSchema: Option[Schema]
): F[(KafkaAvroSerializer, SchemaRegistryClient)] =
createAvroSerializerWith(schemaRegistryClient, isKey, properties)
createAvroSerializerWith(schemaRegistryClient, isKey, writerSchema, properties)

override def registerSchema[A](subject: String)(implicit codec: Codec[A]): F[Int] =
registerSchemaWith(schemaRegistryClient, subject, codec)
Expand All @@ -193,7 +214,7 @@ object AvroSettings {

override def withCreateAvroSerializer(
// format: off
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
createAvroSerializerWith: (F[SchemaRegistryClient], Boolean, Option[Schema], Map[String, String]) => F[(KafkaAvroSerializer, SchemaRegistryClient)]
// format: on
): AvroSettings[F] =
copy(createAvroSerializerWith = createAvroSerializerWith)
Expand Down Expand Up @@ -224,10 +245,33 @@ object AvroSettings {
(deserializer, schemaRegistryClient)
}
},
createAvroSerializerWith = (schemaRegistryClient, isKey, properties) =>
createAvroSerializerWith = (schemaRegistryClient, isKey, schema, properties) =>
schemaRegistryClient.flatMap { schemaRegistryClient =>
F.delay {
val serializer = new KafkaAvroSerializer(schemaRegistryClient)
val serializer = schema match {
case None => new KafkaAvroSerializer(schemaRegistryClient)
case Some(schema) =>
new KafkaAvroSerializer(schemaRegistryClient) {
// Overrides the default auto-registration behaviour, which attempts to guess the
// writer schema based on the encoded representation used by the Java Avro SDK.
// This works for types such as Records, which contain a reference to the exact schema
// that was used to write them, but doesn't work so well for unions (where
// the default behaviour is to register just the schema for the alternative
// being produced) or logical types such as timestamp-millis (where the logical
// type is lost).
val parsedSchema = new AvroSchema(schema.toString)
override def serialize(topic: String, record: AnyRef): Array[Byte] = {
if (record == null) {
return null
}
serializeImpl(
getSubjectName(topic, isKey, record, parsedSchema),
record,
parsedSchema
)
}
}
}
serializer.configure(withDefaults(properties), isKey)
(serializer, schemaRegistryClient)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package fs2.kafka.vulcan

import cats.effect.IO
import cats.effect.unsafe.implicits.global
import fs2.kafka.Headers
import io.confluent.kafka.schemaregistry.client.MockSchemaRegistryClient
import org.scalatest.funspec.AnyFunSpec
import vulcan.{AvroError, Codec}
Expand All @@ -16,6 +17,25 @@ final class AvroSerializerSpec extends AnyFunSpec {
assert(serializer.forValue.attempt.unsafeRunSync().isRight)
}

it("auto-registers union schemas") {
(avroSerializer[Either[Int, Boolean]]
.using(avroSettings)
.forValue
.flatMap(
_.serialize(
"test-union-topic",
Headers.empty,
Right(true)
)
))
.unsafeRunSync()
assert(
schemaRegistryClient
.getLatestSchemaMetadata("test-union-topic-value")
.getSchema === """["int","boolean"]"""
)
}

it("raises schema errors") {
val codec: Codec[Int] =
Codec.instance(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,10 @@ final class AvroSettingsSpec extends AnyFunSpec with ScalaCheckPropertyChecks {
it("should provide withCreateAvroSerializer") {
assert {
settings
.withCreateAvroSerializer {
case _ => IO.raiseError(new RuntimeException)
.withCreateAvroSerializer { (_, _, _, _) =>
IO.raiseError(new RuntimeException)
}
.createAvroSerializer(isKey = false)
.createAvroSerializer(isKey = false, null)
.attempt
.unsafeRunSync()
.isLeft
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,12 @@ final class PackageSpec extends AnyFunSpec {
describe("avroSerializer/avroDeserializer") {
it("should be able to do roundtrip serialization") {
(for {
serializer <- avroSerializer[Test].using(avroSettings).forValue
serializer <- avroSerializer[Either[Test, Int]].using(avroSettings).forValue
test = Test("test")
serialized <- serializer.serialize("topic", Headers.empty, test)
deserializer <- avroDeserializer[Test].using(avroSettings).forValue
serialized <- serializer.serialize("topic", Headers.empty, Left(test))
deserializer <- avroDeserializer[Either[Test, Int]].using(avroSettings).forValue
deserialized <- deserializer.deserialize("topic", Headers.empty, serialized)
} yield assert(deserialized == test)).unsafeRunSync()
} yield assert(deserialized == Left(test))).unsafeRunSync()
}

it("should be able to do roundtrip serialization using compatible schemas") {
Expand Down

0 comments on commit 6673fda

Please sign in to comment.