Skip to content

Commit

Permalink
chore: run black on all PRs (#331)
Browse files Browse the repository at this point in the history
* chore: run black on all PRs

* chore: `black .`

* Update prio_semaphore.py

* Create pyproject.yaml

* Update __init__.py

* Update future.py

* Update property.py

* chore: `black .`

* Update task.py

* Update task.py

* Update prio_semaphore.py

* chore: `black .`

* Update property.py

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
  • Loading branch information
BobTheBuidler and github-actions[bot] authored Nov 10, 2024
1 parent 9ad8caf commit 687b369
Show file tree
Hide file tree
Showing 70 changed files with 3,505 additions and 1,431 deletions.
45 changes: 45 additions & 0 deletions .github/workflows/black.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
name: Black Formatter

on:
pull_request:
branches:
- master

jobs:
format:
runs-on: ubuntu-latest

steps:
- name: Checkout code
uses: actions/checkout@v3
with:
ref: ${{ github.head_ref }} # Check out the PR branch

- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.12'

- name: Install Black
run: pip install black

- name: Run Black
run: black .

- name: Check for changes
id: changes
run: |
if [[ -n $(git status --porcelain) ]]; then
echo "changes_detected=true" >> $GITHUB_ENV
else
echo "changes_detected=false" >> $GITHUB_ENV
fi
- name: Commit changes
if: env.changes_detected == 'true'
run: |
git config --local user.name "github-actions[bot]"
git config --local user.email "github-actions[bot]@users.noreply.github.com"
git add .
git commit -m "chore: \`black .\`"
git push
7 changes: 4 additions & 3 deletions a_sync/ENVIRONMENT_VARIABLES.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@

from typed_envs import EnvVarFactory

envs = EnvVarFactory("EZASYNC")

# We have some envs here to help you debug your custom class implementations

# If you're only interested in debugging a specific class, set this to the class name
DEBUG_CLASS_NAME = envs.create_env("DEBUG_CLASS_NAME", str, default='', verbose=False)
DEBUG_CLASS_NAME = envs.create_env("DEBUG_CLASS_NAME", str, default="", verbose=False)

# Set this to enable debug mode on all classes
DEBUG_MODE = envs.create_env("DEBUG_MODE", bool, default=DEBUG_CLASS_NAME, verbose=False)
DEBUG_MODE = envs.create_env(
"DEBUG_MODE", bool, default=DEBUG_CLASS_NAME, verbose=False
)
21 changes: 8 additions & 13 deletions a_sync/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@

from a_sync import aliases, exceptions, iter, task
from a_sync.a_sync import ASyncGenericBase, ASyncGenericSingleton, a_sync
from a_sync.a_sync.modifiers.semaphores import apply_semaphore
from a_sync.a_sync.property import ASyncCachedPropertyDescriptor, ASyncPropertyDescriptor, cached_property, property
from a_sync.a_sync.property import (
ASyncCachedPropertyDescriptor,
ASyncPropertyDescriptor,
cached_property,
property,
)
from a_sync.asyncio import as_completed, create_task, gather
from a_sync.executor import *
from a_sync.executor import AsyncThreadPoolExecutor as ThreadPoolExecutor
Expand All @@ -29,16 +33,13 @@
"exceptions",
"iter",
"task",

# builtins
"sorted",
"filter",

# asyncio
"create_task",
"gather",
"gather",
"as_completed",

# functions
"a_sync",
"all",
Expand All @@ -47,33 +48,27 @@
"exhaust_iterator",
"exhaust_iterators",
"map",

# classes
"ASyncIterable",
"ASyncIterator",
"ASyncGenericSingleton",
"TaskMapping",

"TaskMapping",
# property
"cached_property",
"property",
"ASyncPropertyDescriptor",
"ASyncCachedPropertyDescriptor",

# semaphores
"Semaphore",
"PrioritySemaphore",
"ThreadsafeSemaphore",

# queues
"Queue",
"ProcessingQueue",
"SmartProcessingQueue",

# locks
"CounterLock",
"Event",

# executors
"AsyncThreadPoolExecutor",
"PruningThreadPoolExecutor",
Expand Down
79 changes: 54 additions & 25 deletions a_sync/_smart.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@

import asyncio
import logging
import warnings
Expand All @@ -17,10 +16,12 @@

logger = logging.getLogger(__name__)


class _SmartFutureMixin(Generic[T]):
_queue: Optional["SmartProcessingQueue[Any, Any, T]"] = None
_key: _Key
_waiters: "weakref.WeakSet[SmartTask[T]]"

def __await__(self: Union["SmartFuture", "SmartTask"]) -> Generator[Any, None, T]:
if self.done():
return self.result() # May raise too.
Expand All @@ -32,17 +33,22 @@ def __await__(self: Union["SmartFuture", "SmartTask"]) -> Generator[Any, None, T
if not self.done():
raise RuntimeError("await wasn't used with future")
return self.result() # May raise too.

@property
def num_waiters(self: Union["SmartFuture", "SmartTask"]) -> int:
# NOTE: we check .done() because the callback may not have ran yet and its very lightweight
if self.done():
# if there are any waiters left, there won't be once the event loop runs once
return 0
return sum(getattr(waiter, 'num_waiters', 1) or 1 for waiter in self._waiters)
def _waiter_done_cleanup_callback(self: Union["SmartFuture", "SmartTask"], waiter: "SmartTask") -> None:
return sum(getattr(waiter, "num_waiters", 1) or 1 for waiter in self._waiters)

def _waiter_done_cleanup_callback(
self: Union["SmartFuture", "SmartTask"], waiter: "SmartTask"
) -> None:
"Removes the waiter from _waiters, and _queue._futs if applicable"
if not self.done():
self._waiters.remove(waiter)

def _self_done_cleanup_callback(self: Union["SmartFuture", "SmartTask"]) -> None:
self._waiters.clear()
if queue := self._queue:
Expand All @@ -52,11 +58,12 @@ def _self_done_cleanup_callback(self: Union["SmartFuture", "SmartTask"]) -> None
class SmartFuture(_SmartFutureMixin[T], asyncio.Future):
_queue = None
_key = None

def __init__(
self,
*,
queue: Optional["SmartProcessingQueue[Any, Any, T]"],
key: Optional[_Key] = None,
self,
*,
queue: Optional["SmartProcessingQueue[Any, Any, T]"],
key: Optional[_Key] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> None:
super().__init__(loop=loop)
Expand All @@ -66,63 +73,74 @@ def __init__(
self._key = key
self._waiters = weakref.WeakSet()
self.add_done_callback(SmartFuture._self_done_cleanup_callback)

def __repr__(self):
return f"<{type(self).__name__} key={self._key} waiters={self.num_waiters} {self._state}>"

def __lt__(self, other: "SmartFuture[T]") -> bool:
"""heap considers lower values as higher priority so a future with more waiters will be 'less than' a future with less waiters."""
#other = other_ref()
#if other is None:
# other = other_ref()
# if other is None:
# # garbage collected refs should always process first so they can be popped from the queue
# return False
return self.num_waiters > other.num_waiters


def create_future(
*,
queue: Optional["SmartProcessingQueue"] = None,
key: Optional[_Key] = None,
queue: Optional["SmartProcessingQueue"] = None,
key: Optional[_Key] = None,
loop: Optional[asyncio.AbstractEventLoop] = None,
) -> SmartFuture[V]:
return SmartFuture(queue=queue, key=key, loop=loop or asyncio.get_event_loop())


class SmartTask(_SmartFutureMixin[T], asyncio.Task):
def __init__(
self,
coro: Awaitable[T],
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
self,
coro: Awaitable[T],
*,
loop: Optional[asyncio.AbstractEventLoop] = None,
name: Optional[str] = None,
) -> None:
super().__init__(coro, loop=loop, name=name)
self._waiters: Set["asyncio.Task[T]"] = set()
self.add_done_callback(SmartTask._self_done_cleanup_callback)

def smart_task_factory(loop: asyncio.AbstractEventLoop, coro: Awaitable[T]) -> SmartTask[T]:

def smart_task_factory(
loop: asyncio.AbstractEventLoop, coro: Awaitable[T]
) -> SmartTask[T]:
"""
Task factory function that an event loop calls to create new tasks.
This factory function utilizes ez-a-sync's custom :class:`~SmartTask` implementation.
Args:
loop: The event loop.
coro: The coroutine to run in the task.
Returns:
A SmartTask instance running the provided coroutine.
"""
return SmartTask(coro, loop=loop)


def set_smart_task_factory(loop: asyncio.AbstractEventLoop = None) -> None:
"""
Set the event loop's task factory to :func:`~smart_task_factory` so all tasks will be SmartTask instances.
Args:
loop: Optional; the event loop. If None, the current event loop is used.
"""
if loop is None:
loop = a_sync.asyncio.get_event_loop()
loop.set_task_factory(smart_task_factory)

def shield(arg: Awaitable[T], *, loop: Optional[asyncio.AbstractEventLoop] = None) -> SmartFuture[T]:

def shield(
arg: Awaitable[T], *, loop: Optional[asyncio.AbstractEventLoop] = None
) -> SmartFuture[T]:
"""
Wait for a future, shielding it from cancellation.
Expand Down Expand Up @@ -150,9 +168,12 @@ def shield(arg: Awaitable[T], *, loop: Optional[asyncio.AbstractEventLoop] = Non
res = None
"""
if loop is not None:
warnings.warn("The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning, stacklevel=2)
warnings.warn(
"The loop argument is deprecated since Python 3.8, "
"and scheduled for removal in Python 3.10.",
DeprecationWarning,
stacklevel=2,
)
inner = asyncio.ensure_future(arg, loop=loop)
if inner.done():
# Shortcut.
Expand All @@ -162,6 +183,7 @@ def shield(arg: Awaitable[T], *, loop: Optional[asyncio.AbstractEventLoop] = Non
# special handling to connect SmartFutures to SmartTasks if enabled
if (waiters := getattr(inner, "_waiters", None)) is not None:
waiters.add(outer)

def _inner_done_callback(inner):
if outer.cancelled():
if not inner.cancelled():
Expand All @@ -187,4 +209,11 @@ def _outer_done_callback(outer):
return outer


__all__ = ["create_future", "shield", "SmartFuture", "SmartTask", "smart_task_factory", "set_smart_task_factory"]
__all__ = [
"create_future",
"shield",
"SmartFuture",
"SmartTask",
"smart_task_factory",
"set_smart_task_factory",
]
Loading

0 comments on commit 687b369

Please sign in to comment.