Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Seeking to a specific offset for a partition #782

Open
ebrockman1 opened this issue May 30, 2024 · 5 comments
Open

Seeking to a specific offset for a partition #782

ebrockman1 opened this issue May 30, 2024 · 5 comments
Labels
enhancement New feature or request

Comments

@ebrockman1
Copy link

ebrockman1 commented May 30, 2024

The Kafka consumer client has the functionality to seek to a specific offset for a partition--see Controlling The Consumer's Position.

The current parallel consumer state does not allow seeking to a specific offset. We're looking to see if it is possible to do add to this library for us to be able to re-read messages if necessary.

@rkolesnev
Copy link
Member

rkolesnev commented Jun 5, 2024

Hi @ebrockman1 - technically - I think it should be possible to do this - seek to specific start offset to start / restart consumption from it - after all it is using vanilla Kafka Consumer under the hood for polling data from Kafka brokers.

Having said that - it may require working out the exact steps / process for this - i.e. - how would it be triggered, how to handle inflight messages, how to handle offset commits etc.
For example if offset is reset back to 10_000 offsets ago that were already committed as processed - should it recommit to older offsets and basically discard committed state above that offset?
There will be a number of considerations to take into account and decisions to make that may be specific to usage scenario - in some cases resetting state to older offsets completely will make sense - full replay from specific point in time, while in others - won't - re-reading specific messages only (for example for retrying) but not resetting whole state to back in time position.

With vanilla Kafka Consumer all this logic is handled by application and coded as required by the developer - with Parallel Consumer commit logic etc - is part of the library - so it would be a bit harder to anticipate different usage scenarios.

Can you describe in more details what the use case / scenario for seeking to specific offsets / re-reading messages?
Maybe dlq or secondary consumer object outside of main flow would make more sense?

@ebrockman1
Copy link
Author

We have an application that we're implementing the parallel consumer on. We opted to use the parallel consumer because we needed to increase our consumption TPS without the option of increasing the number of partitions for the topic.
Previously, when using the single consumer for lower TPS topics, we could reposition the consumer in case of a production incident. We don't manage the Kafka cluster that we consume from, so we can't change the offsets.retention.minutes of the cluster; it is set at 7 days. We're anticipating a topic with 15k messages per second, so if we had to bring up a new consumer group, we would potentially be re-reading 9B messages. We're also trying to not rely on manipulating the stored offset position in the Kafka cluster because, like previously said, we don't manage the cluster. We're trying to make manipulating the stored offset our last resort option.

To address the DLQ option, we do have a DLQ set up if we fail to write the message to our database. That is our current approach for re-processing messages, but it does not account for regional active-passive architecture or if our code were to incorrectly drop messages rather than writing them to the DLQ.

In the case of our active-passive architecture, we need a way to start from a desired position in the stream rather than the earliest or latest offset. If we use earliest, we would be reading 7 days of messages (unfeasible), and if we use latest, we may be loosing messages based on how cross-region replication is set up for the Kafka cluster. For example, if a message was produced to us-east-1 and replicated to us-west-2 around the same time we moved our Kafka consumer from us-east-1 to us-west-2, we could create a race condition on if we actually pickup that message--do we switch regions first or is the message replicated first? In a different scenario, we move our consumer from one region to another (ex. us-east-1 to us-west-2) less than 7 days from when it was last in the passive region, so the consumer group will start consuming from the last known offset which may be days old. In that case, we could just create a new consumer group to resolve that issue, but then we're back to the race condition with the auto.offset.reset = earliest.

I would think that the implementation of how this could be done with the parallel consumer could mirror how it is done for the vanilla Kafka consumer. If the vanilla Kafka consumer re-commits the offset to discard the prior stores state in the Kafka cluster, the parallel consumer should do the same to mimic the behavior; if the vanilla Kafka consumer does not re-commit the offset and restarting the consumer without a position specific would default to the original offset stored in the cluster, the parallel consumer should do the same. For different scenarios, keeping the parallel consumer's behavior as close as possible to the vanilla Kafka consumer is probably the best route.

@rkolesnev
Copy link
Member

rkolesnev commented Jun 6, 2024

Ok, it does make sense.

One thing to clarify - would the offset / position reset be performed as part of manual runbook / intervention rather than automated logic?
In that case you could write / use a simple tool or even code it as part of ParallelConsumer application (not inside Parallel Consumer library but application wrapping / using it).
Steps would be spin down parallel consumer
Start normal Kafka Consumer with specific subscription to Topic / Partition set that requires reset - with same Group.ID as Parallel Consumer
Perform Commit with explicit commit data of offsets that you want to reset to (can use offsets for timestamp methods to figure offset based on time window as needed first)
Shut down that consumer
Start Parallel Consumer as normal.

Basically if it is a special case situation - maybe overwriting offsets for Parallel Consumer group.id is not a bad idea?

@rkolesnev
Copy link
Member

@ebrockman1 - what do you think about the approach outlined above?

@ebrockman1
Copy link
Author

I'm going to work on testing this approach in the next couple of weeks. Based on your outline, I think that would fit the bill of what we're trying to do. Our retry logic is through manual intervention, so re-starting the consumer and committing an older offset could be used.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

2 participants