-
Notifications
You must be signed in to change notification settings - Fork 192
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
QueueIterator raises StopAsyncIteration when iterator/channel is closed. #615
QueueIterator raises StopAsyncIteration when iterator/channel is closed. #615
Conversation
a5180c0
to
15ca5ee
Compare
9ea9416
to
f12ecfb
Compare
a2e3647
to
5652d2d
Compare
5652d2d
to
71fc125
Compare
@mosquito ping |
@Darsstar this request contains a lot of changes that could potentially breaks backward compatibility. I'm still thinking about how to test it so that I understand I need to make a separate major release, or make do with a minor one. |
I assume you are refering to:
9.4.0 already dropped Python 3.7. The changes taking advantage of 3.8, all contained in a single commit, were made in a way it should be backward compatible. It probably should be a new major version due to the protected properties. Although, I think those could be rewritten as |
709ecc8
to
8155901
Compare
I rebased, which added the 3.12 tests, and now |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you for not giving up on this improvement, I'm sorry that I don't have much time to devote to it right now. But I'm open to discussion.
Small update: I am currently under the impression this PR isn't at fault. I branched from I don't have a strong lead, but I started looking at |
8155901
to
6ec53f1
Compare
All good, we appreciate your diligent work in supporting this project. |
Unsurprisingly fixing this requires more code and therefor increases the runtime. @gglluukk's benchmark (thanks!) show about 10% runtime degredation:
|
It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak. |
I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.) |
Hopefully PyO3 gets better support for async/await, while maintaining interoperability with asyncio. There is a lot of active work on this right now. Once that happens, using Rust will be feasible. In the meantime, losing 10% in performance is worthwhile to make sure I dont get OOM kills. |
You should do a performance test with cProfile for example. The last time I did this, it marshall in pamqp the slowest one. |
9abe45f
to
6c744ba
Compare
I guess we should just finish to this already. Will you fix the code style? |
db55f50
to
ea449ba
Compare
Done |
I finally profiled it... Python file I used to profileimport asyncio
import cProfile
from aio_pika import Message
from aio_pika.robust_connection import connect_robust
NUMBER_OF_MESSAGES = 1_000_000
# NUMBER_OF_MESSAGES = 1_000
async def main() -> None:
profiler = cProfile.Profile(
timer=asyncio.get_event_loop().time,
)
async with await connect_robust(host='localhost', port=5672, login='guest', password='guest') as conn:
async with await conn.channel() as channel:
exchange = channel.default_exchange
queue = await channel.declare_queue('queue')
pending = set()
for i in range(NUMBER_OF_MESSAGES):
pending.add(
asyncio.create_task(
exchange.publish(
message=Message(
body=f"message {i}".encode(encoding='utf-8'),
),
routing_key=queue.name,
mandatory=True,
)
)
)
await asyncio.wait(pending)
profiler.enable()
count = 0
async with queue.iterator() as queue_iter:
async for message in queue_iter:
await message.ack()
count += 1
if count == NUMBER_OF_MESSAGES:
await queue_iter.close()
break
profiler.disable()
profiler.print_stats()
profiler.dump_stats('D:/PR.stats')
if __name__ == '__main__':
asyncio.run(main())
Regarding I also made |
ac79041
to
e39a628
Compare
e39a628
to
c2bf331
Compare
The first bug is `Channel` passing `Optional[BaseException]` to `self.close()` while `RobustChannel` passed `asyncio.Future` The second is registering a `CallbackCollection` instance as a callback for a different `CallbackCollection`. (Which was not supported before)
c2bf331
to
1acd87e
Compare
gentle ping with this PR, guys. any ETA on the merge? looking forward to see this released |
@mosquito think you will have time before the end of the year? (It's been a month since the last ping, and the end of the year is about a months away if I factor in holidays. No more significance than that.) If a face to face review has your preference that seems doable. My work is one metro stop away from the Nebius offices. I don't mind paying for a drink. |
I am definitely willing to chip into the beer fund if that means this gets merged!!! |
Thanks guys, sorry for taking so long to respond. These changes have already been published in version 9.5.0 |
See #358
Currently QueueIterator never throws a StopAsyncIterator exception. Not when the channel is closed, and not after QueueIterator's close method is called.
Which implies that starting an
async for message in queue.iterator():
loop will keep running "forever" even if no new message will ever arrive. ("forever" because the asyncio task can be canceled, etc.)This PR fixes that in a backwards compatible way. Some tests are refactored to rely on this implicitly, and new ones that explicitly test QueueIterator.anext() throws certain exceptions have been added as well.