Skip to content

Commit

Permalink
Add monitoring of failed jobs
Browse files Browse the repository at this point in the history
  • Loading branch information
oyvindeide committed Jan 29, 2025
1 parent b446199 commit c373dd0
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 6 deletions.
23 changes: 17 additions & 6 deletions src/everest/bin/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import os
import sys
import traceback
from collections import defaultdict
from dataclasses import dataclass, field
from itertools import groupby
from typing import ClassVar
Expand Down Expand Up @@ -113,6 +114,7 @@ class JobProgress:
JOB_FAILURE: [], # contains failed simulation numbers i.e [5,6]
}
)
errors: defaultdict[list] = field(default_factory=lambda: defaultdict(list))
STATUS_COLOR: ClassVar = {
JOB_RUNNING: Fore.BLUE,
JOB_SUCCESS: Fore.GREEN,
Expand Down Expand Up @@ -253,7 +255,7 @@ def _get_progress_summary(status):

@classmethod
def _get_job_states(cls, snapshot: EnsembleSnapshot, show_all_jobs: bool):
print_lines = ""
print_lines = []
jobs_status = cls._get_jobs_status(snapshot)
if not show_all_jobs:
jobs_status = cls._filter_jobs(jobs_status)
Expand All @@ -265,11 +267,18 @@ def _get_job_states(cls, snapshot: EnsembleSnapshot, show_all_jobs: bool):
for state in [JOB_RUNNING, JOB_SUCCESS, JOB_FAILURE]
}
width = _get_max_width([item.name for item in jobs_status])
print_lines = cls._join_one_newline_indent(
f"{item.name:>{width}}: {item.progress_str(max_widths)}{Fore.RESET}"
for item in jobs_status
)
return print_lines
for job in jobs_status:
print_lines.append(
f"{job.name:>{width}}: {job.progress_str(max_widths)}{Fore.RESET}"
)
if job.errors:
print_lines.extend(
[
f"{Fore.RED}{job.name:>{width}}: Failed: {err}, realizations: {_format_list(job.errors[err])}{Fore.RESET}"
for err in job.errors
]
)
return cls._join_one_newline_indent(print_lines)

@staticmethod
def _get_jobs_status(snapshot: EnsembleSnapshot) -> list[JobProgress]:
Expand All @@ -280,6 +289,8 @@ def _get_jobs_status(snapshot: EnsembleSnapshot) -> list[JobProgress]:
status = job["status"]
if status in {JOB_RUNNING, JOB_SUCCESS, JOB_FAILURE}:
job_progress[job_idx].status[status].append(int(realization))
if error := job.get("error"):
job_progress[job_idx].errors[error].append(int(realization))
return list(job_progress.values())

@staticmethod
Expand Down
2 changes: 2 additions & 0 deletions tests/ert/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ def add_fm_step(
end_time: datetime | None = None,
stdout: str | None = None,
stderr: str | None = None,
error: str | None = None,
) -> "SnapshotBuilder":
self.fm_steps[fm_step_id] = _filter_nones(
FMStepSnapshot(
Expand All @@ -83,6 +84,7 @@ def add_fm_step(
stderr=stderr,
current_memory_usage=current_memory_usage,
max_memory_usage=max_memory_usage,
error=error,
)
)
return self
57 changes: 57 additions & 0 deletions tests/everest/test_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,63 @@ def snapshot_update_event():
yield json.dumps(jsonable_encoder(event))


@pytest.fixture
def snapshot_update_failure_event():
event = SnapshotUpdateEvent(
snapshot=SnapshotBuilder(metadata=METADATA)
.add_fm_step(
fm_step_id="0",
name=None,
index="0",
status=state.FORWARD_MODEL_STATE_FAILURE,
end_time=datetime(2019, 1, 1),
error="The run is cancelled due to reaching MAX_RUNTIME",
)
.build(
real_ids=["1"],
status=state.REALIZATION_STATE_FAILED,
),
iteration_label="Foo",
total_iterations=1,
progress=0.5,
realization_count=4,
status_count={"Finished": 0, "Running": 0, "Unknown": 0, "Failed": 1},
iteration=0,
)
yield json.dumps(jsonable_encoder(event))


def test_failed_jobs_monitor(
monkeypatch, full_snapshot_event, snapshot_update_failure_event, capsys
):
server_mock = MagicMock()
connection_mock = MagicMock(spec=ClientConnection)
connection_mock.recv.side_effect = [
full_snapshot_event,
snapshot_update_failure_event,
json.dumps(jsonable_encoder(EndEvent(failed=True, msg="Failed"))),
]
server_mock.return_value.__enter__.return_value = connection_mock
monkeypatch.setattr(everest.detached, "_query_server", MagicMock(return_value={}))
monkeypatch.setattr(everest.detached, "connect", server_mock)
monkeypatch.setattr(everest.detached, "ssl", MagicMock())
patched = partial(everest.detached.start_monitor, polling_interval=0.1)
with patch("everest.bin.utils.start_monitor", patched):
run_detached_monitor(("some/url", None, None), "output", False)
captured = capsys.readouterr()
expected = [
"===================== Running forward models (Batch #0) ======================\n",
" Waiting: 0 | Pending: 0 | Running: 0 | Complete: 0 | Failed: 1\n",
" fm_step_0: 0/0/1 | Failed: 1"
" fm_step_0: Failure: The run is cancelled due to reaching MAX_RUNTIME, realizations: 1\n",
"Failed\n",
]
# Ignore whitespace
assert captured.out.translate({ord(c): None for c in string.whitespace}) == "".join(
expected
).translate({ord(c): None for c in string.whitespace})


@pytest.mark.parametrize("show_all_jobs", [True, False])
def test_monitor(
monkeypatch, full_snapshot_event, snapshot_update_event, capsys, show_all_jobs
Expand Down

0 comments on commit c373dd0

Please sign in to comment.