From 3394cbbe8364beff36fc903d68bceef17adb6e13 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 10:16:41 -0700 Subject: [PATCH 1/2] Raise UnsupportedVersionError from coordinator --- kafka/coordinator/base.py | 11 ++++++++++- kafka/coordinator/consumer.py | 12 ++++++++---- 2 files changed, 18 insertions(+), 5 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 97ba4fa28..410e92fc9 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -395,12 +395,16 @@ def ensure_active_group(self, timeout_ms=None): Raises: KafkaTimeoutError if timeout_ms is not None """ + if self.config['api_version'] < (0, 9): + raise Errors.UnsupportedVersionError('Group Coordinator APIs require 0.9+ broker') inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to join consumer group') self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) self._start_heartbeat_thread() self.join_group(timeout_ms=inner_timeout_ms()) def join_group(self, timeout_ms=None): + if self.config['api_version'] < (0, 9): + raise Errors.UnsupportedVersionError('Group Coordinator APIs require 0.9+ broker') inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to join consumer group') while self.need_rejoin(): self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms()) @@ -763,6 +767,8 @@ def request_rejoin(self): self.rejoin_needed = True def _start_heartbeat_thread(self): + if self.config['api_version'] < (0, 9): + raise Errors.UnsupportedVersionError('Heartbeat APIs require 0.9+ broker') with self._lock: if self._heartbeat_thread is None: log.info('Starting new heartbeat thread') @@ -794,10 +800,13 @@ def close(self, timeout_ms=None): """Close the coordinator, leave the current group, and reset local generation / member_id""" self._close_heartbeat_thread(timeout_ms=timeout_ms) - self.maybe_leave_group(timeout_ms=timeout_ms) + if self.config['api_version'] >= (0, 9): + self.maybe_leave_group(timeout_ms=timeout_ms) def maybe_leave_group(self, timeout_ms=None): """Leave the current group and reset local generation/memberId.""" + if self.config['api_version'] < (0, 9): + raise Errors.UnsupportedVersionError('Group Coordinator APIs require 0.9+ broker') with self._client._lock, self._lock: if (not self.coordinator_unknown() and self.state is not MemberState.UNJOINED diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 873b1128c..773df38bd 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -494,7 +494,8 @@ def commit_offsets_async(self, offsets, callback=None): return future def _do_commit_offsets_async(self, offsets, callback=None): - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + if self.config['api_version'] < (0, 8, 1): + raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) @@ -516,7 +517,8 @@ def commit_offsets_sync(self, offsets, timeout_ms=None): Raises error on failure """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + if self.config['api_version'] < (0, 8, 1): + raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) @@ -573,7 +575,8 @@ def _send_offset_commit_request(self, offsets): Returns: Future: indicating whether the commit was successful or not """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + if self.config['api_version'] < (0, 8, 1): + raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), offsets)) assert all(map(lambda v: isinstance(v, OffsetAndMetadata), offsets.values())) @@ -761,7 +764,8 @@ def _send_offset_fetch_request(self, partitions): Returns: Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata} """ - assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API' + if self.config['api_version'] < (0, 8, 1): + raise Errors.UnsupportedVersionError('OffsetFetchRequest requires 0.8.1+ broker') assert all(map(lambda k: isinstance(k, TopicPartition), partitions)) if not partitions: return Future().success({}) From a21672cda9ff18332f9f0a9697df47b941ec121d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 3 Apr 2025 10:27:09 -0700 Subject: [PATCH 2/2] fixup consumer.unsubscribe --- kafka/consumer/group.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index 58284a7a9..6e6a88724 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -972,7 +972,8 @@ def unsubscribe(self): # are committed since there will be no following rebalance self._coordinator.maybe_auto_commit_offsets_now() self._subscription.unsubscribe() - self._coordinator.maybe_leave_group() + if self.config['api_version'] >= (0, 9): + self._coordinator.maybe_leave_group() self._client.cluster.need_all_topic_metadata = False self._client.set_topics([]) log.debug("Unsubscribed all topics or patterns and assigned partitions")