Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Getting duplicate events on Kafka connector restart #1682

Open
shubham-maheshwari-tech opened this issue Oct 10, 2024 · 5 comments
Open

Getting duplicate events on Kafka connector restart #1682

shubham-maheshwari-tech opened this issue Oct 10, 2024 · 5 comments

Comments

@shubham-maheshwari-tech

Hi
I am using Apache camel S3 source connector, with Strimizi Operator. When ever the kafka connector is restarted after some config update duplicate events are coming in Kafka means older payload which are present in S3 bucket that can also be seen.
Below is full connector configuration:
CamelAwss3sourceSourceConnectorConfig values: camel.aggregation.size = 10 camel.aggregation.timeout = 500 camel.beans.aggregate = null camel.error.handler = default camel.error.handler.max.redeliveries = 0 camel.error.handler.redelivery.delay = 1000 camel.idempotency.enabled = true camel.idempotency.expression.header = null camel.idempotency.expression.type = body camel.idempotency.kafka.bootstrap.servers = localhost:9092 camel.idempotency.kafka.max.cache.size = 1000 camel.idempotency.kafka.poll.duration.ms = 100 camel.idempotency.kafka.topic = kafka_idempotent_repository camel.idempotency.memory.dimension = 100 camel.idempotency.repository.type = memory camel.kamelet.aws-s3-source.accessKey = [hidden] camel.kamelet.aws-s3-source.autoCreateBucket = false camel.kamelet.aws-s3-source.bucketNameOrArn = test-bucket camel.kamelet.aws-s3-source.delay = 500 camel.kamelet.aws-s3-source.deleteAfterRead = false camel.kamelet.aws-s3-source.forcePathStyle = false camel.kamelet.aws-s3-source.ignoreBody = false camel.kamelet.aws-s3-source.maxMessagesPerPoll = 10 camel.kamelet.aws-s3-source.overrideEndpoint = false camel.kamelet.aws-s3-source.prefix = schema/feature camel.kamelet.aws-s3-source.region = ap-southeast-1 camel.kamelet.aws-s3-source.secretKey = [hidden] camel.kamelet.aws-s3-source.uriEndpointOverride = null camel.kamelet.aws-s3-source.useDefaultCredentialsProvider = false camel.map.headers = true camel.map.properties = true camel.remove.headers.pattern = camel.source.camelMessageHeaderKey = CamelAwsS3Key camel.source.component = null camel.source.contentLogLevel = OFF camel.source.marshal = null camel.source.maxBatchPollSize = 1000 camel.source.maxNotCommittedRecords = 1024 camel.source.maxPollDuration = 1000 camel.source.pollingConsumerBlockTimeout = 0 camel.source.pollingConsumerBlockWhenFull = true camel.source.pollingConsumerQueueSize = 1000 camel.source.unmarshal = null camel.source.url = null topics = commerce-test

The camel.idempotency.enabled = true is already enabled but still duplicate events coming on every kafka connector restart.
Using 4.4.2 version apache camel lib

@oscerd
Copy link
Contributor

oscerd commented Oct 10, 2024

It's in memory. It's normal. You need to enable deleteAfterRead to true.

@shubham-maheshwari-tech
Copy link
Author

Is there any other work around, because don't want to delete the message in S3 bucket once its consumed. The payload/message in S3 bucket is used for other functionality also.

@oscerd
Copy link
Contributor

oscerd commented Oct 10, 2024

Use a Kafka topic:

camel.idempotency.enabled=true
camel.idempotency.repository.type=kafka
camel.idempotency.expression.type=body
camel.idempotency.kafka.topic=my.idempotency.topic
camel.idempotency.kafka.bootstrap.servers=localhost:9092
camel.idempotency.kafka.max.cache.size=1500
camel.idempotency.kafka.poll.duration.ms=150

https://camel.apache.org/camel-kafka-connector/4.8.x/user-guide/idempotency.html

@shubham-maheshwari-tech
Copy link
Author

Tried to use the Kafka topic approach, but getting below exception while deploying the connector:
2024-10-11 06:39:26,886 ERROR [camels3testsource-featflag-connector-global-sre|task-0] Error starting CamelContext (camel-31) due to exception thrown: Failed to start route ckcIdempotent-74 because of org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata (org.apache.camel.impl.engine.AbstractCamelContext) [task-thread-camels3testsource-featflag-connector-global-sre-0] org.apache.camel.FailedToStartRouteException: Failed to start route ckcIdempotent-74 because of org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:122) at org.apache.camel.impl.engine.InternalRouteStartupManager.doWarmUpRoutes(InternalRouteStartupManager.java:308) at org.apache.camel.impl.engine.InternalRouteStartupManager.safelyStartRouteServices(InternalRouteStartupManager.java:187) at org.apache.camel.impl.engine.InternalRouteStartupManager.doStartOrResumeRoutes(InternalRouteStartupManager.java:144) at org.apache.camel.impl.engine.AbstractCamelContext.doStartCamel(AbstractCamelContext.java:2799) at org.apache.camel.impl.engine.AbstractCamelContext.doStartContext(AbstractCamelContext.java:2480) at org.apache.camel.impl.engine.AbstractCamelContext.doStart(AbstractCamelContext.java:2435) at org.apache.camel.support.service.BaseService.start(BaseService.java:113) at org.apache.camel.impl.engine.AbstractCamelContext.start(AbstractCamelContext.java:2040) at org.apache.camel.impl.DefaultCamelContext.start(DefaultCamelContext.java:212) at org.apache.camel.main.SimpleMain.doStart(SimpleMain.java:43) at org.apache.camel.support.service.BaseService.start(BaseService.java:113) at org.apache.camel.kafkaconnector.CamelSourceTask.start(CamelSourceTask.java:179) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.initializeAndStart(AbstractWorkerSourceTask.java:274) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259) at org.apache.kafka.connect.runtime.AbstractWorkerSourceTask.run(AbstractWorkerSourceTask.java:75) at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:181) at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539) at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:840) Caused by: org.apache.camel.RuntimeCamelException: org.apache.kafka.common.errors.TimeoutException: Timeout expired while fetching topic metadata at org.apache.camel.RuntimeCamelException.wrapRuntimeCamelException(RuntimeCamelException.java:51) at org.apache.camel.support.ChildServiceSupport.start(ChildServiceSupport.java:67) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:126) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:153) at org.apache.camel.impl.engine.DefaultChannel.doStart(DefaultChannel.java:129) at org.apache.camel.support.service.BaseService.start(BaseService.java:113) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:126) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:139) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:115) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:153) at org.apache.camel.processor.Pipeline.doStart(Pipeline.java:205) at org.apache.camel.support.service.BaseService.start(BaseService.java:113) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:126) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:113) at org.apache.camel.support.processor.DelegateAsyncProcessor.doStart(DelegateAsyncProcessor.java:89) at org.apache.camel.support.service.BaseService.start(BaseService.java:113) at org.apache.camel.support.service.ServiceHelper.startService(ServiceHelper.java:126) at org.apache.camel.impl.engine.RouteService.startChildServices(RouteService.java:395) at org.apache.camel.impl.engine.RouteService.doWarmUp(RouteService.java:192) at org.apache.camel.impl.engine.RouteService.warmUp(RouteService.java:120)

Below config is used to connect with the Kafka broker: 
`camel.idempotency.repository.type: kafka
  camel.idempotency.expression.type: header
  camel.idempotency.expression.header: CamelAwsS3Key
  camel.idempotency.kafka.topic: commerce_raw_v1
  camel.idempotency.kafka.bootstrap.servers: <broker url> // not localhost
  camel.idempotency.kafka.max.cache.size: 1500
  camel.idempotency.kafka.poll.duration.ms: 150
  camel.component.kafka.requestTimeoutMs: 300000
  camel.component.kafka.securityProtocol: SASL_SSL
  camel.component.kafka.saslMechanism: PLAIN
  camel.component.kafka.saslJaasConfig: org.apache.kafka.common.security.plain.PlainLoginModule required username='${env:KAFKABROKER_USERNAME}' password='${env:KAFKABROKER_PASSWORD}';
  camel.component.kafka.enableIdempotence: true
  camel.component.kafka.deliveryTimeoutMs: 120000  `

Already verified that broker URL is correct, idempotent topic is exists & healthy, Username & passwords are correct. 

What is observed from connector startup logs is that, Producer config can be seen 2 twice once with correct security protocol as SASL_SSL and other config which are passes and 2nd with default configuration. 

Is there any other configuration which as to be passed to make connector working with Topic based Idempotent. 
Kafka connector application is also connected with same broker application. So what I assume is that my S3 connecter configuration has some issue or some config is missing.  

@oscerd
Copy link
Contributor

oscerd commented Oct 11, 2024

As far as I remember the kafka topic used for this purpose must be accessed without any authentication. That's the reason why it's failing.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants