From 159f5a39deda66ec6110f5010c4b2ba56ac4b004 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 10 Mar 2018 13:50:57 -0500 Subject: [PATCH] KAFKA-4160: Ensure rebalance listener not called with coordinator lock --- kafka/coordinator/base.py | 196 ++++++++++++++++++++++---------------- 1 file changed, 112 insertions(+), 84 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 57da97196..48270669f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -323,90 +323,108 @@ def time_to_next_heartbeat(self): return sys.maxsize return self.heartbeat.time_to_next_heartbeat() + def _reset_join_group_future(self): + with self._lock: + self.join_future = None + + def _initiate_join_group(self): + with self._lock: + # we store the join future in case we are woken up by the user + # after beginning the rebalance in the call to poll below. + # This ensures that we do not mistakenly attempt to rejoin + # before the pending rebalance has completed. + if self.join_future is None: + self.state = MemberState.REBALANCING + self.join_future = self._send_join_group_request() + + # handle join completion in the callback so that the + # callback will be invoked even if the consumer is woken up + # before finishing the rebalance + self.join_future.add_callback(self._handle_join_success) + + # we handle failures below after the request finishes. + # If the join completes after having been woken up, the + # exception is ignored and we will rejoin + self.join_future.add_errback(self._handle_join_failure) + + return self.join_future + def _handle_join_success(self, member_assignment_bytes): + # handle join completion in the callback so that the callback + # will be invoked even if the consumer is woken up before + # finishing the rebalance with self._lock: log.info("Successfully joined group %s with generation %s", self.group_id, self._generation.generation_id) - self.join_future = None self.state = MemberState.STABLE - self.rejoining = False - self._heartbeat_thread.enable() - self._on_join_complete(self._generation.generation_id, - self._generation.member_id, - self._generation.protocol, - member_assignment_bytes) + if self._heartbeat_thread is not None: + self._heartbeat_thread.enable() def _handle_join_failure(self, _): + # we handle failures below after the request finishes. + # if the join completes after having been woken up, + # the exception is ignored and we will rejoin with self._lock: - self.join_future = None self.state = MemberState.UNJOINED def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" - with self._lock: - if self._heartbeat_thread is None: - self._start_heartbeat_thread() - - while self.need_rejoin(): - self.ensure_coordinator_ready() - - # call on_join_prepare if needed. We set a flag - # to make sure that we do not call it a second - # time if the client is woken up before a pending - # rebalance completes. This must be called on each - # iteration of the loop because an event requiring - # a rebalance (such as a metadata refresh which - # changes the matched subscription set) can occur - # while another rebalance is still in progress. - if not self.rejoining: - self._on_join_prepare(self._generation.generation_id, - self._generation.member_id) - self.rejoining = True - - # ensure that there are no pending requests to the coordinator. - # This is important in particular to avoid resending a pending - # JoinGroup request. - while not self.coordinator_unknown(): - if not self._client.in_flight_request_count(self.coordinator_id): - break - self._client.poll() - else: + self.ensure_coordinator_ready() + self._start_heartbeat_thread() + self.join_group() + + def join_group(self): + while self.need_rejoin(): + self.ensure_coordinator_ready() + + # call on_join_prepare if needed. We set a flag + # to make sure that we do not call it a second + # time if the client is woken up before a pending + # rebalance completes. This must be called on each + # iteration of the loop because an event requiring + # a rebalance (such as a metadata refresh which + # changes the matched subscription set) can occur + # while another rebalance is still in progress. + if not self.rejoining: + self._on_join_prepare(self._generation.generation_id, + self._generation.member_id) + self.rejoining = True + + # fence off the heartbeat thread explicitly so that it cannot + # interfere with the join group. # Note that this must come after + # the call to onJoinPrepare since we must be able to continue + # sending heartbeats if that callback takes some time. + self._disable_heartbeat_thread() + + # ensure that there are no pending requests to the coordinator. + # This is important in particular to avoid resending a pending + # JoinGroup request. + while not self.coordinator_unknown(): + if not self._client.in_flight_request_count(self.coordinator_id): + break + self._client.poll() + else: + continue + + future = self._initiate_join_group() + self._client.poll(future=future) + self._reset_join_group_future() + + if future.succeeded(): + self.rejoining = False + self._on_join_complete(self._generation.generation_id, + self._generation.member_id, + self._generation.protocol, + future.value) + else: + exception = future.exception + if isinstance(exception, (Errors.UnknownMemberIdError, + Errors.RebalanceInProgressError, + Errors.IllegalGenerationError)): continue - - # we store the join future in case we are woken up by the user - # after beginning the rebalance in the call to poll below. - # This ensures that we do not mistakenly attempt to rejoin - # before the pending rebalance has completed. - if self.join_future is None: - self.state = MemberState.REBALANCING - future = self._send_join_group_request() - - self.join_future = future # this should happen before adding callbacks - - # handle join completion in the callback so that the - # callback will be invoked even if the consumer is woken up - # before finishing the rebalance - future.add_callback(self._handle_join_success) - - # we handle failures below after the request finishes. - # If the join completes after having been woken up, the - # exception is ignored and we will rejoin - future.add_errback(self._handle_join_failure) - - else: - future = self.join_future - - self._client.poll(future=future) - - if future.failed(): - exception = future.exception - if isinstance(exception, (Errors.UnknownMemberIdError, - Errors.RebalanceInProgressError, - Errors.IllegalGenerationError)): - continue - elif not future.retriable(): - raise exception # pylint: disable-msg=raising-bad-type - time.sleep(self.config['retry_backoff_ms'] / 1000) + elif not future.retriable(): + raise exception # pylint: disable-msg=raising-bad-type + time.sleep(self.config['retry_backoff_ms'] / 1000) def _send_join_group_request(self): """Join the group and return the assignment for the next generation. @@ -716,20 +734,27 @@ def request_rejoin(self): self.rejoin_needed = True def _start_heartbeat_thread(self): - if self._heartbeat_thread is None: - log.info('Starting new heartbeat thread') - self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) - self._heartbeat_thread.daemon = True - self._heartbeat_thread.start() + with self._lock: + if self._heartbeat_thread is None: + log.info('Starting new heartbeat thread') + self._heartbeat_thread = HeartbeatThread(weakref.proxy(self)) + self._heartbeat_thread.daemon = True + self._heartbeat_thread.start() + + def _disable_heartbeat_thread(self): + with self._lock: + if self._heartbeat_thread is not None: + self._heartbeat_thread.disable() def _close_heartbeat_thread(self): - if self._heartbeat_thread is not None: - log.info('Stopping heartbeat thread') - try: - self._heartbeat_thread.close() - except ReferenceError: - pass - self._heartbeat_thread = None + with self._lock: + if self._heartbeat_thread is not None: + log.info('Stopping heartbeat thread') + try: + self._heartbeat_thread.close() + except ReferenceError: + pass + self._heartbeat_thread = None def __del__(self): self._close_heartbeat_thread() @@ -892,12 +917,15 @@ def __init__(self, coordinator): def enable(self): with self.coordinator._lock: + log.debug('Enabling heartbeat thread') self.enabled = True self.coordinator.heartbeat.reset_timeouts() self.coordinator._lock.notify() def disable(self): - self.enabled = False + with self.coordinator._lock: + log.debug('Disabling heartbeat thread') + self.enabled = False def close(self): self.closed = True