Skip to content

Commit

Permalink
feat: optimize asyncio.Future.set_result
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler committed Dec 12, 2024
1 parent 421929e commit 49d4005
Show file tree
Hide file tree
Showing 6 changed files with 88 additions and 32 deletions.
10 changes: 5 additions & 5 deletions a_sync/_smart.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ cdef inline bint _is_not_done(fut: asyncio.Future):
"""
return <str>fut._state == "PENDING"

cdef inline bint cancelled(fut: asyncio.Future):
cdef inline bint _is_cancelled(fut: asyncio.Future):
"""Return True if the future was cancelled."""
return <str>fut._state == "CANCELLED"

Expand All @@ -195,7 +195,7 @@ cdef object _get_result(fut: asyncio.Future):
raise fut._make_cancelled_error()
raise asyncio.exceptions.InvalidStateError('Result is not ready.')

def _get_exception(fut: asyncio.Future):
cdef object _get_exception(fut: asyncio.Future):
"""Return the exception that was set on this future.
The exception (or None if no exception was set) is returned only if
Expand Down Expand Up @@ -481,13 +481,13 @@ def shield(
waiters.add(outer)

def _inner_done_callback(inner):
if cancelled(outer):
if not cancelled(inner):
if _is_cancelled(outer):
if not _is_cancelled(inner):
# Mark inner's result as retrieved.
_get_exception(inner)
return

if cancelled(inner):
if _is_cancelled(inner):
outer.cancel()
else:
exc = _get_exception(inner)
Expand Down
9 changes: 5 additions & 4 deletions a_sync/a_sync/function.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import functools
import inspect
import logging
import sys
from libc.stdint cimport uintptr_t

from async_lru import _LRUCacheWrapper
from async_property.base import AsyncPropertyDescriptor # type: ignore [import]
Expand Down Expand Up @@ -120,10 +121,10 @@ cpdef void _validate_wrapped_fn(fn: Callable):

cdef object _function_type = type(logging.getLogger)

cdef set[Py_ssize_t] _argspec_validated = set()
cdef set[uintptr_t] _argspec_validated = set()

cdef void _validate_argspec_cached(fn: Callable):
cdef Py_ssize_t fid = id(fn)
cdef uintptr_t fid = id(fn)
if fid not in _argspec_validated:
_validate_argspec(fn)
_argspec_validated.add(fid)
Expand Down Expand Up @@ -971,10 +972,10 @@ class ASyncDecorator(_ModifiedMixin):
return ASyncFunctionSyncDefault(func, **self.modifiers)


cdef set[Py_ssize_t] _is_genfunc_cache = set()
cdef set[uintptr_t] _is_genfunc_cache = set()

cdef void _check_not_genfunc_cached(func: Callable):
cdef Py_ssize_t fid = id(func)
cdef uintptr_t fid = id(func)
if fid not in _is_genfunc_cache:
_check_not_genfunc(func)
_is_genfunc_cache.add(fid)
Expand Down
50 changes: 34 additions & 16 deletions a_sync/asyncio/create_task.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -134,28 +134,46 @@ cdef void __prune_persisted_tasks():
cdef object task
cdef dict context
for task in tuple(__persisted_tasks):
if _is_done(task) and (e := task.exception()):
# force exceptions related to this lib to bubble up
if not isinstance(e, exceptions.PersistedTaskException):
c_logger.exception(e)
raise e
# we have to manually log the traceback that asyncio would usually log
# since we already got the exception from the task and the usual handler will now not run
context = {
"message": f"{task.__class__.__name__} exception was never retrieved",
"exception": e,
"future": task,
}
if task._source_traceback:
context["source_traceback"] = task._source_traceback
task._loop.call_exception_handler(context)
__persisted_tasks.discard(task)
if _is_done(task):
if e := _get_exception(task):
# force exceptions related to this lib to bubble up
if not isinstance(e, exceptions.PersistedTaskException):
c_logger.exception(e)
raise e
# we have to manually log the traceback that asyncio would usually log
# since we already got the exception from the task and the usual handler will now not run
context = {
"message": f"{task.__class__.__name__} exception was never retrieved",
"exception": e,
"future": task,
}
if task._source_traceback:
context["source_traceback"] = task._source_traceback
task._loop.call_exception_handler(context)
__persisted_tasks.discard(task)
cdef inline bint _is_done(fut: asyncio.Future):
return <str>fut._state != "PENDING"
cdef object _get_exception(fut: asyncio.Future):
"""Return the exception that was set on this future.

The exception (or None if no exception was set) is returned only if
the future is done. If the future has been cancelled, raises
CancelledError. If the future isn't done yet, raises
InvalidStateError.
"""
cdef str state = fut._state
if state == "FINISHED":
fut._Future__log_traceback = False
return fut._exception
if state == "CANCELLED":
raise fut._make_cancelled_error()
raise asyncio.exceptions.InvalidStateError('Exception is not set.')
async def __persisted_task_exc_wrap(task: "asyncio.Task[T]") -> T:
"""
Wrap a task to handle its exception in a specialized manner.
Expand Down
16 changes: 14 additions & 2 deletions a_sync/primitives/locks/event.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ cdef class CythonEvent(_DebugDaemonMixin):

for fut in self._waiters:
if _is_not_done(fut):
fut.set_result(True)
_set_result_true(fut)

cpdef void clear(self):
"""Reset the internal flag to false. Subsequently, coroutines calling
Expand Down Expand Up @@ -184,4 +184,16 @@ cdef class CythonEvent(_DebugDaemonMixin):


cdef inline bint _is_not_done(fut: asyncio.Future):
return <str>fut._state == "PENDING"
return <str>fut._state == "PENDING"

cdef void _set_result_true(fut: asyncio.Future):
"""Mark the future done and set its result.
If the future is already done when this method is called, raises
InvalidStateError.
"""
if <str>fut._state != "PENDING":
raise asyncio.exceptions.InvalidStateError(f'{fut._state}: {fut!r}')
fut._result = True
fut._state = "FINISHED"
fut._Future__schedule_callbacks()
19 changes: 16 additions & 3 deletions a_sync/primitives/locks/prio_semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
waiter = manager._Semaphore__waiters.popleft()
self._potential_lost_waiters.remove(waiter)
if _is_not_done(waiter):
waiter.set_result(None)
_set_result_none(waiter)
woke_up = True
if debug_logs:
c_logger._log(DEBUG, "woke up %s", (waiter, ))
Expand All @@ -261,20 +261,33 @@ cdef class _AbstractPrioritySemaphore(Semaphore):
while self._potential_lost_waiters:
waiter = self._potential_lost_waiters.pop(0)
if _is_not_done(waiter):
waiter.set_result(None)
_set_result_none(waiter)
return
return

while self._potential_lost_waiters:
waiter = self._potential_lost_waiters.pop(0)
c_logger._log(DEBUG, "we found a lost waiter %s", (waiter, ))
if _is_not_done(waiter):
waiter.set_result(None)
_set_result_none(waiter)
c_logger._log(DEBUG, "woke up lost waiter %s", (waiter, ))
return

c_logger._log(DEBUG, "%s has no waiters to wake", (self, ))


cdef void _set_result_none(fut: asyncio.Future):
"""Mark the future done and set its result.
If the future is already done when this method is called, raises
InvalidStateError.
"""
if <str>fut._state != "PENDING":
raise asyncio.exceptions.InvalidStateError(f'{fut._state}: {fut!r}')
fut._result = None
fut._state = "FINISHED"
fut._Future__schedule_callbacks()


cdef class _AbstractPrioritySemaphoreContextManager(Semaphore):
"""
Expand Down
16 changes: 14 additions & 2 deletions a_sync/primitives/locks/semaphore.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ cdef class Semaphore(_DebugDaemonMixin):
for fut in self._Semaphore__waiters:
if _is_not_done(fut):
self._Semaphore__value -= 1
fut.set_result(True)
_set_result_true(fut)
return

cdef void _c_wake_up_next(self):
Expand All @@ -287,7 +287,7 @@ cdef class Semaphore(_DebugDaemonMixin):
for fut in self._Semaphore__waiters:
if _is_not_done(fut):
self._Semaphore__value -= 1
fut.set_result(True)
_set_result_true(fut)
return

async def _debug_daemon(self) -> None:
Expand Down Expand Up @@ -319,6 +319,18 @@ cdef inline bint _is_not_done(fut: asyncio.Future):
cdef inline bint _is_not_cancelled(fut: asyncio.Future):
return <str>fut._state != "CANCELLED"

cdef void _set_result_true(fut: asyncio.Future):
"""Mark the future done and set its result.
If the future is already done when this method is called, raises
InvalidStateError.
"""
if <str>fut._state != "PENDING":
raise asyncio.exceptions.InvalidStateError(f'{fut._state}: {fut!r}')
fut._result = True
fut._state = "FINISHED"
fut._Future__schedule_callbacks()


cdef class DummySemaphore(Semaphore):
"""
Expand Down

0 comments on commit 49d4005

Please sign in to comment.