Skip to content

Commit

Permalink
join the scheduler class with the core file
Browse files Browse the repository at this point in the history
  • Loading branch information
abhijeetSaroha committed Nov 18, 2024
1 parent 5ffc7c3 commit 787976d
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 11 deletions.
31 changes: 31 additions & 0 deletions src/makim/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from makim.console import get_terminal_size
from makim.logs import MakimError, MakimLogs
from makim.scheduler import MakimScheduler

MAKIM_CURRENT_PATH = Path(__file__).parent

Expand Down Expand Up @@ -145,6 +146,7 @@ def __init__(self) -> None:
self.shell_app = sh.xonsh
self.shell_args: list[str] = []
self.tmp_suffix: str = '.makim'
self.scheduler = MakimScheduler(self) # Initialize the scheduler

def _call_shell_app(self, cmd: str) -> None:
self._load_shell_app()
Expand Down Expand Up @@ -659,6 +661,34 @@ def _generate_matrix_combinations(

return combinations

# scheduler methods
def add_scheduled_job(self, job_id: str, task_name: str, schedule: str, args: dict = None) -> None:
"""Add a new scheduled job."""
self.scheduler.add_job(job_id, task_name, schedule, args)

def remove_scheduled_job(self, job_id: str) -> None:
"""Remove a scheduled job."""
self.scheduler.remove_job(job_id)

def list_scheduled_jobs(self) -> list[dict]:
"""List all scheduled jobs."""
return self.scheduler.list_jobs()

# def get_scheduled_job_status(self, job_id: str) -> dict:
# """Get the status of a scheduled job."""
# return self.scheduler.get_job_status(job_id)

def load_scheduled_tasks(self) -> None:
"""Load scheduled tasks from the configuration."""
if 'scheduler' in self.global_data:
scheduler_config = self.global_data['scheduler']
for job_id, job_config in scheduler_config.items():
task_name = job_config.get('task')
schedule = job_config.get('schedule')
args = job_config.get('args', {})
if task_name and schedule:
self.add_scheduled_job(job_id, task_name, schedule, args)

# run commands
def _run_hooks(self, args: dict[str, Any], hook_type: str) -> None:
if not self.task_data.get('hooks', {}).get(hook_type):
Expand Down Expand Up @@ -854,6 +884,7 @@ def run(self, args: dict[str, Any]) -> None:
self._verify_args()
self._change_task(args['task'])
self._load_task_args()
self.load_scheduled_tasks()

# commands
if self.task_data.get('if') and not self._verify_task_conditional(
Expand Down
32 changes: 21 additions & 11 deletions src/makim/scheduler.py
Original file line number Diff line number Diff line change
@@ -1,21 +1,17 @@
"""Manages scheduled tasks for Makim using APScheduler."""

import asyncio

from datetime import datetime
from pathlib import Path
from typing import Any

from apscheduler.events import EVENT_JOB_ERROR, EVENT_JOB_EXECUTED
from apscheduler.executors.pool import ThreadPoolExecutor
from apscheduler.jobstores.sqlalchemy import SQLAlchemyJobStore
from apscheduler.schedulers.background import BackgroundScheduler
from apscheduler.events import EVENT_JOB_EXECUTED, EVENT_JOB_ERROR
from croniter import croniter

from makim.core import Makim
from makim.logs import MakimError, MakimLogs


class MakimScheduler:
"""Manages scheduled tasks for Makim using APScheduler."""

Expand Down Expand Up @@ -45,9 +41,7 @@ def __init__(self, makim_instance: 'Makim'):
)

# Listen for job events
self.scheduler.add_listener(
self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR
)
self.scheduler.add_listener(self._log_job_event, EVENT_JOB_EXECUTED | EVENT_JOB_ERROR)

def _ensure_db_directory(self) -> None:
"""Ensure the database directory exists."""
Expand Down Expand Up @@ -75,9 +69,9 @@ def _log_job_event(self, event) -> None:
"""Log job execution success or failure."""
job = self.scheduler.get_job(event.job_id)
if event.exception:
MakimLogs.print_error(f'Job {job.id} failed: {event.exception}')
MakimLogs.print_error(f"Job {job.id} failed: {event.exception}")
else:
MakimLogs.print_info(f'Job {job.id} executed successfully.')
MakimLogs.print_info(f"Job {job.id} executed successfully.")

def _validate_and_parse_schedule(self, schedule: str) -> dict:
"""Validate and parse cron expressions."""
Expand All @@ -98,7 +92,7 @@ def _validate_and_parse_schedule(self, schedule: str) -> dict:
return cron_params
except ValueError:
MakimLogs.raise_error(
f'Invalid cron expression: {schedule}',
f"Invalid cron expression: {schedule}",
MakimError.SCHEDULER_INVALID_SCHEDULE,
)

Expand Down Expand Up @@ -151,6 +145,22 @@ def remove_job(self, job_id: str) -> None:
MakimError.SCHEDULER_JOB_ERROR,
)

# def get_job_status(self, job_id: str) -> dict:
# """Get the status of a scheduled job."""
# job = self.scheduler.get_job(job_id)
# if not job:
# MakimLogs.raise_error(
# f"Job '{job_id}' not found", MakimError.SCHEDULER_JOB_NOT_FOUND
# )

# return {
# 'id': job.id,
# 'next_run': job.next_run_time,
# 'last_run': job.last_run_time, # New attribute added
# 'schedule': str(job.trigger),
# 'active': job.next_run_time is not None,
# }

def list_jobs(self) -> list[dict[str, Any]]:
"""List all scheduled jobs."""
return [
Expand Down

0 comments on commit 787976d

Please sign in to comment.