Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include cancelling long running jobs #596

Closed
wants to merge 22 commits into from
Closed
Changes from 1 commit
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 45 additions & 1 deletion openeo/extra/job_management.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ def start_job(
"""

def __init__(
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "."
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = ".", max_running_duration: Optional[int] = 720
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
):
"""Create a MultiBackendJobManager.

Expand All @@ -93,6 +93,10 @@ def __init__(
- get_job_dir
- get_error_log_path
- get_job_metadata_path

:param max_running_duration:
A temporal limit for long running jobs to get automatically cancelled.
The preset duration is 720 minutes or 12 hours
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -101,6 +105,9 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self.max_running_duration = datetime.timedelta(minutes=max_running_duration)


def add_backend(
self,
name: str,
Expand Down Expand Up @@ -202,6 +209,39 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
df = df.assign(**new_columns)

return df

def _check_and_stop_long_running_jobs(self, df: pd.DataFrame):
HansVRP marked this conversation as resolved.
Show resolved Hide resolved
"""Check for long-running jobs and stop them if necessary."""
running_jobs = df.loc[
(df.status == "running")

]
for i in running_jobs.index:

try:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
con = self._get_connection(backend_name)
the_job = con.job(job_id)

job_time_created = the_job.describe()['created']
job_time_created = datetime.datetime.strptime(job_time_created, '%Y-%m-%dT%H:%M:%SZ')
time_difference = (datetime.datetime.now() - job_time_created)
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

if (time_difference.total_seconds() > self.max_running_duration.total_seconds()):
HansVRP marked this conversation as resolved.
Show resolved Hide resolved

try:
_log.info(f"Cancelling job {job_id} on backend {backend_name} as it has been running for more than {self.max_running_duration} minutes")

the_job.stop_job()
df.loc[i, "status"] = "cancelled_prolonged_job"
self.on_job_error(the_job, df.loc[i])

except OpenEoApiError as e:
_log.error(f"Error Cancelling long-running job {job_id!r} on backend {backend_name}: {e}")

except:
pass

def run_jobs(
self,
Expand Down Expand Up @@ -277,9 +317,13 @@ def run_jobs(
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "cancelled_prolonged_job")
].size
> 0
):
# Check if any job is running longer than maximally allowed
self._check_and_stop_long_running_jobs(df)

with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
Expand Down