From 80845085aeaa0daa2ab71ab00b2f8c3cb787fd3c Mon Sep 17 00:00:00 2001 From: Sandro Loch Date: Fri, 28 Jun 2024 14:56:15 -0300 Subject: [PATCH] Implement all collectors --- .../redis_queue_gathering_serial_task/app.py | 66 ++++-- .../tasks/__init__.py | 10 +- .../tasks/app.py | 4 +- .../tasks/{serial.py => get_back_process.py} | 0 .../tasks/get_max_article.py | 197 ++++++++++++++++++ .../tasks/libs/collectors/arxiv.py | 9 +- .../tasks/libs/collectors/biorxiv.py | 14 ++ .../tasks/libs/collectors/embase.py | 16 ++ .../tasks/libs/collectors/medrxiv.py | 15 ++ .../tasks/libs/collectors/pmc.py | 15 ++ .../tasks/libs/collectors/pubmed.py | 16 ++ .../tasks/libs/collectors/wos.py | 16 ++ 12 files changed, 347 insertions(+), 31 deletions(-) rename examples/redis_queue_gathering_serial_task/tasks/{serial.py => get_back_process.py} (100%) create mode 100644 examples/redis_queue_gathering_serial_task/tasks/get_max_article.py create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/biorxiv.py create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/embase.py create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/medrxiv.py create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pmc.py create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pubmed.py create mode 100644 examples/redis_queue_gathering_serial_task/tasks/libs/collectors/wos.py diff --git a/examples/redis_queue_gathering_serial_task/app.py b/examples/redis_queue_gathering_serial_task/app.py index 0698e47..4a83fec 100644 --- a/examples/redis_queue_gathering_serial_task/app.py +++ b/examples/redis_queue_gathering_serial_task/app.py @@ -13,20 +13,31 @@ # Setup Redis connection and logging redis_client = redis.Redis(host="localhost", port=6379, db=0) + +if not logging.getLogger().hasHandlers(): + # Configuring the basic logger settings + logging.basicConfig( + level=logging.INFO, + format="[%(asctime)s: %(levelname)s/MainProcess] %(message)s", + datefmt="%Y-%m-%d %H:%M:%S" + ) + log = logging.getLogger(__name__) -# logging.basicConfig(level=logging.ERROR)] -# Configure logging to display errors and custom formatted warnings -logging.basicConfig(level=logging.INFO) +handler = logging.StreamHandler() formatter = logging.Formatter( "[%(asctime)s: %(levelname)s/MainProcess] %(message)s", - datefmt="%Y-%m-%d %H:%M:%S", + datefmt="%Y-%m-%d %H:%M:%S" ) -handler = logging.StreamHandler() handler.setFormatter(formatter) -log.addHandler(handler) +handler.setLevel(logging.INFO) # Set this as needed + +if not log.handlers: + log.addHandler(handler) + log.setLevel(logging.INFO) + task_manager = MyTaskManager() task_manager.start() @@ -144,41 +155,50 @@ def collectors_name( """Handle collector requests by processing a series of tasks sequentially.""" COLLECTORS = { "ARXIV": "arXiv", - # "BIORXIV": "bioRxiv", - # "MEDRXIV": "medRxiv", - # "PMC": "PubMed Central", - # "PUBMED": "PubMed", - # "EMBASE_RIS": "Embase RIS", - # "WOS_RIS": "WoS RIS", + "BIORXIV": "bioRxiv", + "MEDRXIV": "medRxiv", + "PMC": "PubMed Central", + "PUBMED": "PubMed", + "EMBASE_RIS": "Embase RIS", + "WOS_RIS": "WoS RIS", } collectors_names = { COLLECTORS["ARXIV"]: "ArXiv", - # COLLECTORS["BIORXIV"]: "BiorXiv", - # COLLECTORS["MEDRXIV"]: "MedrXiv", - # COLLECTORS["PMC"]: "PubMedCentral", - # COLLECTORS["PUBMED"]: "PubMed", + COLLECTORS["BIORXIV"]: "BiorXiv", + COLLECTORS["MEDRXIV"]: "MedrXiv", + COLLECTORS["PMC"]: "PubMedCentral", + COLLECTORS["PUBMED"]: "PubMed", } article = 0 - + gathering_ids = [] for collector_name, collector_title in collectors_names.items(): - logging.info( - f"Starting {collector_title}GetMaxArticle task with search: {search}" - ) + + if collector_name not in collectors: + continue + task_name = f"{collector_title}GetMaxArticle" try: task = task_manager.get_task(task_name) - logging.info(f"Starting {task} task") + + log.info(f"Starting {task} task") + task_id = task.request(search=search, begin=begin, end=end) # type: ignore[attr-defined] - logging.info( + + gathering_ids.append(f"{task_name}-{task_id}") + + log.info( f"{task_name} task {task_id} started with search: {search}" ) + article += task.result.get(task_id, timeout=5)[0] # type: ignore[attr-defined] - logging.info( + + log.info( f"{task_name} task {task_id} completed with result: {article}, status: completed" ) + except Exception as e: log.warning( f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}: {task_name}" diff --git a/examples/redis_queue_gathering_serial_task/tasks/__init__.py b/examples/redis_queue_gathering_serial_task/tasks/__init__.py index 81fd32d..bf21ddc 100644 --- a/examples/redis_queue_gathering_serial_task/tasks/__init__.py +++ b/examples/redis_queue_gathering_serial_task/tasks/__init__.py @@ -2,10 +2,10 @@ from retsu import TaskManager -from .parallel import ArXivGetMaxArticlesTask +from .get_max_article import ArXivGetMaxArticlesTask, PubMedGetMaxArticlesTask, MedrXivGetMaxArticlesTask, BiorXivGetMaxArticlesTask, EmbaseGetMaxArticlesTask, WebOfScienceGetMaxArticlesTask # from .parallel import MyParallelTask1 -from .serial import ( +from .get_back_process import ( ArticleTask, CleaningTask, ClusteringTask, @@ -27,5 +27,11 @@ def __init__(self) -> None: "plotting": PlottingTask(), "preparing": PreparingTask(), "preprocessing": PreprocessingTask(), + # tasks.collectors.CollectorsGatheringTask(list(task_id)) "ArXivGetMaxArticle": ArXivGetMaxArticlesTask(), + "PubMedGetMaxArticle": PubMedGetMaxArticlesTask(), + "MedrXivGetMaxArticle": MedrXivGetMaxArticlesTask(), + "BiorXivGetMaxArticle": BiorXivGetMaxArticlesTask(), + "EmbaseGetMaxArticle": EmbaseGetMaxArticlesTask(), + "WebOfScienceGetMaxArticle": WebOfScienceGetMaxArticlesTask(), } diff --git a/examples/redis_queue_gathering_serial_task/tasks/app.py b/examples/redis_queue_gathering_serial_task/tasks/app.py index c66f9b5..42309de 100644 --- a/examples/redis_queue_gathering_serial_task/tasks/app.py +++ b/examples/redis_queue_gathering_serial_task/tasks/app.py @@ -1,4 +1,4 @@ """Module for the celery app.""" -from .parallel import * # noqa: F403 -from .serial import * # noqa: F403 +from .get_max_article import * # noqa: F403 +from .get_back_process import * # noqa: F403 diff --git a/examples/redis_queue_gathering_serial_task/tasks/serial.py b/examples/redis_queue_gathering_serial_task/tasks/get_back_process.py similarity index 100% rename from examples/redis_queue_gathering_serial_task/tasks/serial.py rename to examples/redis_queue_gathering_serial_task/tasks/get_back_process.py diff --git a/examples/redis_queue_gathering_serial_task/tasks/get_max_article.py b/examples/redis_queue_gathering_serial_task/tasks/get_max_article.py new file mode 100644 index 0000000..d465dff --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/get_max_article.py @@ -0,0 +1,197 @@ +"""Example of usage retsu with a celery app.""" + +import logging +from datetime import datetime +from typing import Any, Optional + +import celery + +from celery_app import app # type: ignore + +from retsu.celery import SerialCeleryTask, ParallelCeleryTask + +from .libs.collectors.arxiv import ArXivCollector + + +from .libs.collectors.biorxiv import BiorXivCollector +from .libs.collectors.embase import EmbaseRISCollector +from .libs.collectors.medrxiv import MedrXivCollector +from .libs.collectors.pubmed import PubMedCollector +from .libs.collectors.pmc import PMCCollector +from .libs.collectors.wos import WoSRISCollector + + +logger = logging.getLogger(__name__) + + +@app.task # type: ignore +def task_arxiv_get_max_articles( + search: str, begin: datetime.date, end: datetime.date, task_id: str +) -> int: + """Define the task for getting the max number of articles.""" + collector = ArXivCollector() + return collector.get_max_articles(search, begin, end) + +@app.task # type: ignore +def task_biorxiv_get_max_articles( + search: str, begin: datetime.date, end: datetime.date, task_id: str +) -> int: + """Define the task for getting the max number of articles.""" + collector = BiorXivCollector() + return collector.get_max_articles(search, begin, end) + +@app.task # type: ignore +def task_embase_get_max_articles( + search: str, begin: datetime.date, end: datetime.date, task_id: str +) -> int: + """Define the task for getting the max number of articles.""" + collector = EmbaseRISCollector() + return collector.get_max_articles(search, begin, end) + +@app.task # type: ignore +def task_medrxiv_get_max_articles( + search: str, begin: datetime.date, end: datetime.date, task_id: str +) -> int: + """Define the task for getting the max number of articles.""" + collector = MedrXivCollector() + return collector.get_max_articles(search, begin, end) + +@app.task # type: ignore +def task_pubmed_get_max_articles( + search: str, begin: datetime.date, end: datetime.date, task_id: str +) -> int: + """Define the task for getting the max number of articles.""" + collector = PubMedCollector() + return collector.get_max_articles(search, begin, end) + +@app.task # type: ignore +def task_pmc_get_max_articles( + search: str, begin: datetime.date, end: datetime.date, task_id: str +) -> int: + """Define the task for getting the max number of articles.""" + collector = PMCCollector() + return collector.get_max_articles(search, begin, end) + +@app.task # type: ignore +def task_wos_get_max_articles( + search: str, begin: datetime.date, end: datetime.date, task_id: str +) -> int: + """Define the task for getting the max number of articles.""" + collector = WoSRISCollector() + return collector.get_max_articles(search, begin, end) + + +class WebOfScienceGetMaxArticlesTask(SerialCeleryTask): + """Task for the test.""" + + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + search = kwargs.get("search") + dt_begin = kwargs.get("begin") + dt_end = kwargs.get("end") + task_id = kwargs.get("task_id") + return [task_wos_get_max_articles.s(search, dt_begin, dt_end, task_id)] + + +class PubMedCentralGetMaxArticlesTask(SerialCeleryTask): + """Task for the test.""" + + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + search = kwargs.get("search") + dt_begin = kwargs.get("begin") + dt_end = kwargs.get("end") + task_id = kwargs.get("task_id") + return [task_pmc_get_max_articles.s(search, dt_begin, dt_end, task_id)] + + +class PubMedGetMaxArticlesTask(SerialCeleryTask): + """Task for the test.""" + + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + search = kwargs.get("search") + dt_begin = kwargs.get("begin") + dt_end = kwargs.get("end") + task_id = kwargs.get("task_id") + return [ + task_pubmed_get_max_articles.s( + search, dt_begin, dt_end, task_id + ) + ] + + +class MedrXivGetMaxArticlesTask(SerialCeleryTask): + """Task for the test.""" + + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + search = kwargs.get("search") + dt_begin = kwargs.get("begin") + dt_end = kwargs.get("end") + task_id = kwargs.get("task_id") + return [ + task_medrxiv_get_max_articles.s( + search, dt_begin, dt_end, task_id + ) + ] + + +class EmbaseGetMaxArticlesTask(SerialCeleryTask): + """Task for the test.""" + + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + search = kwargs.get("search") + dt_begin = kwargs.get("begin") + dt_end = kwargs.get("end") + task_id = kwargs.get("task_id") + return [ + task_embase_get_max_articles.s( + search, dt_begin, dt_end, task_id + ) + ] + + +class BiorXivGetMaxArticlesTask(SerialCeleryTask): + """Task for the test.""" + + def get_group_tasks( # type: ignore + self, *args, **kwargs + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + search = kwargs.get("search") + dt_begin = kwargs.get("begin") + dt_end = kwargs.get("end") + task_id = kwargs.get("task_id") + return [ + task_biorxiv_get_max_articles.s( + search, dt_begin, dt_end, task_id + ) + ] + + +class ArXivGetMaxArticlesTask(SerialCeleryTask): + """Task to handle articles processing.""" + + def get_group_tasks( + self, *args, **kwargs + ) -> list[celery.Signature]: + """Define the list of tasks for celery chord.""" + search = kwargs.get("search") + dt_begin = kwargs.get("begin") + dt_end = kwargs.get("end") + task_id = kwargs.get("task_id") + + return [task_arxiv_get_max_articles.s(search, dt_begin, dt_end, task_id)] + diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py index 7adb813..99bbc39 100644 --- a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/arxiv.py @@ -1,10 +1,11 @@ -from typing import Any from datetime import datetime + class ArXivCollector: - """ArXivCollector.""" - def get_max_articles(self, search: str, begin: datetime.date, end: datetime.date) -> int: + def get_max_articles( + self, search: str, begin: datetime.date, end: datetime.date + ) -> int: """Get max number of articles.""" - return 400 \ No newline at end of file + return 750 diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/biorxiv.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/biorxiv.py new file mode 100644 index 0000000..9f8e54e --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/biorxiv.py @@ -0,0 +1,14 @@ +from datetime import datetime + + +class BiorXivCollector: + + def __init__(self) -> None: + """Initialize EmBaseCollector.""" + pass + + def get_max_articles( + self, search: str, begin: datetime.date, end: datetime.date + ) -> int: + """Get the max number of articles.""" + return 1000 \ No newline at end of file diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/embase.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/embase.py new file mode 100644 index 0000000..4e124f3 --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/embase.py @@ -0,0 +1,16 @@ +from typing import Any +import datetime + + +class EmbaseRISCollector: + """EmbaseRISCollector.""" + + def __init__(self) -> None: + """Initialize EmBaseCollector.""" + pass + + def get_max_articles( + self, search: str, begin: datetime.date, end: datetime.date + ) -> int: + """Get the max number of articles.""" + return 250 \ No newline at end of file diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/medrxiv.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/medrxiv.py new file mode 100644 index 0000000..1f7bd5c --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/medrxiv.py @@ -0,0 +1,15 @@ +from datetime import datetime + + +class MedrXivCollector: + """MedrXivCollector""" + + def __init__(self) -> None: + """Initialize EmBaseCollector.""" + pass + + def get_max_articles( + self, search: str, begin: datetime.date, end: datetime.date + ) -> int: + """Get max number of articles.""" + return 400 diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pmc.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pmc.py new file mode 100644 index 0000000..7f6ff68 --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pmc.py @@ -0,0 +1,15 @@ +from datetime import datetime + + +class PMCCollector: + """PMCCollector.""" + + def __init__(self) -> None: + """Initialize EmBaseCollector.""" + pass + + def get_max_articles( + self, search: str, begin: datetime.date, end: datetime.date + ) -> int: + """Get max number of articles.""" + return 800 diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pubmed.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pubmed.py new file mode 100644 index 0000000..aeec0ad --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/pubmed.py @@ -0,0 +1,16 @@ +from datetime import datetime + + +class PubMedCollector: + """PubMedCollector.""" + + + def __init__(self) -> None: + """Initialize EmBaseCollector.""" + pass + + def get_max_articles( + self, search: str, begin: datetime.date, end: datetime.date + ) -> int: + """Get max number of articles.""" + return 400 diff --git a/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/wos.py b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/wos.py new file mode 100644 index 0000000..233be6e --- /dev/null +++ b/examples/redis_queue_gathering_serial_task/tasks/libs/collectors/wos.py @@ -0,0 +1,16 @@ +from datetime import datetime + + +class WoSRISCollector: + """WoSRISCollector.""" + + + def __init__(self) -> None: + """Initialize EmBaseCollector.""" + pass + + def get_max_articles( + self, search: str, begin: datetime.date, end: datetime.date + ) -> int: + """Get max number of articles.""" + return 400