Skip to content

Commit

Permalink
slightly improve binding logic
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelcolvin committed Feb 11, 2017
1 parent 96881c3 commit 810168c
Showing 1 changed file with 14 additions and 15 deletions.
29 changes: 14 additions & 15 deletions arq/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ async def shutdown(self):
def _bind_concurrent(self):
for attr_name in dir(self.__class__):
v = getattr(self.__class__, attr_name)
if isinstance(v, Concurrent):
setattr(self, attr_name, v.bind(self))
isinstance(v, Concurrent) and v.bind(self)

async def enqueue_job(self, func_name: str, *args, queue: str=None, **kwargs):
"""
Expand Down Expand Up @@ -128,30 +127,30 @@ class Concurrent:
"""
__slots__ = ['_func', '_dft_queue', '_self_obj']

def __init__(self, func, dft_queue=None, self_obj=None):
if self_obj is None:
def __init__(self, *, func, dft_queue=None, self_obj=None):
self._self_obj = self_obj
# if we're already bound we assume func is of the correct type and skip repeat logging
if not self.bound:
if not inspect.iscoroutinefunction(func):
raise TypeError('{} is not a coroutine function'.format(func.__qualname__))

main_logger.debug('registering concurrent function %s', func.__qualname__)
self._func = func
self._dft_queue = dft_queue
self._self_obj = self_obj

def bind(self, obj: object) -> object:
def bind(self, obj: object):
"""
Equivalent of binding a normal function to an object.
A new instance of Concurrent needs to exist for each function it's bound to.
Equivalent of binding a normal function to an object. A new instance of Concurrent is created and then
the reference on the parent object updated to that.
:param obj: object to bind the function to eg. "self" in the eyes of func.
:return: instance of Concurrent, self if it's not yet bound, otherwise a new instance
"""
if self._self_obj is None:
self._self_obj = obj
return self
else:
return Concurrent(func=self._func, dft_queue=self._dft_queue, self_obj=obj)
new_inst = Concurrent(func=self._func, dft_queue=self._dft_queue, self_obj=obj)
setattr(obj, self._func.__name__, new_inst)

@property
def bound(self):
return self._self_obj is not None

async def __call__(self, *args, **kwargs):
return await self.defer(*args, **kwargs)
Expand Down

0 comments on commit 810168c

Please sign in to comment.