diff --git a/tasktiger/worker.py b/tasktiger/worker.py index de83feb2..6170fb6c 100644 --- a/tasktiger/worker.py +++ b/tasktiger/worker.py @@ -160,6 +160,17 @@ def _filter_queues(self, queues): ) ] + def _worker_perform_secondary_tasks(self): + # We should queue scheduled tasks every QUEUE_SCHEDULED_TASKS_TIME time + # and expired tasks every REQUEUE_EXPIRED_TASKS_INTERVAL time. Use only + # one Redis query to enter this block since every single worker calls + # this every second. + # XXX: Ideally, we should keep track of workers and take turns. + key = self._key("lock", "secondary_tasks") + if self.connection.set(key, "1", ex=1, nx=True): + self._worker_queue_scheduled_tasks() + self._worker_queue_expired_tasks() + def _worker_queue_scheduled_tasks(self): """ Helper method that takes due tasks from the SCHEDULED queue and puts @@ -1046,8 +1057,7 @@ def _worker_run(self): time.time() - self._last_task_check > self.config['SELECT_TIMEOUT'] and not self._stop_requested ): - self._worker_queue_scheduled_tasks() - self._worker_queue_expired_tasks() + self._worker_perform_secondary_tasks() self._last_task_check = time.time() def _queue_periodic_tasks(self):