Skip to content

Commit 0024227

Browse files
authored
Raise UnsupportedVersionError from coordinator (#2579)
1 parent f7b3133 commit 0024227

File tree

3 files changed

+20
-6
lines changed

3 files changed

+20
-6
lines changed

kafka/consumer/group.py

+2-1
Original file line numberDiff line numberDiff line change
@@ -972,7 +972,8 @@ def unsubscribe(self):
972972
# are committed since there will be no following rebalance
973973
self._coordinator.maybe_auto_commit_offsets_now()
974974
self._subscription.unsubscribe()
975-
self._coordinator.maybe_leave_group()
975+
if self.config['api_version'] >= (0, 9):
976+
self._coordinator.maybe_leave_group()
976977
self._client.cluster.need_all_topic_metadata = False
977978
self._client.set_topics([])
978979
log.debug("Unsubscribed all topics or patterns and assigned partitions")

kafka/coordinator/base.py

+10-1
Original file line numberDiff line numberDiff line change
@@ -395,12 +395,16 @@ def ensure_active_group(self, timeout_ms=None):
395395
396396
Raises: KafkaTimeoutError if timeout_ms is not None
397397
"""
398+
if self.config['api_version'] < (0, 9):
399+
raise Errors.UnsupportedVersionError('Group Coordinator APIs require 0.9+ broker')
398400
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to join consumer group')
399401
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
400402
self._start_heartbeat_thread()
401403
self.join_group(timeout_ms=inner_timeout_ms())
402404

403405
def join_group(self, timeout_ms=None):
406+
if self.config['api_version'] < (0, 9):
407+
raise Errors.UnsupportedVersionError('Group Coordinator APIs require 0.9+ broker')
404408
inner_timeout_ms = timeout_ms_fn(timeout_ms, 'Timeout attempting to join consumer group')
405409
while self.need_rejoin():
406410
self.ensure_coordinator_ready(timeout_ms=inner_timeout_ms())
@@ -763,6 +767,8 @@ def request_rejoin(self):
763767
self.rejoin_needed = True
764768

765769
def _start_heartbeat_thread(self):
770+
if self.config['api_version'] < (0, 9):
771+
raise Errors.UnsupportedVersionError('Heartbeat APIs require 0.9+ broker')
766772
with self._lock:
767773
if self._heartbeat_thread is None:
768774
log.info('Starting new heartbeat thread')
@@ -794,10 +800,13 @@ def close(self, timeout_ms=None):
794800
"""Close the coordinator, leave the current group,
795801
and reset local generation / member_id"""
796802
self._close_heartbeat_thread(timeout_ms=timeout_ms)
797-
self.maybe_leave_group(timeout_ms=timeout_ms)
803+
if self.config['api_version'] >= (0, 9):
804+
self.maybe_leave_group(timeout_ms=timeout_ms)
798805

799806
def maybe_leave_group(self, timeout_ms=None):
800807
"""Leave the current group and reset local generation/memberId."""
808+
if self.config['api_version'] < (0, 9):
809+
raise Errors.UnsupportedVersionError('Group Coordinator APIs require 0.9+ broker')
801810
with self._client._lock, self._lock:
802811
if (not self.coordinator_unknown()
803812
and self.state is not MemberState.UNJOINED

kafka/coordinator/consumer.py

+8-4
Original file line numberDiff line numberDiff line change
@@ -494,7 +494,8 @@ def commit_offsets_async(self, offsets, callback=None):
494494
return future
495495

496496
def _do_commit_offsets_async(self, offsets, callback=None):
497-
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
497+
if self.config['api_version'] < (0, 8, 1):
498+
raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker')
498499
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
499500
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
500501
offsets.values()))
@@ -516,7 +517,8 @@ def commit_offsets_sync(self, offsets, timeout_ms=None):
516517
517518
Raises error on failure
518519
"""
519-
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
520+
if self.config['api_version'] < (0, 8, 1):
521+
raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker')
520522
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
521523
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
522524
offsets.values()))
@@ -573,7 +575,8 @@ def _send_offset_commit_request(self, offsets):
573575
Returns:
574576
Future: indicating whether the commit was successful or not
575577
"""
576-
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
578+
if self.config['api_version'] < (0, 8, 1):
579+
raise Errors.UnsupportedVersionError('OffsetCommitRequest requires 0.8.1+ broker')
577580
assert all(map(lambda k: isinstance(k, TopicPartition), offsets))
578581
assert all(map(lambda v: isinstance(v, OffsetAndMetadata),
579582
offsets.values()))
@@ -761,7 +764,8 @@ def _send_offset_fetch_request(self, partitions):
761764
Returns:
762765
Future: resolves to dict of offsets: {TopicPartition: OffsetAndMetadata}
763766
"""
764-
assert self.config['api_version'] >= (0, 8, 1), 'Unsupported Broker API'
767+
if self.config['api_version'] < (0, 8, 1):
768+
raise Errors.UnsupportedVersionError('OffsetFetchRequest requires 0.8.1+ broker')
765769
assert all(map(lambda k: isinstance(k, TopicPartition), partitions))
766770
if not partitions:
767771
return Future().success({})

0 commit comments

Comments
 (0)