Skip to content

Commit

Permalink
Add provision of cron expressions for periodic task scheduling. (#307)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishesh10 authored Nov 7, 2023
1 parent 84f948e commit 7496a77
Show file tree
Hide file tree
Showing 5 changed files with 117 additions and 5 deletions.
13 changes: 12 additions & 1 deletion README.rst
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ The following options can be only specified in the task decorator:
the initial task execution date when a worker is initialized, and to determine
the next execution date when the task is about to get executed.

For most common scenarios, the ``periodic`` built-in function can be passed:
For most common scenarios, the below mentioned built-in functions can be passed:

- ``periodic(seconds=0, minutes=0, hours=0, days=0, weeks=0, start_date=None,
end_date=None)``
Expand All @@ -401,6 +401,17 @@ The following options can be only specified in the task decorator:
every Sunday at 4am UTC, you could use
``schedule=periodic(weeks=1, start_date=datetime.datetime(2000, 1, 2, 4))``.

- ``cron_expr(expr, start_date=None, end_date=None)``

``start_date``, to specify the periodic task start date. It defaults to
``2000-01-01T00:00Z``, a Saturday, if not given.
``end_date``, to specify the periodic task end date. The task repeats
forever if ``end_date`` is not given.
For example, to run a task every hour indefinitely,
use ``schedule=cron_expr("0 * * * *")``. To run a task every Sunday at
4am UTC, you could use ``schedule=cron_expr("0 4 * * 0")``.


Custom retrying
---------------

Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
click==8.1.3
redis==4.5.2
structlog==22.3.0
croniter
3 changes: 2 additions & 1 deletion tasktiger/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
TaskNotFound,
)
from .retry import exponential, fixed, linear
from .schedule import periodic
from .schedule import cron_expr, periodic
from .task import Task
from .tasktiger import TaskTiger, run_worker
from .worker import Worker
Expand All @@ -31,6 +31,7 @@
"exponential",
# Schedules
"periodic",
"cron_expr",
]


Expand Down
59 changes: 57 additions & 2 deletions tasktiger/schedule.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime
from typing import Callable, Optional, Tuple

__all__ = ["periodic"]
__all__ = ["periodic", "cron_expr"]

START_DATE = datetime.datetime(2000, 1, 1)


def _periodic(
Expand Down Expand Up @@ -54,5 +56,58 @@ def periodic(
assert period > 0, "Must specify a positive period."
if not start_date:
# Saturday at midnight
start_date = datetime.datetime(2000, 1, 1)
start_date = START_DATE
return (_periodic, (period, start_date, end_date))


def _cron_expr(
dt: datetime.datetime,
expr: str,
start_date: datetime.datetime,
end_date: Optional[datetime.datetime] = None,
) -> Optional[datetime.datetime]:
import croniter # type: ignore
import pytz # type: ignore

localize = pytz.utc.localize

if end_date and dt >= end_date:
return None

if dt < start_date:
return start_date

assert croniter.croniter.is_valid(expr), "Cron expression is not valid."

start_date = localize(start_date)
dt = localize(dt)

next_utc = croniter.croniter(expr, dt).get_next(ret_type=datetime.datetime)
next_utc = next_utc.replace(tzinfo=None)

# Make sure the time is still within bounds.
if end_date and next_utc > end_date:
return None

return next_utc


def cron_expr(
expr: str,
start_date: Optional[datetime.datetime] = None,
end_date: Optional[datetime.datetime] = None,
) -> Tuple[Callable[..., Optional[datetime.datetime]], Tuple]:
"""
Periodic task schedule via cron expression: Use to schedule a task to run periodically,
starting from start_date (or None to be active immediately) until end_date
(or None to repeat forever).
This function behaves similar to the cron jobs, which run with a minimum of 1 minute
granularity. So specifying "* * * * *" expression will the run the task every
minute.
For more details, see README.
"""
if not start_date:
start_date = START_DATE
return (_cron_expr, (expr, start_date, end_date))
46 changes: 45 additions & 1 deletion tests/test_periodic.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import datetime
import time

from tasktiger import Task, Worker, periodic
from tasktiger import Task, Worker, cron_expr, periodic
from tasktiger._internal import (
QUEUED,
SCHEDULED,
Expand Down Expand Up @@ -64,6 +64,50 @@ def test_periodic_schedule(self):
f = periodic(minutes=1, end_date=dt)
assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None

def test_cron_schedule(self):
"""
Test the cron_expr() schedule function.
"""
dt = datetime.datetime(2010, 1, 1)

f = cron_expr("* * * * *")
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 0, 1)

f = cron_expr("0 * * * *")
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 1)

f = cron_expr("0 0 * * *")
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2)

f = cron_expr("0 0 * * 6")
# 2010-01-02 is a Saturday
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 2)

f = cron_expr("0 0 * * 0", start_date=datetime.datetime(2000, 1, 2))
# 2000-01-02 is a Sunday and 2010-01-02 is a Saturday
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 3)

f = cron_expr("2 3 * * *", start_date=dt)
assert f[0](dt, *f[1]) == datetime.datetime(2010, 1, 1, 3, 2)
# Make sure we return the start_date if the current date is earlier.
assert f[0](datetime.datetime(1990, 1, 1), *f[1]) == dt

f = cron_expr("* * * * *", end_date=dt)
assert f[0](
datetime.datetime(2009, 12, 31, 23, 58), *f[1]
) == datetime.datetime(2009, 12, 31, 23, 59)

f = cron_expr("* * * * *", end_date=dt)
assert f[0](
datetime.datetime(2009, 12, 31, 23, 59), *f[1]
) == datetime.datetime(2010, 1, 1, 0, 0)

f = cron_expr("* * * * *", end_date=dt)
assert f[0](datetime.datetime(2010, 1, 1, 0, 0), *f[1]) is None

f = cron_expr("* * * * *", end_date=dt)
assert f[0](datetime.datetime(2010, 1, 1, 0, 1), *f[1]) is None

def test_periodic_execution(self):
"""
Test periodic task execution.
Expand Down

0 comments on commit 7496a77

Please sign in to comment.