Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Handling Partition Revocation in Parallel-Consumer Leading to Duplicate Event Processing #777

Open
ajax-levashov-m opened this issue May 30, 2024 · 2 comments
Labels
enhancement New feature or request

Comments

@ajax-levashov-m
Copy link

During the investigation of the parallel-consumer with reactor, we observed the following behaviour:

  1. The consumer starts processing an event from a partition.
  2. While the event is being processed, the partition gets revoked, and the current progress (highest sequential offset & metadata with out of order offsets) gets committed.
  3. The processing of the event continues and eventually completes. However, when attempting to update the state for the partition, it is discovered that the partition is now in a RemovedPartitionState due to the revocation. As a result, the state update is ignored.
  4. Subsequently, the same partition is reassigned to the same consumer. The consumer reads the latest state from the committed offset, but since the previous state update was ignored (step 3), the state is stale and does not reflect the processing that was completed.

Related Logs: https://gist.github.com/ajax-levashov-m/29efe07beff585d659ba4d562622721f

Due to this behaviour, event processing is performed twice. The first processing attempt is effectively discarded because the partition state update is ignored after the revocation. When the partition is reassigned, the consumer starts processing from the previously committed offset, resulting in the event being processed again.

Question:
Are there any ways to mitigate this behaviour and use parallel-consumer for non-idempotent workloads? It would be beneficial to have a mechanism for the consumer to complete the processing of in-flight messages within a defined timeout before revoking the partition, or at least cancel the current processing when it involves reactor processor.

@rkolesnev
Copy link
Member

rkolesnev commented May 30, 2024

Hi @ajax-levashov-m,
You are correct events that are inflight - started processing but didn't finish processing at the moment of partition revocation - will be reprocessed. That is really a default / expected behaviour of Kafka Consumer applications in general.

It would be potentially possible to add some delay to allow to finish inflight message processing during revocation - but that has only minor benefit with a cost of increased complexity as it would only reduce potential for duplicate processing on clean revocation / shutdown - either slower processing exceeding the timeout or crash would still cause duplicate processing so system overall needs to be able to cope with it.

It would be complex enough change - to handle the grace period for completing inflight messages - ParallelConsumer on partition revocation needs to stop submitting work for processing only from revoked partitions - while still allowing new work from non-revoked partitions to be submitted for processing and then scan inflight messages to find those that belong to revoked partitions specifically to monitor their completion.

If you are willing to draft the changes required - i am happy to review / see if its possible to implement this cleanly enough.

@rkolesnev rkolesnev added the enhancement New feature or request label May 30, 2024
@ajax-levashov-m
Copy link
Author

Thank your for the detailed explanation! Unfortunately I don't have the time to draft the required changes at the moment. The thing is that it was a bit confusing switching from reactor-kafka to parallel-consumer and seeing lots of duplicates to appear each time we redeploy our service. So I was curious if there was an existing configuration option to handle partition revocation gracefully that I might have missed. Since there isn't one, and given the complexity involved, ensuring our message processing is idempotent seems to be the way to go.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants