Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KIP-98: Add idempotent producer support #2569

Merged
merged 18 commits into from
Apr 6, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
40 changes: 39 additions & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
from kafka.metrics.stats.rate import TimeUnit
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
from kafka.protocol.metadata import MetadataRequest
from kafka.util import Dict, WeakMethod, ensure_valid_topic_name
from kafka.util import Dict, WeakMethod, ensure_valid_topic_name, timeout_ms_fn
# Although this looks unused, it actually monkey-patches socket.socketpair()
# and should be left in as long as we're using socket.socketpair() in this file
from kafka.vendor import socketpair # noqa: F401
Expand Down Expand Up @@ -400,6 +400,11 @@ def maybe_connect(self, node_id, wakeup=True):
return True
return False

def connection_failed(self, node_id):
if node_id not in self._conns:
return False
return self._conns[node_id].connect_failed()

def _should_recycle_connection(self, conn):
# Never recycle unless disconnected
if not conn.disconnected():
Expand Down Expand Up @@ -1157,6 +1162,39 @@ def bootstrap_connected(self):
else:
return False

def await_ready(self, node_id, timeout_ms=30000):
"""
Invokes `poll` to discard pending disconnects, followed by `client.ready` and 0 or more `client.poll`
invocations until the connection to `node` is ready, the timeoutMs expires or the connection fails.

It returns `true` if the call completes normally or `false` if the timeoutMs expires. If the connection fails,
an `IOException` is thrown instead. Note that if the `NetworkClient` has been configured with a positive
connection timeoutMs, it is possible for this method to raise an `IOException` for a previous connection which
has recently disconnected.

This method is useful for implementing blocking behaviour on top of the non-blocking `NetworkClient`, use it with
care.
"""
inner_timeout_ms = timeout_ms_fn(timeout_ms, None)
self.poll(timeout_ms=0)
if self.is_ready(node_id):
return True

while not self.is_ready(node_id) and inner_timeout_ms() > 0:
if self.connection_failed(node_id):
raise Errors.KafkaConnectionError("Connection to %s failed." % (node_id,))
self.maybe_connect(node_id)
self.poll(timeout_ms=inner_timeout_ms())
return self.is_ready(node_id)

def send_and_receive(self, node_id, request):
future = self.send(node_id, request)
self.poll(future=future)
assert future.is_done
if future.failed():
raise future.exception
return future.value


# OrderedDict requires python2.7+
try:
Expand Down
57 changes: 51 additions & 6 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from kafka.producer.future import FutureRecordMetadata, FutureProduceResult
from kafka.producer.record_accumulator import AtomicInteger, RecordAccumulator
from kafka.producer.sender import Sender
from kafka.producer.transaction_state import TransactionState
from kafka.record.default_records import DefaultRecordBatchBuilder
from kafka.record.legacy_records import LegacyRecordBatchBuilder
from kafka.serializer import Serializer
Expand Down Expand Up @@ -93,6 +94,19 @@ class KafkaProducer(object):
value_serializer (callable): used to convert user-supplied message
values to bytes. If not None, called as f(value), should return
bytes. Default: None.
enable_idempotence (bool): When set to True, the producer will ensure
that exactly one copy of each message is written in the stream.
If False, producer retries due to broker failures, etc., may write
duplicates of the retried message in the stream. Default: False.

Note that enabling idempotence requires
`max_in_flight_requests_per_connection` to be set to 1 and `retries`
cannot be zero. Additionally, `acks` must be set to 'all'. If these
values are left at their defaults, the producer will override the
defaults to be suitable. If the values are set to something
incompatible with the idempotent producer, a KafkaConfigurationError
will be raised.

acks (0, 1, 'all'): The number of acknowledgments the producer requires
the leader to have received before considering a request complete.
This controls the durability of records that are sent. The
Expand Down Expand Up @@ -303,6 +317,7 @@ class KafkaProducer(object):
'client_id': None,
'key_serializer': None,
'value_serializer': None,
'enable_idempotence': False,
'acks': 1,
'bootstrap_topics_filter': set(),
'compression_type': None,
Expand Down Expand Up @@ -365,6 +380,7 @@ class KafkaProducer(object):
def __init__(self, **configs):
log.debug("Starting the Kafka producer") # trace
self.config = copy.copy(self.DEFAULT_CONFIG)
user_provided_configs = set(configs.keys())
for key in self.config:
if key in configs:
self.config[key] = configs.pop(key)
Expand Down Expand Up @@ -428,13 +444,41 @@ def __init__(self, **configs):
assert checker(), "Libraries for {} compression codec not found".format(ct)
self.config['compression_attrs'] = compression_attrs

message_version = self._max_usable_produce_magic()
self._accumulator = RecordAccumulator(message_version=message_version, **self.config)
self._transaction_state = None
if self.config['enable_idempotence']:
self._transaction_state = TransactionState()
if 'retries' not in user_provided_configs:
log.info("Overriding the default 'retries' config to 3 since the idempotent producer is enabled.")
self.config['retries'] = 3
elif self.config['retries'] == 0:
raise Errors.KafkaConfigurationError("Must set 'retries' to non-zero when using the idempotent producer.")

if 'max_in_flight_requests_per_connection' not in user_provided_configs:
log.info("Overriding the default 'max_in_flight_requests_per_connection' to 1 since idempontence is enabled.")
self.config['max_in_flight_requests_per_connection'] = 1
elif self.config['max_in_flight_requests_per_connection'] != 1:
raise Errors.KafkaConfigurationError("Must set 'max_in_flight_requests_per_connection' to 1 in order"
" to use the idempotent producer."
" Otherwise we cannot guarantee idempotence.")

if 'acks' not in user_provided_configs:
log.info("Overriding the default 'acks' config to 'all' since idempotence is enabled")
self.config['acks'] = -1
elif self.config['acks'] != -1:
raise Errors.KafkaConfigurationError("Must set 'acks' config to 'all' in order to use the idempotent"
" producer. Otherwise we cannot guarantee idempotence")

message_version = self.max_usable_produce_magic(self.config['api_version'])
self._accumulator = RecordAccumulator(
transaction_state=self._transaction_state,
message_version=message_version,
**self.config)
self._metadata = client.cluster
guarantee_message_order = bool(self.config['max_in_flight_requests_per_connection'] == 1)
self._sender = Sender(client, self._metadata,
self._accumulator,
metrics=self._metrics,
transaction_state=self._transaction_state,
guarantee_message_order=guarantee_message_order,
**self.config)
self._sender.daemon = True
Expand Down Expand Up @@ -548,16 +592,17 @@ def partitions_for(self, topic):
max_wait = self.config['max_block_ms'] / 1000
return self._wait_on_metadata(topic, max_wait)

def _max_usable_produce_magic(self):
if self.config['api_version'] >= (0, 11):
@classmethod
def max_usable_produce_magic(cls, api_version):
if api_version >= (0, 11):
return 2
elif self.config['api_version'] >= (0, 10, 0):
elif api_version >= (0, 10, 0):
return 1
else:
return 0

def _estimate_size_in_bytes(self, key, value, headers=[]):
magic = self._max_usable_produce_magic()
magic = self.max_usable_produce_magic(self.config['api_version'])
if magic == 2:
return DefaultRecordBatchBuilder.estimate_size_in_bytes(
key, value, headers)
Expand Down
58 changes: 46 additions & 12 deletions kafka/producer/record_accumulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,9 @@ def get(self):


class ProducerBatch(object):
def __init__(self, tp, records):
def __init__(self, tp, records, now=None):
self.max_record_size = 0
now = time.time()
now = time.time() if now is None else now
self.created = now
self.drained = None
self.attempts = 0
Expand All @@ -52,13 +52,18 @@ def __init__(self, tp, records):
def record_count(self):
return self.records.next_offset()

def try_append(self, timestamp_ms, key, value, headers):
@property
def producer_id(self):
return self.records.producer_id if self.records else None

def try_append(self, timestamp_ms, key, value, headers, now=None):
metadata = self.records.append(timestamp_ms, key, value, headers)
if metadata is None:
return None

now = time.time() if now is None else now
self.max_record_size = max(self.max_record_size, metadata.size)
self.last_append = time.time()
self.last_append = now
future = FutureRecordMetadata(self.produce_future, metadata.offset,
metadata.timestamp, metadata.crc,
len(key) if key is not None else -1,
Expand All @@ -81,7 +86,7 @@ def done(self, base_offset=None, timestamp_ms=None, exception=None, log_start_of
log_start_offset, exception) # trace
self.produce_future.failure(exception)

def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full):
def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full, now=None):
"""Expire batches if metadata is not available

A batch whose metadata is not available should be expired if one
Expand All @@ -93,7 +98,7 @@ def maybe_expire(self, request_timeout_ms, retry_backoff_ms, linger_ms, is_full)
* the batch is in retry AND request timeout has elapsed after the
backoff period ended.
"""
now = time.time()
now = time.time() if now is None else now
since_append = now - self.last_append
since_ready = now - (self.created + linger_ms / 1000.0)
since_backoff = now - (self.last_attempt + retry_backoff_ms / 1000.0)
Expand Down Expand Up @@ -121,6 +126,10 @@ def in_retry(self):
def set_retry(self):
self._retry = True

@property
def is_done(self):
return self.produce_future.is_done

def __str__(self):
return 'ProducerBatch(topic_partition=%s, record_count=%d)' % (
self.topic_partition, self.records.next_offset())
Expand Down Expand Up @@ -161,6 +170,7 @@ class RecordAccumulator(object):
'compression_attrs': 0,
'linger_ms': 0,
'retry_backoff_ms': 100,
'transaction_state': None,
'message_version': 0,
}

Expand All @@ -171,6 +181,7 @@ def __init__(self, **configs):
self.config[key] = configs.pop(key)

self._closed = False
self._transaction_state = self.config['transaction_state']
self._flushes_in_progress = AtomicInteger()
self._appends_in_progress = AtomicInteger()
self._batches = collections.defaultdict(collections.deque) # TopicPartition: [ProducerBatch]
Expand Down Expand Up @@ -233,6 +244,10 @@ def append(self, tp, timestamp_ms, key, value, headers):
batch_is_full = len(dq) > 1 or last.records.is_full()
return future, batch_is_full, False

if self._transaction_state and self.config['message_version'] < 2:
raise Errors.UnsupportedVersionError("Attempting to use idempotence with a broker which"
" does not support the required message format (v2)."
" The broker must be version 0.11 or later.")
records = MemoryRecordsBuilder(
self.config['message_version'],
self.config['compression_attrs'],
Expand Down Expand Up @@ -310,9 +325,9 @@ def abort_expired_batches(self, request_timeout_ms, cluster):

return expired_batches

def reenqueue(self, batch):
def reenqueue(self, batch, now=None):
"""Re-enqueue the given record batch in the accumulator to retry."""
now = time.time()
now = time.time() if now is None else now
batch.attempts += 1
batch.last_attempt = now
batch.last_append = now
Expand All @@ -323,7 +338,7 @@ def reenqueue(self, batch):
with self._tp_locks[batch.topic_partition]:
dq.appendleft(batch)

def ready(self, cluster):
def ready(self, cluster, now=None):
"""
Get a list of nodes whose partitions are ready to be sent, and the
earliest time at which any non-sendable partition will be ready;
Expand Down Expand Up @@ -357,7 +372,7 @@ def ready(self, cluster):
ready_nodes = set()
next_ready_check = 9999999.99
unknown_leaders_exist = False
now = time.time()
now = time.time() if now is None else now

# several threads are accessing self._batches -- to simplify
# concurrent access, we iterate over a snapshot of partitions
Expand Down Expand Up @@ -412,7 +427,7 @@ def has_unsent(self):
return True
return False

def drain(self, cluster, nodes, max_size):
def drain(self, cluster, nodes, max_size, now=None):
"""
Drain all the data for the given nodes and collate them into a list of
batches that will fit within the specified size on a per-node basis.
Expand All @@ -430,7 +445,7 @@ def drain(self, cluster, nodes, max_size):
if not nodes:
return {}

now = time.time()
now = time.time() if now is None else now
batches = {}
for node_id in nodes:
size = 0
Expand Down Expand Up @@ -463,7 +478,26 @@ def drain(self, cluster, nodes, max_size):
# single request
break
else:
producer_id_and_epoch = None
if self._transaction_state:
producer_id_and_epoch = self._transaction_state.producer_id_and_epoch
if not producer_id_and_epoch.is_valid:
# we cannot send the batch until we have refreshed the PID
log.debug("Waiting to send ready batches because transaction producer id is not valid")
break

batch = dq.popleft()
if producer_id_and_epoch and not batch.in_retry():
# If the batch is in retry, then we should not change the pid and
# sequence number, since this may introduce duplicates. In particular,
# the previous attempt may actually have been accepted, and if we change
# the pid and sequence here, this attempt will also be accepted, causing
# a duplicate.
sequence_number = self._transaction_state.sequence_number(batch.topic_partition)
log.debug("Dest: %s: %s producer_id=%s epoch=%s sequence=%s",
node_id, batch.topic_partition, producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch,
sequence_number)
batch.records.set_producer_state(producer_id_and_epoch.producer_id, producer_id_and_epoch.epoch, sequence_number)
batch.records.close()
size += batch.records.size_in_bytes()
ready.append(batch)
Expand Down
Loading
Loading