Skip to content

Commit

Permalink
fix: freeze set during ticks iter in async hub (#1830)
Browse files Browse the repository at this point in the history
* test: ticks set size changed during iteration in async hub

* fix: freeze set during ticks iter in async hub
  • Loading branch information
Alisson Claudino authored Dec 19, 2023
1 parent b4235c7 commit 8aff4e7
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 3 deletions.
4 changes: 2 additions & 2 deletions kombu/asynchronous/hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import errno
import threading
from contextlib import contextmanager
from copy import copy
from queue import Empty
from time import sleep
from types import GeneratorType as generator
Expand Down Expand Up @@ -295,7 +296,6 @@ def create_loop(self,
scheduled = self.timer._queue
consolidate = self.consolidate
consolidate_callback = self.consolidate_callback
on_tick = self.on_tick
propagate = self.propagate_errors

while 1:
Expand All @@ -307,7 +307,7 @@ def create_loop(self,

poll_timeout = fire_timers(propagate=propagate) if scheduled else 1

for tick_callback in on_tick:
for tick_callback in copy(self.on_tick):
tick_callback()

# print('[[[HUB]]]: %s' % (self.repr_active(),))
Expand Down
3 changes: 2 additions & 1 deletion kombu/transport/redis.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ def get_redis_error_classes():
IOError,
OSError,
exceptions.ConnectionError,
exceptions.BusyLoadingError,
exceptions.AuthenticationError,
exceptions.TimeoutError)),
(virtual.Transport.channel_errors + (
Expand Down Expand Up @@ -1229,7 +1230,7 @@ def _get_client(self):
global_keyprefix=self.global_keyprefix,
)

return redis.StrictRedis
return redis.Redis

@contextmanager
def conn_or_acquire(self, client=None):
Expand Down
17 changes: 17 additions & 0 deletions t/unit/asynchronous/test_hub.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,6 +525,23 @@ def test_loop__tick_callbacks(self):
ticks[0].assert_called_once_with()
ticks[1].assert_called_once_with()

def test_loop__tick_callbacks_on_ticks_change(self):
def callback_1():
ticks.remove(ticks_list[0])
return Mock(name='cb1')

ticks_list = [Mock(wraps=callback_1), Mock(name='cb2')]
ticks = set(ticks_list)

self.hub.on_tick = ticks
self.hub.poller.unregister = Mock()

next(self.hub.loop)
next(self.hub.loop)

ticks_list[0].assert_has_calls([call()])
ticks_list[1].assert_has_calls([call(), call()])

def test_loop__todo(self):
deferred = Mock(name='cb_deferred')

Expand Down

0 comments on commit 8aff4e7

Please sign in to comment.