-
-
Notifications
You must be signed in to change notification settings - Fork 940
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix crash when using global_keyprefix with a sentinel connection #1838
Conversation
…keyprefix sentinel connection
Hello @adam-homeboost,thank you for providing this patch. I re-traced the steps and made a comparable build of both patches (this and replicated environment containing the bug). Here are my analysis: before - matching your exact debug information: [2024-01-08 09:55:01,745: CRITICAL/MainProcess] Unrecoverable error: AttributeError("'functools.partial' object has no attribute 'from_pool'")
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 951, in create_channel
return self._avail_channels.pop()
^^^^^^^^^^^^^^^^^^^^^^^^^^
IndexError: pop from empty list
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/usr/local/lib/python3.11/site-packages/celery/worker/worker.py", line 202, in start
self.blueprint.start(self)
File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 365, in start
return self.obj.start()
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 340, in start
blueprint.start(self)
File "/usr/local/lib/python3.11/site-packages/celery/bootsteps.py", line 116, in start
step.start(parent)
File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/connection.py", line 21, in start
c.connection = c.connect()
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 469, in connect
conn = self.connection_for_read(heartbeat=self.amqheartbeat)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 475, in connection_for_read
return self.ensure_connected(
^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/celery/worker/consumer/consumer.py", line 526, in ensure_connected
conn = conn.ensure_connection(
^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 406, in ensure_connection
self._ensure_connection(*args, **kwargs)
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 459, in _ensure_connection
return retry_over_time(
^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/utils/functional.py", line 318, in retry_over_time
return fun(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 934, in _connection_factory
self._connection = self._establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/connection.py", line 860, in _establish_connection
conn = self.transport.establish_connection()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 975, in establish_connection
self._avail_channels.append(self.create_channel(self))
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/virtual/base.py", line 953, in create_channel
channel = self.Channel(connection)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 741, in __init__
self.client.ping()
^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/utils/objects.py", line 31, in __get__
return super().__get__(instance, owner)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/functools.py", line 1001, in __get__
val = self.func(instance)
^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1254, in client
return self._create_client(asynchronous=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1210, in _create_client
return self.Client(connection_pool=self.async_pool)
^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1248, in async_pool
self._async_pool = self._get_pool(asynchronous=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1433, in _get_pool
return self._sentinel_managed_pool(asynchronous)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/kombu/transport/redis.py", line 1427, in _sentinel_managed_pool
return sentinel_inst.master_for(
^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.11/site-packages/redis/sentinel.py", line 356, in master_for
return redis_class.from_pool(
^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'functools.partial' object has no attribute 'from_pool' after: I am trying to run the tests, but hitting a few dead-end here. Precisely, I can't catch the test with: $ python t/unit/transport/test_redis.py::test_can_create_connection_with_global_prefix Other commands I have tried: $ python -k <test_name::removed_for_conciseness> -o python_files=t/unit/transport/test_redis.py Screenshots: |
t/unit/transport/test_redis.py
Outdated
connection = Connection( | ||
'sentinel://localhost:65534/', | ||
transport_options={ | ||
'global_keyprefix': 'some_prefix', | ||
'master_name': 'not_important', | ||
}, | ||
) | ||
with pytest.raises(ConnectionError): | ||
connection.channel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we consider one more approach here:
try:
connection = Connection(
'sentinel://localhost:65534/',
transport_options={
'global_keyprefix': 'some_prefix',
'master_name': 'not_important',
},
)
with pytest.raises(ConnectionError):
connection.channel()
finally:
connection.close() # Ensure connection is closed even if an exception occurs
Should we consider testing potential exceptions that could occur during connection creation??
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are inside a test class. Try:
pytest t/unit/transport/test_redis.py::test_RedisSentinel::test_can_create_connection_with_global_keyprefix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are inside a test class. Try:
pytest t/unit/transport/test_redis.py::test_RedisSentinel::test_can_create_connection_with_global_keyprefix
class test_Redis:
def setup(self):
self.connection = Connection(transport=Transport)
self.exchange = Exchange('test_Redis', type='direct')
self.queue = Queue('test_Redis', self.exchange, 'test_Redis')
def teardown(self):
self.connection.close()
If I am not mistaken it means it does not apply to your connection
obj so I agree with a safer approach like @50-Course proposed.
Also, reviewing a sample of the other tests shows most tests have consistent resource cleanup manually, so it would make sense to apply it here as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @adam-homeboost,
Thanks for the PR addressing the global_keyprefix
issue! I was able to reproduce the #1809 simulations locally (check out https://github.com/50-Course/global-prefix-error-celery for my setup).
Overall, the proposed fix makes sense and avoids future key prefix problems. The explanation is clear and the logic seems solid.
What if we added a test case to cover potential connection creation errors? Here's an example:
-
This would catch any unexpected problems while connecting and further solidify our confidence in the fix.
-
Besides that, the PR looks great! With suggested changes, I'd be happy to give it a green light.
Any thoughts on potential side effects? I am open to discussion!
cc: @auvipy
In terms of adding things to the test, I hesitate because:
But open to other opinions, just not sure where I would even begin. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All in all looks good - just needs to resolve the comments on the PR
kombu/transport/redis.py
Outdated
@@ -1429,7 +1429,7 @@ def _sentinel_managed_pool(self, asynchronous=False): | |||
|
|||
return sentinel_inst.master_for( | |||
master_name, | |||
self.Client, | |||
redis.StrictRedis, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StrictRedis
is deprecated.
Better use Redis instead.
t/unit/transport/test_redis.py
Outdated
connection = Connection( | ||
'sentinel://localhost:65534/', | ||
transport_options={ | ||
'global_keyprefix': 'some_prefix', | ||
'master_name': 'not_important', | ||
}, | ||
) | ||
with pytest.raises(ConnectionError): | ||
connection.channel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These tests are inside a test class. Try:
pytest t/unit/transport/test_redis.py::test_RedisSentinel::test_can_create_connection_with_global_keyprefix
class test_Redis:
def setup(self):
self.connection = Connection(transport=Transport)
self.exchange = Exchange('test_Redis', type='direct')
self.queue = Queue('test_Redis', self.exchange, 'test_Redis')
def teardown(self):
self.connection.close()
If I am not mistaken it means it does not apply to your connection
obj so I agree with a safer approach like @50-Course proposed.
Also, reviewing a sample of the other tests shows most tests have consistent resource cleanup manually, so it would make sense to apply it here as well.
As requested: changed connection class to non-deprecated, added resource cleanup to exception test. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Waiting for @50-Course @auvipy to approve/comment before merge.
Foremost, thank you for the heads-up on Onto our conversation,
Thank you for clarification, I am well aware this is parameterization of the
Following this up, I have reviewed the few other tests, while in this scenario the test scope aims to catch the error from the
Point understood! I am also a firm believer of the single-responsibility principle. And this would have been a good approach, However, given the a |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM 🚀
Good work all! |
Fix for #1809.
When global_keyprefix is set, self.Client is no longer a class, it is a closure to create the class with the prefix argument. This breaks in a place that expects this to be a class and not just any callable. Fortunately, in this particular case, the place that is breaking doesn't actually need or care about the prefix and so it is safe to use the main redis client class. It only cares about the pool that comes back. This fix does not prevent the key prefix from being used later by kombu's own client extension.