Skip to content

Commit

Permalink
feat: BI-0 async_run for inside async contexts (#609)
Browse files Browse the repository at this point in the history
feat: async_run for inside async contexts
  • Loading branch information
ovsds authored Sep 16, 2024
1 parent c108067 commit dbb71cd
Showing 1 changed file with 38 additions and 0 deletions.
38 changes: 38 additions & 0 deletions lib/dl_utils/dl_utils/aio.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@
import contextvars
from functools import partial
import logging
import threading
from types import TracebackType
import typing
from typing import (
TYPE_CHECKING,
Any,
Expand Down Expand Up @@ -93,6 +95,42 @@ def await_sync(coro: Awaitable[_WRAPPED_RT], loop: Optional[asyncio.AbstractEven
return loop.run_until_complete(coro)


class RunThread(threading.Thread):
def __init__(self, coro: typing.Coroutine[typing.Any, typing.Any, _WRAPPED_RT]):
self.coro = coro
self.result: _WRAPPED_RT | None = None
super().__init__()

def run(self) -> None:
self.result = asyncio.run(self.coro)


# Threading version of await_sync, needed to be able to run async code in sync code, which is running in async context.
# This is needed due to constant mixing of context in the codebase.
# Should be avoided by making context async and using await_sync on higher level of sync counterparts.
# USE IT ONLY IN THE RAREST OF OCCASIONS, WHEN YOU HAVE NO OTHER CHOICE.
# Please, lord, have mercy on my soul, for I have sinned.
def async_run(
coro: typing.Coroutine[typing.Any, typing.Any, _WRAPPED_RT],
loop: typing.Optional[asyncio.AbstractEventLoop] = None,
) -> _WRAPPED_RT:
if loop is None:
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None

if loop is None or not loop.is_running():
return asyncio.run(coro)

thread = RunThread(coro)
thread.start()
thread.join()

assert thread.result is not None
return thread.result


_ITERABLE_T = TypeVar("_ITERABLE_T")


Expand Down

0 comments on commit dbb71cd

Please sign in to comment.