Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

question re IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord #80

Open
nicolaemarasoiu opened this issue Apr 24, 2020 · 0 comments

Comments

@nicolaemarasoiu
Copy link

Hello,

We have like this:

addSbtPlugin("com.julianpeeters" % "sbt-avrohugger" % "2.0.0-RC22")
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.5")
addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.0.2")
addSbtPlugin("org.wartremover" % "sbt-wartremover" % "2.4.3")

scalaVersion := "2.12.10"
name := "global-topic-conveyor"
organization := "com.ovoenergy.data-availability"
version := "1.0"
scalacOptions ++= Seq(
"-deprecation",
"-feature",
"-unchecked",
"-language:higherKinds",
// "-Xfatal-warnings",
"-Xlint:infer-any",
"-Xlint:unused"
)

resolvers += "confluent" at "https://packages.confluent.io/maven/"

val confluentVersion = "5.4.0"
val kafkaVersion = confluentVersion + "-ccs"

libraryDependencies += "org.apache.kafka" %% "kafka-streams-scala" % kafkaVersion
libraryDependencies += "io.confluent" % "kafka-streams-avro-serde" % confluentVersion
libraryDependencies += "io.confluent" % "kafka-schema-registry-client" % confluentVersion
libraryDependencies += "ch.qos.logback" % "logback-classic" % "1.2.3"
libraryDependencies += "com.typesafe" % "config" % "1.4.0"
libraryDependencies += "com.sksamuel.avro4s" %% "avro4s-core" % "3.0.4"
libraryDependencies += "io.circe" %% "circe-parser" % "0.13.0"
libraryDependencies += "org.apache.kafka" % "kafka-streams-test-utils" % kafkaVersion % "test"
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % "test"
libraryDependencies += "com.github.tomakehurst" % "wiremock" % "2.25.1" % "test"

avroSourceDirectories in Compile += (sourceDirectory in Compile).value / "resources" / "avro"
sourceGenerators in Compile += (avroScalaGenerate in Compile).taskValue

assemblyJarName in assembly := "global-topic-conveyor.jar"

parallelExecution in Test := false

sourceGenerators in Compile += Def.task {
val schemasPath = (sourceDirectory in Compile).value / "resources" / "avro"
streams.value.log.info(s"Parsing schemas from $schemasPath")
val managedFolder = (sourceManaged in Compile).value
val outFile = CreateImplicitsForSchemas.writeImplicitsForSchemas(schemasPath, managedFolder)
streams.value.log.info(s"Generated implicits for Avro4s: $outFile")
Seq(outFile)
}.taskValue

wartremoverErrors in (Compile, compile) ++= Warts.unsafe.filterNot(_ == Wart.Any) ++ Seq(
Wart.Nothing
)

wartremoverExcluded += sourceManaged.value
assemblyMergeStrategy in assembly := {
// TODO: module-info.class is conflicting for com.fasterxml.jackson after importing the new kafka libraries
// This has been addressed by discarding the file from the fat jar (as per the MergeStrategy.discard)
case "module-info.class" => MergeStrategy.discard
case x =>
val oldStrategy = (assemblyMergeStrategy in assembly).value
oldStrategy(x)
}

// Since the uberjar is deployed by the CI process and tests are performed before the jar is built
// There's no need for testing again before assembling, hence the next line
test in assembly := {}

package com.ovoenergy.globaltopics.pipelines.orion

import java.time.{Instant, LocalDate}
import java.util.{Properties, UUID}

import com.ovoenergy.kafka.common.event.Metadata
import com.ovoenergy.kafka.identity.event.{
CreatedOrUpdate,
Deleted,
FormattedAddress,
User,
UserEvent
}
import io.confluent.kafka.serializers.{AbstractKafkaAvroSerDeConfig, KafkaAvroSerializer}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.kafka.common.serialization.StringSerializer

object Producer {
def main(args: Array[String]): Unit = {
val topic = "identity_users_v1"
val props = fromMap(
Map(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer],
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"
)
)
val producer = new KafkaProducerString, UserEvent
val userEventId = "userEventId__alsjbfahjlsbefhjashdfbjsaf"
val userId = "userId__qowiuefg324uf3qhf3q4"
val inputMessageValue = UserEvent(
metadata = Metadata(eventId = userEventId, traceToken = "asdfkjasdf", createdAt = Instant.now),
realm = "ovo-france",
event = Left[CreatedOrUpdate, Deleted](
CreatedOrUpdate(
User(
id = userId,
title = Some("Mr"),
givenName = "John",
familyName = "Smith",
emailAddress = Some("[email protected]"),
birthdate = Some(LocalDate.of(1978, 6, 4)),
phoneNumbers = List("072189736"),
permissions = Nil,
postalAddress = Some(List("Unused address, we use the formattedAddresses instead")),
formattedAddresses = List(
FormattedAddress(
addressType = "current",
country = "GB",
address = """
{
"line1":"195 Radburn Close",
"town":"Eastbourne",
"postcode":"CM18 7EQ"
}
"""
)
)
)
)
)
)
discard(
producer.send(
new ProducerRecord[String, UserEvent](topic, UUID.randomUUID.toString, inputMessageValue)
)
)
producer.flush()
}
def discard(evaluateForSideEffectOnly: Any): Unit = {
val _: Any = evaluateForSideEffectOnly
()
}

def fromMap(properties: Map[String, Object]): Properties = {
val result = properties.foldLeft(new Properties()) {
case (props, (k, v)) =>
discard(props.put(k, v))
props
}
println("For map=" + properties.toString() + ", properties=" + result.toString)
result
}

}

log:

{"message":"[Producer clientId=producer-1] Cluster ID: sqjPEJjHQyuly4zXPuqdmA","logger":"org.apache.kafka.clients.Metadata","thread":"kafka-producer-network-thread | producer-1","severity":"INFO","timestamp":{"seconds":1587741874,"nanos":201000000}}

Exception in thread "main" java.lang.IllegalArgumentException: Unsupported Avro type. Supported types are null, Boolean, Integer, Long, Float, Double, String, byte[] and IndexedRecord
at io.confluent.kafka.serializers.AvroSchemaUtils.getSchema(AvroSchemaUtils.java:93)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:55)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at com.ovoenergy.globaltopics.pipelines.orion.Producer$.main(Producer.scala:66)
at com.ovoenergy.globaltopics.pipelines.orion.Producer.main(Producer.scala)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant