Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unable to find schema by definition when using Kafka Connect and MongoDB #143

Closed
asmithjc opened this issue Feb 22, 2022 · 1 comment
Closed

Comments

@asmithjc
Copy link

asmithjc commented Feb 22, 2022

Hi folks 👋

I am trying to run Kafka Connect (via Debezium) to pull change data capture (CDC) data from MongoDB into Kafka. Here is what my config looks like (some fields redacted due to sensitive info):

{
  "config": {
    "collection.include.list": "<redacted>",
    "connector.class": "io.debezium.connector.mongodb.MongoDbConnector",
    "database.include.list": "<redacted>",
    "field.exclude.list": "<redacted>",
    "key.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
    "key.converter.avroRecordType": "GENERIC_RECORD",
    "key.converter.compatibility": "FULL",
    "key.converter.dataFormat": "AVRO",
    "key.converter.region": "us-west-2",
    "key.converter.registry.name": "avro-schemas",
    "key.converter.schemaAutoRegistrationEnabled": false,
    "key.converter.schemaName": "foo.bar.Key",
    "key.converter.schemas.enable": false,
    "key.converter.userAgentApp": "Debezium",
    "mongodb.hosts": "mongors/localhost:27017",
    "mongodb.members.auto.discover": false,
    "mongodb.name": "<redacted>",
    "mongodb.password": "debezium_pass",
    "mongodb.ssl.enabled": true,
    "mongodb.ssl.invalid.hostname.allowed": true,
    "mongodb.user": "debezium_user",
    "sanitize.field.names": true,
    "snapshot.mode": "initial",
    "tasks.max": "1",
    "tombstones.on.delete": false,
    "transforms": "unwrap",
    "transforms.unwrap.array.encoding": "array",
    "transforms.unwrap.delete.handling.mode": "none",
    "transforms.unwrap.flatten.struct": false,
    "transforms.unwrap.sanitize.field.names": true,
    "transforms.unwrap.type": "io.debezium.connector.mongodb.transforms.ExtractNewDocumentState",
    "value.converter": "com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter",
    "value.converter.avroRecordType": "GENERIC_RECORD",
    "value.converter.compatibility": "FULL",
    "value.converter.region": "us-west-2",
    "value.converter.registry.name": "avro-schemas",
    "value.converter.schemaAutoRegistrationEnabled": false,
    "value.converter.schemaName": "foo.bar.Value",
    "value.converter.schemas.enable": false,
    "value.converter.userAgentApp": "Debezium"
  },
  "name": "test_connector"
}

I will focus on foo.bar.Key (although I am sure the problem exists for foo.bar.Value as well). My schema is defined as follows (and is registered in AWS):

{
  "type": "record",
  "name": "Key",
  "namespace": "foo.bar",
  "fields": [
    {
      "name": "id",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

However when I run the Debezium container I get the following error:

debezium  | 2022-02-22 15:50:34,630 ERROR  ||  WorkerSourceTask{id=test_connector-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted   [org.apache.kafka.connect.runtime.WorkerTask]
debezium  | org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
debezium  |     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:206)
debezium  |     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132)
debezium  |     at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:318)
debezium  |     at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:347)
debezium  |     at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261)
debezium  |     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
debezium  |     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237)
debezium  |     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
debezium  |     at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
debezium  |     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
debezium  |     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
debezium  |     at java.base/java.lang.Thread.run(Thread.java:829)
debezium  | Caused by: org.apache.kafka.connect.errors.DataException: Converting Kafka Connect data to byte[] failed due to serialization error: 
debezium  |     at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:98)
debezium  |     at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63)
debezium  |     at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$2(WorkerSourceTask.java:318)
debezium  |     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156)
debezium  |     at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190)
debezium  |     ... 11 more
debezium  | Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Failed to auto-register schema. Auto registration of schema is not enabled.
debezium  |     at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getORRegisterSchemaVersionId(AWSSchemaRegistryClient.java:187)
debezium  |     at com.amazonaws.services.schemaregistry.serializers.GlueSchemaRegistrySerializationFacade.getOrRegisterSchemaVersion(GlueSchemaRegistrySerializationFacade.java:98)
debezium  |     at com.amazonaws.services.schemaregistry.serializers.avro.AWSKafkaAvroSerializer.serialize(AWSKafkaAvroSerializer.java:115)
debezium  |     at com.amazonaws.services.schemaregistry.kafkaconnect.AWSKafkaAvroConverter.fromConnectData(AWSKafkaAvroConverter.java:96)
debezium  |     ... 15 more
debezium  | Caused by: com.amazonaws.services.schemaregistry.exception.AWSSchemaRegistryException: Failed to get schemaVersionId by schema definition for schema name = foo.bar.Key 
debezium  |     at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionIdByDefinition(AWSSchemaRegistryClient.java:148)
debezium  |     at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getORRegisterSchemaVersionId(AWSSchemaRegistryClient.java:179)
debezium  |     ... 18 more
debezium  | Caused by: software.amazon.awssdk.services.glue.model.EntityNotFoundException: Schema version is not found. (Service: Glue, Status Code: 400, Request ID: c53c041c-de95-4633-834b-32902a928bf4, Extended Request ID: null)
debezium  |     at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:123)
debezium  |     at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:79)
debezium  |     at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:59)
debezium  |     at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:40)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:80)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium  |     at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
debezium  |     at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
debezium  |     at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
debezium  |     at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
debezium  |     at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
debezium  |     at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:167)
debezium  |     at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
debezium  |     at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:175)
debezium  |     at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
debezium  |     at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
debezium  |     at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
debezium  |     at software.amazon.awssdk.services.glue.DefaultGlueClient.getSchemaByDefinition(DefaultGlueClient.java:7294)
debezium  |     at com.amazonaws.services.schemaregistry.common.AWSSchemaRegistryClient.getSchemaVersionIdByDefinition(AWSSchemaRegistryClient.java:144)
debezium  |     ... 19 more

I dug in a bit and tried using schemaAutoRegistrationEnabled=true and it worked when I did that! However, here is the Avro schema it generates:

{
  "type": "record",
  "name": "ConnectDefault",
  "namespace": "com.amazonaws.services.schemaregistry.kafkaconnect.avrodata",
  "fields": [
    {
      "name": "id",
      "type": [
        "null",
        "string"
      ],
      "default": null
    }
  ]
}

From my understanding of the error message, it seems it tries to pull a schema definition from MongoDB and then look that definition up against the schema registry. Since the naming is different, it fails to find the correct schema and crashes (since I have schemaAutoRegistrationEnabled=false). Is there any way to force the connector to use the schema I have defined (and maybe look it up by name) instead of looking it up by definition?

I have already looked into implementing AWSSchemaNamingStrategy (as done in #126), but I have some additional concerns around that. For example, we have some more complex Mongo collections that have null fields by default. When the connector sees these, it generates a schema that looks like this:

{
  "name": "deletedAt",
  "type": [
    "null",
    "string"
  ],
  "default": null
}

When it should actually look like this:

{
  "name": "deletedAt",
  "type": [
    "null",
    "long"
  ],
  "default": null
}

Let me know if I can provide any additional details. Thank you!

Tl;dr

Is it possible to force a Kafka connector to use the schema already defined in the schema registry instead of looking up by definition or implementing a custom AWSSchemaNamingStrategy?

Versions used

  • aws-glue-schema-registry - 1.1.8
  • Debezium - 1.8
  • Kafka - 2.8.1
@asmithjc
Copy link
Author

I actually think this is a Kafka Connect and/or Debezium issue and not a Schema Registry issue. I wrote a Custom Single Message Transformer (SMT) to force our schema and it worked.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant