-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix broken error handling around Idempotent producer + Ensure strict ordering when Net.MaxOpenRequests = 1 #2943
base: main
Are you sure you want to change the base?
Changes from all commits
0a07760
78b02d0
46d19e5
e715c7d
d22f96f
337aab2
b7cc6fc
82e8795
401c990
2d7cdb6
1cf850b
1125d60
b73a5f2
0222838
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -197,6 +197,7 @@ | |
sequenceNumber int32 | ||
producerEpoch int16 | ||
hasSequence bool | ||
hasBeenBatched bool | ||
} | ||
|
||
const producerMessageOverhead = 26 // the metadata overhead of CRC, flags, etc. | ||
|
@@ -220,7 +221,7 @@ | |
return size | ||
} | ||
|
||
func (m *ProducerMessage) clear() { | ||
m.flags = 0 | ||
m.retries = 0 | ||
m.sequenceNumber = 0 | ||
|
@@ -249,6 +250,19 @@ | |
type ProducerErrors []*ProducerError | ||
|
||
func (pe ProducerErrors) Error() string { | ||
if len(pe) > 0 { | ||
return fmt.Sprintf( | ||
"kafka: Failed to deliver %d messages, sample error: %v after %d retries on topic: %s and partition: %d with producerEpoch: %d and sequence: %d(%v)", | ||
len(pe), | ||
pe[0].Err, | ||
pe[0].Msg.retries, | ||
pe[0].Msg.Topic, | ||
pe[0].Msg.Partition, | ||
pe[0].Msg.producerEpoch, | ||
pe[0].Msg.sequenceNumber, | ||
pe[0].Msg.hasSequence, | ||
) | ||
} | ||
return fmt.Sprintf("kafka: Failed to deliver %d messages.", len(pe)) | ||
} | ||
|
||
|
@@ -695,6 +709,9 @@ | |
// All messages being retried (sent or not) have already had their retry count updated | ||
// Also, ignore "special" syn/fin messages used to sync the brokerProducer and the topicProducer. | ||
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 { | ||
if msg.hasSequence { | ||
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://go.dev/wiki/CodeReviewComments#dont-panic Is the condition here so bad that we need to panic? (That is, is it entirely unrecoverable?) |
||
} | ||
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) | ||
msg.hasSequence = true | ||
} | ||
|
@@ -737,6 +754,13 @@ | |
} | ||
|
||
for _, msg := range pp.retryState[pp.highWatermark].buf { | ||
if pp.parent.conf.Producer.Idempotent && msg.retries == 0 && msg.flags == 0 { | ||
if msg.hasSequence { | ||
panic("assertion failure: reassigning producer epoch and sequence number to message that already has them") | ||
} | ||
msg.sequenceNumber, msg.producerEpoch = pp.parent.txnmgr.getAndIncrementSequenceNumber(msg.Topic, msg.Partition) | ||
msg.hasSequence = true | ||
} | ||
pp.brokerProducer.input <- msg | ||
} | ||
|
||
|
@@ -800,6 +824,7 @@ | |
|
||
// Count the in flight requests to know when we can close the pending channel safely | ||
wg.Add(1) | ||
|
||
// Capture the current set to forward in the callback | ||
sendResponse := func(set *produceSet) ProduceCallback { | ||
return func(response *ProduceResponse, err error) { | ||
|
@@ -823,19 +848,66 @@ | |
} | ||
} | ||
|
||
// Use AsyncProduce vs Produce to not block waiting for the response | ||
// so that we can pipeline multiple produce requests and achieve higher throughput, see: | ||
// https://kafka.apache.org/protocol#protocol_network | ||
err := broker.AsyncProduce(request, sendResponse) | ||
if err != nil { | ||
// Request failed to be sent | ||
sendResponse(nil, err) | ||
continue | ||
} | ||
// Callback is not called when using NoResponse | ||
if p.conf.Producer.RequiredAcks == NoResponse { | ||
// Provide the expected nil response | ||
sendResponse(nil, nil) | ||
if p.conf.Producer.Idempotent || p.conf.Net.MaxOpenRequests == 1 { | ||
// Idempotency being enabled or MaxOpenRequests == 1 are special cases where | ||
// the caller has communicated: I really care about ordering. As a result, we | ||
// we do our best to guarantee that by using the synchronous (blocking) version | ||
// of produce so that there is only one outstanding Produce request at any given | ||
// time. A few notes: | ||
// | ||
// 1. We shouldn't *have* to do this since that is what p.conf.Net.MaxOpenRequests | ||
// is for. However, as noted in the comments of the P.R that introduced pipeline | ||
// in the first place, the implementation of pipelining is such that | ||
// p.conf.net.MaxOpenRequests is no longer strictly honored, so we have to do it | ||
// this way if we want to maintain strict ordering guarantees. P.R that introduced | ||
// request pipelining: https://github.com/IBM/sarama/pull/2094 | ||
// | ||
// | ||
// 2. In theory when idempotency is enabled, it *should* be possible to maintain | ||
// strict ordering with concurrent produce requests as long as all the produce | ||
// requests are received by the broker in the correct order (say, by sending them | ||
// in the right order over a single connection). However, the current implementation | ||
// of this library is such that idempotency breaks in certain error/retry scenarios | ||
// when request pipelining is enabled. This can manifest as either the assertion in | ||
// `produce_set.go` firing and complaining that a batch was created that contains out | ||
// of sequence messages (https://github.com/IBM/sarama/issues/2803), or the Kafka | ||
// broker itself detecting that the ordering guarantees have been violated and | ||
// rejecting the batch, effectively causing data loss since there is no way for the | ||
// client to retry or handle this error without risk of introducing duplicates: | ||
// https://github.com/IBM/sarama/issues/2619. I wish I could say with certainty | ||
// exactly *why* this happens, but after two days of strenous debugging I still can't | ||
// say for sure other than this file was written with a lot of assumptiosn and things | ||
// that happened to work because there was no request pipelining and the best path | ||
// forward is to just make sure there is no request pipelining when strict ordering | ||
// is required. | ||
resp, err := broker.Produce(request) | ||
if err != nil { | ||
// Request failed to be sent | ||
sendResponse(nil, err) | ||
continue | ||
} | ||
// Callback is not called when using NoResponse | ||
if p.conf.Producer.RequiredAcks == NoResponse { | ||
// Provide the expected nil response | ||
sendResponse(nil, nil) | ||
} else { | ||
sendResponse(resp, nil) | ||
} | ||
} else { | ||
// Use AsyncProduce vs Produce to not block waiting for the response | ||
// so that we can pipeline multiple produce requests and achieve higher throughput, see: | ||
// https://kafka.apache.org/protocol#protocol_network | ||
err := broker.AsyncProduce(request, sendResponse) | ||
if err != nil { | ||
// Request failed to be sent | ||
sendResponse(nil, err) | ||
continue | ||
} | ||
// Callback is not called when using NoResponse | ||
if p.conf.Producer.RequiredAcks == NoResponse { | ||
// Provide the expected nil response | ||
sendResponse(nil, nil) | ||
} | ||
} | ||
} | ||
// Wait for all in flight requests to close the pending channel safely | ||
|
@@ -974,6 +1046,11 @@ | |
continue | ||
} | ||
} | ||
|
||
if bp.parent.conf.Producer.Idempotent && !msg.hasSequence { | ||
panic("msg made it to brokerProducer goroutine without being sequenced while idempotency is enabled") | ||
} | ||
|
||
if err := bp.buffer.add(msg); err != nil { | ||
bp.parent.returnError(msg, err) | ||
continue | ||
|
@@ -1101,7 +1178,7 @@ | |
bp.parent.returnSuccesses(pSet.msgs) | ||
// Retriable errors | ||
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, | ||
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: | ||
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: | ||
if bp.parent.conf.Producer.Retry.Max <= 0 { | ||
bp.parent.abandonBrokerConnection(bp.broker) | ||
bp.parent.returnErrors(pSet.msgs, block.Err) | ||
|
@@ -1134,32 +1211,55 @@ | |
|
||
switch block.Err { | ||
case ErrInvalidMessage, ErrUnknownTopicOrPartition, ErrLeaderNotAvailable, ErrNotLeaderForPartition, | ||
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend: | ||
ErrRequestTimedOut, ErrNotEnoughReplicas, ErrNotEnoughReplicasAfterAppend, ErrKafkaStorageError: | ||
Logger.Printf("producer/broker/%d state change to [retrying] on %s/%d because %v\n", | ||
bp.broker.ID(), topic, partition, block.Err) | ||
if bp.currentRetries[topic] == nil { | ||
bp.currentRetries[topic] = make(map[int32]error) | ||
} | ||
bp.currentRetries[topic][partition] = block.Err | ||
if bp.parent.conf.Producer.Idempotent { | ||
// I don't really understand why we need a dedicated goroutine for this, but it was like | ||
// that before. | ||
go bp.parent.retryBatch(topic, partition, pSet, block.Err) | ||
} else { | ||
bp.parent.retryMessages(pSet.msgs, block.Err) | ||
// dropping the following messages has the side effect of incrementing their retry count. | ||
// | ||
// Note, I'm not 100% sure why this is here since we're not about to call bp.rollover(), | ||
// however, it's very important that this function is not called in the idempotency path | ||
// because it would break apart the messages in an existing (sequenced) batch and the | ||
// messages could end up split across multiple batches or in a different order which | ||
// violates how the idempotency protocol works and will cause an assertion to fire in the | ||
// client (dropping the messages) or for the Broker to reject the messages. Either way, its | ||
// data loss. | ||
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) | ||
} | ||
// dropping the following messages has the side effect of incrementing their retry count | ||
bp.parent.retryMessages(bp.buffer.dropPartition(topic, partition), block.Err) | ||
} | ||
}) | ||
} | ||
} | ||
|
||
func (p *asyncProducer) retryBatch(topic string, partition int32, pSet *partitionSet, kerr KError) { | ||
// Due to how this library is structured / implemented, retryBatch can end up getting called in | ||
// a very tight loop in some scenarios (like when a Broker is down and the library is trying to | ||
// re-establish a connection). As a result, we need to make sure that retryBatch honors the | ||
// configured Retry.Backoff just like the regular message retries do. | ||
time.Sleep(p.conf.Producer.Retry.Backoff) | ||
|
||
Logger.Printf("Retrying batch for %v-%d because of %s\n", topic, partition, kerr) | ||
produceSet := newProduceSet(p) | ||
produceSet.msgs[topic] = make(map[int32]*partitionSet) | ||
produceSet.msgs[topic][partition] = pSet | ||
produceSet.bufferBytes += pSet.bufferBytes | ||
produceSet.bufferCount += len(pSet.msgs) | ||
// Once a producer ID and epoch have been assigned to messages, they can *never* be changed | ||
// in order for the idempotency protocol between the client and Broker to be correct. As a | ||
// result, we make sure that we copy over the old producer ID and epoch (older versions of | ||
// this library did not do this which resulted in bugs where the client would issue batches | ||
// with new producer ID / epochs, but with old sequence numbers which makes no sense). | ||
produceSet.producerID = pSet.recordsToSend.RecordBatch.ProducerID | ||
produceSet.producerEpoch = pSet.recordsToSend.RecordBatch.ProducerEpoch | ||
for _, msg := range pSet.msgs { | ||
if msg.retries >= p.conf.Producer.Retry.Max { | ||
p.returnErrors(pSet.msgs, kerr) | ||
|
@@ -1168,18 +1268,39 @@ | |
msg.retries++ | ||
} | ||
|
||
// it's expected that a metadata refresh has been requested prior to calling retryBatch | ||
leader, err := p.client.Leader(topic, partition) | ||
if err != nil { | ||
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader\n", topic, partition, err) | ||
// Previous versions of this library did not retry the call to p.client.Leader(topic, partition) | ||
// if it failed. That was problematic because it meant during periods of time where we couldn't | ||
// figure out who the leader was (because it had just died, or there was a network problem, or | ||
// whatever) idempotent Produce requests would fail immediately instead of retrying for awhile | ||
// as expected. This retry loop is very important since prematurely (and unnecessarily) failing | ||
// an idempotent batch is ~equivalent to data loss. | ||
succeeded := false | ||
for i := 0; i < p.conf.Producer.Retry.Max; i++ { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest using a different variable name if we’re trying retries/tries rather than indices. [off-by-one smell] Are we counting retries, or tries? That is, if I’ve asked for 5 retries max, then that’s 6 total tries. |
||
// it's expected that a metadata refresh has been requested prior to calling retryBatch | ||
leader, err := p.client.Leader(topic, partition) | ||
if err != nil { | ||
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, attempt: %d of %d\n", topic, partition, err, i, p.conf.Producer.Retry.Max) | ||
for _, msg := range pSet.msgs { | ||
p.returnError(msg, kerr) | ||
} | ||
|
||
time.Sleep(p.conf.Producer.Retry.Backoff) | ||
continue | ||
} | ||
|
||
bp := p.getBrokerProducer(leader) | ||
bp.output <- produceSet | ||
p.unrefBrokerProducer(leader, bp) | ||
succeeded = true | ||
break | ||
} | ||
|
||
if !succeeded { | ||
Logger.Printf("Failed retrying batch for %v-%d because of %v while looking up for new leader, no more retries\n", topic, partition) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [nitpick] Newlines at the end should be unnecessary for loggers? (I mean, this is generally the case, but I don’t know if that is specifically true here.) Three |
||
for _, msg := range pSet.msgs { | ||
p.returnError(msg, kerr) | ||
} | ||
return | ||
} | ||
bp := p.getBrokerProducer(leader) | ||
bp.output <- produceSet | ||
p.unrefBrokerProducer(leader, bp) | ||
} | ||
|
||
func (bp *brokerProducer) handleError(sent *produceSet, err error) { | ||
|
@@ -1193,11 +1314,34 @@ | |
bp.parent.abandonBrokerConnection(bp.broker) | ||
_ = bp.broker.Close() | ||
bp.closing = err | ||
|
||
sent.eachPartition(func(topic string, partition int32, pSet *partitionSet) { | ||
bp.parent.retryMessages(pSet.msgs, err) | ||
// It is very important that we use retryBatch instead of retryMessages when idempotency | ||
// is enabled. Otherwise batches that have already been sequenced (and potentially even | ||
// sent to Brokers) will get split up and rebuilt in a different way, order, or with the | ||
// messages split across different batches. This is not allowed in the idempotency | ||
// protocol. The expectation on the Broker side is that if the client retries any batches, | ||
// it will send them with exact same messages and sequence numbers as the original attempt, | ||
// otherwise the protocol doesn't work. | ||
if bp.parent.conf.Producer.Idempotent { | ||
// IDK if this needs/should have an external goroutine, but the only other place where | ||
// retryBatch() was called did it this way, so I did the same for posterity. | ||
go bp.parent.retryBatch(topic, partition, pSet, ErrKafkaStorageError) | ||
} else { | ||
bp.parent.retryMessages(pSet.msgs, err) | ||
} | ||
}) | ||
|
||
// Everything in bp.buffer needs to be retried because we're about to call bp.rollOver() | ||
// which will clear bp.buffer. Why we need to call bp.rollOver() here I'm not sure. | ||
bp.buffer.eachPartition(func(topic string, partition int32, pSet *partitionSet) { | ||
bp.parent.retryMessages(pSet.msgs, err) | ||
// Same comment as above. | ||
if bp.parent.conf.Producer.Idempotent { | ||
// Same comment as above. | ||
go bp.parent.retryBatch(topic, partition, pSet, ErrKafkaStorageError) | ||
} else { | ||
bp.parent.retryMessages(pSet.msgs, err) | ||
} | ||
}) | ||
bp.rollOver() | ||
} | ||
|
@@ -1292,7 +1436,6 @@ | |
p.bumpIdempotentProducerEpoch() | ||
} | ||
|
||
msg.clear() | ||
pErr := &ProducerError{Msg: msg, Err: err} | ||
if p.conf.Producer.Return.Errors { | ||
p.errors <- pErr | ||
|
@@ -1311,14 +1454,23 @@ | |
func (p *asyncProducer) returnSuccesses(batch []*ProducerMessage) { | ||
for _, msg := range batch { | ||
if p.conf.Producer.Return.Successes { | ||
msg.clear() | ||
p.successes <- msg | ||
} | ||
p.inFlight.Done() | ||
} | ||
} | ||
|
||
func (p *asyncProducer) retryMessage(msg *ProducerMessage, err error) { | ||
if p.conf.Producer.Idempotent && msg.hasSequence && msg.hasBeenBatched { | ||
// If the message has had a sequence number assigned *and* it's | ||
// been added to a batch (produce_set.go) then it is illegal to | ||
// use the retryMessage function on it because that means the | ||
// calling function is breaking apart and retrying messages from | ||
// a batch that was already potentially sent to a Kafka broker | ||
// which is a violation of the idempotency protocol. | ||
panic("retrying msg with sequence that has been batched before") | ||
} | ||
|
||
if msg.retries >= p.conf.Producer.Retry.Max { | ||
p.returnError(msg, err) | ||
} else { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -86,10 +86,26 @@ func (ps *produceSet) add(msg *ProducerMessage) error { | |
|
||
if ps.parent.conf.Version.IsAtLeast(V0_11_0_0) { | ||
if ps.parent.conf.Producer.Idempotent && msg.sequenceNumber < set.recordsToSend.RecordBatch.FirstSequence { | ||
Logger.Println( | ||
"assertion failed: message out of sequence added to batch", | ||
"producer_id", | ||
ps.producerID, | ||
set.recordsToSend.RecordBatch.ProducerID, | ||
"producer_epoch", | ||
ps.producerEpoch, | ||
set.recordsToSend.RecordBatch.ProducerEpoch, | ||
"sequence_number", | ||
msg.sequenceNumber, | ||
set.recordsToSend.RecordBatch.FirstSequence, | ||
"buffer_count", | ||
ps.bufferCount, | ||
"msg_has_sequence", | ||
msg.hasSequence) | ||
Comment on lines
+89
to
+103
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would recommend leaving the log message on the same line as the method call, so that it’s easily findable via Additionally, if the line is long enough to break up, then the possibility of adding even more fields is high, so each entry should end with a comma and a newline, so adding new fiels to the end of the call don’t produce unnecessary line changes, where the only change is in punctuation due to syntax requirements. Then I would pair up each log field name with the log field value, all together:
|
||
return errors.New("assertion failed: message out of sequence added to a batch") | ||
} | ||
} | ||
|
||
msg.hasBeenBatched = true | ||
// Past this point we can't return an error, because we've already added the message to the set. | ||
set.msgs = append(set.msgs, msg) | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this ever actually be produced with zero messages?
If it’s unlikely to ever happen with
len(pe) == 0
then that should be the guard condition, and then the complex error message should be the unindented path.