Skip to content
This repository has been archived by the owner on Nov 10, 2024. It is now read-only.

[FEATURE] support daily/ periodic task scheduling #246

Open
PabloRuizCuevas opened this issue Oct 30, 2023 · 0 comments
Open

[FEATURE] support daily/ periodic task scheduling #246

PabloRuizCuevas opened this issue Oct 30, 2023 · 0 comments
Labels
enhancement New feature or request

Comments

@PabloRuizCuevas
Copy link

PabloRuizCuevas commented Oct 30, 2023

Is your feature request related to a problem? Please describe.
Repeating a task every x seconds is useful but for having daily task is not great as the hour would be determined by time of deployment + delay added

Describe the solution you'd like
include a decorator for periodically repeating a task.

as an example i adapted the current decorator to schedule task daily

import asyncio
import  datetime
from functools import wraps
from traceback import format_exception
from typing import Any, Callable, Coroutine, Union

from starlette.concurrency import run_in_threadpool

NoArgsNoReturnFuncT = Callable[[], None]
NoArgsNoReturnAsyncFuncT = Callable[[], Coroutine[Any, Any, None]]
NoArgsNoReturnDecorator = Callable[[Union[NoArgsNoReturnFuncT, NoArgsNoReturnAsyncFuncT]], NoArgsNoReturnAsyncFuncT]


def repeat_every_day(*, time: datetime.time, raise_exceptions: bool = False,
                     max_repetitions: int | None = None) -> NoArgsNoReturnDecorator:

    def decorator(func: NoArgsNoReturnAsyncFuncT | NoArgsNoReturnFuncT) -> NoArgsNoReturnAsyncFuncT:
        is_coroutine = asyncio.iscoroutinefunction(func)

        @wraps(func)
        async def wrapped() -> None:
            repetitions = 0

            async def loop() -> None:
                nonlocal repetitions
                while max_repetitions is None or repetitions < max_repetitions:
                    now = datetime.datetime.now()
                    target_datetime = datetime.datetime.combine(now.date(), time)
                    if now.time() > time:
                        target_datetime += datetime.timedelta(days=1)
                    sleep_seconds = (target_datetime - now).total_seconds()
                    print(f"Sleeping for {sleep_seconds} seconds")
                    await asyncio.sleep(sleep_seconds)
                    try:
                        if is_coroutine:
                            await func()
                        else:
                            await run_in_threadpool(func)
                    except Exception as exc:
                        if logger is not None:
                            formatted_exception = "".join(format_exception(type(exc), exc, exc.__traceback__))
                            logger.error(formatted_exception)
                        if raise_exceptions:
                            raise exc
                    repetitions += 1
            await loop()
        return wrapped
    return decorator

# Example Usage:


@repeat_every_day(time=datetime.time(16, 28))
async def daily_task():
    print("Running daily task at", datetime.datetime.now())

notice that a very similar behavior can be obtained with the current decorator, making it wait the amount of seconds left to the hour and putting it to wait for 60 *60 * 24 seconds of a day, but that naive approach would have fail today ( change of hour in europe) and also if func() takes long at may delay every day the execution by some small amount of time, that could accumulate in time.

@PabloRuizCuevas PabloRuizCuevas added the enhancement New feature or request label Oct 30, 2023
Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests

1 participant