Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix broken error handling around Idempotent producer + Ensure strict ordering when Net.MaxOpenRequests = 1 #2943

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
210 changes: 181 additions & 29 deletions async_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,7 @@
sequenceNumber int32
producerEpoch int16
hasSequence bool
hasBeenBatched bool
}

const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc.
Expand All @@ -220,7 +221,7 @@
return size
}

func (m *ProducerMessage) clear() {

Check failure on line 224 in async_producer.go

View workflow job for this annotation

GitHub Actions / Linting with Go 1.22.x

func `(*ProducerMessage).clear` is unused (unused)
m.flags = 0
m.retries = 0
m.sequenceNumber = 0
Expand Down Expand Up @@ -249,6 +250,19 @@
type ProducerErrors []*ProducerError

func (pe ProducerErrors) Error() string {
if len(pe) > 0 {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this ever actually be produced with zero messages?

If it’s unlikely to ever happen with len(pe) == 0 then that should be the guard condition, and then the complex error message should be the unindented path.

return fmt.Sprintf(
"kafka: Failed to deliver %d messages, sample error: %v after %d retries on topic: %s and partition: %d with producerEpoch: %d and sequence: %d(%v)",
len(pe),
pe[0].Err,
pe[0].Msg.retries,
pe[0].Msg.Topic,
pe[0].Msg.Partition,
pe[0].Msg.producerEpoch,
pe[0].Msg.sequenceNumber,
pe[0].Msg.hasSequence,
)
}
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe))
}

Expand Down Expand Up @@ -695,6 +709,9 @@
// All messages being retried (sent or not) have already had their retry count updated
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer.
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
if msg.hasSequence {
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://go.dev/wiki/CodeReviewComments#dont-panic

Is the condition here so bad that we need to panic? (That is, is it entirely unrecoverable?)

}
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
Expand Down Expand Up @@ -737,6 +754,13 @@
}

for _, msg := range pp.retryState[pp.highWatermark].buf {
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 {
if msg.hasSequence {
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them")
}
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition)
msg.hasSequence = true
}
pp.brokerProducer.input <- msg
}

Expand Down Expand Up @@ -800,6 +824,7 @@

// Count the in flight requests to know when we can close the pending channel safely
wg.Add(1)

// Capture the current set to forward in the callback
sendResponse := func(set *produceSet) ProduceCallback {
return func(response *ProduceResponse, err error) {
Expand All @@ -823,19 +848,66 @@
}
}

// Use AsyncProduce vs Produce to not block waiting for the response
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
// https://kafka.apache.org/protocol#protocol_network
err := broker.AsyncProduce(request, sendResponse)
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
// Callback is not called when using NoResponse
if p.conf.Producer.RequiredAcks == NoResponse {
// Provide the expected nil response
sendResponse(nil, nil)
if p.conf.Producer.Idempotent || p.conf.Net.MaxOpenRequests == 1 {
// Idempotency being enabled or MaxOpenRequests == 1 are special cases where
// the caller has communicated: I really care about ordering. As a result, we
// we do our best to guarantee that by using the synchronous (blocking) version
// of produce so that there is only one outstanding Produce request at any given
// time. A few notes:
//
// 1. We shouldn't *have* to do this since that is what p.conf.Net.MaxOpenRequests
// is for. However, as noted in the comments of the P.R that introduced pipeline
// in the first place, the implementation of pipelining is such that
// p.conf.net.MaxOpenRequests is no longer strictly honored, so we have to do it
// this way if we want to maintain strict ordering guarantees. P.R that introduced
// request pipelining: https://github.com/IBM/sarama/pull/2094
//
//
// 2. In theory when idempotency is enabled, it *should* be possible to maintain
// strict ordering with concurrent produce requests as long as all the produce
// requests are received by the broker in the correct order (say, by sending them
// in the right order over a single connection). However, the current implementation
// of this library is such that idempotency breaks in certain error/retry scenarios
// when request pipelining is enabled. This can manifest as either the assertion in
// `produce_set.go` firing and complaining that a batch was created that contains out
// of sequence messages (https://github.com/IBM/sarama/issues/2803), or the Kafka
// broker itself detecting that the ordering guarantees have been violated and
// rejecting the batch, effectively causing data loss since there is no way for the
// client to retry or handle this error without risk of introducing duplicates:
// https://github.com/IBM/sarama/issues/2619. I wish I could say with certainty
// exactly *why* this happens, but after two days of strenous debugging I still can't

Check failure on line 878 in async_producer.go

View workflow job for this annotation

GitHub Actions / Linting with Go 1.22.x

`strenous` is a misspelling of `strenuous` (misspell)
// say for sure other than this file was written with a lot of assumptiosn and things
// that happened to work because there was no request pipelining and the best path
// forward is to just make sure there is no request pipelining when strict ordering
// is required.
resp, err := broker.Produce(request)
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
// Callback is not called when using NoResponse
if p.conf.Producer.RequiredAcks == NoResponse {
// Provide the expected nil response
sendResponse(nil, nil)
} else {
sendResponse(resp, nil)
}
} else {
// Use AsyncProduce vs Produce to not block waiting for the response
// so that we can pipeline multiple produce requests and achieve higher throughput, see:
// https://kafka.apache.org/protocol#protocol_network
err := broker.AsyncProduce(request, sendResponse)
if err != nil {
// Request failed to be sent
sendResponse(nil, err)
continue
}
// Callback is not called when using NoResponse
if p.conf.Producer.RequiredAcks == NoResponse {
// Provide the expected nil response
sendResponse(nil, nil)
}
}
}
// Wait for all in flight requests to close the pending channel safely
Expand Down Expand Up @@ -974,6 +1046,11 @@
continue
}
}

if bp.parent.conf.Producer.Idempotent && !msg.hasSequence {
panic("msg made it to brokerProducer goroutine without being sequenced while idempotency is enabled")
}

if err := bp.buffer.add(msg); err != nil {
bp.parent.returnError(msg, err)
continue
Expand Down Expand Up @@ -1101,7 +1178,7 @@
bp.parent.returnSuccesses(pSet.msgs)
// Retriable errors
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
if bp.parent.conf.Producer.Retry.Max <= 0 {
bp.parent.abandonBrokerConnection(bp.broker)
bp.parent.returnErrors(pSet.msgs, block.Err)
Expand Down Expand Up @@ -1134,32 +1211,55 @@

switch block.Err {
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition,
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend:
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError:
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n",
bp.broker.ID(), topic, partition, block.Err)
if bp.currentRetries[topic] == nil {
bp.currentRetries[topic] = make(map[int32]error)
}
bp.currentRetries[topic][partition] = block.Err
if bp.parent.conf.Producer.Idempotent {
// I don't really understand why we need a dedicated goroutine for this, but it was like
// that before.
go bp.parent.retryBatch(topic, partition, pSet, block.Err)
} else {
bp.parent.retryMessages(pSet.msgs, block.Err)
// dropping the following messages has the side effect of incrementing their retry count.
//
// Note, I'm not 100% sure why this is here since we're not about to call bp.rollover(),
// however, it's very important that this function is not called in the idempotency path
// because it would break apart the messages in an existing (sequenced) batch and the
// messages could end up split across multiple batches or in a different order which
// violates how the idempotency protocol works and will cause an assertion to fire in the
// client (dropping the messages) or for the Broker to reject the messages. Either way, its
// data loss.
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
}
// dropping the following messages has the side effect of incrementing their retry count
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err)
}
})
}
}

func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) {
// Due to how this library is structured / implemented, retryBatch can end up getting called in
// a very tight loop in some scenarios (like when a Broker is down and the library is trying to
// re-establish a connection). As a result, we need to make sure that retryBatch honors the
// configured Retry.Backoff just like the regular message retries do.
time.Sleep(p.conf.Producer.Retry.Backoff)

Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr)
produceSet := newProduceSet(p)
produceSet.msgs[topic] = make(map[int32]*partitionSet)
produceSet.msgs[topic][partition] = pSet
produceSet.bufferBytes += pSet.bufferBytes
produceSet.bufferCount += len(pSet.msgs)
// Once a producer ID and epoch have been assigned to messages, they can *never* be changed
// in order for the idempotency protocol between the client and Broker to be correct. As a
// result, we make sure that we copy over the old producer ID and epoch (older versions of
// this library did not do this which resulted in bugs where the client would issue batches
// with new producer ID / epochs, but with old sequence numbers which makes no sense).
produceSet.producerID = pSet.recordsToSend.RecordBatch.ProducerID
produceSet.producerEpoch = pSet.recordsToSend.RecordBatch.ProducerEpoch
for _, msg := range pSet.msgs {
if msg.retries >= p.conf.Producer.Retry.Max {
p.returnErrors(pSet.msgs, kerr)
Expand All @@ -1168,18 +1268,39 @@
msg.retries++
}

// it's expected that a metadata refresh has been requested prior to calling retryBatch
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err)
// Previous versions of this library did not retry the call to p.client.Leader(topic, partition)
// if it failed. That was problematic because it meant during periods of time where we couldn't
// figure out who the leader was (because it had just died, or there was a network problem, or
// whatever) idempotent Produce requests would fail immediately instead of retrying for awhile
// as expected. This retry loop is very important since prematurely (and unnecessarily) failing
// an idempotent batch is ~equivalent to data loss.
succeeded := false
for i := 0; i < p.conf.Producer.Retry.Max; i++ {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest using a different variable name if we’re trying retries/tries rather than indices.

[off-by-one smell] Are we counting retries, or tries? That is, if I’ve asked for 5 retries max, then that’s 6 total tries.

// it's expected that a metadata refresh has been requested prior to calling retryBatch
leader, err := p.client.Leader(topic, partition)
if err != nil {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, attempt: %d of %d\n", topic, partition, err, i, p.conf.Producer.Retry.Max)
for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}

time.Sleep(p.conf.Producer.Retry.Backoff)
continue
}

bp := p.getBrokerProducer(leader)
bp.output <- produceSet
p.unrefBrokerProducer(leader, bp)
succeeded = true
break
}

if !succeeded {
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, no more retries\n", topic, partition)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] Newlines at the end should be unnecessary for loggers? (I mean, this is generally the case, but I don’t know if that is specifically true here.)

Three % verbs are specified but only two arguments are given.

for _, msg := range pSet.msgs {
p.returnError(msg, kerr)
}
return
}
bp := p.getBrokerProducer(leader)
bp.output <- produceSet
p.unrefBrokerProducer(leader, bp)
}

func (bp *brokerProducer) handleError(sent *produceSet, err error) {
Expand All @@ -1193,11 +1314,34 @@
bp.parent.abandonBrokerConnection(bp.broker)
_ = bp.broker.Close()
bp.closing = err

sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
// It is very important that we use retryBatch instead of retryMessages when idempotency
// is enabled. Otherwise batches that have already been sequenced (and potentially even
// sent to Brokers) will get split up and rebuilt in a different way, order, or with the
// messages split across different batches. This is not allowed in the idempotency
// protocol. The expectation on the Broker side is that if the client retries any batches,
// it will send them with exact same messages and sequence numbers as the original attempt,
// otherwise the protocol doesn't work.
if bp.parent.conf.Producer.Idempotent {
// IDK if this needs/should have an external goroutine, but the only other place where
// retryBatch() was called did it this way, so I did the same for posterity.
go bp.parent.retryBatch(topic, partition, pSet, ErrKafkaStorageError)
} else {
bp.parent.retryMessages(pSet.msgs, err)
}
})

// Everything in bp.buffer needs to be retried because we're about to call bp.rollOver()
// which will clear bp.buffer. Why we need to call bp.rollOver() here I'm not sure.
bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) {
bp.parent.retryMessages(pSet.msgs, err)
// Same comment as above.
if bp.parent.conf.Producer.Idempotent {
// Same comment as above.
go bp.parent.retryBatch(topic, partition, pSet, ErrKafkaStorageError)
} else {
bp.parent.retryMessages(pSet.msgs, err)
}
})
bp.rollOver()
}
Expand Down Expand Up @@ -1292,7 +1436,6 @@
p.bumpIdempotentProducerEpoch()
}

msg.clear()
pErr := &ProducerError{Msg: msg, Err: err}
if p.conf.Producer.Return.Errors {
p.errors <- pErr
Expand All @@ -1311,14 +1454,23 @@
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) {
for _, msg := range batch {
if p.conf.Producer.Return.Successes {
msg.clear()
p.successes <- msg
}
p.inFlight.Done()
}
}

func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) {
if p.conf.Producer.Idempotent && msg.hasSequence && msg.hasBeenBatched {
// If the message has had a sequence number assigned *and* it's
// been added to a batch (produce_set.go) then it is illegal to
// use the retryMessage function on it because that means the
// calling function is breaking apart and retrying messages from
// a batch that was already potentially sent to a Kafka broker
// which is a violation of the idempotency protocol.
panic("retrying msg with sequence that has been batched before")
}

if msg.retries >= p.conf.Producer.Retry.Max {
p.returnError(msg, err)
} else {
Expand Down
16 changes: 16 additions & 0 deletions produce_set.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,26 @@ func (ps *produceSet) add(msg *ProducerMessage) error {

if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) {
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence {
Logger.Println(
"assertion failed: message out of sequence added to batch",
"producer_id",
ps.producerID,
set.recordsToSend.RecordBatch.ProducerID,
"producer_epoch",
ps.producerEpoch,
set.recordsToSend.RecordBatch.ProducerEpoch,
"sequence_number",
msg.sequenceNumber,
set.recordsToSend.RecordBatch.FirstSequence,
"buffer_count",
ps.bufferCount,
"msg_has_sequence",
msg.hasSequence)
Comment on lines +89 to +103
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend leaving the log message on the same line as the method call, so that it’s easily findable via grep, and otherwise one-line isolates well.

Additionally, if the line is long enough to break up, then the possibility of adding even more fields is high, so each entry should end with a comma and a newline, so adding new fiels to the end of the call don’t produce unnecessary line changes, where the only change is in punctuation due to syntax requirements.

Then I would pair up each log field name with the log field value, all together:

			Logger.Println("assertion failed: message out of sequence added to batch",
				"producer_id", ps.producerID, set.recordsToSend.RecordBatch.ProducerID,
				"producer_epoch", ps.producerEpoch, set.recordsToSend.RecordBatch.ProducerEpoch,
				"sequence_number", msg.sequenceNumber, set.recordsToSend.RecordBatch.FirstSequence,
				"buffer_count", ps.bufferCount,
				"msg_has_sequence", msg.hasSequence,
			)

return errors.New("assertion failed: message out of sequence added to a batch")
}
}

msg.hasBeenBatched = true
// Past this point we can't return an error, because we've already added the message to the set.
set.msgs = append(set.msgs, msg)

Expand Down
Loading