Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ParallelConsumer would run for a while and then exit due to InternalRuntimeException(Timeout) #833

Open
dumontxiong opened this issue Sep 16, 2024 · 8 comments

Comments

@dumontxiong
Copy link

dumontxiong commented Sep 16, 2024

Hi team,
version 0.5.3.1

InternalRuntimeException:
My test scenario is a scenario where 50% of records fail, and there's 1000 keys in total, parallelConsumer would run for a while and then exit due to InternalRuntimeException at 24/09/13 21:33:37.130
io.confluent.parallelconsumer.internal.InternalRuntimeException: Timeout waiting for commit response PT30S to request ConsumerOffsetCommitter.CommitRequest(id=79c3ac04-b8c7-4dc2-9b09-c77d6ad6bee4, requestedAtMs=1726234432425)
Screenshot 2024-09-16 at 15 24 18

And we can see the code from ConsumerOffsetCommitter.commitAndWait()

CommitResponse take = commitResponseQueue.poll(commitTimeout.toMillis(), TimeUnit.MILLISECONDS); // blocks, drain until we find our response
cause take is null then throw InternalRuntimeException.

metrics from pc_processed_records_total
During this time, there's no successful records:
Screenshot 2024-09-18 at 10 39 16

Adding commit response to queue:
And below logs from ConsumerOffsetCommitter.maybeDoCommit() show the last time add commit response to queue is 24/09/13 21:16:54.105

Screenshot 2024-09-16 at 15 25 49

Waiting on a commit response:

And we can see the code from ConsumerOffsetCommitter.commitAndWait() show the last time wait commit response from queue is 24/09/13 21:33:52.426

Screenshot 2024-09-16 at 15 31 17

Here's my concerns:
First time adding commit response to queue time is 24/09/13 21:16:54.105, and waiting on a commit response time is 24/09/13 21:16:54.084, within 30s
second time there's no adding commit response to queue but waiting on a commit response time is 24/09/13 21:33:39.194.
so it lead to InternalRuntimeException.

please help to check

BRS,
Dumont

@rkolesnev
Copy link
Contributor

Hi @dumontxiong - it is hard to tell if it is an issue or not - it is possible not to get a commit response if something went wrong with Kafka cluster, but without more information I cannot really tell if that was the cause here.
In general with CONSUMER_PERIODIC_SYNC commit mode - there is 2 possible execution modes - depending which thread has requested commit:

  1. if commit is requested on the thread owning the Committer (it is the Broker Poll thread) - then commit happens immediately with blocking commit request.
  2. if commit is requested by a different thread - then it is added to commitRequest queue and thread is waiting for matching response on the commitResponseQueue - the commit processing itself still happens on the broker-poll thread as part of the control loop.

I cannot tell what happened in this case - why response is not there etc - but it should be visible in Debug logs when commit request is picked off the request queue (and if its not picked / processed - then maybe will be something showing why it wasnt).
Can you provide more complete logs? I can see that you have debug logging enabled at least for ConsumerOffsetCommitter so that may help to clarify things.

@rkolesnev
Copy link
Contributor

Or if you can provide a minimal test application or integration test that reproduces this issue - that would be even better :)

@dumontxiong
Copy link
Author

@rkolesnev
this is springboot service,
Here's two. Scheduled tasks,
one is publish records to topic1 with random key(half with THROW_EXCEPTION_FLAG header),
one is publish records to topic2 with key 0 - 999 (half with THROW_EXCEPTION_FLAG header):

 @Scheduled(fixedRate = 2, timeUnit = TimeUnit.MINUTES)
  public void publishEventsNoKey() {
      for (int i = 0; i < 1000; i++) {
         kafkaEvent.setKey(new random().int);
	kafkaEvent.setHeader(THROW_EXCEPTION_FLAG, i%2);
	publisher.publish(kafkaEvent);
        }
}
@Scheduled(fixedRate = 2, timeUnit = TimeUnit.MINUTES)
	public void publishEventsOneThousandKey() {
		for (int i = 0; i < 1000; i++) {
                      kafkaEvent.setKey(i);
                      kafkaEvent.setHeader(THROW_EXCEPTION_FLAG, i%2);
		     publisher.publish(kafkaEvent);
		}
}

Here's two. subscriber , one is subscribed to topic1 the other is subscribed to topic2,and both of them concurrency = 14

ParallelConsumerOptions.ParallelConsumerOptionsBuilder<KafkaEventKey, T> builder = ParallelConsumerOptions.builder().ordering(ProcessingOrder.KEY).maxConcurrency(14).consumer(this.consumer);
ParallelConsumerOptions options = builder.build();
this.parallelStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);
this.parallelStreamProcessor.subscribe("topic1);
this.parallelStreamProcessor.poll((context) -> {
      try {
           Thread.sleep(100);
     } catch (InterruptedException e) {
        //
     }
    if (kafkaevent.getHeader(THROW_EXCEPTION_FLAG) != null && event.getMeta(THROW_EXCEPTION_FLAG, Integer.class) == 0){
      throw new RuntimeException("THROW_EXCEPTION_FLAG_HAPPENED");
     }
});
ParallelConsumerOptions.ParallelConsumerOptionsBuilder<KafkaEventKey, T> builder = ParallelConsumerOptions.builder().ordering(ProcessingOrder.KEY).maxConcurrency(14).consumer(this.consumer);
ParallelConsumerOptions options = builder.build();
this.parallelStreamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);
this.parallelStreamProcessor.subscribe("topic2);
this.parallelStreamProcessor.poll((context) -> {
      try {
           Thread.sleep(100);
     } catch (InterruptedException e) {
        //
     }
    if (kafkaevent.getHeader(THROW_EXCEPTION_FLAG) != null && event.getMeta(THROW_EXCEPTION_FLAG, Integer.class) == 0){
      throw new RuntimeException("THROW_EXCEPTION_FLAG_HAPPENED");
     }
});

And the above issues and metrics is about the subscriber than subscribed topic2(1000 key)

@rkolesnev
Copy link
Contributor

Thank you for the repro code - will run and see if i can reproduce / see where the issue occurs.

@ndqvinh2109
Copy link

Hi @rkolesnev, we got the same issue and PC has been closed unintentionally. We have applied a work-around to try catch the controlLoop function so that the exception won't propagate to supervisorLoop and close the PC entirely. We are looking forward to have a official release if possible. One potential approach we suggest to handle the try/catch mechanism inside the controlLoop during the commit offset.

So how this issue can able to reproduce including these steps below:

  1. Start application and kafka broker are all UP
  2. Publish the event
  3. Consume the event, sleep the main thread
  4. Turn off the kafka broker
  5. After sleep time has over, consumer will try to commit
  6. The timeout error will be thrown
  7. Finally PC closed

Regards,

@rkolesnev
Copy link
Contributor

Hi @ndqvinh2109, @dumontxiong,
So to clarify - the commit timeout happens when broker is unavailable (due to network issues etc) or down entirely?
In that case - i dont really see what Parallel Consumer really do except shutting down / closing? If it cannot commit data - then polling will fail as well and processing anything that was previously polled is just going to cause more side-effects / duplicates...
I don't think that indefinite retry is an answer there either - one possible approach would be to add X number of retries for async commit, potentially reducing timeout for each individual retry itself and making it configurable.
For Synchronous commits - retry logic is already implemented and configurable - see PR #819.
For Asynchronous commits - i propose to reduce timeout on Consumer for commits and do X (for example 3) tries on failure while increasing timeout on Parallel Consumer side for it. That way rolling restarts and similar transient issues while committing will be mitigated, but true Broker or connectivity failure will still cause Parallel Consumer to shutdown and allow work to be rebalanced to potentially healthy instance.
If you have capacity to implement this feature - that would be greatly appreciated, otherwise i will try to fit it into next set of work for Parallel Consumer.

@ndqvinh2109
Copy link

Thanks @rkolesnev for letting me know the retry mechanism has been implemented in latest version. So we adjusted the timeout config and basically the retry happened according to our testing. PC hasn't closed as our expectation.

@rkolesnev
Copy link
Contributor

@dumontxiong - could you please retest on the latest build of ParallelConsumer?
And maybe with adjustment to commit timeout as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants