Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include cancelling long running jobs #596

Closed
wants to merge 22 commits into from
Closed
Show file tree
Hide file tree
Changes from 6 commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
107 changes: 76 additions & 31 deletions openeo/extra/job_management.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import contextlib
import datetime
from datetime import datetime, timedelta, timezone
import json
import logging
import time
Expand All @@ -14,7 +14,8 @@

from openeo import BatchJob, Connection
from openeo.rest import OpenEoApiError
from openeo.util import deep_get
from openeo.util import deep_get, rfc3339


_log = logging.getLogger(__name__)

Expand All @@ -30,6 +31,7 @@ class _Backend(NamedTuple):

MAX_RETRIES = 5


class MultiBackendJobManager:
"""
Tracker for multiple jobs on multiple backends.
Expand Down Expand Up @@ -76,7 +78,10 @@ def start_job(
"""

def __init__(
self, poll_sleep: int = 60, root_dir: Optional[Union[str, Path]] = "."
self,
poll_sleep: int = 60,
root_dir: Optional[Union[str, Path]] = ".",
max_running_duration: Optional[int] = None,
):
"""Create a MultiBackendJobManager.

Expand All @@ -93,6 +98,10 @@ def __init__(
- get_job_dir
- get_error_log_path
- get_job_metadata_path

:param max_running_duration [seconds]:
A temporal limit for long running jobs to get automatically canceled.
The preset duration 12 hours. Can be set to None to disable
"""
self.backends: Dict[str, _Backend] = {}
self.poll_sleep = poll_sleep
Expand All @@ -101,6 +110,11 @@ def __init__(
# An explicit None or "" should also default to "."
self._root_dir = Path(root_dir or ".")

self.max_running_duration = (
timedelta(seconds=max_running_duration) if max_running_duration is not None else None
)


def add_backend(
self,
name: str,
Expand All @@ -125,9 +139,7 @@ def add_backend(
c = connection
connection = lambda: c
assert callable(connection)
self.backends[name] = _Backend(
get_connection=connection, parallel_jobs=parallel_jobs
)
self.backends[name] = _Backend(get_connection=connection, parallel_jobs=parallel_jobs)

def _get_connection(self, backend_name: str, resilient: bool = True) -> Connection:
"""Get a connection for the backend and optionally make it resilient (adds retry behavior)
Expand Down Expand Up @@ -184,19 +196,21 @@ def _normalize_df(self, df: pd.DataFrame) -> pd.DataFrame:
:param df: The dataframe to normalize.
:return: a new dataframe that is normalized.
"""
pass

# check for some required columns.
required_with_default = [
("status", "not_started"),
("id", None),
("start_time", None),
("start_time", None),
("running_time", None),
# TODO: columns "cpu", "memory", "duration" are not referenced directly
# within MultiBackendJobManager making it confusing to claim they are required.
# However, they are through assumptions about job "usage" metadata in `_update_statuses`.
# However, they are through assumptions about job "usage" metadata in `_track_statuses`.
("cpu", None),
("memory", None),
("duration", None),
("backend_name", None),
("backend_name", None)
]
new_columns = {col: val for (col, val) in required_with_default if col not in df.columns}
df = df.assign(**new_columns)
Expand Down Expand Up @@ -277,30 +291,26 @@ def run_jobs(
& (df.status != "skipped")
& (df.status != "start_failed")
& (df.status != "error")
& (df.status != "canceled")
].size
> 0
):

with ignore_connection_errors(context="get statuses"):
self._update_statuses(df)
self._track_statuses(df)
status_histogram = df.groupby("status").size().to_dict()
_log.info(f"Status histogram: {status_histogram}")
job_db.persist(df)

if len(df[df.status == "not_started"]) > 0:
# Check number of jobs running at each backend
running = df[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]
running = df[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
per_backend = running.groupby("backend_name").size().to_dict()
_log.info(f"Running per backend: {per_backend}")
for backend_name in self.backends:
backend_load = per_backend.get(backend_name, 0)
if backend_load < self.backends[backend_name].parallel_jobs:
to_add = (
self.backends[backend_name].parallel_jobs - backend_load
)
to_add = self.backends[backend_name].parallel_jobs - backend_load
to_launch = df[df.status == "not_started"].iloc[0:to_add]
for i in to_launch.index:
self._launch_job(start_job, df, i, backend_name)
Expand Down Expand Up @@ -348,7 +358,7 @@ def _launch_job(self, start_job, df, i, backend_name):
_log.warning(f"Failed to start job for {row.to_dict()}", exc_info=True)
df.loc[i, "status"] = "start_failed"
else:
df.loc[i, "start_time"] = datetime.datetime.now().isoformat()
df.loc[i, "start_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')
if job:
df.loc[i, "id"] = job.job_id
with ignore_connection_errors(context="get status"):
Expand Down Expand Up @@ -386,6 +396,7 @@ def on_job_done(self, job: BatchJob, row):
with open(metadata_path, "w") as f:
json.dump(job_metadata, f, ensure_ascii=False)


def on_job_error(self, job: BatchJob, row):
"""
Handles jobs that stopped with errors. Can be overridden to provide custom behaviour.
Expand All @@ -404,6 +415,32 @@ def on_job_error(self, job: BatchJob, row):
self.ensure_job_dir_exists(job.job_id)
error_log_path.write_text(json.dumps(error_logs, indent=2))

def on_job_cancel(self, job: BatchJob, row):
"""
Handles jobs that that were cancelled. Can be overridden to provide custom behaviour.

Default implementation does not do anything.

:param job: The job that has finished.
:param row: DataFrame row containing the job's metadata.
"""
# TODO: param `row` is never accessed in this method. Remove it? Is this intended for future use?


def _cancel_prolonged_job(self, job: BatchJob, row):
"""Cancel the job if it has been running for too long."""
job_running_timestamp = rfc3339.parse_datetime(row["running_time"], with_timezone = True)
current_time = datetime.now(timezone.utc)

if current_time > job_running_timestamp + self.max_running_duration:
try:
job.stop()
_log.info(
f"Cancelling job {job.job_id} as it has been running for more than {self.max_running_duration}"
)
except OpenEoApiError as e:
_log.error(f"Error Cancelling long-running job {job.job_id}: {e}")

def get_job_dir(self, job_id: str) -> Path:
"""Path to directory where job metadata, results and error logs are be saved."""
return self._root_dir / f"job_{job_id}"
Expand All @@ -422,13 +459,10 @@ def ensure_job_dir_exists(self, job_id: str) -> Path:
if not job_dir.exists():
job_dir.mkdir(parents=True)

def _update_statuses(self, df: pd.DataFrame):
"""Update status (and stats) of running jobs (in place)."""
active = df.loc[
(df.status == "created")
| (df.status == "queued")
| (df.status == "running")
]

def _track_statuses(self, df: pd.DataFrame):
"""tracks status (and stats) of running jobs (in place). Optinally cancels jobs when running too long"""
active = df.loc[(df.status == "created") | (df.status == "queued") | (df.status == "running")]
for i in active.index:
job_id = df.loc[i, "id"]
backend_name = df.loc[i, "backend_name"]
Expand All @@ -437,15 +471,25 @@ def _update_statuses(self, df: pd.DataFrame):
con = self._get_connection(backend_name)
the_job = con.job(job_id)
job_metadata = the_job.describe()
_log.info(
f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}"
)
if job_metadata["status"] == "finished":
self.on_job_done(the_job, df.loc[i])
_log.info(f"Status of job {job_id!r} (on backend {backend_name}) is {job_metadata['status']!r}")

if (df.loc[i, "status"] == "created" or df.loc[i, "status"] == "queued" or df.loc[i, "status"] == "started") and job_metadata["status"] == "running":
df.loc[i, "running_time"] = datetime.now(timezone.utc).strftime('%Y-%m-%dT%H:%M:%S.%fZ')

if self.max_running_duration and job_metadata["status"] == "running":
self._cancel_prolonged_job(the_job, df.loc[i])

if df.loc[i, "status"] != "error" and job_metadata["status"] == "error":
self.on_job_error(the_job, df.loc[i])

if job_metadata["status"] == "finished":
self.on_job_done(the_job, df.loc[i])

if job_metadata["status"] == "canceled":
self.on_job_cancel(the_job, df.loc[i])

df.loc[i, "status"] = job_metadata["status"]

# TODO: there is well hidden coupling here with "cpu", "memory" and "duration" from `_normalize_df`
for key in job_metadata.get("usage", {}).keys():
df.loc[i, key] = _format_usage_stat(job_metadata, key)
Expand Down Expand Up @@ -501,6 +545,7 @@ def read(self) -> pd.DataFrame:
):
df["geometry"] = df["geometry"].apply(shapely.wkt.loads)
return df

def persist(self, df: pd.DataFrame):
self.path.parent.mkdir(parents=True, exist_ok=True)
df.to_csv(self.path, index=False)
Expand Down
Loading