You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
I am trying to run Kafka Connect (via Debezium) to pull change data capture (CDC) data from MongoDB into Kafka. Here is what my config looks like (some fields redacted due to sensitive info):
I will focus on foo.bar.Key (although I am sure the problem exists for foo.bar.Value as well). My schema is defined as follows (and is registered in AWS):
However when I run the Debezium container I get the following error:
debezium | 2022-02-22 15:50:34,630 ERROR || WorkerSourceTask{id=test_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted [org.apache.kafka.connect.runtime.WorkerTask]
debezium | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:318)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:347)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
debezium | at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
debezium | at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
debezium | at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium | at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium | at java.base/java.lang.Thread.run(Thread.java:829)
debezium | Caused by: org.apache.kafka.connect.errors.DataException: Converting Kafka Connect data to byte[] failed due to serialization error:
debezium | at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:98)
debezium | at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
debezium | at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:318)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
debezium | at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
debezium | ... 11 more
debezium | Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Failed to auto-register schema. Auto registration of schema is not enabled.
debezium | at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getORRegisterSchemaVersionId(AWSSchemaRegistryClient.java:187)
debezium | at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade.getOrRegisterSchemaVersion(GlueSchemaRegistrySerializationFacade.java:98)
debezium | at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.serialize(AWSKafkaAvroSerializer.java:115)
debezium | at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:96)
debezium | ... 15 more
debezium | Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Failed to get schemaVersionId by schema definition for schema name = foo.bar.Key
debezium | at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionIdByDefinition(AWSSchemaRegistryClient.java:148)
debezium | at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getORRegisterSchemaVersionId(AWSSchemaRegistryClient.java:179)
debezium | ... 18 more
debezium | Caused by: software.amazon.awssdk.services.glue.model.EntityNotFoundException: Schema version is not found. (Service: Glue, Status Code: 400, Request ID: c53c041c-de95-4633-834b-32902a928bf4, Extended Request ID: null)
debezium | at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:123)
debezium | at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:79)
debezium | at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:59)
debezium | at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:40)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:80)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium | at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
debezium | at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
debezium | at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
debezium | at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
debezium | at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
debezium | at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167)
debezium | at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
debezium | at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175)
debezium | at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
debezium | at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
debezium | at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
debezium | at software.amazon.awssdk.services.glue.DefaultGlueClient.getSchemaByDefinition(DefaultGlueClient.java:7294)
debezium | at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionIdByDefinition(AWSSchemaRegistryClient.java:144)
debezium | ... 19 more
I dug in a bit and tried using schemaAutoRegistrationEnabled=true and it worked when I did that! However, here is the Avro schema it generates:
From my understanding of the error message, it seems it tries to pull a schema definition from MongoDB and then look that definition up against the schema registry. Since the naming is different, it fails to find the correct schema and crashes (since I have schemaAutoRegistrationEnabled=false). Is there any way to force the connector to use the schema I have defined (and maybe look it up by name) instead of looking it up by definition?
I have already looked into implementing AWSSchemaNamingStrategy (as done in #126), but I have some additional concerns around that. For example, we have some more complex Mongo collections that have null fields by default. When the connector sees these, it generates a schema that looks like this:
Let me know if I can provide any additional details. Thank you!
Tl;dr
Is it possible to force a Kafka connector to use the schema already defined in the schema registry instead of looking up by definition or implementing a custom AWSSchemaNamingStrategy?
Versions used
aws-glue-schema-registry - 1.1.8
Debezium - 1.8
Kafka - 2.8.1
The text was updated successfully, but these errors were encountered:
I actually think this is a Kafka Connect and/or Debezium issue and not a Schema Registry issue. I wrote a Custom Single Message Transformer (SMT) to force our schema and it worked.
Hi folks 👋
I am trying to run Kafka Connect (via Debezium) to pull change data capture (CDC) data from MongoDB into Kafka. Here is what my config looks like (some fields redacted due to sensitive info):
I will focus on
foo.bar.Key
(although I am sure the problem exists forfoo.bar.Value
as well). My schema is defined as follows (and is registered in AWS):However when I run the Debezium container I get the following error:
I dug in a bit and tried using
schemaAutoRegistrationEnabled=true
and it worked when I did that! However, here is the Avro schema it generates:From my understanding of the error message, it seems it tries to pull a schema definition from MongoDB and then look that definition up against the schema registry. Since the naming is different, it fails to find the correct schema and crashes (since I have
schemaAutoRegistrationEnabled=false
). Is there any way to force the connector to use the schema I have defined (and maybe look it up by name) instead of looking it up by definition?I have already looked into implementing
AWSSchemaNamingStrategy
(as done in #126), but I have some additional concerns around that. For example, we have some more complex Mongo collections that havenull
fields by default. When the connector sees these, it generates a schema that looks like this:When it should actually look like this:
Let me know if I can provide any additional details. Thank you!
Tl;dr
Is it possible to force a Kafka connector to use the schema already defined in the schema registry instead of looking up by definition or implementing a custom
AWSSchemaNamingStrategy
?Versions used
aws-glue-schema-registry
- 1.1.8The text was updated successfully, but these errors were encountered: