-
-
Notifications
You must be signed in to change notification settings - Fork 196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Deadlock in Produce() / TryProduce() when kgo.MaxBufferedBytes() is configured #777
Comments
Good find. Offhand, I think the fix is to convert this to a sync.Cond and have every waiter be notified and inspect the conditions, but I'll have to stare a bit at how to do this. |
Closed
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
If you're able to see why #787 fails in CI, that may help accelerate a fix here. So far, the error is not reproducing locally for me. |
twmb
added a commit
that referenced
this issue
Jul 22, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes; the fix for #777 could not really be possible unless we avoid counting them. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
twmb
added a commit
that referenced
this issue
Jul 29, 2024
Copying from the issue, """ 1) Produce() record A (100 bytes) 2) Produce() record B (50 bytes), waiting for buffer to free 3) Produce() record C (50 bytes), waiting for buffer to free 4) Record A is produced, finishRecordPromise() gets called, detects it was over the limit so publish 1 message to waitBuffer 5) Record B is unlocked, finishRecordPromise() gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely on waitBuffer """ The fix requires adding a lock while producing. This reuses the existing lock on the `producer` type. This can lead to a few more spurious wakeups in other functions that use this same mutex, but that's fine. The prior algorithm counted anything to produce immediately into the buffered records and bytes fields; the fix for #777 could not really be possible unless we avoid counting the "buffered" aspect right away. Specifically, we need to have a goroutine looping with a sync.Cond that checks *IF* we add the record, will we still be blocked? This allows us to wake up all blocked goroutines always (unlike one at a time, the problem this issue points out), and each goroutine can check under a lock if they still do not fit. This also fixes an unreported bug where, if a record WOULD be blocked but fails early due to no topic / not in a transaction while in a transactional client, the serial promise finishing goroutine would deadlock. Closes #777.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
In Mimir we found a deadlock in
Produce()
andTryProduce()
whenkgo.MaxBufferedBytes()
is configured and the limit hit.The gist of the root cause of such deadlock is that there's no guarantee that all goroutines waiting for
p.waitBuffer
will be released, because there are conditions under which the number of messages sent top.waitBuffer
channel is different than the number of goroutines waiting for it.Details
Produce()
andTryProduce()
wait forp.waitBuffer
when the buffer limit is reached:franz-go/pkg/kgo/producer.go
Lines 439 to 449 in a5f2b71
A message is published to
p.waitBuffer
each time a record completes, if the client detect that such record was over the bytes limit:franz-go/pkg/kgo/producer.go
Lines 520 to 534 in a5f2b71
In case of "max buffered records" (which does not suffer this issue) each record has the same "weight": each record accounts for +1 in the
p.bufferedRecords
accumulator. However, in the case of "max buffered bytes" limit, different records don't have the same "weight", because each record has a differentuserSize()
. This means that the computation ofwasOverMaxBytes
is screwed up, depending on which record gets completed first.Example
Let's assume we set
kgo.MaxBufferedBytes()
to 100 bytes and the following workflow happen in order:Produce()
record A (100 bytes)Produce()
record B (50 bytes), waiting for buffer to freeProduce()
record C (50 bytes), waiting for buffer to freefinishRecordPromise()
gets called, detects it was over the limit so publish 1 message towaitBuffer
finishRecordPromise()
gets called, does not detect it was over the limit (only 50 bytes), so record C is never unblocked and will wait indefinitely onwaitBuffer
Reproduction test
I've written a test reproducing the deadlock:
https://github.com/twmb/franz-go/compare/pracucci:reproduce-produce-deadlock
The text was updated successfully, but these errors were encountered: