diff --git a/src/makim/core.py b/src/makim/core.py index f5ae971..1bbd107 100644 --- a/src/makim/core.py +++ b/src/makim/core.py @@ -33,6 +33,7 @@ from makim.console import get_terminal_size from makim.logs import MakimError, MakimLogs +from makim.scheduler import MakimScheduler MAKIM_CURRENT_PATH = Path(__file__).parent @@ -145,6 +146,7 @@ def __init__(self) -> None: self.shell_app = sh.xonsh self.shell_args: list[str] = [] self.tmp_suffix: str = '.makim' + self.scheduler = MakimScheduler(self) # Initialize the scheduler def _call_shell_app(self, cmd: str) -> None: self._load_shell_app() @@ -659,6 +661,34 @@ def _generate_matrix_combinations( return combinations + # scheduler methods + def add_scheduled_job(self, job_id: str, task_name: str, schedule: str, args: dict = None) -> None: + """Add a new scheduled job.""" + self.scheduler.add_job(job_id, task_name, schedule, args) + + def remove_scheduled_job(self, job_id: str) -> None: + """Remove a scheduled job.""" + self.scheduler.remove_job(job_id) + + def list_scheduled_jobs(self) -> list[dict]: + """List all scheduled jobs.""" + return self.scheduler.list_jobs() + + # def get_scheduled_job_status(self, job_id: str) -> dict: + # """Get the status of a scheduled job.""" + # return self.scheduler.get_job_status(job_id) + + def load_scheduled_tasks(self) -> None: + """Load scheduled tasks from the configuration.""" + if 'scheduler' in self.global_data: + scheduler_config = self.global_data['scheduler'] + for job_id, job_config in scheduler_config.items(): + task_name = job_config.get('task') + schedule = job_config.get('schedule') + args = job_config.get('args', {}) + if task_name and schedule: + self.add_scheduled_job(job_id, task_name, schedule, args) + # run commands def _run_hooks(self, args: dict[str, Any], hook_type: str) -> None: if not self.task_data.get('hooks', {}).get(hook_type): @@ -854,6 +884,7 @@ def run(self, args: dict[str, Any]) -> None: self._verify_args() self._change_task(args['task']) self._load_task_args() + self.load_scheduled_tasks() # commands if self.task_data.get('if') and not self._verify_task_conditional( diff --git a/src/makim/scheduler.py b/src/makim/scheduler.py index 8dd3217..2b941c9 100644 --- a/src/makim/scheduler.py +++ b/src/makim/scheduler.py @@ -1,21 +1,17 @@ """Manages scheduled tasks for Makim using APScheduler.""" import asyncio - from datetime import datetime from pathlib import Path from typing import Any - -from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED from apscheduler.executors.pool import ThreadPoolExecutor from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore from apscheduler.schedulers.background import BackgroundScheduler +from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR from croniter import croniter - from makim.core import Makim from makim.logs import MakimError, MakimLogs - class MakimScheduler: """Manages scheduled tasks for Makim using APScheduler.""" @@ -45,9 +41,7 @@ def __init__(self, makim_instance: 'Makim'): ) # Listen for job events - self.scheduler.add_listener( - self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR - ) + self.scheduler.add_listener(self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR) def _ensure_db_directory(self) -> None: """Ensure the database directory exists.""" @@ -75,9 +69,9 @@ def _log_job_event(self, event) -> None: """Log job execution success or failure.""" job = self.scheduler.get_job(event.job_id) if event.exception: - MakimLogs.print_error(f'Job {job.id} failed: {event.exception}') + MakimLogs.print_error(f"Job {job.id} failed: {event.exception}") else: - MakimLogs.print_info(f'Job {job.id} executed successfully.') + MakimLogs.print_info(f"Job {job.id} executed successfully.") def _validate_and_parse_schedule(self, schedule: str) -> dict: """Validate and parse cron expressions.""" @@ -98,7 +92,7 @@ def _validate_and_parse_schedule(self, schedule: str) -> dict: return cron_params except ValueError: MakimLogs.raise_error( - f'Invalid cron expression: {schedule}', + f"Invalid cron expression: {schedule}", MakimError.SCHEDULER_INVALID_SCHEDULE, ) @@ -151,6 +145,22 @@ def remove_job(self, job_id: str) -> None: MakimError.SCHEDULER_JOB_ERROR, ) + # def get_job_status(self, job_id: str) -> dict: + # """Get the status of a scheduled job.""" + # job = self.scheduler.get_job(job_id) + # if not job: + # MakimLogs.raise_error( + # f"Job '{job_id}' not found", MakimError.SCHEDULER_JOB_NOT_FOUND + # ) + + # return { + # 'id': job.id, + # 'next_run': job.next_run_time, + # 'last_run': job.last_run_time, # New attribute added + # 'schedule': str(job.trigger), + # 'active': job.next_run_time is not None, + # } + def list_jobs(self) -> list[dict[str, Any]]: """List all scheduled jobs.""" return [