You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
avroSourceDirectories in Compile += (sourceDirectory in Compile).value / "resources" / "avro"
sourceGenerators in Compile += (avroScalaGenerate in Compile).taskValue
assemblyJarName in assembly := "global-topic-conveyor.jar"
parallelExecution in Test := false
sourceGenerators in Compile += Def.task {
val schemasPath = (sourceDirectory in Compile).value / "resources" / "avro""Parsing schemas from $schemasPath")
val managedFolder = (sourceManaged in Compile).value
val outFile = CreateImplicitsForSchemas.writeImplicitsForSchemas(schemasPath, managedFolder)"Generated implicits for Avro4s: $outFile")
wartremoverExcluded += sourceManaged.value
assemblyMergeStrategy in assembly := {
// TODO: module-info.class is conflicting for com.fasterxml.jackson after importing the new kafka libraries
// This has been addressed by discarding the file from the fat jar (as per the MergeStrategy.discard)
case "module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
// Since the uberjar is deployed by the CI process and tests are performed before the jar is built
// There's no need for testing again before assembling, hence the next line
test in assembly := {}
Exception in thread "main" java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
at io.confluent.kafka.serializers.AvroSchemaUtils.getSchema(
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(
at org.apache.kafka.common.serialization.Serializer.serialize(
at org.apache.kafka.clients.producer.KafkaProducer.doSend(
at org.apache.kafka.clients.producer.KafkaProducer.send(
at org.apache.kafka.clients.producer.KafkaProducer.send(
at com.ovoenergy.globaltopics.pipelines.orion.Producer$.main(Producer.scala:66)
at com.ovoenergy.globaltopics.pipelines.orion.Producer.main(Producer.scala)
The text was updated successfully, but these errors were encountered:
We have like this:
addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0-RC22")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.2")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.3")
scalaVersion := "2.12.10"
name := "global-topic-conveyor"
organization := ""
version := "1.0"
scalacOptions ++= Seq(
// "-Xfatal-warnings",
resolvers += "confluent" at ""
val confluentVersion = "5.4.0"
val kafkaVersion = confluentVersion + "-ccs"
libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
libraryDependencies += "io.circe" %% "circe-parser" % "0.13.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % "test"
libraryDependencies += "com.github.tomakehurst" % "wiremock" % "2.25.1" % "test"
avroSourceDirectories in Compile += (sourceDirectory in Compile).value / "resources" / "avro"
sourceGenerators in Compile += (avroScalaGenerate in Compile).taskValue
assemblyJarName in assembly := "global-topic-conveyor.jar"
parallelExecution in Test := false
sourceGenerators in Compile += Def.task {
val schemasPath = (sourceDirectory in Compile).value / "resources" / "avro""Parsing schemas from $schemasPath")
val managedFolder = (sourceManaged in Compile).value
val outFile = CreateImplicitsForSchemas.writeImplicitsForSchemas(schemasPath, managedFolder)"Generated implicits for Avro4s: $outFile")
wartremoverErrors in (Compile, compile) ++= Warts.unsafe.filterNot(_ == Wart.Any) ++ Seq(
wartremoverExcluded += sourceManaged.value
assemblyMergeStrategy in assembly := {
// TODO: module-info.class is conflicting for com.fasterxml.jackson after importing the new kafka libraries
// This has been addressed by discarding the file from the fat jar (as per the MergeStrategy.discard)
case "module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
// Since the uberjar is deployed by the CI process and tests are performed before the jar is built
// There's no need for testing again before assembling, hence the next line
test in assembly := {}
package com.ovoenergy.globaltopics.pipelines.orion
import java.time.{Instant, LocalDate}
import java.util.{Properties, UUID}
import com.ovoenergy.kafka.common.event.Metadata
import com.ovoenergy.kafka.identity.event.{
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroSerializer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer
object Producer {
def main(args: Array[String]): Unit = {
val topic = "identity_users_v1"
val props = fromMap(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer],
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
val producer = new KafkaProducerString, UserEvent
val userEventId = "userEventId__alsjbfahjlsbefhjashdfbjsaf"
val userId = "userId__qowiuefg324uf3qhf3q4"
val inputMessageValue = UserEvent(
metadata = Metadata(eventId = userEventId, traceToken = "asdfkjasdf", createdAt =,
realm = "ovo-france",
event = Left[CreatedOrUpdate, Deleted](
id = userId,
title = Some("Mr"),
givenName = "John",
familyName = "Smith",
emailAddress = Some("[email protected]"),
birthdate = Some(LocalDate.of(1978, 6, 4)),
phoneNumbers = List("072189736"),
permissions = Nil,
postalAddress = Some(List("Unused address, we use the formattedAddresses instead")),
formattedAddresses = List(
addressType = "current",
country = "GB",
address = """
"line1":"195 Radburn Close",
"postcode":"CM18 7EQ"
new ProducerRecord[String, UserEvent](topic, UUID.randomUUID.toString, inputMessageValue)
def discard(evaluateForSideEffectOnly: Any): Unit = {
val _: Any = evaluateForSideEffectOnly
def fromMap(properties: Map[String, Object]): Properties = {
val result = properties.foldLeft(new Properties()) {
case (props, (k, v)) =>
discard(props.put(k, v))
println("For map=" + properties.toString() + ", properties=" + result.toString)
{"message":"[Producer clientId=producer-1] Cluster ID: sqjPEJjHQyuly4zXPuqdmA","logger":"org.apache.kafka.clients.Metadata","thread":"kafka-producer-network-thread | producer-1","severity":"INFO","timestamp":{"seconds":1587741874,"nanos":201000000}}
Exception in thread "main" java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
at io.confluent.kafka.serializers.AvroSchemaUtils.getSchema(
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(
at org.apache.kafka.common.serialization.Serializer.serialize(
at org.apache.kafka.clients.producer.KafkaProducer.doSend(
at org.apache.kafka.clients.producer.KafkaProducer.send(
at org.apache.kafka.clients.producer.KafkaProducer.send(
at com.ovoenergy.globaltopics.pipelines.orion.Producer$.main(Producer.scala:66)
at com.ovoenergy.globaltopics.pipelines.orion.Producer.main(Producer.scala)
The text was updated successfully, but these errors were encountered: