-
Notifications
You must be signed in to change notification settings - Fork 138
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Use ThreadSelectorEventLoop on Windows with ProactorEventLoop #820
base: master
Are you sure you want to change the base?
Conversation
c5ff14d
to
6719d69
Compare
To me this seems like overkill. Why would we have to shoehorn these into the async event loop? |
What I'm saying is that a simpler solution would be better, where we just spawn a thread on demand that runs |
But that would mean one thread per socket, right? While with this solution there is only one thread per event loop. |
No, I never said that. There would be just one thread which would |
OK, well there might be a better way than this PR, which is trying to bring Tornado's implementation almost untouched. |
I think Tornado had different needs, hence their more elaborate solution. |
I don't really understand what you mean by shoehorning into the loop. The selector is not being added to the loop itself, it is a single Thread running However you want to do it, I think it is appropriate to implement it optimistically as if
It is indeed very simple if you have a single thread per socket, but if you want to use a shared thread instead of thread-per-socket, you need to handle the fact that
which is where most of the logic in the tornado implementation resides. You could reimplement it, but it seems to me like it's going to need to be quite similar to tornado's implementation if it's going to work. But maybe you have an idea I'm missing about how to handle waking the select thread in a simpler way? |
6719d69
to
058e463
Compare
I don't understand that |
3cccf1c
to
5589633
Compare
5589633
to
c1dd759
Compare
And now I don't understand that the test won't be skipped for Windows/Trio :) |
Have you tried using |
It seems that |
tests/test_sockets.py
Outdated
sock.connect(("127.0.0.1", port)) | ||
sock.sendall(b"Hello, world") | ||
|
||
with move_on_after(0.1): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why a timeout this strict? It's bound to cause flakiness on slow/busy systems.
Also, wouldn't fail_after
eliminate the need to set a flag?
with move_on_after(0.1): | |
with fail_after(5): |
On another note, I think this should be on the same line as with conn:
, given that it's the only place where the timeout can work.
tests/test_sockets.py
Outdated
sock.bind(("127.0.0.1", 0)) | ||
port = sock.getsockname()[1] | ||
sock.listen() | ||
thread = Thread(target=client, args=(port,), daemon=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think this needs to be daemon, given how it will either succeed or fail instantly. Given the continuous effort on CPython's side to get rid of daemon threads, I'm also looking for alternatives.
Also, the thread needs to be joined uncoditionally, even if the test fails. With this code, it won't be joined if the test fails.
Do you have a Windows machine to run this locally on? Setting a breakpoint on the first line would probably shed some light on this. |
No, but with |
I have a Windows machine here, maybe I could take a look. |
Something weird is going on here. If I only run the test on Trio, it's skipped as expected. I would say that the exception is some sort of fallout from the asyncio version of the test. |
Specifically, it has to do with the thread that's spawned when |
I checked that the |
Thanks @agronholm, I had forgotten to close the loop when removing |
src/anyio/_backends/_asyncio.py
Outdated
@@ -2683,13 +2684,24 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None: | |||
raise BusyResourceError("reading from") from None | |||
|
|||
loop = get_running_loop() | |||
if ( | |||
sys.platform == "win32" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it would be better to try to use add_reader, and catch the NotImplementedError - raising if we're not on sys.platform == "win32"?
Or maybe we don't care if sys.platform == "win32" or not at this point? if add_reader is not implemented on the loop we can still work around it with the selector in a thread
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think if you catch the error you can register the selector independent of the platform in case someone else decides to make another incomplete EventLoop implementation like Proactor. The selector thread should work everywhere.
# can be shut down in this way (non-daemon threads would require the | ||
# introduction of a new hook: https://bugs.python.org/issue41962) | ||
self._thread = threading.Thread( | ||
name="Tornado selector", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
probably will need a different name
src/anyio/_backends/_asyncio.py
Outdated
event = read_events[sock] = asyncio.Event() | ||
loop.add_reader(sock, event.set) | ||
add_reader(sock, event.set) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
perhaps something like:
add_reader(sock, event.set) | |
try: | |
add_reader(sock, event.set) | |
except NotImplementedError: | |
# Proactor on Windows does not yet implement add/remove reader | |
selector = _get_selector_windows(loop) | |
selector.add_reader(sock, event.set) | |
remove_reader = selector.remove_reader | |
else: | |
remove_reader = loop.remove_reader |
_selector_loops.clear() | ||
|
||
|
||
atexit.register(_atexit_callback) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This causes an issue for dask/distributed where they can't use tornado on the proactor because they send a shutdown message on a socket then close the event loop in atexit
the socket add_reader/add_writer then stops working while it's trying to send the shutdown message
anyio already has a thread pool implementation that shuts down when run_forever completes I think that should be used here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
To be precise, it ties itself into the life cycle of the root task.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also the thread here is closed explicitly using the async generator hook and patching EventLoop.close so I think it's ok to leak the demon thread atexit if nobody explicitly closes the loop
# this generator one step. | ||
await self._thread_manager_handle.__anext__() | ||
|
||
# When the loop starts, start the thread. Not too soon because we can't |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we need this, because we know the loop has started when we create the Selector thread. We should probably also stash the task somewhere
451d3a1
to
66a9c72
Compare
66a9c72
to
8b18582
Compare
"""Ensure asyncio selector methods (add_reader, etc.) are available. | ||
Running select in a thread and defining these methods on the running event loop. | ||
Originally in tornado.platform.asyncio. | ||
Redistributed under license Apache-2.0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we'll need to include the copyright statement and notice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, if we end up going with their implementation.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't know how this plays with AnyIO's MIT license? Would it be an issue to vendor this code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, the Apache 2 license supports re-distribution under a different license
I think what @agronholm is looking for is something that uses the existing thread shutdown approach https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L2417 Or maybe both should use the shutdown_asyncgens/patch loop.close hack that tornado uses |
It's already here: anyio/src/anyio/_backends/_selector_thread.py Line 269 in 6980062
But maybe you'd like to get rid of the loop patch? anyio/src/anyio/_backends/_selector_thread.py Lines 272 to 283 in 6980062
|
If we could hook it into the life cycle of the root task, that would the ideal solution from the AnyIO PoV I think. |
Do we also want to close the selector thread after 5 seconds of not waiting for any sockets? Might not be worth bothering in this PR but maybe in a subsequent one? |
I think I'd like to take a stab at using the |
Changes
This PR allows
anyio.wait_socket_readable(sock)
to work on Windows when using the asyncio backend with a proactor event loop, by using Tornado's SelectorThread, as discussed with @minrk in zeromq/pyzmq#1827.Checklist
If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):
tests/
) added which would fail without your patchdocs/
, in case of behavior changes or newfeatures)
docs/versionhistory.rst
).