Skip to content

Commit

Permalink
Implement all collectors
Browse files Browse the repository at this point in the history
  • Loading branch information
esloch committed Jun 28, 2024
1 parent b5145e9 commit 8084508
Show file tree
Hide file tree
Showing 12 changed files with 347 additions and 31 deletions.
66 changes: 43 additions & 23 deletions examples/redis_queue_gathering_serial_task/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,31 @@

# Setup Redis connection and logging
redis_client = redis.Redis(host="localhost", port=6379, db=0)

if not logging.getLogger().hasHandlers():
# Configuring the basic logger settings
logging.basicConfig(
level=logging.INFO,
format="[%(asctime)s: %(levelname)s/MainProcess] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S"
)

log = logging.getLogger(__name__)
# logging.basicConfig(level=logging.ERROR)]

# Configure logging to display errors and custom formatted warnings
logging.basicConfig(level=logging.INFO)
handler = logging.StreamHandler()
formatter = logging.Formatter(
"[%(asctime)s: %(levelname)s/MainProcess] %(message)s",
datefmt="%Y-%m-%d %H:%M:%S",
datefmt="%Y-%m-%d %H:%M:%S"
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
log.addHandler(handler)
handler.setLevel(logging.INFO) # Set this as needed

if not log.handlers:
log.addHandler(handler)

log.setLevel(logging.INFO)


task_manager = MyTaskManager()
task_manager.start()

Expand Down Expand Up @@ -144,41 +155,50 @@ def collectors_name(
"""Handle collector requests by processing a series of tasks sequentially."""
COLLECTORS = {
"ARXIV": "arXiv",
# "BIORXIV": "bioRxiv",
# "MEDRXIV": "medRxiv",
# "PMC": "PubMed Central",
# "PUBMED": "PubMed",
# "EMBASE_RIS": "Embase RIS",
# "WOS_RIS": "WoS RIS",
"BIORXIV": "bioRxiv",
"MEDRXIV": "medRxiv",
"PMC": "PubMed Central",
"PUBMED": "PubMed",
"EMBASE_RIS": "Embase RIS",
"WOS_RIS": "WoS RIS",
}

collectors_names = {
COLLECTORS["ARXIV"]: "ArXiv",
# COLLECTORS["BIORXIV"]: "BiorXiv",
# COLLECTORS["MEDRXIV"]: "MedrXiv",
# COLLECTORS["PMC"]: "PubMedCentral",
# COLLECTORS["PUBMED"]: "PubMed",
COLLECTORS["BIORXIV"]: "BiorXiv",
COLLECTORS["MEDRXIV"]: "MedrXiv",
COLLECTORS["PMC"]: "PubMedCentral",
COLLECTORS["PUBMED"]: "PubMed",
}

article = 0

gathering_ids = []
for collector_name, collector_title in collectors_names.items():
logging.info(
f"Starting {collector_title}GetMaxArticle task with search: {search}"
)

if collector_name not in collectors:
continue

task_name = f"{collector_title}GetMaxArticle"

try:
task = task_manager.get_task(task_name)
logging.info(f"Starting {task} task")

log.info(f"Starting {task} task")

task_id = task.request(search=search, begin=begin, end=end) # type: ignore[attr-defined]
logging.info(

gathering_ids.append(f"{task_name}-{task_id}")

log.info(
f"{task_name} task {task_id} started with search: {search}"
)

article += task.result.get(task_id, timeout=5)[0] # type: ignore[attr-defined]
logging.info(

log.info(
f"{task_name} task {task_id} completed with result: {article}, status: completed"
)

except Exception as e:
log.warning(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}: {task_name}"
Expand Down
10 changes: 8 additions & 2 deletions examples/redis_queue_gathering_serial_task/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,10 @@

from retsu import TaskManager

from .parallel import ArXivGetMaxArticlesTask
from .get_max_article import ArXivGetMaxArticlesTask, PubMedGetMaxArticlesTask, MedrXivGetMaxArticlesTask, BiorXivGetMaxArticlesTask, EmbaseGetMaxArticlesTask, WebOfScienceGetMaxArticlesTask

# from .parallel import MyParallelTask1
from .serial import (
from .get_back_process import (
ArticleTask,
CleaningTask,
ClusteringTask,
Expand All @@ -27,5 +27,11 @@ def __init__(self) -> None:
"plotting": PlottingTask(),
"preparing": PreparingTask(),
"preprocessing": PreprocessingTask(),
# tasks.collectors.CollectorsGatheringTask(list(task_id))
"ArXivGetMaxArticle": ArXivGetMaxArticlesTask(),
"PubMedGetMaxArticle": PubMedGetMaxArticlesTask(),
"MedrXivGetMaxArticle": MedrXivGetMaxArticlesTask(),
"BiorXivGetMaxArticle": BiorXivGetMaxArticlesTask(),
"EmbaseGetMaxArticle": EmbaseGetMaxArticlesTask(),
"WebOfScienceGetMaxArticle": WebOfScienceGetMaxArticlesTask(),
}
4 changes: 2 additions & 2 deletions examples/redis_queue_gathering_serial_task/tasks/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Module for the celery app."""

from .parallel import * # noqa: F403
from .serial import * # noqa: F403
from .get_max_article import * # noqa: F403
from .get_back_process import * # noqa: F403
197 changes: 197 additions & 0 deletions examples/redis_queue_gathering_serial_task/tasks/get_max_article.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
"""Example of usage retsu with a celery app."""

import logging
from datetime import datetime
from typing import Any, Optional

import celery

from celery_app import app # type: ignore

from retsu.celery import SerialCeleryTask, ParallelCeleryTask

from .libs.collectors.arxiv import ArXivCollector


from .libs.collectors.biorxiv import BiorXivCollector
from .libs.collectors.embase import EmbaseRISCollector
from .libs.collectors.medrxiv import MedrXivCollector
from .libs.collectors.pubmed import PubMedCollector
from .libs.collectors.pmc import PMCCollector
from .libs.collectors.wos import WoSRISCollector


logger = logging.getLogger(__name__)


@app.task # type: ignore
def task_arxiv_get_max_articles(
search: str, begin: datetime.date, end: datetime.date, task_id: str
) -> int:
"""Define the task for getting the max number of articles."""
collector = ArXivCollector()
return collector.get_max_articles(search, begin, end)

@app.task # type: ignore
def task_biorxiv_get_max_articles(
search: str, begin: datetime.date, end: datetime.date, task_id: str
) -> int:
"""Define the task for getting the max number of articles."""
collector = BiorXivCollector()
return collector.get_max_articles(search, begin, end)

@app.task # type: ignore
def task_embase_get_max_articles(
search: str, begin: datetime.date, end: datetime.date, task_id: str
) -> int:
"""Define the task for getting the max number of articles."""
collector = EmbaseRISCollector()
return collector.get_max_articles(search, begin, end)

@app.task # type: ignore
def task_medrxiv_get_max_articles(
search: str, begin: datetime.date, end: datetime.date, task_id: str
) -> int:
"""Define the task for getting the max number of articles."""
collector = MedrXivCollector()
return collector.get_max_articles(search, begin, end)

@app.task # type: ignore
def task_pubmed_get_max_articles(
search: str, begin: datetime.date, end: datetime.date, task_id: str
) -> int:
"""Define the task for getting the max number of articles."""
collector = PubMedCollector()
return collector.get_max_articles(search, begin, end)

@app.task # type: ignore
def task_pmc_get_max_articles(
search: str, begin: datetime.date, end: datetime.date, task_id: str
) -> int:
"""Define the task for getting the max number of articles."""
collector = PMCCollector()
return collector.get_max_articles(search, begin, end)

@app.task # type: ignore
def task_wos_get_max_articles(
search: str, begin: datetime.date, end: datetime.date, task_id: str
) -> int:
"""Define the task for getting the max number of articles."""
collector = WoSRISCollector()
return collector.get_max_articles(search, begin, end)


class WebOfScienceGetMaxArticlesTask(SerialCeleryTask):
"""Task for the test."""

def get_group_tasks( # type: ignore
self, *args, **kwargs
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
search = kwargs.get("search")
dt_begin = kwargs.get("begin")
dt_end = kwargs.get("end")
task_id = kwargs.get("task_id")
return [task_wos_get_max_articles.s(search, dt_begin, dt_end, task_id)]


class PubMedCentralGetMaxArticlesTask(SerialCeleryTask):
"""Task for the test."""

def get_group_tasks( # type: ignore
self, *args, **kwargs
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
search = kwargs.get("search")
dt_begin = kwargs.get("begin")
dt_end = kwargs.get("end")
task_id = kwargs.get("task_id")
return [task_pmc_get_max_articles.s(search, dt_begin, dt_end, task_id)]


class PubMedGetMaxArticlesTask(SerialCeleryTask):
"""Task for the test."""

def get_group_tasks( # type: ignore
self, *args, **kwargs
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
search = kwargs.get("search")
dt_begin = kwargs.get("begin")
dt_end = kwargs.get("end")
task_id = kwargs.get("task_id")
return [
task_pubmed_get_max_articles.s(
search, dt_begin, dt_end, task_id
)
]


class MedrXivGetMaxArticlesTask(SerialCeleryTask):
"""Task for the test."""

def get_group_tasks( # type: ignore
self, *args, **kwargs
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
search = kwargs.get("search")
dt_begin = kwargs.get("begin")
dt_end = kwargs.get("end")
task_id = kwargs.get("task_id")
return [
task_medrxiv_get_max_articles.s(
search, dt_begin, dt_end, task_id
)
]


class EmbaseGetMaxArticlesTask(SerialCeleryTask):
"""Task for the test."""

def get_group_tasks( # type: ignore
self, *args, **kwargs
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
search = kwargs.get("search")
dt_begin = kwargs.get("begin")
dt_end = kwargs.get("end")
task_id = kwargs.get("task_id")
return [
task_embase_get_max_articles.s(
search, dt_begin, dt_end, task_id
)
]


class BiorXivGetMaxArticlesTask(SerialCeleryTask):
"""Task for the test."""

def get_group_tasks( # type: ignore
self, *args, **kwargs
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
search = kwargs.get("search")
dt_begin = kwargs.get("begin")
dt_end = kwargs.get("end")
task_id = kwargs.get("task_id")
return [
task_biorxiv_get_max_articles.s(
search, dt_begin, dt_end, task_id
)
]


class ArXivGetMaxArticlesTask(SerialCeleryTask):
"""Task to handle articles processing."""

def get_group_tasks(
self, *args, **kwargs
) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
search = kwargs.get("search")
dt_begin = kwargs.get("begin")
dt_end = kwargs.get("end")
task_id = kwargs.get("task_id")

return [task_arxiv_get_max_articles.s(search, dt_begin, dt_end, task_id)]

Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
from typing import Any
from datetime import datetime


class ArXivCollector:

"""ArXivCollector."""

def get_max_articles(self, search: str, begin: datetime.date, end: datetime.date) -> int:
def get_max_articles(
self, search: str, begin: datetime.date, end: datetime.date
) -> int:
"""Get max number of articles."""
return 400
return 750
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
from datetime import datetime


class BiorXivCollector:

def __init__(self) -> None:
"""Initialize EmBaseCollector."""
pass

def get_max_articles(
self, search: str, begin: datetime.date, end: datetime.date
) -> int:
"""Get the max number of articles."""
return 1000
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Any
import datetime


class EmbaseRISCollector:
"""EmbaseRISCollector."""

def __init__(self) -> None:
"""Initialize EmBaseCollector."""
pass

def get_max_articles(
self, search: str, begin: datetime.date, end: datetime.date
) -> int:
"""Get the max number of articles."""
return 250
Loading

0 comments on commit 8084508

Please sign in to comment.