Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add factories #349

Open
wants to merge 6 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions taskiq/abc/broker_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod

from taskiq.abc.broker import AsyncBroker


class BrokerFactory(ABC):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need a class for that? Shouldn't just path to function be enough?

"""BrokerFactory class."""

@abstractmethod
def get_broker(self) -> AsyncBroker:
"""
Get broker instance.

:return: AsyncBroker instance
"""
15 changes: 15 additions & 0 deletions taskiq/abc/scheduler_factory.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
from abc import ABC, abstractmethod

from taskiq import TaskiqScheduler


class TaskiqSchedulerFactory(ABC):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question.

"""SchedulerFactory class."""

@abstractmethod
def get_scheduler(self) -> TaskiqScheduler:
"""
Get scheduler instance.

:return: TaskiqScheduler instance
"""
2 changes: 1 addition & 1 deletion taskiq/brokers/inmemory_broker.py
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ async def set_progress(
progress: TaskProgress[Any],
) -> None:
"""
Set progress of task exection.
Set progress of task execution.

:param task_id: task id
:param progress: task execution progress
Expand Down
20 changes: 18 additions & 2 deletions taskiq/cli/scheduler/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
from dataclasses import dataclass
from typing import List, Optional, Sequence, Union

from taskiq.abc.scheduler_factory import TaskiqSchedulerFactory
from taskiq.cli.common_args import LogLevel
from taskiq.scheduler.scheduler import TaskiqScheduler

Expand All @@ -10,8 +11,9 @@
class SchedulerArgs:
"""Arguments for scheduler."""

scheduler: Union[str, TaskiqScheduler]
modules: List[str]
scheduler: Union[str, TaskiqScheduler] = ""
scheduler_factory: Union[str, TaskiqSchedulerFactory] = ""
log_level: str = LogLevel.INFO.name
configure_logging: bool = True
fs_discover: bool = False
Expand All @@ -32,7 +34,21 @@ def from_cli(cls, args: Optional[Sequence[str]] = None) -> "SchedulerArgs":
formatter_class=ArgumentDefaultsHelpFormatter,
description="Subcommand to run scheduler",
)
parser.add_argument("scheduler", help="Path to scheduler")
parser.add_argument(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't need two parameters. Specify only factory. We can check if the imported scheduler is a factory after we import it.

"--scheduler",
default=None,
help="Path to scheduler",
)
parser.add_argument(
"--scheduler-factory",
"-sf",
default=None,
help=(
"Where to search for SchedulerFactory. "
"This string must be specified in "
"'module.module:ClassName' format."
),
)
parser.add_argument(
"modules",
help="List of modules where to look for tasks.",
Expand Down
45 changes: 41 additions & 4 deletions taskiq/cli/scheduler/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
import sys
from datetime import datetime, timedelta
from logging import basicConfig, getLevelName, getLogger
from typing import Dict, List, Optional
from typing import Dict, List, Optional, Union

import pytz
from pycron import is_now

from taskiq.abc.schedule_source import ScheduleSource
from taskiq.abc.scheduler_factory import TaskiqSchedulerFactory
from taskiq.cli.scheduler.args import SchedulerArgs
from taskiq.cli.utils import import_object, import_tasks
from taskiq.scheduler.scheduled_task import ScheduledTask
Expand Down Expand Up @@ -189,10 +190,18 @@ async def run_scheduler(args: SchedulerArgs) -> None:
),
)
getLogger("taskiq").setLevel(level=getLevelName(args.log_level))
if isinstance(args.scheduler, str):
scheduler = import_object(args.scheduler)

if not args.scheduler and not args.scheduler_factory:
raise ValueError("You must specified `scheduler` or `scheduler_factory`")

if args.scheduler:
if isinstance(args.scheduler, str):
scheduler = import_object(args.scheduler)
else:
scheduler = args.scheduler
else:
scheduler = args.scheduler
scheduler = get_scheduler_from_factory(args.scheduler_factory)

if not isinstance(scheduler, TaskiqScheduler):
logger.error(
"Imported scheduler is not a subclass of TaskiqScheduler.",
Expand Down Expand Up @@ -223,3 +232,31 @@ async def run_scheduler(args: SchedulerArgs) -> None:
for source in scheduler.sources:
await source.shutdown()
logger.info("Scheduler shut down. Good bye!")


def get_scheduler_from_factory(
scheduler_factory: Union[str, TaskiqSchedulerFactory],
) -> TaskiqScheduler:
"""
Get a TaskiqScheduler instance from a factory.

This function either imports the factory using its string representation
or uses an existing TaskiqSchedulerFactory instance to obtain a
TaskiqScheduler.

:param scheduler_factory: Either the string path of the factory or
an instance of TaskiqSchedulerFactory.
:return: An instance of TaskiqScheduler.
:raises TypeError: If the factory is not an instance of TaskiqSchedulerFactory.
"""
if isinstance(scheduler_factory, str):
scheduler_factory = import_object(scheduler_factory)()

if not isinstance(scheduler_factory, TaskiqSchedulerFactory):
error_msg = (
"scheduler_factory should be an instance of TaskiqSchedulerFactory "
"after importing."
)
raise TypeError(error_msg)

return scheduler_factory.get_scheduler()
16 changes: 14 additions & 2 deletions taskiq/cli/worker/args.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@ def receiver_arg_type(string: str) -> Tuple[str, str]:
class WorkerArgs:
"""Taskiq worker CLI arguments."""

broker: str
modules: List[str]
broker: Optional[str] = None
broker_factory: Optional[str] = None
tasks_pattern: Sequence[str] = ("**/tasks.py",)
fs_discover: bool = False
configure_logging: bool = True
Expand Down Expand Up @@ -59,13 +60,24 @@ def from_cli(
"""
parser = ArgumentParser(formatter_class=ArgumentDefaultsHelpFormatter)
parser.add_argument(
"broker",
"--broker",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same question as for the scheduler.

default=None,
help=(
"Where to search for broker. "
"This string must be specified in "
"'module.module:variable' format."
),
)
parser.add_argument(
"--broker-factory",
"-bf",
default=None,
help=(
"Where to search for BrokerFactory. "
"This string must be specified in "
"'module.module:ClassName' format."
),
)
parser.add_argument(
"--receiver",
default="taskiq.receiver:Receiver",
Expand Down
26 changes: 24 additions & 2 deletions taskiq/cli/worker/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
from concurrent.futures import ThreadPoolExecutor
from multiprocessing import set_start_method
from sys import platform
from typing import Any, Optional, Type
from typing import Any, Optional, Type, cast

from taskiq.abc.broker import AsyncBroker
from taskiq.cli.utils import import_object, import_tasks
Expand Down Expand Up @@ -121,7 +121,15 @@ def interrupt_handler(signum: int, _frame: Any) -> None:
# broker is running as a worker.
# We must set this field before importing tasks,
# so broker will remember all tasks it's related to.
broker = import_object(args.broker)

if not args.broker and not args.broker_factory:
raise ValueError("You must specified `broker` or `broker_factory`")

if args.broker:
broker = import_object(args.broker)
else:
args.broker_factory = cast(str, args.broker_factory)
broker = get_broker_from_factory(args.broker_factory)
if not isinstance(broker, AsyncBroker):
raise ValueError("Unknown broker type. Please use AsyncBroker instance.")
broker.is_worker_process = True
Expand Down Expand Up @@ -199,3 +207,17 @@ def run_worker(args: WorkerArgs) -> Optional[int]:
observer.stop()

return status


def get_broker_from_factory(broker_factory_path: str) -> AsyncBroker:
"""
Get a AsyncBroker instance from a factory.

This function either imports the factory using its string representation
to obtain a AsyncBroker.

:param broker_factory_path: Either the string path of the factory.
:return: An instance of AsyncBroker.
"""
broker_factory = import_object(broker_factory_path)
return broker_factory().get_broker()