Skip to content

Commit

Permalink
Merge branch 'main' into cli_dev
Browse files Browse the repository at this point in the history
  • Loading branch information
tomuram authored May 15, 2023
2 parents 6112efe + d1f1595 commit 689858b
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 8 deletions.
2 changes: 2 additions & 0 deletions balsam/platform/scheduler/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from .lsf_sched import LsfScheduler
from .pbs_sched import PBSScheduler
from .scheduler import (
DelayedSubmitFail,
SchedulerDeleteError,
SchedulerError,
SchedulerInterface,
Expand All @@ -22,4 +23,5 @@
"SchedulerSubmitError",
"SchedulerDeleteError",
"SchedulerNonZeroReturnCode",
"DelayedSubmitFail",
]
10 changes: 9 additions & 1 deletion balsam/platform/scheduler/pbs_sched.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
from balsam.util import parse_to_utc

from .scheduler import (
DelayedSubmitFail,
SchedulerBackfillWindow,
SchedulerJobLog,
SchedulerJobStatus,
SchedulerNonZeroReturnCode,
SubprocessSchedulerInterface,
scheduler_subproc,
)
Expand Down Expand Up @@ -320,7 +322,13 @@ def _parse_logs(scheduler_id: int, job_script_path: Optional[PathLike]) -> Sched
args += ["-x", "-f", "-F", "json"]
args += [str(scheduler_id)]
logger.info(f"_parse_logs issuing qstat: {str(args)}")
stdout = scheduler_subproc(args)
try:
stdout = scheduler_subproc(args)
except SchedulerNonZeroReturnCode as e:
if "Unknown Job Id" in str(e):
logger.warning(f"Batch Job {scheduler_id} not found in PBS")
raise DelayedSubmitFail
return SchedulerJobLog()
json_output = json.loads(stdout)
# logger.info(f"_parse_logs json_output: {json_output}")
if len(json_output["Jobs"]) == 0:
Expand Down
4 changes: 4 additions & 0 deletions balsam/platform/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ class SchedulerNonZeroReturnCode(SchedulerError):
pass


class DelayedSubmitFail(SchedulerError):
pass


class SchedulerSubmitError(SchedulerError):
pass

Expand Down
21 changes: 14 additions & 7 deletions balsam/site/service/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from typing import TYPE_CHECKING, Dict, List, Type

from balsam.platform.scheduler import (
DelayedSubmitFail,
SchedulerDeleteError,
SchedulerError,
SchedulerNonZeroReturnCode,
Expand Down Expand Up @@ -154,13 +155,19 @@ def run_cycle(self) -> None:
job.state = BatchJobState.finished
assert job.scheduler_id is not None
assert job.status_info is not None
job_log = self.scheduler.parse_logs(job.scheduler_id, job.status_info.get("submit_script", None))
start_time = job_log.start_time
end_time = job_log.end_time
if start_time:
job.start_time = start_time
if end_time:
job.end_time = end_time
try:
job_log = self.scheduler.parse_logs(job.scheduler_id, job.status_info.get("submit_script", None))

start_time = job_log.start_time
end_time = job_log.end_time
if start_time:
job.start_time = start_time
if end_time:
job.end_time = end_time

except DelayedSubmitFail:
job.state = BatchJobState.submit_failed

elif job.state != scheduler_jobs[job.scheduler_id].state:
job.state = scheduler_jobs[job.scheduler_id].state
logger.info(f"Job {job.id} (sched_id {job.scheduler_id}) advanced to state {job.state}")
Expand Down

0 comments on commit 689858b

Please sign in to comment.