@@ -134,7 +134,7 @@ def run_once(self):
134
134
for expired_batch in expired_batches :
135
135
self ._sensors .record_errors (expired_batch .topic_partition .topic , expired_batch .record_count )
136
136
137
- # Reset the PID if an expired batch has previously been sent to the broker.
137
+ # Reset the producer_id if an expired batch has previously been sent to the broker.
138
138
# See the documentation of `TransactionState.reset_producer_id` to understand why
139
139
# we need to reset the producer id here.
140
140
if self ._transaction_state and any ([batch .in_retry () for batch in expired_batches ]):
@@ -195,9 +195,7 @@ def add_topic(self, topic):
195
195
self .wakeup ()
196
196
197
197
def _maybe_wait_for_producer_id (self ):
198
- log .debug ("_maybe_wait_for_producer_id" )
199
198
if not self ._transaction_state :
200
- log .debug ("_maybe_wait_for_producer_id: no transaction_state..." )
201
199
return
202
200
203
201
while not self ._transaction_state .has_pid ():
@@ -230,7 +228,6 @@ def _maybe_wait_for_producer_id(self):
230
228
except Errors .RequestTimedOutError :
231
229
log .debug ("InitProducerId request to node %s timed out" , node_id )
232
230
time .sleep (self .config ['retry_backoff_ms' ] / 1000 )
233
- log .debug ("_maybe_wait_for_producer_id: ok: %s" , self ._transaction_state .producer_id_and_epoch )
234
231
235
232
def _failed_produce (self , batches , node_id , error ):
236
233
log .error ("Error sending produce request to node %d: %s" , node_id , error ) # trace
0 commit comments