Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implements a KafkaConsumerResource to solve consumer already closed i… #306

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

paualarco
Copy link
Member

@paualarco paualarco commented Oct 3, 2021

Resolves #240 by introducing a new class KafkaConsumerResource which essentially is an improved way of creating and closing a KafkaConsumer than directly form KafkaConsumerObservable, the problem with the latter one was on the last element of the observable, the consumer was closed after emitting the last element and signaling on complete, which lead to an exception when trying to commit the last consumer record.
This is fixed with the resource usage, since the consumer will not be closed after the latest element is consumed.

…ssue

Reproducible test


KafkaConsumerResourceSpec


Scaladocs




Revert unwanted changes


a
@paualarco paualarco force-pushed the bugfix/consumer-already-closed branch 2 times, most recently from 3f4b035 to fd067b2 Compare October 3, 2021 17:58
Trigger ci

1
Remove printline


Update scala version build
Refinement


Bring back serialization test
@paualarco paualarco force-pushed the bugfix/consumer-already-closed branch from 1e0cb17 to 5e238d1 Compare October 3, 2021 19:53
@paualarco paualarco marked this pull request as ready for review October 27, 2021 20:42
@jvican
Copy link

jvican commented Nov 4, 2021

I can review this if you're interested :)

@paualarco
Copy link
Member Author

I can review this if you're interested :)

Thanks, that would be helpful! I've updated the description :)
Although in order to proceed I think we would need approval from one maintainer too.

@Avasil
Copy link
Collaborator

Avasil commented Nov 28, 2021

Hi @paualarco sorry for ignoring this

Looking at the bug example:

    KafkaConsumerObservable
      .manualCommit[String, String](kafkaConfig, List(topic))
      .timeoutOnSlowUpstreamTo(5.seconds, Observable.empty)
      .foldLeft(CommittableOffsetBatch.empty) { case (batch, message) => batch.updated(message.committableOffset) }
      .mapEval(completeBatch => completeBatch.commitAsync())
      .headOrElseL(List.empty)

I feel like the real issue is that completeBatch.commitAsync() returns before the commit is completed? IIRC the underlying consumer is not closed until the downstream completes and it feels like it should complete after the last commit is done. Is the issue reproducible with these changes #250 ?

@paualarco
Copy link
Member Author

@Avasil no problem,

I don't think that's the case, so the poll heartbeat consumer would fix the problem of partition reassignment by rebalancing, which was caused by slow downstream consumers.
However in this case the issue is different, it happens to be that when the observable is cancelled (in the example because it is reaching a certain timeout), the consumer is being closed too, but that's not desirable when there are elements that have not yet been committed. Thus, when they do, they return an error indicating that the consumer was already closed.

It gets fixed when the resource is responsible of closing the consumer, and not from the Observable onComplete.

For the poll heartbeat to have been caused this issue, the max.poll.interval.ms would have needed to be higher than 300sec (as the default value), but the test were not as high :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Failing to commit offset at the end of the Observable
3 participants