Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record being picked up by multiple threads simultaneously #843

Open
singhsaurabh2409 opened this issue Dec 4, 2024 · 5 comments
Open

Record being picked up by multiple threads simultaneously #843

singhsaurabh2409 opened this issue Dec 4, 2024 · 5 comments
Labels
wait for info Waiting for additional info from user

Comments

@singhsaurabh2409
Copy link

I am using the library v5.3.0 and i noticed that same record is being picked up by different threads of the same pod almost at the same time. It is causing issue while processing them as I am maintaining the counter of the records processed and that count becomes more than the total records as few records being counted twice.

@rkolesnev
Copy link
Contributor

Hi @singhsaurabh2409,

That doesn't sound right at all - are the records failing and being retried?
Or is it reprocessing due to rebalancing or smth like that?

Otherwise if same record is actually picked more than once for processing in normal execution scenario - that would be a significant bug.

Do you have more information, ideally test / sample code that reproduces the behaviour?

@singhsaurabh2409
Copy link
Author

Hi @rkolesnev,

  1. No, all record were processed successfully in the first attempt. There was no retry.
  2. There was no rebalancing within that time duration when the record was picked up and processed completely.
  3. There was no sudden peak in the load when this record was processed.
  4. We are using ordering = KEY , commit mode = SYNC , concurrency = 16 and default commit interval.

The situation is like this -
We are trying to process bunch of tasks so we publish them on kafka topic by their groupId and taskId.
Any kafka record can be uniquely identified by the combination of groupId+taskId. The scenarion when it happened there were 10 tasks in this group.

Logs are something like this- . Both threads belong to same pod.
Nov 27, 2024 @ 18:20:00.617 | task with groupId[10]-task[4] was started successfully | pc-pool-1-thread-2
Nov 27, 2024 @ 18:20:00.687 | task with groupId[10]-task[4] was started successfully | pc-pool-1-thread-12
Nov 27, 2024 @ 18:20:01.454 | task with groupId[10]-task[4] was processed successfully | pc-pool-1-thread-12
Nov 27, 2024 @ 18:20:01.544 | task with groupId[10]-task[4] was processed successfully | pc-pool-1-thread-2

We have not been able to reproduce it locally yet.

@rkolesnev
Copy link
Contributor

rkolesnev commented Dec 10, 2024

Hi @singhsaurabh2409,

Couple of things that may help shed some light on this:

  • is parallel consumer consuming from single topic?
  • how are the messages keyed and partitioned on source topic?
  • any way you could check those messages on the Kafka topic - anywhere offsets, partitions for the messages anything like that logged allowing to find those messages?
  • could the duplication be on the incoming topic - i.e. - same group / task message actually produced upstream twice?

@rkolesnev
Copy link
Contributor

rkolesnev commented Dec 10, 2024

I've went through the code and I do not see a possibility of duplicate processing of a message - work retrieval is single threaded (through control loop thread) and uses iterator + inflight flag to guard against getting same messages -
before message is queued to thread pool for execution - wc.inflight is set to true and only set to false on completion (either success or failure) and after it is removed from the shard.

For the above scenario to happen - same work container would need to be submitted more than once into that queue by single thread that is pulling the work from shards - there is no concurrent processing of work selection itself so no race conditions on that code path.
Maybe I am missing something - but right now I do not see how same message / work container could be picked for work more than once at the same time (i.e. excluding retries on errors etc which is not the case here).

Only explanation I can think of - is that somehow that task got produced to topic more than once upstream - and it would have to be with a different Key or on different Partition - as otherwise it would be blocked from being picked for work due to Key ordering mode.

@rkolesnev rkolesnev added the wait for info Waiting for additional info from user label Dec 10, 2024
@singhsaurabh2409
Copy link
Author

Hi @singhsaurabh2409,

Couple of things that may help shed some light on this:

  • is parallel consumer consuming from single topic?
  • how are the messages keyed and partitioned on source topic?
  • any way you could check those messages on the Kafka topic - anywhere offsets, partitions for the messages anything like that logged allowing to find those messages?
  • could the duplication be on the incoming topic - i.e. - same group / task message actually produced upstream twice?

Hi @rkolesnev,

  • Yes, the consumer was consuming from single topic.
  • There are 20 partitions in the topic in which the issue occurred. The key is selected to be the item id for which the tasks are created and there are sufficient number of unique keys.
  • We were not logging the partition and offset before processing the message but we logged the few fields in the kafka event body.
  • Yes, we thought of this case, but using the logged fields we were able to find a single message in kafka.

We have added detailed logging so that in case its reproduced again, we will have more data to pin point the issue.
Will keep you posted.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
wait for info Waiting for additional info from user
Projects
None yet
Development

No branches or pull requests

2 participants