Skip to content

Commit

Permalink
Add initial structure for testing with celery
Browse files Browse the repository at this point in the history
  • Loading branch information
xmnlab committed May 31, 2024
1 parent 02640a6 commit 670e365
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 19 deletions.
3 changes: 2 additions & 1 deletion example/tasks/config.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
"""Configuration for Celery app."""

import os
import sys

import redis

Expand Down Expand Up @@ -47,4 +48,4 @@
print("Redis connection is working.")
except redis.ConnectionError as e:
print(f"Failed to connect to Redis: {e}")
exit(1)
sys.exit(1)
3 changes: 3 additions & 0 deletions src/retsu/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import celery

from celery import chain, chord
from public import public

from retsu.core import ParallelTask, SerialTask

Expand Down Expand Up @@ -63,12 +64,14 @@ def get_chain_tasks( # type: ignore
return chain_tasks


@public
class ParallelCeleryTask(CeleryTask, ParallelTask):
"""Parallel Task for Celery."""

...


@public
class SerialCeleryTask(CeleryTask, SerialTask):
"""Serial Task for Celery."""

Expand Down
3 changes: 2 additions & 1 deletion tests/celery_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from __future__ import annotations

import os
import sys

from datetime import datetime
from time import sleep
Expand Down Expand Up @@ -52,7 +53,7 @@
print("Redis connection is working.")
except redis.ConnectionError as e:
print(f"Failed to connect to Redis: {e}")
exit(1)
sys.exit(1)


@app.task # type: ignore
Expand Down
40 changes: 23 additions & 17 deletions tests/test_task_celery_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,40 @@

from __future__ import annotations

from datetime import datetime
from time import sleep
from typing import Any, Generator
from typing import Generator

import pytest

from retsu import SerialTask, Task
from retsu.celery import SerialCeleryTask

from .celery_tasks import task_sum

class MyResultTask(SerialTask):

class MyResultTask(SerialCeleryTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Return the sum of the given 2 numbers."""
a = kwargs.pop("a", 0)
b = kwargs.pop("b", 0)
result = a + b
return result
def get_chord_tasks(self, *args, **kwargs) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
x = kwargs.get("x")
y = kwargs.get("y")
task_id = kwargs.get("task_id")
return (
[task_sum.s(x, y, task_id)],
None,
)


class MyTimestampTask(SerialTask):
class MyTimestampTask(SerialCeleryTask):
"""Task for the test."""

def task(self, *args, task_id: str, **kwargs) -> Any: # type: ignore
"""Sleep the given seconds, and return the current timestamp."""
sleep_time = kwargs.pop("sleep", 0)
sleep(sleep_time)
return datetime.now().timestamp()
def get_chord_tasks(self, *args, **kwargs) -> list[celery.Signature]:
"""Define the list of tasks for celery chord."""
seconds = kwargs.get("seconds")
task_id = kwargs.get("task_id")
return (
[task_sum.s(x, y, task_id, task_id)],
None,
)


@pytest.fixture
Expand Down

0 comments on commit 670e365

Please sign in to comment.