From b5145e924428bb1d9a55454ae4519ee1c14144fb Mon Sep 17 00:00:00 2001 From: Sandro Loch Date: Fri, 28 Jun 2024 10:35:02 -0300 Subject: [PATCH] Implement collectors for executing in parallel tasks --- .../redis_queue_gathering_serial_task/app.py | 100 ++++++++++++++++++ .../tasks/__init__.py | 13 ++- .../tasks/app.py | 2 +- .../tasks/libs/collectors/__init__.py | 0 .../tasks/libs/collectors/arxiv.py | 10 ++ .../tasks/serial.py | 77 +------------- 6 files changed, 123 insertions(+), 79 deletions(-) create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/__init__.py create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py diff --git a/examples/redis_queue_gathering_serial_task/app.py b/examples/redis_queue_gathering_serial_task/app.py index 076631a..0698e47 100644 --- a/examples/redis_queue_gathering_serial_task/app.py +++ b/examples/redis_queue_gathering_serial_task/app.py @@ -134,6 +134,106 @@ def check_task_completion( return None, "timeout" +@app.route("/collectors////") +def collectors_name( + search: str, + begin: datetime.date, + end: datetime.date, + collectors: list[str], +) -> dict[str, Any]: + """Handle collector requests by processing a series of tasks sequentially.""" + COLLECTORS = { + "ARXIV": "arXiv", + # "BIORXIV": "bioRxiv", + # "MEDRXIV": "medRxiv", + # "PMC": "PubMed Central", + # "PUBMED": "PubMed", + # "EMBASE_RIS": "Embase RIS", + # "WOS_RIS": "WoS RIS", + } + + collectors_names = { + COLLECTORS["ARXIV"]: "ArXiv", + # COLLECTORS["BIORXIV"]: "BiorXiv", + # COLLECTORS["MEDRXIV"]: "MedrXiv", + # COLLECTORS["PMC"]: "PubMedCentral", + # COLLECTORS["PUBMED"]: "PubMed", + } + + article = 0 + + for collector_name, collector_title in collectors_names.items(): + logging.info( + f"Starting {collector_title}GetMaxArticle task with search: {search}" + ) + task_name = f"{collector_title}GetMaxArticle" + + try: + task = task_manager.get_task(task_name) + logging.info(f"Starting {task} task") + task_id = task.request(search=search, begin=begin, end=end) # type: ignore[attr-defined] + logging.info( + f"{task_name} task {task_id} started with search: {search}" + ) + article += task.result.get(task_id, timeout=5)[0] # type: ignore[attr-defined] + logging.info( + f"{task_name} task {task_id} completed with result: {article}, status: completed" + ) + except Exception as e: + log.warning( + f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}: {task_name}" + ) + + return {"article": article} + + +""" + +COLLECTORS = { + "ARXIV": "arXiv", + "BIORXIV": "bioRxiv", + "MEDRXIV": "medRxiv", + "PMC": "PubMed Central", + "PUBMED": "PubMed", + "EMBASE_RIS": "Embase RIS", + "WOS_RIS": "WoS RIS", +} + +def max_article( + search: str, + begin: datetime.date, + end: datetime.date, + collectors: list[str], +) -> int: + collectors_names = { + COLLECTORS["ARXIV"]: "ArXiv", + COLLECTORS["BIORXIV"]: "BiorXiv", + COLLECTORS["MEDRXIV"]: "MedrXiv", + COLLECTORS["PMC"]: "PubMedCentral", + COLLECTORS["PUBMED"]: "PubMed", + } + + article = 0 + + for collector_name, collector_title in collectors_names.items(): + task_name = f"{collector_title}GetMaxArticle" + + if collector_name not in collectors: + continue + try: + task = tasks_manager.get_task(task_name) + task_id = task.request(search=search, begin=begin, end=end) # type: ignore[attr-defined] + article += task.result.get(task_id, timeout=5)[0] # type: ignore[attr-defined] + except Exception as e: + logging.error(f"* Error while obtaining {collector_name}") + logging.error(e) + + # TODO: count max articles for files collectors + + return article +""" + + if __name__ == "__main__": try: app.run( diff --git a/examples/redis_queue_gathering_serial_task/tasks/__init__.py b/examples/redis_queue_gathering_serial_task/tasks/__init__.py index f591f0d..81fd32d 100644 --- a/examples/redis_queue_gathering_serial_task/tasks/__init__.py +++ b/examples/redis_queue_gathering_serial_task/tasks/__init__.py @@ -2,9 +2,17 @@ from retsu import TaskManager -# from .parallel import MyParallelTask1 -from .serial import ArticleTask, CleaningTask, ClusteringTask, PlottingTask, PreparingTask, PreprocessingTask +from .parallel import ArXivGetMaxArticlesTask +# from .parallel import MyParallelTask1 +from .serial import ( + ArticleTask, + CleaningTask, + ClusteringTask, + PlottingTask, + PreparingTask, + PreprocessingTask, +) class MyTaskManager(TaskManager): @@ -19,4 +27,5 @@ def __init__(self) -> None: "plotting": PlottingTask(), "preparing": PreparingTask(), "preprocessing": PreprocessingTask(), + "ArXivGetMaxArticle": ArXivGetMaxArticlesTask(), } diff --git a/examples/redis_queue_gathering_serial_task/tasks/app.py b/examples/redis_queue_gathering_serial_task/tasks/app.py index 5b4852c..c66f9b5 100644 --- a/examples/redis_queue_gathering_serial_task/tasks/app.py +++ b/examples/redis_queue_gathering_serial_task/tasks/app.py @@ -1,4 +1,4 @@ """Module for the celery app.""" -# from .parallel import * # noqa: F403 +from .parallel import * # noqa: F403 from .serial import * # noqa: F403 diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/__init__.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py new file mode 100644 index 0000000..7adb813 --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py @@ -0,0 +1,10 @@ +from typing import Any +from datetime import datetime + +class ArXivCollector: + + """ArXivCollector.""" + + def get_max_articles(self, search: str, begin: datetime.date, end: datetime.date) -> int: + """Get max number of articles.""" + return 400 \ No newline at end of file diff --git a/examples/redis_queue_gathering_serial_task/tasks/serial.py b/examples/redis_queue_gathering_serial_task/tasks/serial.py index d14961e..a7d1206 100644 --- a/examples/redis_queue_gathering_serial_task/tasks/serial.py +++ b/examples/redis_queue_gathering_serial_task/tasks/serial.py @@ -1,87 +1,13 @@ -"""My retsu tasks.""" - -from __future__ import annotations - -from time import sleep - -import celery - -from retsu.celery import SerialCeleryTask - -from .config import app, redis_client - - -@app.task -def task_serial_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore - """Define the task_serial_a_plus_b.""" - sleep(a + b) - print("running task_serial_a_plus_b") - result = a + b - redis_client.set(f"serial-result-a-plus-b-{task_id}", result) - return result - - -@app.task -def task_serial_result_plus_10(task_id: str) -> int: # type: ignore - """Define the task_serial_result_plus_10.""" - print("running task_serial_result_plus_10") - previous_result = None - while previous_result is None: - previous_result = redis_client.get(f"serial-result-a-plus-b-{task_id}") - sleep(1) - - previous_result_int = int(previous_result) - result = previous_result_int + 10 - redis_client.set(f"serial-result-plus-10-{task_id}", result) - return result - - -@app.task -def task_serial_result_square(results, task_id: str) -> int: # type: ignore - """Define the task_serial_result_square.""" - print("running task_serial_result_square") - previous_result = None - while previous_result is None: - previous_result = redis_client.get(f"serial-result-plus-10-{task_id}") - sleep(1) - - previous_result_int = int(previous_result) - result = previous_result_int**2 - return result - - -class MySerialTask1(SerialCeleryTask): - """MySerialTask1.""" - - def request(self, a: int, b: int) -> str: - """Receive the request for processing.""" - return super().request(a=a, b=b) - - def get_chord_tasks( - self, a: int, b: int, task_id: str - ) -> tuple[list[celery.Signature], celery.Signature]: - """Define the list of tasks for celery chord.""" - return ( - [ - task_serial_a_plus_b.s(a, b, task_id), - task_serial_result_plus_10.s(task_id), - ], - task_serial_result_square.s(task_id), - ) """Example of usage retsu with a celery app.""" import logging -import time from typing import Any import celery - -# from .config import app, redis_client from celery_app import app # type: ignore -from retsu import TaskManager -from retsu.celery import ParallelCeleryTask, SerialCeleryTask +from retsu.celery import SerialCeleryTask from .libs.back_cleaning import clean_intermediate_files from .libs.back_clustering import cluster_preprocessed_corpuses @@ -197,4 +123,3 @@ def get_group_tasks( return [ task_clean_intermediate_files.s(research, task_id), ] -