Skip to content

Commit

Permalink
Implement collectors for executing in parallel tasks
Browse files Browse the repository at this point in the history
  • Loading branch information
esloch committed Jun 28, 2024
1 parent 9fa7dfb commit b5145e9
Show file tree
Hide file tree
Showing 6 changed files with 123 additions and 79 deletions.
100 changes: 100 additions & 0 deletions examples/redis_queue_gathering_serial_task/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,106 @@ def check_task_completion(
return None, "timeout"


@app.route("/collectors/<search>/<begin>/<end>/<collectors>")
def collectors_name(
search: str,
begin: datetime.date,
end: datetime.date,
collectors: list[str],
) -> dict[str, Any]:
"""Handle collector requests by processing a series of tasks sequentially."""
COLLECTORS = {
"ARXIV": "arXiv",
# "BIORXIV": "bioRxiv",
# "MEDRXIV": "medRxiv",
# "PMC": "PubMed Central",
# "PUBMED": "PubMed",
# "EMBASE_RIS": "Embase RIS",
# "WOS_RIS": "WoS RIS",
}

collectors_names = {
COLLECTORS["ARXIV"]: "ArXiv",
# COLLECTORS["BIORXIV"]: "BiorXiv",
# COLLECTORS["MEDRXIV"]: "MedrXiv",
# COLLECTORS["PMC"]: "PubMedCentral",
# COLLECTORS["PUBMED"]: "PubMed",
}

article = 0

for collector_name, collector_title in collectors_names.items():
logging.info(
f"Starting {collector_title}GetMaxArticle task with search: {search}"
)
task_name = f"{collector_title}GetMaxArticle"

try:
task = task_manager.get_task(task_name)
logging.info(f"Starting {task} task")
task_id = task.request(search=search, begin=begin, end=end) # type: ignore[attr-defined]
logging.info(
f"{task_name} task {task_id} started with search: {search}"
)
article += task.result.get(task_id, timeout=5)[0] # type: ignore[attr-defined]
logging.info(
f"{task_name} task {task_id} completed with result: {article}, status: completed"
)
except Exception as e:
log.warning(
f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}: {e}: {task_name}"
)

return {"article": article}


"""
COLLECTORS = {
"ARXIV": "arXiv",
"BIORXIV": "bioRxiv",
"MEDRXIV": "medRxiv",
"PMC": "PubMed Central",
"PUBMED": "PubMed",
"EMBASE_RIS": "Embase RIS",
"WOS_RIS": "WoS RIS",
}
def max_article(
search: str,
begin: datetime.date,
end: datetime.date,
collectors: list[str],
) -> int:
collectors_names = {
COLLECTORS["ARXIV"]: "ArXiv",
COLLECTORS["BIORXIV"]: "BiorXiv",
COLLECTORS["MEDRXIV"]: "MedrXiv",
COLLECTORS["PMC"]: "PubMedCentral",
COLLECTORS["PUBMED"]: "PubMed",
}
article = 0
for collector_name, collector_title in collectors_names.items():
task_name = f"{collector_title}GetMaxArticle"
if collector_name not in collectors:
continue
try:
task = tasks_manager.get_task(task_name)
task_id = task.request(search=search, begin=begin, end=end) # type: ignore[attr-defined]
article += task.result.get(task_id, timeout=5)[0] # type: ignore[attr-defined]
except Exception as e:
logging.error(f"* Error while obtaining {collector_name}")
logging.error(e)
# TODO: count max articles for files collectors
return article
"""


if __name__ == "__main__":
try:
app.run(
Expand Down
13 changes: 11 additions & 2 deletions examples/redis_queue_gathering_serial_task/tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,17 @@

from retsu import TaskManager

# from .parallel import MyParallelTask1
from .serial import ArticleTask, CleaningTask, ClusteringTask, PlottingTask, PreparingTask, PreprocessingTask
from .parallel import ArXivGetMaxArticlesTask

# from .parallel import MyParallelTask1
from .serial import (
ArticleTask,
CleaningTask,
ClusteringTask,
PlottingTask,
PreparingTask,
PreprocessingTask,
)


class MyTaskManager(TaskManager):
Expand All @@ -19,4 +27,5 @@ def __init__(self) -> None:
"plotting": PlottingTask(),
"preparing": PreparingTask(),
"preprocessing": PreprocessingTask(),
"ArXivGetMaxArticle": ArXivGetMaxArticlesTask(),
}
2 changes: 1 addition & 1 deletion examples/redis_queue_gathering_serial_task/tasks/app.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
"""Module for the celery app."""

# from .parallel import * # noqa: F403
from .parallel import * # noqa: F403
from .serial import * # noqa: F403
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
from typing import Any
from datetime import datetime

class ArXivCollector:

"""ArXivCollector."""

def get_max_articles(self, search: str, begin: datetime.date, end: datetime.date) -> int:
"""Get max number of articles."""
return 400
77 changes: 1 addition & 76 deletions examples/redis_queue_gathering_serial_task/tasks/serial.py
Original file line number Diff line number Diff line change
@@ -1,87 +1,13 @@
"""My retsu tasks."""

from __future__ import annotations

from time import sleep

import celery

from retsu.celery import SerialCeleryTask

from .config import app, redis_client


@app.task
def task_serial_a_plus_b(a: int, b: int, task_id: str) -> int: # type: ignore
"""Define the task_serial_a_plus_b."""
sleep(a + b)
print("running task_serial_a_plus_b")
result = a + b
redis_client.set(f"serial-result-a-plus-b-{task_id}", result)
return result


@app.task
def task_serial_result_plus_10(task_id: str) -> int: # type: ignore
"""Define the task_serial_result_plus_10."""
print("running task_serial_result_plus_10")
previous_result = None
while previous_result is None:
previous_result = redis_client.get(f"serial-result-a-plus-b-{task_id}")
sleep(1)

previous_result_int = int(previous_result)
result = previous_result_int + 10
redis_client.set(f"serial-result-plus-10-{task_id}", result)
return result


@app.task
def task_serial_result_square(results, task_id: str) -> int: # type: ignore
"""Define the task_serial_result_square."""
print("running task_serial_result_square")
previous_result = None
while previous_result is None:
previous_result = redis_client.get(f"serial-result-plus-10-{task_id}")
sleep(1)

previous_result_int = int(previous_result)
result = previous_result_int**2
return result


class MySerialTask1(SerialCeleryTask):
"""MySerialTask1."""

def request(self, a: int, b: int) -> str:
"""Receive the request for processing."""
return super().request(a=a, b=b)

def get_chord_tasks(
self, a: int, b: int, task_id: str
) -> tuple[list[celery.Signature], celery.Signature]:
"""Define the list of tasks for celery chord."""
return (
[
task_serial_a_plus_b.s(a, b, task_id),
task_serial_result_plus_10.s(task_id),
],
task_serial_result_square.s(task_id),
)
"""Example of usage retsu with a celery app."""

import logging
import time

from typing import Any

import celery


# from .config import app, redis_client
from celery_app import app # type: ignore
from retsu import TaskManager
from retsu.celery import ParallelCeleryTask, SerialCeleryTask
from retsu.celery import SerialCeleryTask

from .libs.back_cleaning import clean_intermediate_files
from .libs.back_clustering import cluster_preprocessed_corpuses
Expand Down Expand Up @@ -197,4 +123,3 @@ def get_group_tasks(
return [
task_clean_intermediate_files.s(research, task_id),
]

0 comments on commit b5145e9

Please sign in to comment.