Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use ThreadSelectorEventLoop on Windows with ProactorEventLoop #820

Open
wants to merge 15 commits into
base: master
Choose a base branch
from

Conversation

davidbrochart
Copy link
Contributor

Changes

This PR allows anyio.wait_socket_readable(sock) to work on Windows when using the asyncio backend with a proactor event loop, by using Tornado's SelectorThread, as discussed with @minrk in zeromq/pyzmq#1827.

Checklist

If this is a user-facing code change, like a bugfix or a new feature, please ensure that
you've fulfilled the following conditions (where applicable):

  • You've added tests (in tests/) added which would fail without your patch
  • You've updated the documentation (in docs/, in case of behavior changes or new
    features)
  • You've added a new changelog entry (in docs/versionhistory.rst).

@davidbrochart davidbrochart force-pushed the selector-thread branch 3 times, most recently from c5ff14d to 6719d69 Compare November 9, 2024 15:18
@agronholm
Copy link
Owner

To me this seems like overkill. Why would we have to shoehorn these into the async event loop?

@agronholm
Copy link
Owner

What I'm saying is that a simpler solution would be better, where we just spawn a thread on demand that runs select(...) on all the sockets that need to wait for read/write. Or am I missing something?

@davidbrochart
Copy link
Contributor Author

But that would mean one thread per socket, right? While with this solution there is only one thread per event loop.

@agronholm
Copy link
Owner

But that would mean one thread per socket, right? While with this solution there is only one thread per event loop.

No, I never said that. There would be just one thread which would select() on multiple sockets.

@davidbrochart
Copy link
Contributor Author

OK, well there might be a better way than this PR, which is trying to bring Tornado's implementation almost untouched.

@agronholm
Copy link
Owner

I think Tornado had different needs, hence their more elaborate solution.

@minrk
Copy link

minrk commented Nov 11, 2024

Why would we have to shoehorn these into the async event loop?

I don't really understand what you mean by shoehorning into the loop. The selector is not being added to the loop itself, it is a single Thread running select which implements the add_reader interface of the EventLoop API in a threadsafe way, to call select in a thread and run callbacks in the event loop when select returns. It is an object with a reference to the event loop and a weakref registry to store one of these threads per event loop (only once add_reader is called).

However you want to do it, I think it is appropriate to implement it optimistically as if add_reader can be assumed, because asyncio maintainers agree that it should be, even on Proactor, even though nobody has had the time to implement it yet (I still think it was a mistake to allow Proactor to become the default without this fundamental API). That way you can remove the workaround when you drop support for Pythons that didn't support it without changing ~anything. That is I think the main thing the tornado approach accomplishes because you only need to replace loop.add_reader with get_selector(loop).add_reader, which does nothing but return loop.add_reader everywhere but Windows + Proactor.

where we just spawn a thread on demand that runs select(...) on all the sockets that need to wait for read/write. Or am I missing something?

It is indeed very simple if you have a single thread per socket, but if you want to use a shared thread instead of thread-per-socket, you need to handle the fact that add_reader and friends may be called when select is already blocking in a background thread, which means needing:

  1. a threadsafe way to update the sockets for select, and
  2. a mechanism to wake the thread when the socket list is updated

which is where most of the logic in the tornado implementation resides. You could reimplement it, but it seems to me like it's going to need to be quite similar to tornado's implementation if it's going to work. But maybe you have an idea I'm missing about how to handle waking the select thread in a simpler way?

@davidbrochart
Copy link
Contributor Author

I don't understand that test_wait_socket_readable fails on trio now.

@davidbrochart davidbrochart force-pushed the selector-thread branch 2 times, most recently from 3cccf1c to 5589633 Compare November 11, 2024 17:04
@davidbrochart
Copy link
Contributor Author

And now I don't understand that the test won't be skipped for Windows/Trio :)

@agronholm
Copy link
Owner

And now I don't understand that the test won't be skipped for Windows/Trio :)

Have you tried using pytest.mark.skipif to skip the test?

@davidbrochart
Copy link
Contributor Author

It seems that pytest.mark.skipif doesn't support fixtures, and we need the anyio_backend_name fixture. I don't see any other way than using pytest.skip inside the test.

sock.connect(("127.0.0.1", port))
sock.sendall(b"Hello, world")

with move_on_after(0.1):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why a timeout this strict? It's bound to cause flakiness on slow/busy systems.
Also, wouldn't fail_after eliminate the need to set a flag?

Suggested change
with move_on_after(0.1):
with fail_after(5):

On another note, I think this should be on the same line as with conn:, given that it's the only place where the timeout can work.

sock.bind(("127.0.0.1", 0))
port = sock.getsockname()[1]
sock.listen()
thread = Thread(target=client, args=(port,), daemon=True)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this needs to be daemon, given how it will either succeed or fail instantly. Given the continuous effort on CPython's side to get rid of daemon threads, I'm also looking for alternatives.

Also, the thread needs to be joined uncoditionally, even if the test fails. With this code, it won't be joined if the test fails.

@agronholm
Copy link
Owner

It seems that pytest.mark.skipif doesn't support fixtures, and we need the anyio_backend_name fixture. I don't see any other way than using pytest.skip inside the test.

Do you have a Windows machine to run this locally on? Setting a breakpoint on the first line would probably shed some light on this.

@davidbrochart
Copy link
Contributor Author

Do you have a Windows machine to run this locally on?

No, but with platform.system() == "Linux" I see that the test is skipped on my Linux machine for trio, so I really have no clue.

@agronholm
Copy link
Owner

Do you have a Windows machine to run this locally on?

No, but with platform.system() == "Linux" I see that the test is skipped on my Linux machine for trio, so I really have no clue.

I have a Windows machine here, maybe I could take a look.

@agronholm
Copy link
Owner

Something weird is going on here. If I only run the test on Trio, it's skipped as expected. I would say that the exception is some sort of fallout from the asyncio version of the test.

@agronholm
Copy link
Owner

Specifically, it has to do with the thread that's spawned when wait_socket_readable() is called, not so much the thread that's spawned in the test code directly – that looks totally fine.

@agronholm
Copy link
Owner

I checked that the ResourceWarning about the unclosed socket had nothing to do with any of the three sockets created during the test.

@davidbrochart
Copy link
Contributor Author

Thanks @agronholm, I had forgotten to close the loop when removing AddThreadSelectorEventLoop.
I think the failure in PyPy is unrelated?

@@ -2683,13 +2684,24 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None:
raise BusyResourceError("reading from") from None

loop = get_running_loop()
if (
sys.platform == "win32"
Copy link
Collaborator

@graingert graingert Nov 12, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it would be better to try to use add_reader, and catch the NotImplementedError - raising if we're not on sys.platform == "win32"?

Or maybe we don't care if sys.platform == "win32" or not at this point? if add_reader is not implemented on the loop we can still work around it with the selector in a thread

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think if you catch the error you can register the selector independent of the platform in case someone else decides to make another incomplete EventLoop implementation like Proactor. The selector thread should work everywhere.

# can be shut down in this way (non-daemon threads would require the
# introduction of a new hook: https://bugs.python.org/issue41962)
self._thread = threading.Thread(
name="Tornado selector",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably will need a different name

event = read_events[sock] = asyncio.Event()
loop.add_reader(sock, event.set)
add_reader(sock, event.set)
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

perhaps something like:

Suggested change
add_reader(sock, event.set)
try:
add_reader(sock, event.set)
except NotImplementedError:
# Proactor on Windows does not yet implement add/remove reader
selector = _get_selector_windows(loop)
selector.add_reader(sock, event.set)
remove_reader = selector.remove_reader
else:
remove_reader = loop.remove_reader

_selector_loops.clear()


atexit.register(_atexit_callback)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This causes an issue for dask/distributed where they can't use tornado on the proactor because they send a shutdown message on a socket then close the event loop in atexit

the socket add_reader/add_writer then stops working while it's trying to send the shutdown message

anyio already has a thread pool implementation that shuts down when run_forever completes I think that should be used here

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To be precise, it ties itself into the life cycle of the root task.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also the thread here is closed explicitly using the async generator hook and patching EventLoop.close so I think it's ok to leak the demon thread atexit if nobody explicitly closes the loop

# this generator one step.
await self._thread_manager_handle.__anext__()

# When the loop starts, start the thread. Not too soon because we can't
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think we need this, because we know the loop has started when we create the Selector thread. We should probably also stash the task somewhere

@davidbrochart davidbrochart force-pushed the selector-thread branch 5 times, most recently from 451d3a1 to 66a9c72 Compare November 12, 2024 16:16
"""Ensure asyncio selector methods (add_reader, etc.) are available.
Running select in a thread and defining these methods on the running event loop.
Originally in tornado.platform.asyncio.
Redistributed under license Apache-2.0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we'll need to include the copyright statement and notice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, if we end up going with their implementation.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't know how this plays with AnyIO's MIT license? Would it be an issue to vendor this code?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, the Apache 2 license supports re-distribution under a different license

@graingert
Copy link
Collaborator

graingert commented Nov 22, 2024

I think what @agronholm is looking for is something that uses the existing thread shutdown approach

https://github.com/agronholm/anyio/blob/master/src/anyio/_backends/_asyncio.py#L2417

Or maybe both should use the shutdown_asyncgens/patch loop.close hack that tornado uses

@davidbrochart
Copy link
Contributor Author

I think what @agronholm is looking for is something that uses the existing thread shutdown approach

It's already here:

find_root_task().add_done_callback(_at_loop_close_callback)

But maybe you'd like to get rid of the loop patch?

# patch loop.close to also close the selector thread
loop_close = asyncio_loop.close
def _close_selector_and_loop() -> None:
# restore original before calling selector.close,
# which in turn calls eventloop.close!
asyncio_loop.close = loop_close # type: ignore[method-assign]
_selectors.pop(asyncio_loop, None)
selector_thread.close()
asyncio_loop.close()
asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]

@agronholm
Copy link
Owner

I think what @agronholm is looking for is something that uses the existing thread shutdown approach

It's already here:

find_root_task().add_done_callback(_at_loop_close_callback)

But maybe you'd like to get rid of the loop patch?

# patch loop.close to also close the selector thread
loop_close = asyncio_loop.close
def _close_selector_and_loop() -> None:
# restore original before calling selector.close,
# which in turn calls eventloop.close!
asyncio_loop.close = loop_close # type: ignore[method-assign]
_selectors.pop(asyncio_loop, None)
selector_thread.close()
asyncio_loop.close()
asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]

If we could hook it into the life cycle of the root task, that would the ideal solution from the AnyIO PoV I think.

@graingert
Copy link
Collaborator

graingert commented Nov 25, 2024

Do we also want to close the selector thread after 5 seconds of not waiting for any sockets? Might not be worth bothering in this PR but maybe in a subsequent one?

@agronholm
Copy link
Owner

I think I'd like to take a stab at using the selectors API for this to potentially simplify the implementation (and to not have to think about the licensing stuff).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants