-
Notifications
You must be signed in to change notification settings - Fork 136
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Sporadic timeouts from ConsumerOffsetCommitter.CommitRequest #809
Comments
also the timeouts are not limited to OffsetCommitter, here are ones from sending messages
java.lang.InterruptedException: null |
Hi @tedcaozoom, Can you please provide more information on the architecture, configuration and environment? Are there specific steps that enable to reproduce the issue in test / synthetic environment? Configuration
Consumer configuration
Setup
Details about the issue observed
Please provide logs spanning period when the issue in question is observed Please provide any metrics collected for both Kafka cluster / topic and Parallel Consumer application (including KafkaConsumer and KafkaProducer) metrics for the period when the issue in question is observed From the stack traces above:
|
@rkolesnev I will try to get more info as soon as I can but I have a more urgent need, I really need our pods in GKE to be able to restart themselves when they encounter this problem (which basically kill our pods and we have to manually restart them, as soon as restarted they work fine for a while), so I would like to know how better to build the health checks currently we do the following and it's NOT working, so apparently ParallelConsumerGroup's containers are NOT closed or failed, how else can we do this?? ParallelConsumerGroup.isRunning() ??? or something else to indicate a problem? we need GKE to restart the pods themselves to buy us time to diagnose this further
|
just to report, it appears that ParallelConsumerGroup.isRunning() works correctly so basically !ParallelConsumerGroup.isRunning() == needing to restart so one more piece of information for this is that, when we encounter these timeouts, ParallelConsumerGroup stops running, ParallelConsumerGroup.isRunning() becomes FALSE |
Hmm that is interesting - what is the ParallelConsumerGroup object? note that internal ParallelStreamProcessor state object - can be RUNNING, PAUSED, CLOSING, CLOSED etc - and there RUNNING can be switched to PAUSED when back-pressure is kicking in - if its polling more data than its can process... i guess it is some custom code on your side - containers / parallel consumer group - wrapping multiple parallel consumer instances... |
Just to re-iterate - i cannot really help with the issue if i do not know what those things are, what is your setup, usage scenarios, configs, logs, versions etc etc.. |
Hello i wish to provide some detail on this scenario - first, ParallelConsumerGroup is a Spring Lifecycle container for PC processors. It is intended to support clean shutdown and of PC processors when one of them dies. The problem is they are not dying successfully in 0.5.3.0. last working version for clean shutdown was 0.5.2.5 second - producer timeout is not via PC producer, it is just happening within PC processing thread. so suggests general kafka connectivity instability. So we are getting periodic connectivity/timeouts on commit to confluent/kafka broker. in that case the PC processor cannot recover and (ideally) dies. So remediation is health-check to restart the k8s pod, (new PC processors start up, attempt to rejoin kafka consumer group, etc). setup -
concurrency 64, single topic consumer, etc. partitions - 128 issue occurs clustered (sometimes one PC consumer, sometimes multiple pods or topics) appears related to general kafka connectivity/stability. kafka cluster is a large shared cluster Logs -
and in some cases
other cases is the |
It seems acceptable for PC processor to close during kafka connectivity failure. The bigger problem in this case is failure to close successfully. we hit this error state due to connectivity failure - Line 808 in ab277c9
now it attempts doClose(timeout) Line 631 in ab277c9
it attempts to drain i can see from JDWP on hung process - This state appears permanently hung and cannot be discovered with healthcheck on AbstractParallelEoSStreamProcessor.isClosedOrFailed() |
i can also note that this invocation of |
running with additional logging - when stuck it looks like this happens during shutdown attempt -
sometimes that last unknown error in brokerPollSystem is a commit timeout instead so probably there are some timeout adjustments or handling user function thread interrupt that we can make to improve our chances of clean shutdown, but i think it should never be able to reach a state of RUNNING/DRAINING when the critical control loop threads and futures are terminated. maybe it can do execectorservice.shutdownNow() after the grace period and give up on unresponsive threads and go to CLOSED state or something like that? |
and probably if |
to sum up -
what should happen -
|
here's a naive fix working for us in production - https://github.com/confluentinc/parallel-consumer/compare/master...gtassone:parallel-consumer:gt-fixclose?expand=1 |
here's some logs from fix - note multiple sequence of kafka client exceptions that would have caused
|
@rkolesnev i opened that as a PR - feel free to take over that or suggest any alternative #818 |
Thank you @gtassone - I have started to map out a more deeper fix - but i am really struggling with time - i think the suggested PR is a good stop-gap solution to at least let it close completely and avoid getting stuck. |
Fixes confluentinc#809 Co-authored-by: Roman Kolesnev <[email protected]>
@rkolesnev - updated PR - addressed comments, squash rebased, made my github user as author. lmk any additional changes needed. |
thanks @rkolesnev - how does your release process work? can we expect a patch release for this feature? |
Seems it got closed again automatically/incorrectly due to PR hooks - reopening. |
@gtassone - yep - i think it is worthwhile to cut a release with this fix - @wushilin is working on another angle for same issue for Sync Commits - to add option to retry them - as those are Sync commits and processing is halted - i think it would be nice to have an option to do so. Release process is pretty informal - we try to bundle enhancements / fixes / dependency updates into release - so not doing daily / weekly etc build - but for important fixes or enhancements - would do the release sooner rather than later - process is fairly automated. |
@gtassone - this is released now in 0.5.3.1 |
Our applications in GKE began to have sporadic timeouts with following exceptions started 6/7/2024, and we ruled out code problems and infrastructural problems and we also tested the newest version of parallel consumer without success, we do not know what's going on
java.util.concurrent.ExecutionException: io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=5d9ed827-0261-4a06-907c-4010bfe919c8, requestedAtMs=1718620540268)
at java.base/java.util.concurrent.FutureTask.report(Unknown Source)
at java.base/java.util.concurrent.FutureTask.get(Unknown Source)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.waitForClose(AbstractParallelEoSStreamProcessor.java:559)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:539)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.close(AbstractParallelEoSStreamProcessor.java:518)
at io.confluent.parallelconsumer.internal.DrainingCloseable.closeDontDrainFirst(DrainingCloseable.java:61)
at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerContainer.close(ParallelConsumerContainer.java:47)
at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerGroup.lambda$stop$1(ParallelConsumerGroup.java:99)
at java.base/java.lang.Iterable.forEach(Unknown Source)
at com.zoominfo.personprocessing.boot.conf.listeners.ParallelConsumerGroup.stop(ParallelConsumerGroup.java:97)
at org.springframework.context.support.DefaultLifecycleProcessor.doStop(DefaultLifecycleProcessor.java:248)
at org.springframework.context.support.DefaultLifecycleProcessor.access$300(DefaultLifecycleProcessor.java:54)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.stop(DefaultLifecycleProcessor.java:374)
at org.springframework.context.support.DefaultLifecycleProcessor.stopBeans(DefaultLifecycleProcessor.java:207)
at org.springframework.context.support.DefaultLifecycleProcessor.onClose(DefaultLifecycleProcessor.java:130)
at org.springframework.context.support.AbstractApplicationContext.doClose(AbstractApplicationContext.java:1070)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.doClose(ServletWebServerApplicationContext.java:174)
at org.springframework.context.support.AbstractApplicationContext.close(AbstractApplicationContext.java:1024)
at org.springframework.boot.SpringApplicationShutdownHook.closeAndWait(SpringApplicationShutdownHook.java:145)
at java.base/java.lang.Iterable.forEach(Unknown Source)
at org.springframework.boot.SpringApplicationShutdownHook.run(SpringApplicationShutdownHook.java:114)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=5d9ed827-0261-4a06-907c-4010bfe919c8, requestedAtMs=1718620540268)
at io.confluent.parallelconsumer.internal.InternalRuntimeException.msg(InternalRuntimeException.java:23)
at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commitAndWait(ConsumerOffsetCommitter.java:154)
at io.confluent.parallelconsumer.internal.ConsumerOffsetCommitter.commit(ConsumerOffsetCommitter.java:74)
at io.confluent.parallelconsumer.internal.BrokerPollSystem.retrieveOffsetsAndCommit(BrokerPollSystem.java:346)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.commitOffsetsThatAreReady(AbstractParallelEoSStreamProcessor.java:1231)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.doClose(AbstractParallelEoSStreamProcessor.java:622)
at io.confluent.parallelconsumer.internal.AbstractParallelEoSStreamProcessor.lambda$supervisorLoop$10(AbstractParallelEoSStreamProcessor.java:751)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
... 1 common frames omitted
The text was updated successfully, but these errors were encountered: