Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kgo sink: fix read/write race for recBatch.canFailFromLoadErrs #786

Merged
merged 1 commit into from
Jul 29, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 14 additions & 1 deletion pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -714,6 +714,8 @@ func (s *sink) handleReqRespBatch(

// Since we have received a response and we are the first batch, we can
// at this point re-enable failing from load errors.
//
// We do not need a lock since the owner is locked.
batch.canFailFromLoadErrs = true

// By default, we assume we errored. Non-error updates this back
Expand Down Expand Up @@ -1294,6 +1296,10 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
batch0 := recBuf.batches[0]
batch0.tries++

// We need to lock the batch as well because there could be a buffered
// request about to be written. Writing requests only grabs the batch
// mu, not the recBuf mu.
batch0.mu.Lock()
var (
canFail = !recBuf.cl.idempotent() || batch0.canFailFromLoadErrs // we can only fail if we are not idempotent or if we have no outstanding requests
batch0Fail = batch0.maybeFailErr(&recBuf.cl.cfg) != nil // timeout, retries, or aborting
Expand All @@ -1303,6 +1309,8 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {

willFail = canFail && (batch0Fail || !netErr && (!retryableKerr || retryableKerr && isUnknownLimit))
)
batch0.isFailingFromLoadErr = willFail
batch0.mu.Unlock()

recBuf.cl.cfg.logger.Log(LogLevelWarn, "produce partition load error, bumping error count on first stored batch",
"broker", logID(recBuf.sink.nodeID),
Expand All @@ -1316,6 +1324,7 @@ func (recBuf *recBuf) bumpRepeatedLoadErr(err error) {
"is_unknown_limit", isUnknownLimit,
"will_fail", willFail,
)

if willFail {
recBuf.failAllRecords(err)
}
Expand Down Expand Up @@ -1406,6 +1415,10 @@ type recBatch struct {
// request with this batch, and then reset it to true whenever we
// process a response.
canFailFromLoadErrs bool
// If we are going to fail the batch in bumpRepeatedLoadErr, we need to
// set this bool to true. There could be a concurrent request about to
// be written. See more comments below where this is used.
isFailingFromLoadErr bool

wireLength int32 // tracks total size this batch would currently encode as, including length prefix
v1wireLength int32 // same as wireLength, but for message set v1
Expand Down Expand Up @@ -1958,7 +1971,7 @@ func (p *produceRequest) AppendTo(dst []byte) []byte {
for partition, batch := range partitions {
dst = kbin.AppendInt32(dst, partition)
batch.mu.Lock()
if batch.records == nil { // concurrent failAllRecords
if batch.records == nil || batch.isFailingFromLoadErr { // concurrent failAllRecords OR concurrent bumpRepeatedLoadErr
if flexible {
dst = kbin.AppendCompactNullableBytes(dst, nil)
} else {
Expand Down