diff --git a/docs/versionhistory.rst b/docs/versionhistory.rst
index 05e69a09..1d547b22 100644
--- a/docs/versionhistory.rst
+++ b/docs/versionhistory.rst
@@ -10,10 +10,13 @@ This library adheres to `Semantic Versioning 2.0 `_.
- Added the ``wait_readable()`` and ``wait_writable()`` functions which will accept
an object with a ``.fileno()`` method or an integer handle, and deprecated
their now obsolete versions (``wait_socket_readable()`` and
- ``wait_socket_writable()`` (PR by @davidbrochart)
+ ``wait_socket_writable()``) (PR by @davidbrochart)
- Fixed the return type annotations of ``readinto()`` and ``readinto1()`` methods in the
``anyio.AsyncFile`` class
(`#825 `_)
+- Ported ``ThreadSelectorEventLoop`` from Tornado to allow
+ ``anyio.wait_readable()`` and ``anyio.wait_writable()`` to work on Windows with a
+ ``ProactorEventLoop``.
**4.6.2**
diff --git a/src/anyio/_backends/_asyncio.py b/src/anyio/_backends/_asyncio.py
index fe648a6f..e2715ce7 100644
--- a/src/anyio/_backends/_asyncio.py
+++ b/src/anyio/_backends/_asyncio.py
@@ -2691,12 +2691,23 @@ async def wait_readable(cls, obj: HasFileno | int) -> None:
loop = get_running_loop()
event = read_events[obj] = asyncio.Event()
- loop.add_reader(obj, event.set)
+ try:
+ loop.add_reader(obj, event.set)
+ except NotImplementedError:
+ # Proactor on Windows does not yet implement add/remove reader
+ from ._selector_thread import _get_selector_windows
+
+ selector = _get_selector_windows(loop)
+ selector.add_reader(obj, event.set)
+ remove_reader = selector.remove_reader
+ else:
+ remove_reader = loop.remove_reader
+
try:
await event.wait()
finally:
if read_events.pop(obj, None) is not None:
- loop.remove_reader(obj)
+ remove_reader(obj)
readable = True
else:
readable = False
@@ -2721,12 +2732,23 @@ async def wait_writable(cls, obj: HasFileno | int) -> None:
loop = get_running_loop()
event = write_events[obj] = asyncio.Event()
- loop.add_writer(obj, event.set)
+ try:
+ loop.add_writer(obj, event.set)
+ except NotImplementedError:
+ # Proactor on Windows does not yet implement add/remove writer
+ from ._selector_thread import _get_selector_windows
+
+ selector = _get_selector_windows(loop)
+ selector.add_writer(obj, event.set)
+ remove_writer = selector.remove_writer
+ else:
+ remove_writer = loop.remove_writer
+
try:
await event.wait()
finally:
if write_events.pop(obj, None) is not None:
- loop.remove_writer(obj)
+ remove_writer(obj)
writable = True
else:
writable = False
diff --git a/src/anyio/_backends/_selector_thread.py b/src/anyio/_backends/_selector_thread.py
new file mode 100644
index 00000000..943c8755
--- /dev/null
+++ b/src/anyio/_backends/_selector_thread.py
@@ -0,0 +1,257 @@
+"""Ensure asyncio selector methods (add_reader, etc.) are available.
+Running select in a thread and defining these methods on the running event loop.
+Originally in tornado.platform.asyncio.
+Redistributed under license Apache-2.0
+"""
+
+from __future__ import annotations
+
+import errno
+from asyncio import AbstractEventLoop, Future
+from functools import partial
+from select import select
+from socket import socketpair
+from threading import Condition, Thread
+from typing import (
+ TYPE_CHECKING,
+ Any,
+ Callable,
+)
+from weakref import WeakKeyDictionary
+
+from ._asyncio import find_root_task
+
+if TYPE_CHECKING:
+ from _typeshed import HasFileno
+
+ _FileDescriptorLike = HasFileno | int
+
+
+# Registry of asyncio loop : selector thread
+_selectors: WeakKeyDictionary = WeakKeyDictionary()
+
+# Collection of selector threads to shut down on exit
+_selector_threads: set[SelectorThread] = set()
+
+
+def _loop_close_callback(asyncio_loop: AbstractEventLoop, future: Future) -> None:
+ selector_thread = _selectors.pop(asyncio_loop)
+ selector_thread.close()
+
+
+# SelectorThread from tornado 6.4.0
+class SelectorThread:
+ """Define ``add_reader`` methods to be called in a background select thread.
+
+ Instances of this class start a second thread to run a selector.
+ This thread is completely hidden from the user;
+ all callbacks are run on the wrapped event loop's thread.
+
+ Typically used via ``AddThreadSelectorEventLoop``,
+ but can be attached to a running asyncio loop.
+ """
+
+ _closed = False
+
+ def __init__(self, real_loop: AbstractEventLoop) -> None:
+ self._real_loop = real_loop
+
+ self._select_cond = Condition()
+ self._select_args: (
+ tuple[list[_FileDescriptorLike], list[_FileDescriptorLike]] | None
+ ) = None
+ self._closing_selector = False
+ self._thread: Thread | None = None
+
+ self._readers: dict[_FileDescriptorLike, Callable] = {}
+ self._writers: dict[_FileDescriptorLike, Callable] = {}
+
+ # Writing to _waker_w will wake up the selector thread, which
+ # watches for _waker_r to be readable.
+ self._waker_r, self._waker_w = socketpair()
+ self._waker_r.setblocking(False)
+ self._waker_w.setblocking(False)
+ _selector_threads.add(self)
+ self.add_reader(self._waker_r, self._consume_waker)
+ self._thread_manager()
+
+ def close(self) -> None:
+ if self._closed:
+ return
+ with self._select_cond:
+ self._closing_selector = True
+ self._select_cond.notify()
+ self._wake_selector()
+ if self._thread is not None:
+ self._thread.join()
+ _selector_threads.discard(self)
+ self.remove_reader(self._waker_r)
+ self._waker_r.close()
+ self._waker_w.close()
+ self._closed = True
+
+ def _thread_manager(self) -> None:
+ # Create a thread to run the select system call. We manage this thread
+ # manually so we can trigger a clean shutdown at loop teardown. Note
+ # that due to the order of operations at shutdown, only daemon threads
+ # can be shut down in this way (non-daemon threads would require the
+ # introduction of a new hook: https://bugs.python.org/issue41962)
+ self._thread = Thread(
+ name="AnyIO selector",
+ daemon=True,
+ target=self._run_select,
+ )
+ self._thread.start()
+ self._start_select()
+
+ def _wake_selector(self) -> None:
+ if self._closed:
+ return
+ try:
+ self._waker_w.send(b"a")
+ except BlockingIOError:
+ pass
+
+ def _consume_waker(self) -> None:
+ try:
+ self._waker_r.recv(1024)
+ except BlockingIOError:
+ pass
+
+ def _start_select(self) -> None:
+ # Capture reader and writer sets here in the event loop
+ # thread to avoid any problems with concurrent
+ # modification while the select loop uses them.
+ with self._select_cond:
+ assert self._select_args is None
+ self._select_args = (list(self._readers.keys()), list(self._writers.keys()))
+ self._select_cond.notify()
+
+ def _run_select(self) -> None:
+ while True:
+ with self._select_cond:
+ while self._select_args is None and not self._closing_selector:
+ self._select_cond.wait()
+ if self._closing_selector:
+ return
+ assert self._select_args is not None
+ to_read, to_write = self._select_args
+ self._select_args = None
+
+ # We use the simpler interface of the select module instead of
+ # the more stateful interface in the selectors module because
+ # this class is only intended for use on windows, where
+ # select.select is the only option. The selector interface
+ # does not have well-documented thread-safety semantics that
+ # we can rely on so ensuring proper synchronization would be
+ # tricky.
+ try:
+ # On windows, selecting on a socket for write will not
+ # return the socket when there is an error (but selecting
+ # for reads works). Also select for errors when selecting
+ # for writes, and merge the results.
+ #
+ # This pattern is also used in
+ # https://github.com/python/cpython/blob/v3.8.0/Lib/selectors.py#L312-L317
+ rs, ws, xs = select(to_read, to_write, to_write)
+ ws = ws + xs
+ except OSError as e:
+ # After remove_reader or remove_writer is called, the file
+ # descriptor may subsequently be closed on the event loop
+ # thread. It's possible that this select thread hasn't
+ # gotten into the select system call by the time that
+ # happens in which case (at least on macOS), select may
+ # raise a "bad file descriptor" error. If we get that
+ # error, check and see if we're also being woken up by
+ # polling the waker alone. If we are, just return to the
+ # event loop and we'll get the updated set of file
+ # descriptors on the next iteration. Otherwise, raise the
+ # original error.
+ if e.errno == getattr(errno, "WSAENOTSOCK", errno.EBADF):
+ rs, _, _ = select([self._waker_r.fileno()], [], [], 0)
+ if rs:
+ ws = []
+ else:
+ raise
+ else:
+ raise
+
+ try:
+ self._real_loop.call_soon_threadsafe(self._handle_select, rs, ws)
+ except RuntimeError:
+ # "Event loop is closed". Swallow the exception for
+ # consistency with PollIOLoop (and logical consistency
+ # with the fact that we can't guarantee that an
+ # add_callback that completes without error will
+ # eventually execute).
+ pass
+ except AttributeError:
+ # ProactorEventLoop may raise this instead of RuntimeError
+ # if call_soon_threadsafe races with a call to close().
+ # Swallow it too for consistency.
+ pass
+
+ def _handle_select(
+ self, rs: list[_FileDescriptorLike], ws: list[_FileDescriptorLike]
+ ) -> None:
+ for r in rs:
+ self._handle_event(r, self._readers)
+ for w in ws:
+ self._handle_event(w, self._writers)
+ self._start_select()
+
+ def _handle_event(
+ self,
+ fd: _FileDescriptorLike,
+ cb_map: dict[_FileDescriptorLike, Callable],
+ ) -> None:
+ try:
+ callback = cb_map[fd]
+ except KeyError:
+ return
+ callback()
+
+ def add_reader(
+ self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+ ) -> None:
+ self._readers[fd] = partial(callback, *args)
+ self._wake_selector()
+
+ def add_writer(
+ self, fd: _FileDescriptorLike, callback: Callable[..., None], *args: Any
+ ) -> None:
+ self._writers[fd] = partial(callback, *args)
+ self._wake_selector()
+
+ def remove_reader(self, fd: _FileDescriptorLike) -> bool:
+ try:
+ del self._readers[fd]
+ except KeyError:
+ return False
+ self._wake_selector()
+ return True
+
+ def remove_writer(self, fd: _FileDescriptorLike) -> bool:
+ try:
+ del self._writers[fd]
+ except KeyError:
+ return False
+ self._wake_selector()
+ return True
+
+
+def _get_selector_windows(
+ asyncio_loop: AbstractEventLoop,
+) -> SelectorThread:
+ """Get selector-compatible loop.
+
+ Sets ``add_reader`` family of methods on the asyncio loop.
+
+ Workaround Windows proactor removal of *reader methods.
+ """
+ if asyncio_loop in _selectors:
+ return _selectors[asyncio_loop]
+
+ find_root_task().add_done_callback(partial(_loop_close_callback, asyncio_loop))
+ selector_thread = _selectors[asyncio_loop] = SelectorThread(asyncio_loop)
+ return selector_thread
diff --git a/src/anyio/_core/_sockets.py b/src/anyio/_core/_sockets.py
index 12da4c5c..31827195 100644
--- a/src/anyio/_core/_sockets.py
+++ b/src/anyio/_core/_sockets.py
@@ -609,9 +609,6 @@ def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
Wait until the given socket has data to be read.
- This does **NOT** work on Windows when using the asyncio backend with a proactor
- event loop (default on py3.8+).
-
.. warning:: Only use this on raw sockets that have not been wrapped by any higher
level constructs like socket streams!
diff --git a/tests/test_sockets.py b/tests/test_sockets.py
index a1189bb8..b6108850 100644
--- a/tests/test_sockets.py
+++ b/tests/test_sockets.py
@@ -1858,16 +1858,7 @@ async def test_connect_tcp_getaddrinfo_context() -> None:
@pytest.mark.parametrize("socket_type", ["socket", "fd"])
@pytest.mark.parametrize("event", ["readable", "writable"])
-async def test_wait_socket(
- anyio_backend_name: str, event: str, socket_type: str
-) -> None:
- if anyio_backend_name == "asyncio" and platform.system() == "Windows":
- import asyncio
-
- policy = asyncio.get_event_loop_policy()
- if policy.__class__.__name__ == "WindowsProactorEventLoopPolicy":
- pytest.skip("Does not work on asyncio/Windows/ProactorEventLoop")
-
+async def test_wait_socket(event: str, socket_type: str) -> None:
wait = wait_readable if event == "readable" else wait_writable
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as server_sock: