From b4eed4f0b81671131120574637f8e3dd170f0283 Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Mon, 11 Nov 2024 22:25:18 +0530 Subject: [PATCH 01/10] create the makim scheduler --- src/makim/logs.py | 3 + src/makim/scheduler.py | 171 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 174 insertions(+) create mode 100644 src/makim/scheduler.py diff --git a/src/makim/logs.py b/src/makim/logs.py index fb025ec..4b0009f 100644 --- a/src/makim/logs.py +++ b/src/makim/logs.py @@ -28,6 +28,9 @@ class MakimError(Enum): SSH_CONNECTION_ERROR = 16 SSH_EXECUTION_ERROR = 17 REMOTE_HOST_NOT_FOUND = 18 + SCHEDULER_JOB_ERROR = 19 + SCHEDULER_JOB_NOT_FOUND = 20 + SCHEDULER_INVALID_SCHEDULE = 21 class MakimLogs: diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py new file mode 100644 index 0000000..66448f0 --- /dev/null +++ b/src/makim/scheduler.py @@ -0,0 +1,171 @@ +"""Manages scheduled tasks for Makim using APScheduler.""" + +import asyncio + +from pathlib import Path +from typing import Any + +from apscheduler.executors.pool import ThreadPoolExecutor +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.schedulers.background import BackgroundScheduler + +from makim.core import Makim +from makim.logs import MakimError, MakimLogs + + +class MakimScheduler: + """Manages scheduled tasks for Makim using APScheduler.""" + + def __init__(self, makim_instance: 'Makim'): + self.makim = makim_instance + self.db_path = Path.home() / '.makim' / 'jobs.db' + self._ensure_db_directory() + + # Configure job stores and executors + jobstores = { + 'default': SQLAlchemyJobStore(url=f'sqlite:///{self.db_path}') + } + executors = {'default': ThreadPoolExecutor(20)} + + # Initialize scheduler with SQLite storage + self.scheduler = BackgroundScheduler( + jobstores=jobstores, + executors=executors, + job_defaults={'coalesce': False, 'max_instances': 3}, + ) + + def _ensure_db_directory(self) -> None: + """Ensure the database directory exists.""" + self.db_path.parent.mkdir(parents=True, exist_ok=True) + + def _execute_task(self, task_name: str, args: dict) -> None: + """Execute a Makim task within the scheduler.""" + try: + # Create a new event loop for this thread + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + + # Run the task + self.makim.run({'task': task_name, **args}) + except Exception as e: + MakimLogs.print_error( + f'Error executing scheduled task {task_name}: {e!s}' + ) + finally: + loop.close() + + def add_job( + self, + job_id: str, + task_name: str, + schedule: str, + args: dict[Any, Any] = None, + ) -> None: + """ + Add a new scheduled job. + + Parameters + ---------- + job_id : str + Unique identifier for the job + task_name : str + Name of the Makim task to execute + schedule : str + Cron schedule expression + args : dict[Any, Any], optional + Arguments to pass to the task + """ + try: + self.scheduler.add_job( + func=self._execute_task, + trigger='cron', + args=[task_name, args or {}], + id=job_id, + **self._parse_schedule(schedule), + replace_existing=True, + ) + MakimLogs.print_info(f"Successfully scheduled job '{job_id}'") + except Exception as e: + MakimLogs.raise_error( + f"Failed to schedule job '{job_id}': {e!s}", + MakimError.SCHEDULER_JOB_ERROR, + ) + + def remove_job(self, job_id: str) -> None: + """Remove a scheduled job.""" + try: + self.scheduler.remove_job(job_id) + MakimLogs.print_info(f"Successfully removed job '{job_id}'") + except Exception as e: + MakimLogs.raise_error( + f"Failed to remove job '{job_id}': {e!s}", + MakimError.SCHEDULER_JOB_ERROR, + ) + + def get_job_status(self, job_id: str) -> dict: + """Get the status of a scheduled job.""" + job = self.scheduler.get_job(job_id) + if not job: + MakimLogs.raise_error( + f"Job '{job_id}' not found", MakimError.SCHEDULER_JOB_NOT_FOUND + ) + + return { + 'id': job.id, + 'next_run': job.next_run_time, + 'schedule': str(job.trigger), + 'active': job.next_run_time is not None, + } + + def list_jobs(self) -> list[dict[str, Any]]: + """List all scheduled jobs.""" + return [ + { + 'id': job.id, + 'next_run': job.next_run_time, + 'schedule': str(job.trigger), + } + for job in self.scheduler.get_jobs() + ] + + def start(self) -> None: + """Start the scheduler.""" + if not self.scheduler.running: + self.scheduler.start() + + def stop(self) -> None: + """Stop the scheduler.""" + if self.scheduler.running: + self.scheduler.shutdown() + + def _parse_schedule(self, schedule: str) -> dict: + """Parse schedule string into APScheduler arguments.""" + schedule = schedule.lower() + + if schedule in ('hourly', 'daily', 'weekly', 'monthly', 'yearly'): + if schedule == 'hourly': + return {'hour': '*'} + elif schedule == 'daily': + return {'day': '*'} + elif schedule == 'weekly': + return {'day_of_week': '0'} # Sunday + elif schedule == 'monthly': + return {'day': '1'} + elif schedule == 'yearly': + return {'month': '1', 'day': '1'} + + # Handle cron expressions + try: + minute, hour, day, month, day_of_week = schedule.split()[:5] + return { + 'minute': minute, + 'hour': hour, + 'day': day, + 'month': month, + 'day_of_week': day_of_week, + } + except ValueError: + MakimLogs.raise_error( + f'Invalid schedule format: {schedule}', + MakimError.SCHEDULER_INVALID_SCHEDULE, + ) From 5ffc7c307d35ef8cacb354a1f0b0b939ba13986f Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Mon, 18 Nov 2024 19:46:33 +0530 Subject: [PATCH 02/10] use croniter for parse scheduler and singleton pattern --- poetry.lock | 28 +++++++++++- pyproject.toml | 1 + src/makim/scheduler.py | 101 +++++++++++++++++++++-------------------- 3 files changed, 80 insertions(+), 50 deletions(-) diff --git a/poetry.lock b/poetry.lock index 15f892d..db66657 100644 --- a/poetry.lock +++ b/poetry.lock @@ -656,6 +656,21 @@ tomli = {version = "*", optional = true, markers = "python_full_version <= \"3.1 [package.extras] toml = ["tomli"] +[[package]] +name = "croniter" +version = "5.0.1" +description = "croniter provides iteration for datetime object with cron like format" +optional = false +python-versions = "!=3.0.*,!=3.1.*,!=3.2.*,!=3.3.*,>=2.6" +files = [ + {file = "croniter-5.0.1-py2.py3-none-any.whl", hash = "sha256:eb28439742291f6c10b181df1a5ecf421208b1fc62ef44501daec1780a0b09e9"}, + {file = "croniter-5.0.1.tar.gz", hash = "sha256:7d9b1ef25b10eece48fdf29d8ac52f9b6252abff983ac614ade4f3276294019e"}, +] + +[package.dependencies] +python-dateutil = "*" +pytz = ">2021.1" + [[package]] name = "cryptography" version = "43.0.3" @@ -2570,6 +2585,17 @@ files = [ [package.dependencies] Levenshtein = "0.25.1" +[[package]] +name = "pytz" +version = "2024.2" +description = "World timezone definitions, modern and historical" +optional = false +python-versions = "*" +files = [ + {file = "pytz-2024.2-py2.py3-none-any.whl", hash = "sha256:31c7c1817eb7fae7ca4b8c7ee50c72f93aa2dd863de768e1ef4245d426aa0725"}, + {file = "pytz-2024.2.tar.gz", hash = "sha256:2aa355083c50a0f93fa581709deac0c9ad65cca8a9e9beac660adcbd493c798a"}, +] + [[package]] name = "pywin32" version = "306" @@ -3696,4 +3722,4 @@ testing = ["big-O", "jaraco.functools", "jaraco.itertools", "more-itertools", "p [metadata] lock-version = "2.0" python-versions = ">=3.9,<4" -content-hash = "0fbec82975dc0c59328c69a1cbbee16adfd2fcbed57e4e3b282b07c6e4ace666" +content-hash = "a0f3c1619bf69b745b0adf7f30e3d4fddf84d3522f7b87e2a55ca3c49e0d20b7" diff --git a/pyproject.toml b/pyproject.toml index d770eea..63f09bc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -39,6 +39,7 @@ rich = ">=10.11.0" shellingham = ">=1.5.4" jsonschema = ">=4" paramiko = "^3.5.0" +croniter = "^5.0.1" [tool.poetry.group.dev.dependencies] containers-sugar = ">=1.11.1" diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py index 66448f0..8dd3217 100644 --- a/src/makim/scheduler.py +++ b/src/makim/scheduler.py @@ -2,12 +2,15 @@ import asyncio +from datetime import datetime from pathlib import Path from typing import Any +from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler +from croniter import croniter from makim.core import Makim from makim.logs import MakimError, MakimLogs @@ -16,6 +19,13 @@ class MakimScheduler: """Manages scheduled tasks for Makim using APScheduler.""" + _instance = None # Singleton pattern for scheduler instance + + def __new__(cls, *args, **kwargs): + if cls._instance is None: + cls._instance = super(MakimScheduler, cls).__new__(cls) + return cls._instance + def __init__(self, makim_instance: 'Makim'): self.makim = makim_instance self.db_path = Path.home() / '.makim' / 'jobs.db' @@ -34,6 +44,11 @@ def __init__(self, makim_instance: 'Makim'): job_defaults={'coalesce': False, 'max_instances': 3}, ) + # Listen for job events + self.scheduler.add_listener( + self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR + ) + def _ensure_db_directory(self) -> None: """Ensure the database directory exists.""" self.db_path.parent.mkdir(parents=True, exist_ok=True) @@ -46,7 +61,9 @@ def _execute_task(self, task_name: str, args: dict) -> None: asyncio.set_event_loop(loop) # Run the task - self.makim.run({'task': task_name, **args}) + default_args = self.makim.global_data.get('default_args', {}) + merged_args = {**default_args, **(args or {})} + self.makim.run({'task': task_name, **merged_args}) except Exception as e: MakimLogs.print_error( f'Error executing scheduled task {task_name}: {e!s}' @@ -54,6 +71,37 @@ def _execute_task(self, task_name: str, args: dict) -> None: finally: loop.close() + def _log_job_event(self, event) -> None: + """Log job execution success or failure.""" + job = self.scheduler.get_job(event.job_id) + if event.exception: + MakimLogs.print_error(f'Job {job.id} failed: {event.exception}') + else: + MakimLogs.print_info(f'Job {job.id} executed successfully.') + + def _validate_and_parse_schedule(self, schedule: str) -> dict: + """Validate and parse cron expressions.""" + try: + # Use croniter to validate and compute next run time + base_time = datetime.now() + iter = croniter(schedule, base_time) + + # Get parsed schedule for APScheduler + next_time = iter.get_next(datetime) + cron_params = { + 'minute': iter.next_exact('minute'), + 'hour': iter.next_exact('hour'), + 'day': iter.next_exact('day'), + 'month': iter.next_exact('month'), + 'day_of_week': iter.next_exact('weekday'), + } + return cron_params + except ValueError: + MakimLogs.raise_error( + f'Invalid cron expression: {schedule}', + MakimError.SCHEDULER_INVALID_SCHEDULE, + ) + def add_job( self, job_id: str, @@ -75,13 +123,14 @@ def add_job( args : dict[Any, Any], optional Arguments to pass to the task """ + cron_params = self._validate_and_parse_schedule(schedule) try: self.scheduler.add_job( func=self._execute_task, trigger='cron', args=[task_name, args or {}], id=job_id, - **self._parse_schedule(schedule), + **cron_params, replace_existing=True, ) MakimLogs.print_info(f"Successfully scheduled job '{job_id}'") @@ -102,27 +151,13 @@ def remove_job(self, job_id: str) -> None: MakimError.SCHEDULER_JOB_ERROR, ) - def get_job_status(self, job_id: str) -> dict: - """Get the status of a scheduled job.""" - job = self.scheduler.get_job(job_id) - if not job: - MakimLogs.raise_error( - f"Job '{job_id}' not found", MakimError.SCHEDULER_JOB_NOT_FOUND - ) - - return { - 'id': job.id, - 'next_run': job.next_run_time, - 'schedule': str(job.trigger), - 'active': job.next_run_time is not None, - } - def list_jobs(self) -> list[dict[str, Any]]: """List all scheduled jobs.""" return [ { 'id': job.id, 'next_run': job.next_run_time, + 'last_run': job.last_run_time, # Include last run 'schedule': str(job.trigger), } for job in self.scheduler.get_jobs() @@ -137,35 +172,3 @@ def stop(self) -> None: """Stop the scheduler.""" if self.scheduler.running: self.scheduler.shutdown() - - def _parse_schedule(self, schedule: str) -> dict: - """Parse schedule string into APScheduler arguments.""" - schedule = schedule.lower() - - if schedule in ('hourly', 'daily', 'weekly', 'monthly', 'yearly'): - if schedule == 'hourly': - return {'hour': '*'} - elif schedule == 'daily': - return {'day': '*'} - elif schedule == 'weekly': - return {'day_of_week': '0'} # Sunday - elif schedule == 'monthly': - return {'day': '1'} - elif schedule == 'yearly': - return {'month': '1', 'day': '1'} - - # Handle cron expressions - try: - minute, hour, day, month, day_of_week = schedule.split()[:5] - return { - 'minute': minute, - 'hour': hour, - 'day': day, - 'month': month, - 'day_of_week': day_of_week, - } - except ValueError: - MakimLogs.raise_error( - f'Invalid schedule format: {schedule}', - MakimError.SCHEDULER_INVALID_SCHEDULE, - ) From 787976df44e102ac65791f5169f971d6ce3bd47a Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Mon, 18 Nov 2024 20:00:49 +0530 Subject: [PATCH 03/10] join the scheduler class with the core file --- src/makim/core.py | 31 +++++++++++++++++++++++++++++++ src/makim/scheduler.py | 32 +++++++++++++++++++++----------- 2 files changed, 52 insertions(+), 11 deletions(-) diff --git a/src/makim/core.py b/src/makim/core.py index f5ae971..1bbd107 100644 --- a/src/makim/core.py +++ b/src/makim/core.py @@ -33,6 +33,7 @@ from makim.console import get_terminal_size from makim.logs import MakimError, MakimLogs +from makim.scheduler import MakimScheduler MAKIM_CURRENT_PATH = Path(__file__).parent @@ -145,6 +146,7 @@ def __init__(self) -> None: self.shell_app = sh.xonsh self.shell_args: list[str] = [] self.tmp_suffix: str = '.makim' + self.scheduler = MakimScheduler(self) # Initialize the scheduler def _call_shell_app(self, cmd: str) -> None: self._load_shell_app() @@ -659,6 +661,34 @@ def _generate_matrix_combinations( return combinations + # scheduler methods + def add_scheduled_job(self, job_id: str, task_name: str, schedule: str, args: dict = None) -> None: + """Add a new scheduled job.""" + self.scheduler.add_job(job_id, task_name, schedule, args) + + def remove_scheduled_job(self, job_id: str) -> None: + """Remove a scheduled job.""" + self.scheduler.remove_job(job_id) + + def list_scheduled_jobs(self) -> list[dict]: + """List all scheduled jobs.""" + return self.scheduler.list_jobs() + + # def get_scheduled_job_status(self, job_id: str) -> dict: + # """Get the status of a scheduled job.""" + # return self.scheduler.get_job_status(job_id) + + def load_scheduled_tasks(self) -> None: + """Load scheduled tasks from the configuration.""" + if 'scheduler' in self.global_data: + scheduler_config = self.global_data['scheduler'] + for job_id, job_config in scheduler_config.items(): + task_name = job_config.get('task') + schedule = job_config.get('schedule') + args = job_config.get('args', {}) + if task_name and schedule: + self.add_scheduled_job(job_id, task_name, schedule, args) + # run commands def _run_hooks(self, args: dict[str, Any], hook_type: str) -> None: if not self.task_data.get('hooks', {}).get(hook_type): @@ -854,6 +884,7 @@ def run(self, args: dict[str, Any]) -> None: self._verify_args() self._change_task(args['task']) self._load_task_args() + self.load_scheduled_tasks() # commands if self.task_data.get('if') and not self._verify_task_conditional( diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py index 8dd3217..2b941c9 100644 --- a/src/makim/scheduler.py +++ b/src/makim/scheduler.py @@ -1,21 +1,17 @@ """Manages scheduled tasks for Makim using APScheduler.""" import asyncio - from datetime import datetime from pathlib import Path from typing import Any - -from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR from croniter import croniter - from makim.core import Makim from makim.logs import MakimError, MakimLogs - class MakimScheduler: """Manages scheduled tasks for Makim using APScheduler.""" @@ -45,9 +41,7 @@ def __init__(self, makim_instance: 'Makim'): ) # Listen for job events - self.scheduler.add_listener( - self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR - ) + self.scheduler.add_listener(self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) def _ensure_db_directory(self) -> None: """Ensure the database directory exists.""" @@ -75,9 +69,9 @@ def _log_job_event(self, event) -> None: """Log job execution success or failure.""" job = self.scheduler.get_job(event.job_id) if event.exception: - MakimLogs.print_error(f'Job {job.id} failed: {event.exception}') + MakimLogs.print_error(f"Job {job.id} failed: {event.exception}") else: - MakimLogs.print_info(f'Job {job.id} executed successfully.') + MakimLogs.print_info(f"Job {job.id} executed successfully.") def _validate_and_parse_schedule(self, schedule: str) -> dict: """Validate and parse cron expressions.""" @@ -98,7 +92,7 @@ def _validate_and_parse_schedule(self, schedule: str) -> dict: return cron_params except ValueError: MakimLogs.raise_error( - f'Invalid cron expression: {schedule}', + f"Invalid cron expression: {schedule}", MakimError.SCHEDULER_INVALID_SCHEDULE, ) @@ -151,6 +145,22 @@ def remove_job(self, job_id: str) -> None: MakimError.SCHEDULER_JOB_ERROR, ) + # def get_job_status(self, job_id: str) -> dict: + # """Get the status of a scheduled job.""" + # job = self.scheduler.get_job(job_id) + # if not job: + # MakimLogs.raise_error( + # f"Job '{job_id}' not found", MakimError.SCHEDULER_JOB_NOT_FOUND + # ) + + # return { + # 'id': job.id, + # 'next_run': job.next_run_time, + # 'last_run': job.last_run_time, # New attribute added + # 'schedule': str(job.trigger), + # 'active': job.next_run_time is not None, + # } + def list_jobs(self) -> list[dict[str, Any]]: """List all scheduled jobs.""" return [ From 7a5f95703d6ddf32b08070ce9d73f7b420d2c5c6 Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Thu, 5 Dec 2024 20:29:16 +0530 Subject: [PATCH 04/10] add schema for scheduler --- src/makim/schema.json | 29 +++++++++++++++++++++++++++++ 1 file changed, 29 insertions(+) diff --git a/src/makim/schema.json b/src/makim/schema.json index ba3bafb..c484dfc 100644 --- a/src/makim/schema.json +++ b/src/makim/schema.json @@ -270,6 +270,35 @@ } }, "additionalProperties": false + }, + "scheduler": { + "type": "object", + "description": "Scheduler configuration for tasks", + "patternProperties": { + "^[a-zA-Z0-9_-]+$": { + "type": "object", + "required": ["schedule", "task"], + "properties": { + "schedule": { + "type": "string", + "description": "Cron-style schedule for the task (e.g., '* * * * *' for every minute)" + }, + "task": { + "type": "string", + "description": "Full task path in the format 'group.task'" + }, + "args": { + "type": "object", + "description": "Arguments to pass to the scheduled task", + "additionalProperties": { + "type": ["string", "boolean"] + } + } + }, + "additionalProperties": false + } + }, + "additionalProperties": false } }, "additionalProperties": false From 8450256de09ad03ea22745abeb4415d5a5627d28 Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Thu, 5 Dec 2024 20:30:02 +0530 Subject: [PATCH 05/10] solve the loop import error --- src/makim/scheduler.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py index 2b941c9..663c68e 100644 --- a/src/makim/scheduler.py +++ b/src/makim/scheduler.py @@ -9,9 +9,12 @@ from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR from croniter import croniter -from makim.core import Makim from makim.logs import MakimError, MakimLogs +from typing import TYPE_CHECKING +if TYPE_CHECKING: + from makim import Makim + class MakimScheduler: """Manages scheduled tasks for Makim using APScheduler.""" From 5f43752555c92fd98c8aec73577b96cc565ebbcc Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Sun, 22 Dec 2024 16:00:13 +0530 Subject: [PATCH 06/10] add support for cron commands --- src/makim/cli/__init__.py | 96 +++++++++++++++++++++------------------ 1 file changed, 52 insertions(+), 44 deletions(-) diff --git a/src/makim/cli/__init__.py b/src/makim/cli/__init__.py index 4860c90..0c0763f 100644 --- a/src/makim/cli/__init__.py +++ b/src/makim/cli/__init__.py @@ -1,14 +1,12 @@ -"""Cli functions to define the arguments and to call Makim.""" +"""CLI functions to define the arguments and call Makim.""" from __future__ import annotations import os import sys - from typing import Any, cast import typer - from makim import __version__ from makim.cli.auto_generator import ( create_dynamic_command, @@ -20,51 +18,50 @@ app = typer.Typer( help=( - 'Makim is a tool that helps you to organize ' - 'and simplify your helper commands.' + "Makim is a tool that helps you to organize " + "and simplify your helper commands." ), epilog=( - 'If you have any problem, open an issue at: ' - 'https://github.com/osl-incubator/makim' + "If you have any problem, open an issue at: " + "https://github.com/osl-incubator/makim" ), ) makim: Makim = Makim() - @app.callback(invoke_without_command=True) def main( ctx: typer.Context, version: bool = typer.Option( None, - '--version', - '-v', + "--version", + "-v", is_flag=True, - help='Show the version and exit', + help="Show the version and exit", ), file: str = typer.Option( - '.makim.yaml', - '--file', - help='Makim config file', + ".makim.yaml", + "--file", + help="Makim config file", ), dry_run: bool = typer.Option( None, - '--dry-run', + "--dry-run", is_flag=True, - help='Execute the command in dry mode', + help="Execute the command in dry mode", ), verbose: bool = typer.Option( None, - '--verbose', + "--verbose", is_flag=True, - help='Execute the command in verbose mode', + help="Execute the command in verbose mode", ), ) -> None: - """Process envers for specific flags, otherwise show the help menu.""" - typer.echo(f'Makim file: {file}') + """Process top-level flags; otherwise, show the help menu.""" + typer.echo(f"Makim file: {file}") if version: - typer.echo(f'Version: {__version__}') + typer.echo(f"Version: {__version__}") raise typer.Exit() if ctx.invoked_subcommand is None: @@ -79,14 +76,14 @@ def _get_command_from_cli() -> str: This function is based on `CLI_ROOT_FLAGS_VALUES_COUNT`. """ params = sys.argv[1:] - command = '' + command = "" try: idx = 0 while idx < len(params): arg = params[idx] if arg not in CLI_ROOT_FLAGS_VALUES_COUNT: - command = f'flag `{arg}`' if arg.startswith('--') else arg + command = f"flag `{arg}`" if arg.startswith("--") else arg break idx += 1 + CLI_ROOT_FLAGS_VALUES_COUNT[arg] @@ -97,59 +94,70 @@ def _get_command_from_cli() -> str: def run_app() -> None: - """Run the typer app.""" + """Run the Typer app.""" root_config = extract_root_config() - config_file_path = cast(str, root_config.get('file', '.makim.yaml')) + config_file_path = cast(str, root_config.get("file", ".makim.yaml")) cli_completion_words = [ - w for w in os.getenv('COMP_WORDS', '').split('\n') if w + w for w in os.getenv("COMP_WORDS", "").split("\n") if w ] if not makim._check_makim_file(config_file_path) and cli_completion_words: - # autocomplete call + # Autocomplete call root_config = extract_root_config(cli_completion_words) - config_file_path = cast(str, root_config.get('file', '.makim.yaml')) + config_file_path = cast(str, root_config.get("file", ".makim.yaml")) if not makim._check_makim_file(config_file_path): return makim.load( file=config_file_path, - dry_run=cast(bool, root_config.get('dry_run', False)), - verbose=cast(bool, root_config.get('verbose', False)), + dry_run=cast(bool, root_config.get("dry_run", False)), + verbose=cast(bool, root_config.get("verbose", False)), ) - # create tasks data + # Create tasks data tasks: dict[str, Any] = {} - for group_name, group_data in makim.global_data.get('groups', {}).items(): - for task_name, task_data in group_data.get('tasks', {}).items(): - tasks[f'{group_name}.{task_name}'] = task_data + for group_name, group_data in makim.global_data.get("groups", {}).items(): + for task_name, task_data in group_data.get("tasks", {}).items(): + tasks[f"{group_name}.{task_name}"] = task_data - # Add dynamically cron commands to Typer app - if 'scheduler' in makim.global_data: + # Add dynamically created cron commands + if "scheduler" in makim.global_data: typer_cron = typer.Typer( - help='Tasks Scheduler', + help="Tasks Scheduler", invoke_without_command=True, ) for schedule_name, schedule_params in makim.global_data.get( - 'scheduler', {} + "scheduler", {} ).items(): create_dynamic_command_cron( makim, typer_cron, schedule_name, schedule_params or {} ) - # Add cron command - app.add_typer(typer_cron, name='cron', rich_help_panel='Extensions') + @typer_cron.command(help="List all scheduled tasks") + def list(): + print("list") + + @typer_cron.command(help="Start a scheduler by its name") + def start(name: str): + print("start") + + @typer_cron.command(help="Stop a scheduler by its name") + def stop(name: str): + print("Stop") + + app.add_typer(typer_cron, name="cron", rich_help_panel="Extensions") - # Add dynamically commands to Typer app + # Add dynamically created commands to the Typer app for name, args in tasks.items(): create_dynamic_command(makim, app, name, args) try: app() except SystemExit as e: - # code 2 means code not found + # Code 2 means command not found error_code = 2 if e.code != error_code: raise e @@ -163,11 +171,11 @@ def run_app() -> None: typer.secho( f"Command {command_used} not found. Did you mean '{suggestion}'?", - fg='red', + fg="red", ) raise e -if __name__ == '__main__': +if __name__ == "__main__": run_app() From b24979328916a7dfcee11f4ea51f5ac5bb629890 Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Sun, 22 Dec 2024 19:17:16 +0530 Subject: [PATCH 07/10] change the scheduler implementation --- src/makim/core.py | 57 ++++---- src/makim/scheduler.py | 304 +++++++++++++++++++---------------------- 2 files changed, 165 insertions(+), 196 deletions(-) diff --git a/src/makim/core.py b/src/makim/core.py index b24434f..8cebc45 100644 --- a/src/makim/core.py +++ b/src/makim/core.py @@ -133,6 +133,7 @@ class Makim: task_name: str = '' task_data: dict[str, Any] = {} ssh_config: dict[str, Any] = {} + scheduler: Optional[MakimScheduler] = None def __init__(self) -> None: """Prepare the Makim class with the default configuration.""" @@ -146,7 +147,7 @@ def __init__(self) -> None: self.shell_app = sh.xonsh self.shell_args: list[str] = [] self.tmp_suffix: str = '.makim' - self.scheduler = MakimScheduler(self) # Initialize the scheduler + self.scheduler = None def _call_shell_app(self, cmd: str) -> None: self._load_shell_app() @@ -387,7 +388,28 @@ def _load_config_data(self) -> None: self.ssh_config = self.global_data.get('hosts', {}) self._validate_config() - + + if 'scheduler' in self.global_data: + if self.scheduler is None: + self.scheduler = MakimScheduler(self) + + # Load scheduler configurations + for name, config in self.global_data['scheduler'].items(): + schedule = config.get('schedule') + task = config.get('task') + args = config.get('args', {}) + + if schedule and task: + try: + self.scheduler.add_job(name, schedule, task, args) + except Exception as e: + MakimLogs.print_info(f"Failed to load scheduler {name}: {e}") + + def shutdown(self) -> None: + """Cleanup resources before exit.""" + if self.scheduler: + self.scheduler.shutdown() + def _resolve_working_directory(self, scope: str) -> Optional[Path]: scope_options = ('global', 'group', 'task') if scope not in scope_options: @@ -661,34 +683,6 @@ def _generate_matrix_combinations( return combinations - # scheduler methods - def add_scheduled_job(self, job_id: str, task_name: str, schedule: str, args: dict = None) -> None: - """Add a new scheduled job.""" - self.scheduler.add_job(job_id, task_name, schedule, args) - - def remove_scheduled_job(self, job_id: str) -> None: - """Remove a scheduled job.""" - self.scheduler.remove_job(job_id) - - def list_scheduled_jobs(self) -> list[dict]: - """List all scheduled jobs.""" - return self.scheduler.list_jobs() - - # def get_scheduled_job_status(self, job_id: str) -> dict: - # """Get the status of a scheduled job.""" - # return self.scheduler.get_job_status(job_id) - - def load_scheduled_tasks(self) -> None: - """Load scheduled tasks from the configuration.""" - if 'scheduler' in self.global_data: - scheduler_config = self.global_data['scheduler'] - for job_id, job_config in scheduler_config.items(): - task_name = job_config.get('task') - schedule = job_config.get('schedule') - args = job_config.get('args', {}) - if task_name and schedule: - self.add_scheduled_job(job_id, task_name, schedule, args) - # run commands def _run_hooks(self, args: dict[str, Any], hook_type: str) -> None: if not self.task_data.get('hooks', {}).get(hook_type): @@ -884,7 +878,6 @@ def run(self, args: dict[str, Any]) -> None: self._verify_args() self._change_task(args['task']) self._load_task_args() - self.load_scheduled_tasks() # commands if self.task_data.get('if') and not self._verify_task_conditional( @@ -897,4 +890,4 @@ def run(self, args: dict[str, Any]) -> None: self._run_hooks(args, 'pre-run') self._run_command(args) - self._run_hooks(args, 'post-run') + self._run_hooks(args, 'post-run') \ No newline at end of file diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py index 663c68e..0abeef4 100644 --- a/src/makim/scheduler.py +++ b/src/makim/scheduler.py @@ -1,187 +1,163 @@ -"""Manages scheduled tasks for Makim using APScheduler.""" +from __future__ import annotations -import asyncio +import json +import os +import subprocess from datetime import datetime from pathlib import Path -from typing import Any -from apscheduler.executors.pool import ThreadPoolExecutor -from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore -from apscheduler.schedulers.background import BackgroundScheduler -from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR -from croniter import croniter -from makim.logs import MakimError, MakimLogs -from typing import TYPE_CHECKING - -if TYPE_CHECKING: - from makim import Makim - -class MakimScheduler: - """Manages scheduled tasks for Makim using APScheduler.""" +from typing import Any, Dict, Optional - _instance = None # Singleton pattern for scheduler instance - - def __new__(cls, *args, **kwargs): - if cls._instance is None: - cls._instance = super(MakimScheduler, cls).__new__(cls) - return cls._instance - - def __init__(self, makim_instance: 'Makim'): - self.makim = makim_instance - self.db_path = Path.home() / '.makim' / 'jobs.db' - self._ensure_db_directory() +from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.triggers.cron import CronTrigger +from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore +from apscheduler.job import Job - # Configure job stores and executors +class MakimScheduler: + """Handles task scheduling for Makim.""" + + def __init__(self, makim_instance: Any): + """Initialize the scheduler with configuration.""" + self.config_file = makim_instance.file + self.scheduler = None + self.job_store_path = Path.home() / '.makim' / 'jobs.sqlite' + self.job_history_path = Path.home() / '.makim' / 'history.json' + self._setup_directories() + self._initialize_scheduler() + self.job_history: Dict[str, list[Dict[str, Any]]] = self._load_history() + + def _setup_directories(self) -> None: + """Create necessary directories for job storage.""" + self.job_store_path.parent.mkdir(parents=True, exist_ok=True) + + def _initialize_scheduler(self) -> None: + """Initialize the APScheduler with SQLite backend.""" jobstores = { - 'default': SQLAlchemyJobStore(url=f'sqlite:///{self.db_path}') + 'default': SQLAlchemyJobStore(url=f'sqlite:///{self.job_store_path}') } - executors = {'default': ThreadPoolExecutor(20)} - - # Initialize scheduler with SQLite storage - self.scheduler = BackgroundScheduler( - jobstores=jobstores, - executors=executors, - job_defaults={'coalesce': False, 'max_instances': 3}, - ) - - # Listen for job events - self.scheduler.add_listener(self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) - - def _ensure_db_directory(self) -> None: - """Ensure the database directory exists.""" - self.db_path.parent.mkdir(parents=True, exist_ok=True) - - def _execute_task(self, task_name: str, args: dict) -> None: - """Execute a Makim task within the scheduler.""" + self.scheduler = BackgroundScheduler(jobstores=jobstores) + self.scheduler.start() + + def _load_history(self) -> Dict[str, list[Dict[str, Any]]]: + """Load job execution history from file.""" + if self.job_history_path.exists(): + with open(self.job_history_path, 'r') as f: + return json.load(f) + return {} + + def _save_history(self) -> None: + """Save job execution history to file.""" + with open(self.job_history_path, 'w') as f: + json.dump(self.job_history, f) + + def _log_execution(self, name: str, event: str, result: Optional[str] = None, error: Optional[str] = None) -> None: + """Log execution details to history.""" + if name not in self.job_history: + self.job_history[name] = [] + + self.job_history[name].append({ + 'timestamp': datetime.now().isoformat(), + 'event': event, + 'result': result, + 'error': error + }) + self._save_history() + + @staticmethod + def _run_makim_task(config_file: str, task: str, args: Dict[str, Any]) -> None: + """Static method to execute a Makim task.""" + cmd = ['makim', '--file', config_file, task] + + # Convert args to command line arguments + for key, value in (args or {}).items(): + if isinstance(value, bool): + if value: + cmd.append(f'--{key}') + else: + cmd.extend([f'--{key}', str(value)]) + try: - # Create a new event loop for this thread - loop = asyncio.new_event_loop() - asyncio.set_event_loop(loop) - - # Run the task - default_args = self.makim.global_data.get('default_args', {}) - merged_args = {**default_args, **(args or {})} - self.makim.run({'task': task_name, **merged_args}) - except Exception as e: - MakimLogs.print_error( - f'Error executing scheduled task {task_name}: {e!s}' - ) - finally: - loop.close() - - def _log_job_event(self, event) -> None: - """Log job execution success or failure.""" - job = self.scheduler.get_job(event.job_id) - if event.exception: - MakimLogs.print_error(f"Job {job.id} failed: {event.exception}") - else: - MakimLogs.print_info(f"Job {job.id} executed successfully.") - - def _validate_and_parse_schedule(self, schedule: str) -> dict: - """Validate and parse cron expressions.""" - try: - # Use croniter to validate and compute next run time - base_time = datetime.now() - iter = croniter(schedule, base_time) - - # Get parsed schedule for APScheduler - next_time = iter.get_next(datetime) - cron_params = { - 'minute': iter.next_exact('minute'), - 'hour': iter.next_exact('hour'), - 'day': iter.next_exact('day'), - 'month': iter.next_exact('month'), - 'day_of_week': iter.next_exact('weekday'), - } - return cron_params - except ValueError: - MakimLogs.raise_error( - f"Invalid cron expression: {schedule}", - MakimError.SCHEDULER_INVALID_SCHEDULE, - ) + result = subprocess.run(cmd, capture_output=True, text=True, check=True) + return result.stdout + except subprocess.CalledProcessError as e: + raise Exception(f"Job execution failed: {e.stderr}") + + def add_job(self, name: str, schedule: str, task: str, args: Optional[Dict[str, Any]] = None) -> None: + """Add a new scheduled job.""" + if not self.scheduler: + raise RuntimeError("Scheduler not initialized") - def add_job( - self, - job_id: str, - task_name: str, - schedule: str, - args: dict[Any, Any] = None, - ) -> None: - """ - Add a new scheduled job. - - Parameters - ---------- - job_id : str - Unique identifier for the job - task_name : str - Name of the Makim task to execute - schedule : str - Cron schedule expression - args : dict[Any, Any], optional - Arguments to pass to the task - """ - cron_params = self._validate_and_parse_schedule(schedule) try: + # Create trigger from schedule + trigger = CronTrigger.from_crontab(schedule) + + # Add the job using the static method self.scheduler.add_job( - func=self._execute_task, - trigger='cron', - args=[task_name, args or {}], - id=job_id, - **cron_params, + func=self._run_makim_task, + trigger=trigger, + args=[self.config_file, task, args or {}], + id=name, + name=name, replace_existing=True, + misfire_grace_time=None ) - MakimLogs.print_info(f"Successfully scheduled job '{job_id}'") + + self._log_execution(name, 'scheduled') + except Exception as e: - MakimLogs.raise_error( - f"Failed to schedule job '{job_id}': {e!s}", - MakimError.SCHEDULER_JOB_ERROR, - ) + self._log_execution(name, 'schedule_failed', error=str(e)) + raise - def remove_job(self, job_id: str) -> None: + def remove_job(self, name: str) -> None: """Remove a scheduled job.""" + if not self.scheduler: + raise RuntimeError("Scheduler not initialized") + try: - self.scheduler.remove_job(job_id) - MakimLogs.print_info(f"Successfully removed job '{job_id}'") + self.scheduler.remove_job(name) + self._log_execution(name, 'removed') except Exception as e: - MakimLogs.raise_error( - f"Failed to remove job '{job_id}': {e!s}", - MakimError.SCHEDULER_JOB_ERROR, - ) + self._log_execution(name, 'remove_failed', error=str(e)) + raise + + def get_job(self, name: str) -> Optional[Job]: + """Get a job by name.""" + if not self.scheduler: + raise RuntimeError("Scheduler not initialized") + + return self.scheduler.get_job(name) - # def get_job_status(self, job_id: str) -> dict: - # """Get the status of a scheduled job.""" - # job = self.scheduler.get_job(job_id) - # if not job: - # MakimLogs.raise_error( - # f"Job '{job_id}' not found", MakimError.SCHEDULER_JOB_NOT_FOUND - # ) - - # return { - # 'id': job.id, - # 'next_run': job.next_run_time, - # 'last_run': job.last_run_time, # New attribute added - # 'schedule': str(job.trigger), - # 'active': job.next_run_time is not None, - # } - - def list_jobs(self) -> list[dict[str, Any]]: + def list_jobs(self) -> list[Dict[str, Any]]: """List all scheduled jobs.""" - return [ - { + if not self.scheduler: + raise RuntimeError("Scheduler not initialized") + + jobs = [] + for job in self.scheduler.get_jobs(): + job_info = { 'id': job.id, - 'next_run': job.next_run_time, - 'last_run': job.last_run_time, # Include last run + 'name': job.name, + 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, 'schedule': str(job.trigger), } - for job in self.scheduler.get_jobs() - ] - - def start(self) -> None: - """Start the scheduler.""" - if not self.scheduler.running: - self.scheduler.start() - - def stop(self) -> None: - """Stop the scheduler.""" - if self.scheduler.running: - self.scheduler.shutdown() + jobs.append(job_info) + return jobs + + def get_job_status(self, name: str) -> Dict[str, Any]: + """Get detailed status of a specific job.""" + job = self.get_job(name) + if not job: + return {'error': 'Job not found'} + + history = self.job_history.get(name, []) + + return { + 'name': name, + 'next_run_time': job.next_run_time.isoformat() if job.next_run_time else None, + 'schedule': str(job.trigger), + 'history': history + } + + def shutdown(self) -> None: + """Shutdown the scheduler.""" + if self.scheduler: + self.scheduler.shutdown() \ No newline at end of file From ee0801a5831f15e45fab98c4c277706a59595fdd Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Mon, 23 Dec 2024 19:29:38 +0530 Subject: [PATCH 08/10] update the cli init file --- src/makim/cli/__init__.py | 42 +++++++++++++++++++++++++++++++++++++-- 1 file changed, 40 insertions(+), 2 deletions(-) diff --git a/src/makim/cli/__init__.py b/src/makim/cli/__init__.py index 0c0763f..bc6d462 100644 --- a/src/makim/cli/__init__.py +++ b/src/makim/cli/__init__.py @@ -15,6 +15,8 @@ ) from makim.cli.config import CLI_ROOT_FLAGS_VALUES_COUNT, extract_root_config from makim.core import Makim +from rich.table import Table +from rich.console import Console app = typer.Typer( help=( @@ -138,11 +140,47 @@ def run_app() -> None: @typer_cron.command(help="List all scheduled tasks") def list(): - print("list") + """List all scheduled tasks.""" + if not makim.scheduler: + typer.echo("No scheduled tasks configured.") + return + + console = Console() + table = Table(show_header=True, header_style="bold") + table.add_column("Name") + table.add_column("Next Run") + + jobs = makim.scheduler.list_jobs() + for job in jobs: + table.add_row( + job['name'], + job['next_run_time'] or "Not scheduled" + ) + + console.print(table) @typer_cron.command(help="Start a scheduler by its name") def start(name: str): - print("start") + """Start (enable) a scheduled task.""" + if not makim.scheduler: + typer.echo("No scheduler configured.") + return + + try: + schedule_config = makim.global_data.get("scheduler", {}).get(name) + if not schedule_config: + typer.echo(f"No configuration found for schedule '{name}'") + return + + makim.scheduler.add_job( + name=name, + schedule=schedule_config["schedule"], + task=schedule_config["task"], + args=schedule_config.get("args", {}) + ) + typer.echo(f"Successfully started schedule '{name}'") + except Exception as e: + typer.echo(f"Failed to start schedule '{name}': {e}", err=True) @typer_cron.command(help="Stop a scheduler by its name") def stop(name: str): From 05d81d45d6fcc22c18f492a786255e875fa29f82 Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Mon, 23 Dec 2024 20:50:05 +0530 Subject: [PATCH 09/10] add the functionality for stopping the task --- src/makim/cli/__init__.py | 19 ++++++++++++++----- 1 file changed, 14 insertions(+), 5 deletions(-) diff --git a/src/makim/cli/__init__.py b/src/makim/cli/__init__.py index bc6d462..31930dc 100644 --- a/src/makim/cli/__init__.py +++ b/src/makim/cli/__init__.py @@ -139,7 +139,7 @@ def run_app() -> None: ) @typer_cron.command(help="List all scheduled tasks") - def list(): + def list() -> None: """List all scheduled tasks.""" if not makim.scheduler: typer.echo("No scheduled tasks configured.") @@ -160,7 +160,7 @@ def list(): console.print(table) @typer_cron.command(help="Start a scheduler by its name") - def start(name: str): + def start(name: str) -> None: """Start (enable) a scheduled task.""" if not makim.scheduler: typer.echo("No scheduler configured.") @@ -183,10 +183,19 @@ def start(name: str): typer.echo(f"Failed to start schedule '{name}': {e}", err=True) @typer_cron.command(help="Stop a scheduler by its name") - def stop(name: str): - print("Stop") + def stop(name: str) -> None: + """Stop (disable) a scheduled task.""" + if not makim.scheduler: + typer.echo("No scheduler configured.") + return + + try: + makim.scheduler.remove_job(name) + typer.echo(f"Successfully stopped schedule '{name}'") + except Exception as e: + typer.echo(f"Failed to stop schedule '{name}': {e}", err=True) - app.add_typer(typer_cron, name="cron", rich_help_panel="Extensions") + app.add_typer(typer_cron, name="cron", rich_help_panel="Extensions") # Add dynamically created commands to the Typer app for name, args in tasks.items(): From bcbb3199774ea1895633aefc99c64501c4d68cc6 Mon Sep 17 00:00:00 2001 From: Abhijeet Saroha Date: Mon, 23 Dec 2024 20:52:09 +0530 Subject: [PATCH 10/10] unlink the scheduler from the previous .yaml file --- src/makim/scheduler.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py index 0abeef4..89e8400 100644 --- a/src/makim/scheduler.py +++ b/src/makim/scheduler.py @@ -24,6 +24,28 @@ def __init__(self, makim_instance: Any): self._setup_directories() self._initialize_scheduler() self.job_history: Dict[str, list[Dict[str, Any]]] = self._load_history() + self._sync_jobs_with_config(makim_instance.global_data.get('scheduler', {})) + + def _sync_jobs_with_config(self, config_jobs: Dict[str, Any]) -> None: + """Synchronize scheduler jobs with current config file.""" + if not self.scheduler: + return + + # Remove jobs not in current config + current_jobs = set(self.scheduler.get_jobs()) + config_job_ids = set(config_jobs.keys()) + + for job in current_jobs: + if job.id not in config_job_ids: + self.scheduler.remove_job(job.id) + + # Clear history for removed jobs + self.job_history = { + name: history + for name, history in self.job_history.items() + if name in config_job_ids + } + self._save_history() def _setup_directories(self) -> None: """Create necessary directories for job storage.""" @@ -31,6 +53,9 @@ def _setup_directories(self) -> None: def _initialize_scheduler(self) -> None: """Initialize the APScheduler with SQLite backend.""" + if Path(self.job_store_path).exists(): + Path(self.job_store_path).unlink() + jobstores = { 'default': SQLAlchemyJobStore(url=f'sqlite:///{self.job_store_path}') }