From c65a897733e77da44b9032393e8ead3d0a8bf264 Mon Sep 17 00:00:00 2001 From: Thomas Steinacher Date: Thu, 7 Dec 2023 12:12:45 +0000 Subject: [PATCH] Purge errored tasks even if task object is not found --- tasktiger/task.py | 43 +++++++++++++++++++++++++++++++----------- tasktiger/tasktiger.py | 7 ++++++- tests/test_base.py | 11 +++++++++++ 3 files changed, 49 insertions(+), 12 deletions(-) diff --git a/tasktiger/task.py b/tasktiger/task.py index 4f7b83c2..b2aa9a3a 100644 --- a/tasktiger/task.py +++ b/tasktiger/task.py @@ -492,14 +492,24 @@ def tasks_from_queue( skip: int = 0, limit: int = 1000, load_executions: int = 0, + include_not_found: bool = False, ) -> Tuple[int, List["Task"]]: """ - Returns a tuple with the following information: - * total items in the queue - * tasks from the given queue in the given state, latest first. - - An integer may be passed in the load_executions parameter to indicate - how many executions should be loaded (starting from the latest). + Return tasks from a queue. + + Args: + tiger: TaskTiger instance. + queue: Name of the queue. + state: State of the task (QUEUED, ACTIVE, SCHEDULED, ERROR). + limit: Maximum number of tasks to return. + load_executions: Maximum number of executions to load for each task + (starting from the latest). + include_not_found: Whether to include tasks that cannot be loaded. + + Returns: + Tuple with the following information: + * total items in the queue + * tasks from the given queue in the given state, latest first. """ key = tiger._key(state, queue) @@ -509,6 +519,7 @@ def tasks_from_queue( n, items = pipeline.execute() tasks = [] + data: List[Optional[Dict[str, Any]]] if items: tss = [ @@ -525,10 +536,14 @@ def tasks_from_queue( ) results = pipeline.execute() - for serialized_data, serialized_executions, ts in zip( - results[0], results[1:], tss + for idx, serialized_data, serialized_executions, ts in zip( + range(len(items)), results[0], results[1:], tss ): - data = json.loads(serialized_data) + if serialized_data is None and include_not_found: + data = {"id": items[idx][0]} + else: + data = json.loads(serialized_data) + executions = [ json.loads(e) for e in serialized_executions if e ] @@ -547,8 +562,14 @@ def tasks_from_queue( data = tiger.connection.mget( [tiger._key("task", item[0]) for item in items] ) - for serialized_data, ts in zip(data, tss): - data = json.loads(serialized_data) + for idx, serialized_data, ts in zip( + range(len(items)), data, tss + ): + if serialized_data is None and include_not_found: + data = {"id": items[idx][0]} + else: + data = json.loads(serialized_data) + task = Task( tiger, queue=queue, _data=data, _state=state, _ts=ts ) diff --git a/tasktiger/tasktiger.py b/tasktiger/tasktiger.py index 943ccbb5..54f6c251 100644 --- a/tasktiger/tasktiger.py +++ b/tasktiger/tasktiger.py @@ -611,7 +611,12 @@ def errored_tasks() -> Iterable[Task]: task_limit = 5000 while total_tasks is None or skip < total_tasks: total_tasks, tasks = Task.tasks_from_queue( - self, queue, ERROR, skip=skip, limit=task_limit + self, + queue, + ERROR, + skip=skip, + limit=task_limit, + include_not_found=True, ) for task in tasks: if ( diff --git a/tests/test_base.py b/tests/test_base.py index d6d04b41..c8774999 100644 --- a/tests/test_base.py +++ b/tests/test_base.py @@ -965,6 +965,17 @@ def test_purge_errored_tasks_only_errored_unique_task(self): assert 1 == self.tiger.purge_errored_tasks() self._ensure_queues(queued={"default": 1}, error={"default": 0}) + def test_purge_errored_tasks_if_task_not_found(self): + task = self.tiger.delay(exception_task) + + Worker(self.tiger).run(once=True) + self._ensure_queues(error={"default": 1}) + + self.tiger.connection.delete(self.tiger._key("task", task.id)) + + self.tiger.purge_errored_tasks() + self._ensure_queues(error={"default": 0}) + class TestTasks(BaseTestCase): """