Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unsupported type passed for serialization: com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.NonRecordContainer #316

Open
chriline opened this issue Dec 20, 2023 · 1 comment
Labels
question Further information is requested

Comments

@chriline
Copy link

Hi,

I'm trying to use this library together with the lenses-mqtt-source-connector (https://docs.lenses.io/5.0/integrations/connectors/stream-reactor/sources/mqttsourceconnector/) that consumes JSON from an MQTT topic.

I want the converter to pick up the JSON (for which I have no schema), create the respective AVRO schema and to upload both the schema to Glue and the message in AVRO format to Kafka. Is this something that can be achieved with this library?

This is my config:

name=lenses-5-mqtt-source
tasks.max=1

connector.class=com.datamountaineer.streamreactor.connect.mqtt.source.MqttSourceConnector
connect.mqtt.clean=true
connect.mqtt.hosts=wss://my-mqtt-broker:1910
connect.mqtt.username=MyUserName
connect.mqtt.service.quality=2
connect.mqtt.kcql=INSERT INTO TestTopic SELECT * FROM MyTopic WITHCONVERTER=`com.datamountaineer.streamreactor.connect.converters.source.JsonPassThroughConverter`
connect.mqtt.password=MyPassword
connect.mqtt.client.id=MyClientId
# VALUE-CONVERTER
value.converter=com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter
value.converter.dataFormat=AVRO
value.converter.schemaAutoRegistrationEnabled=true
value.converter.registry.name=my-registry
value.converter.endpoint=https://glue.eu-central-1.amazonaws.com
value.converter.region=eu-central-1
value.converter.avroRecordType=SPECIFIC_RECORD
value.converter.schemaName=my-schema-name
value.converter.schemas.enable=true
value.converter.compatibility=NONE
# KEY-CONVERTER
key.converter=org.apache.kafka.connect.storage.StringConverter
key.converter.schemas.enable=false

I can see the message hit AWSKafkaAvroConverter::fromConnectData(String topic, Schema schema, Object value) - that is schema is null and value is the JSON string (of type string).

2023-12-20 13:27:25,921 ERROR [task-thread-lenses-5-mqtt-source-0] [org.apache.kafka.connect.runtime.WorkerTask] [doRun:212] WorkerSourceTask{id=lenses-5-mqtt-source-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.convertTransformedRecord(AbstractWorkerSourceTask.java:494)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.sendRecords(AbstractWorkerSourceTask.java:402)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.execute(AbstractWorkerSourceTask.java:367)
	at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
	at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:77)
	at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:236)
	at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
	at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: org.apache.kafka.connect.errors.DataException: Converting Kafka Connect data to byte[] failed due to serialization error: 
	at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:109)
	at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:64)
	at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.lambda$convertTransformedRecord$6(AbstractWorkerSourceTask.java:494)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
	at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
	... 13 common frames omitted
Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Unsupported type passed for serialization: com.amazonaws.services.schemaregistry.kafkaconnect.avrodata.NonRecordContainer@95171f16
	at com.amazonaws.services.schemaregistry.serializers.avro.AvroSerializer.createDatumWriter(AvroSerializer.java:94)
	at com.amazonaws.services.schemaregistry.serializers.avro.AvroSerializer.serialize(AvroSerializer.java:64)
	at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade.serialize(GlueSchemaRegistrySerializationFacade.java:108)
	at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.serialize(AWSKafkaAvroSerializer.java:123)
	at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:107)
	... 17 common frames omitted
@blacktooth blacktooth added the question Further information is requested label Jan 3, 2024
@JKCai
Copy link

JKCai commented Apr 15, 2024

Hi team, is there any update on this git issue? I want to integrate lenses connector (S3 connector) with AWS Glue registry and facing similar issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
question Further information is requested
Projects
None yet
Development

No branches or pull requests

3 participants