Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

QueueIterator raises StopAsyncIteration when iterator/channel is closed. #615

Merged

Conversation

Darsstar
Copy link
Contributor

@Darsstar Darsstar commented Jan 17, 2024

See #358

Currently QueueIterator never throws a StopAsyncIterator exception. Not when the channel is closed, and not after QueueIterator's close method is called.
Which implies that starting an async for message in queue.iterator(): loop will keep running "forever" even if no new message will ever arrive. ("forever" because the asyncio task can be canceled, etc.)

This PR fixes that in a backwards compatible way. Some tests are refactored to rely on this implicitly, and new ones that explicitly test QueueIterator.anext() throws certain exceptions have been added as well.

@coveralls
Copy link

coveralls commented Jan 17, 2024

Coverage Status

coverage: 91.886% (+3.6%) from 88.327%
when pulling 1acd87e on Darsstar:QueueIterator-raises-StopAsyncIterator
into eb5990e on mosquito:master.

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 3 times, most recently from a5180c0 to 15ca5ee Compare January 17, 2024 01:00
@Darsstar Darsstar changed the title QueueIterator raised StopAsyncIteration when channel is closed. QueueIterator raises StopAsyncIteration when channel is closed. Jan 17, 2024
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 6 times, most recently from 9ea9416 to f12ecfb Compare January 17, 2024 15:25
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 2 times, most recently from a2e3647 to 5652d2d Compare January 20, 2024 09:52
@Darsstar Darsstar changed the title QueueIterator raises StopAsyncIteration when channel is closed. QueueIterator raises StopAsyncIteration when iterator/channel is closed. Jan 24, 2024
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from 5652d2d to 71fc125 Compare January 29, 2024 15:09
@Darsstar
Copy link
Contributor Author

@mosquito ping

@mosquito
Copy link
Owner

mosquito commented Mar 4, 2024

@Darsstar this request contains a lot of changes that could potentially breaks backward compatibility. I'm still thinking about how to test it so that I understand I need to make a separate major release, or make do with a minor one.

@Darsstar
Copy link
Contributor Author

Darsstar commented Mar 4, 2024

this request contains a lot of changes that could potentially breaks backward compatibility.

I assume you are refering to:

  • new abstract methods
  • some 'protected' properties changing type (bool ->> asyncio.Event)

9.4.0 already dropped Python 3.7. The changes taking advantage of 3.8, all contained in a single commit, were made in a way it should be backward compatible.

It probably should be a new major version due to the protected properties.

Although, I think those could be rewritten as @propertys with setters which notify a asyncio.Condition ...
Than I think it boils down to wether you are supporting people inheriting from the abstract base classes without inheriting from the concrete classes aio-pika provides.
If you do the major version should be bumped.

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 4 times, most recently from 709ecc8 to 8155901 Compare March 4, 2024 20:50
@Darsstar
Copy link
Contributor Author

Darsstar commented Mar 5, 2024

I rebased, which added the 3.12 tests, and now tests/test_amqp_robust_proxy.py::test_channel_reconnect_stairway[[0] 0.1-128] (consistantly) fails only on 3.12, time for me to debug the issue...

Copy link
Owner

@mosquito mosquito left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for not giving up on this improvement, I'm sorry that I don't have much time to devote to it right now. But I'm open to discussion.

aio_pika/queue.py Outdated Show resolved Hide resolved
@Darsstar
Copy link
Contributor Author

Small update: I am currently under the impression this PR isn't at fault.

I branched from upstream/master and altered the reconnect_stairway test to start at 60 and go all the way up to 4096, instead of every power of 2 in the range [64, 4096]. The tests fail on all Python versions, so it seems to be some sort of race condition based on how tasks end up being scheduled...?
The larger the stair value, the less likely it is to fail.

I don't have a strong lead, but I started looking at aiomisc and aiormq as well. Hopefully I'll stumble on the cause in the next two weeks or so...

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from 8155901 to 6ec53f1 Compare March 19, 2024 15:56
@LockedThread
Copy link

@LockedThread @Darsstar sorry, lots of changes, should planning to retest all these myself.

All good, we appreciate your diligent work in supporting this project.

@Darsstar
Copy link
Contributor Author

Unsurprisingly fixing this requires more code and therefor increases the runtime.

@gglluukk's benchmark (thanks!) show about 10% runtime degredation:

before your patch:
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 4.905
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 4.503
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 4.809
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 4.870
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 4.741
Average: 4.766

after your patch:
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 5.481
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 5.123
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 5.397
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 5.437
rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 5.128
Average: 5.313

@LockedThread
Copy link

Unsurprisingly fixing this requires more code and therefor increases the runtime.

@gglluukk's benchmark (thanks!) show about 10% runtime degredation:


before your patch:

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 4.905

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 4.503

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 4.809

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 4.870

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 4.741

Average: 4.766



after your patch:

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #1: 5.481

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #2: 5.123

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #3: 5.397

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #4: 5.437

rmq2psql.py --loop-type queue_iteration_with_timeout --max-bulks 100: #5: 5.128

Average: 5.313

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

@Darsstar
Copy link
Contributor Author

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.)

@LockedThread
Copy link

LockedThread commented May 4, 2024

It would be great to get the time down but I am personally willing to accept a 10% degradation in order to not have a memory leak.

I don't see optimisation potential without porting the library to C/C++/Rust. (While keeping the implementation correct.)

Hopefully PyO3 gets better support for async/await, while maintaining interoperability with asyncio. There is a lot of active work on this right now. Once that happens, using Rust will be feasible.

In the meantime, losing 10% in performance is worthwhile to make sure I dont get OOM kills.

@mosquito
Copy link
Owner

mosquito commented May 5, 2024

You should do a performance test with cProfile for example. The last time I did this, it marshall in pamqp the slowest one.

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from 9abe45f to 6c744ba Compare August 28, 2024 17:58
@mosquito
Copy link
Owner

I guess we should just finish to this already. Will you fix the code style?

@mosquito mosquito requested a review from decaz August 29, 2024 08:08
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from db55f50 to ea449ba Compare August 29, 2024 08:22
@Darsstar
Copy link
Contributor Author

I guess we should just finish to this already. Will you fix the code style?

Done

@Darsstar
Copy link
Contributor Author

Darsstar commented Sep 13, 2024

I finally profiled it...

Python file I used to profile
import asyncio
import cProfile

from aio_pika import Message
from aio_pika.robust_connection import connect_robust

NUMBER_OF_MESSAGES = 1_000_000
# NUMBER_OF_MESSAGES = 1_000


async def main() -> None:
    profiler = cProfile.Profile(
        timer=asyncio.get_event_loop().time,
    )

    async with await connect_robust(host='localhost', port=5672, login='guest', password='guest') as conn:
        async with await conn.channel() as channel:
            exchange = channel.default_exchange
            queue = await channel.declare_queue('queue')

            pending = set()

            for i in range(NUMBER_OF_MESSAGES):
                pending.add(
                    asyncio.create_task(
                        exchange.publish(
                            message=Message(
                                body=f"message {i}".encode(encoding='utf-8'),
                            ),
                            routing_key=queue.name,
                            mandatory=True,
                        )
                    )
                )

            await asyncio.wait(pending)

            profiler.enable()

            count = 0

            async with queue.iterator() as queue_iter:
                async for message in queue_iter:
                    await message.ack()
                    count += 1
                    if count == NUMBER_OF_MESSAGES:
                        await queue_iter.close()
                        break

            profiler.disable()
            profiler.print_stats()
            profiler.dump_stats('D:/PR.stats')

if __name__ == '__main__':
    asyncio.run(main())
   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
# before
  1000001    2.035    0.000   15.227    0.000 D:\Projects\aio-pika\aio_pika\queue.py:505(__anext__)
#after
  1000001    3.890    0.000    9.679    0.000 D:\Projects\aio-pika\aio_pika\queue.py:552(__anext__)

Regarding tottime: there is more code in QueueIterator.__anext__ in after, so it is logical more cycles are spent in that frame.
Regarding cumtime: yay! A goog 5.5 seconds faster :)

I also made CallbackCollection inherit from Generic to track down a bug I introduced, and it led me to an existing bug.

@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch 2 times, most recently from ac79041 to e39a628 Compare September 13, 2024 21:54
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from e39a628 to c2bf331 Compare September 13, 2024 22:24
The first bug is `Channel` passing `Optional[BaseException]` to `self.close()` while `RobustChannel` passed `asyncio.Future`

The second is registering a `CallbackCollection` instance as a callback for a different `CallbackCollection`. (Which was not supported before)
@Darsstar Darsstar force-pushed the QueueIterator-raises-StopAsyncIterator branch from c2bf331 to 1acd87e Compare September 16, 2024 18:27
@walkingpendulum
Copy link

gentle ping with this PR, guys. any ETA on the merge? looking forward to see this released

@Darsstar
Copy link
Contributor Author

@mosquito think you will have time before the end of the year? (It's been a month since the last ping, and the end of the year is about a months away if I factor in holidays. No more significance than that.)

If a face to face review has your preference that seems doable. My work is one metro stop away from the Nebius offices. I don't mind paying for a drink.

@LockedThread
Copy link

@mosquito think you will have time before the end of the year? (It's been a month since the last ping, and the end of the year is about a months away if I factor in holidays. No more significance than that.)

If a face to face review has your preference that seems doable. My work is one metro stop away from the Nebius offices. I don't mind paying for a drink.

I am definitely willing to chip into the beer fund if that means this gets merged!!!

@mosquito mosquito merged commit 001dcce into mosquito:master Nov 21, 2024
9 checks passed
@mosquito
Copy link
Owner

Thanks guys, sorry for taking so long to respond. These changes have already been published in version 9.5.0

@Darsstar Darsstar deleted the QueueIterator-raises-StopAsyncIterator branch November 21, 2024 08:25
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants