Skip to content

Commit

Permalink
Use ThreadSelectorEventLoop on Windows with ProactorEventLoop
Browse files Browse the repository at this point in the history
  • Loading branch information
davidbrochart committed Nov 9, 2024
1 parent 4d3dd26 commit c5ff14d
Show file tree
Hide file tree
Showing 5 changed files with 412 additions and 4 deletions.
2 changes: 2 additions & 0 deletions docs/versionhistory.rst
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ This library adheres to `Semantic Versioning 2.0 <http://semver.org/>`_.

- Fixed a misleading ``ValueError`` in the context of DNS failures
(`#815 <https://github.com/agronholm/anyio/issues/815>`_; PR by @graingert)
- Ported ``ThreadSelectorEventLoop`` from Tornado to allow
``anyio.wait_socket_readable(sock)`` to work on Windows with a ``ProactorEventLoop``.

**4.6.2**

Expand Down
47 changes: 46 additions & 1 deletion src/anyio/_backends/_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,44 @@
from ..lowlevel import RunVar
from ..streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream

# registry of asyncio loop : selector thread
_selectors: WeakKeyDictionary = WeakKeyDictionary()


def _get_selector_windows(
asyncio_loop: AbstractEventLoop,
) -> AbstractEventLoop:
"""Get selector-compatible loop.
Sets ``add_reader`` family of methods on the asyncio loop.
Workaround Windows proactor removal of *reader methods.
"""

if asyncio_loop in _selectors:
return _selectors[asyncio_loop]

from ._selector_thread import AddThreadSelectorEventLoop

selector_loop = _selectors[asyncio_loop] = AddThreadSelectorEventLoop( # type: ignore[abstract]
asyncio_loop
)

# patch loop.close to also close the selector thread
loop_close = asyncio_loop.close

def _close_selector_and_loop() -> None:
# restore original before calling selector.close,
# which in turn calls eventloop.close!
asyncio_loop.close = loop_close # type: ignore[method-assign]
_selectors.pop(asyncio_loop, None)
selector_loop.close()

asyncio_loop.close = _close_selector_and_loop # type: ignore[method-assign]

return selector_loop


if sys.version_info >= (3, 10):
from typing import ParamSpec
else:
Expand Down Expand Up @@ -2682,7 +2720,14 @@ async def wait_socket_readable(cls, sock: socket.socket) -> None:
if read_events.get(sock):
raise BusyResourceError("reading from") from None

loop = get_running_loop()
if (
sys.platform == "win32"
and asyncio.get_event_loop_policy().__class__.__name__
== "WindowsProactorEventLoopPolicy"
):
loop = _get_selector_windows(loop)
else:
loop = get_running_loop()
event = read_events[sock] = asyncio.Event()
loop.add_reader(sock, event.set)
try:
Expand Down
Loading

0 comments on commit c5ff14d

Please sign in to comment.