Skip to content

Commit

Permalink
feat: cythonic primitives (#412)
Browse files Browse the repository at this point in the history
  • Loading branch information
BobTheBuidler authored Nov 21, 2024
1 parent 07ac20c commit cb58991
Show file tree
Hide file tree
Showing 20 changed files with 570 additions and 203 deletions.
1 change: 1 addition & 0 deletions a_sync/__init__.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from a_sync.primitives import *
16 changes: 15 additions & 1 deletion a_sync/asyncio/gather.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ This module provides an enhanced version of :func:`asyncio.gather`.
from typing import Any, Awaitable, Dict, List, Mapping, Union, overload

from a_sync._typing import *
from a_sync.asyncio.as_completed import _exc_wrap
from a_sync.asyncio.as_completed cimport as_completed_mapping

try:
Expand Down Expand Up @@ -202,3 +201,18 @@ async def gather_mapping(

cdef bint _is_mapping(object awaitables):
return len(awaitables) == 1 and isinstance(awaitables[0], Mapping)


async def _exc_wrap(awaitable: Awaitable[T]) -> Union[T, Exception]:
"""Wraps an awaitable to catch exceptions and return them instead of raising.
Args:
awaitable: The awaitable to wrap.
Returns:
The result of the awaitable or the exception if one is raised.
"""
try:
return await awaitable
except Exception as e:
return e
2 changes: 1 addition & 1 deletion a_sync/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ def submit(self, fn: Callable[P, T], *args: P.args, **kwargs: P.kwargs) -> "asyn
- :meth:`run` for running functions with the executor.
"""
if self.sync_mode:
fut = asyncio.get_event_loop().create_future()
fut = self._get_loop().create_future()
try:
fut.set_result(fn(*args, **kwargs))
except Exception as e:
Expand Down
3 changes: 2 additions & 1 deletion a_sync/iter.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -726,7 +726,8 @@ class ASyncSorter(_ASyncView[T]):
items.append(obj)
sort_tasks.append(a_sync.asyncio.create_task(self._function(obj)))
for sort_value, obj in sorted(
zip(await asyncio.gather(*sort_tasks), items), reverse=reverse
zip(await asyncio.gather(*sort_tasks), items),
reverse=reverse,
):
yield obj
else:
Expand Down
1 change: 1 addition & 0 deletions a_sync/primitives/__init__.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
from a_sync.primitives.locks cimport *
9 changes: 9 additions & 0 deletions a_sync/primitives/_debug.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from a_sync.primitives._loggable cimport _LoggerMixin

cdef class _LoopBoundMixin(_LoggerMixin):
cdef object __loop
cpdef object _get_loop(self)
cdef object _c_get_loop(self)

cdef class _DebugDaemonMixin(_LoopBoundMixin):
cdef object _daemon
2 changes: 1 addition & 1 deletion a_sync/primitives/_debug.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ The mixin provides a framework for managing a debug daemon task, which can be us
import abc
from a_sync.primitives._loggable import _LoggerMixin

class _DebugDaemonMixin(_LoggerMixin, metaclass=abc.ABCMeta):
class _DebugDaemonMixin(_LoggerMixin):
"""
A mixin class that provides a framework for debugging capabilities using a daemon task.
Expand Down
79 changes: 67 additions & 12 deletions a_sync/primitives/_debug.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,71 @@ This module provides a mixin class used to facilitate the creation of debugging
The mixin provides a framework for managing a debug daemon task, which can be used to emit rich debug logs from subclass instances whenever debug logging is enabled. Subclasses must implement the specific logging behavior.
"""

import abc
import asyncio
from asyncio.events import _running_loop
from threading import Lock
from typing import Optional

from a_sync.a_sync._helpers cimport get_event_loop
from a_sync.asyncio.create_task cimport ccreate_task_simple
from a_sync.primitives._loggable import _LoggerMixin


class _DebugDaemonMixin(_LoggerMixin, metaclass=abc.ABCMeta):
cdef extern from "unistd.h":
int getpid()


_global_lock = Lock()


cdef object _get_running_loop():
"""Return the running event loop or None.
This is a low-level function intended to be used by event loops.
This function is thread-specific.
"""
cdef object running_loop, pid
running_loop, pid = _running_loop.loop_pid
if running_loop is not None and <int>pid == getpid():
return running_loop


cdef class _LoopBoundMixin(_LoggerMixin):
def __cinit__(self):
self.__loop = None
def __init__(self, *, loop=None):
if loop is not None:
raise TypeError(
'The loop parameter is not supported. '
'As of 3.10, the *loop* parameter was removed'
'{}() since it is no longer necessary.'.format(type(self).__name__)
)
@property
def _loop(self) -> asyncio.AbstractEventLoop:
return self.__loop
@_loop.setter
def _loop(self, loop: asyncio.AbstractEventLoop):
self.__loop = loop
cpdef object _get_loop(self):
return self._c_get_loop()
cdef object _c_get_loop(self):
cdef object loop = _get_running_loop()
if self.__loop is None:
with _global_lock:
if self.__loop is None:
self.__loop = loop
if loop is None:
return get_event_loop()
elif loop is not self.__loop:
raise RuntimeError(
f'{self!r} is bound to a different event loop',
"running loop: ".format(loop),
"bound to: ".format(self.__loop),
)
return loop


cdef class _DebugDaemonMixin(_LoopBoundMixin):
"""
A mixin class that provides a framework for debugging capabilities using a daemon task.
Expand All @@ -23,9 +78,6 @@ class _DebugDaemonMixin(_LoggerMixin, metaclass=abc.ABCMeta):
:class:`_LoggerMixin` for logging capabilities.
"""

__slots__ = ("_daemon",)

@abc.abstractmethod
async def _debug_daemon(self, fut: asyncio.Future, fn, *args, **kwargs) -> None:
"""
Abstract method to define the debug daemon's behavior.
Expand All @@ -49,6 +101,7 @@ class _DebugDaemonMixin(_LoggerMixin, metaclass=abc.ABCMeta):
self.logger.debug("Debugging...")
await asyncio.sleep(1)
"""
raise NotImplementedError

def _start_debug_daemon(self, *args, **kwargs) -> "asyncio.Future[None]":
"""
Expand All @@ -74,8 +127,8 @@ class _DebugDaemonMixin(_LoggerMixin, metaclass=abc.ABCMeta):
See Also:
:meth:`_ensure_debug_daemon` for ensuring the daemon is running.
"""
cdef object loop = get_event_loop()
if self.debug_logs_enabled and loop.is_running():
cdef object loop = self._c_get_loop()
if self.check_debug_logs_enabled() and loop.is_running():
return ccreate_task_simple(self._debug_daemon(*args, **kwargs))
return loop.create_future()

Expand Down Expand Up @@ -103,11 +156,13 @@ class _DebugDaemonMixin(_LoggerMixin, metaclass=abc.ABCMeta):
See Also:
:meth:`_start_debug_daemon` for starting the daemon.
"""
if not self.debug_logs_enabled:
self._daemon = get_event_loop().create_future()
if not hasattr(self, "_daemon") or self._daemon is None:
self._daemon = self._start_debug_daemon(*args, **kwargs)
self._daemon.add_done_callback(self._stop_debug_daemon)
cdef object daemon = self._daemon
if daemon is None:
if self.check_debug_logs_enabled():
self._daemon = self._start_debug_daemon(*args, **kwargs)
self._daemon.add_done_callback(self._stop_debug_daemon)
else:
self._daemon = get_event_loop().create_future()
return self._daemon

def _stop_debug_daemon(self, t: Optional[asyncio.Task] = None) -> None:
Expand Down
4 changes: 4 additions & 0 deletions a_sync/primitives/_loggable.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
cdef class _LoggerMixin:
cdef object _logger
cdef object get_logger(self)
cdef bint check_debug_logs_enabled(self)
27 changes: 19 additions & 8 deletions a_sync/primitives/_loggable.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,10 @@
This module provides a mixin class to add debug logging capabilities to other classes.
"""

from functools import cached_property
from logging import Logger, getLogger, DEBUG


class _LoggerMixin:
cdef class _LoggerMixin:
"""
A mixin class that adds logging capabilities to other classes.
Expand All @@ -17,7 +16,7 @@ class _LoggerMixin:
- :class:`logging.Logger`
"""

@cached_property
@property
def logger(self) -> Logger:
"""
Provides a logger instance specific to the class using this mixin.
Expand Down Expand Up @@ -48,10 +47,19 @@ class _LoggerMixin:
- :func:`logging.getLogger`
- :class:`logging.Logger`
"""
logger_id = f"{type(self).__module__}.{type(self).__qualname__}"
if hasattr(self, "_name") and self._name:
logger_id += f".{self._name}"
return getLogger(logger_id)
return self.get_logger()
cdef object get_logger(self):
cdef str logger_id
cdef object logger, cls
logger = self._logger
if not logger:
cls = type(self)
logger_id = "{}.{}".format(cls.__module__, cls.__qualname__)
if hasattr(self, "_name") and self._name:
logger_id += ".{}".format(self._name)
logger = getLogger(logger_id)
return logger
@property
def debug_logs_enabled(self) -> bool:
Expand All @@ -69,4 +77,7 @@ class _LoggerMixin:
See Also:
- :attr:`logging.Logger.isEnabledFor`
"""
return self.logger.isEnabledFor(DEBUG)
return self.get_logger().isEnabledFor(DEBUG)
cdef bint check_debug_logs_enabled(self):
return self.get_logger().isEnabledFor(DEBUG)
2 changes: 2 additions & 0 deletions a_sync/primitives/locks/__init__.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from a_sync.primitives.locks.counter import CounterLock
from a_sync.primitives.locks.event cimport CythonEvent as Event
2 changes: 1 addition & 1 deletion a_sync/primitives/locks/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from a_sync.primitives.locks.counter import CounterLock
from a_sync.primitives.locks.event import Event
from a_sync.primitives.locks.event import CythonEvent as Event
from a_sync.primitives.locks.semaphore import (
DummySemaphore,
Semaphore,
Expand Down
11 changes: 11 additions & 0 deletions a_sync/primitives/locks/counter.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@

from a_sync.primitives._debug cimport _DebugDaemonMixin
from a_sync.primitives.locks.event cimport CythonEvent as Event

cdef class CounterLock(_DebugDaemonMixin):
cdef char* __name
cdef int _value
cdef dict[int, Event] _events
cdef object is_ready
cpdef void set(self, int value)
cdef void c_set(self, int value)
Loading

0 comments on commit cb58991

Please sign in to comment.