Skip to content

Commit

Permalink
feat: optimize executors more
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Dec 16, 2024
1 parent ca5d981 commit 6db88f8
Showing 1 changed file with 28 additions and 20 deletions.
48 changes: 28 additions & 20 deletions a_sync/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,21 +34,6 @@
Initializer = Callable[..., object]


def _copy_future_state(cf_fut: concurrent.futures.Future, fut: asyncio.Future):
"""Internal helper to copy state from another Future.
The other Future may be a concurrent.futures.Future.
"""
# check this again in case it was cancelled since the last check
if fut.cancelled():
return
exception = cf_fut.exception()
if exception is None:
fut.set_result(cf_fut.result())
else:
fut.set_exception(_convert_future_exc(exception))


class _AsyncExecutorMixin(concurrent.futures.Executor, _DebugDaemonMixin):
"""
A mixin for Executors to provide asynchronous run and submit methods.
Expand Down Expand Up @@ -125,31 +110,31 @@ def submit(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> "asyn
fut = self._create_future()
if self.sync_mode:
try:
fut.set_result(fn(*args, **kwargs))
_set_fut_result(fut, fn(*args, **kwargs))
except Exception as e:
fut.set_exception(e)
_set_fut_exception(fut, e)
else:
self._ensure_debug_daemon(fut, fn, *args, **kwargs)

cf_fut = self._super_submit(fn, *args, **kwargs)

# TODO: implement logic to actually cancel the job, not just the future which is useless for our use case
# def _call_check_cancel(destination: asyncio.Future):
# if destination.cancelled():
# if _fut_is_cancelled(destination):
# cf_fut.cancel()
#
# fut.add_done_callback(_call_check_cancel)

def _call_copy_future_state(cf_fut: "concurrent.futures.Future"):
if fut.cancelled():
if _fut_is_cancelled(fut):
return
self._call_soon_threadsafe(
_copy_future_state,
cf_fut,
fut,
)

cf_fut.add_done_callback(_call_copy_future_state)
_add_done_callback(cf_fut, _call_copy_future_state)

return fut

Expand Down Expand Up @@ -496,6 +481,29 @@ def weakref_cb(_, q=self._work_queue):
thread._threads_queues[t] = self._work_queue


def _copy_future_state(cf_fut: concurrent.futures.Future, fut: asyncio.Future):
"""Internal helper to copy state from another Future.
The other Future may be a concurrent.futures.Future.
"""
# check this again in case it was cancelled since the last check
if _fut_is_cancelled(fut):
return
exception = _get_cf_fut_exception(cf_fut)
if exception is None:
_set_fut_result(fut, _get_cf_fut_result(cf_fut))
else:
_set_fut_exception(fut, _convert_future_exc(exception))


_fut_is_cancelled = asyncio.Future.cancelled
_get_cf_fut_result = concurrent.futures.Future.result
_get_cf_fut_exception = concurrent.futures.Future.exception
_set_fut_result = asyncio.Future.set_result
_set_fut_exception = asyncio.Future.set_exception
_add_done_callback = concurrent.futures.Future.add_done_callback


executor = PruningThreadPoolExecutor(128)

__all__ = [
Expand Down

0 comments on commit 6db88f8

Please sign in to comment.