diff --git a/arq/main.py b/arq/main.py index 66fc3873..38102dce 100644 --- a/arq/main.py +++ b/arq/main.py @@ -81,8 +81,7 @@ async def shutdown(self): def _bind_concurrent(self): for attr_name in dir(self.__class__): v = getattr(self.__class__, attr_name) - if isinstance(v, Concurrent): - setattr(self, attr_name, v.bind(self)) + isinstance(v, Concurrent) and v.bind(self) async def enqueue_job(self, func_name: str, *args, queue: str=None, **kwargs): """ @@ -128,30 +127,30 @@ class Concurrent: """ __slots__ = ['_func', '_dft_queue', '_self_obj'] - def __init__(self, func, dft_queue=None, self_obj=None): - if self_obj is None: + def __init__(self, *, func, dft_queue=None, self_obj=None): + self._self_obj = self_obj + # if we're already bound we assume func is of the correct type and skip repeat logging + if not self.bound: if not inspect.iscoroutinefunction(func): raise TypeError('{} is not a coroutine function'.format(func.__qualname__)) main_logger.debug('registering concurrent function %s', func.__qualname__) self._func = func self._dft_queue = dft_queue - self._self_obj = self_obj - def bind(self, obj: object) -> object: + def bind(self, obj: object): """ - Equivalent of binding a normal function to an object. - - A new instance of Concurrent needs to exist for each function it's bound to. + Equivalent of binding a normal function to an object. A new instance of Concurrent is created and then + the reference on the parent object updated to that. :param obj: object to bind the function to eg. "self" in the eyes of func. - :return: instance of Concurrent, self if it's not yet bound, otherwise a new instance """ - if self._self_obj is None: - self._self_obj = obj - return self - else: - return Concurrent(func=self._func, dft_queue=self._dft_queue, self_obj=obj) + new_inst = Concurrent(func=self._func, dft_queue=self._dft_queue, self_obj=obj) + setattr(obj, self._func.__name__, new_inst) + + @property + def bound(self): + return self._self_obj is not None async def __call__(self, *args, **kwargs): return await self.defer(*args, **kwargs)