Skip to content

Commit

Permalink
Add snapshot event for STOP_LONG_RUNNING
Browse files Browse the repository at this point in the history
This makes the effect of STOP_LONG_RUNNING visible in the GUI.
  • Loading branch information
berland committed Feb 11, 2025
1 parent dd9be56 commit f7ac8b4
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 4 deletions.
9 changes: 9 additions & 0 deletions src/_ert/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Id:
REALIZATION_UNKNOWN_TYPE = Literal["realization.unknown"]
REALIZATION_WAITING_TYPE = Literal["realization.waiting"]
REALIZATION_TIMEOUT_TYPE = Literal["realization.timeout"]
REALIZATION_STOPPED_LONG_RUNNING_TYPE = Literal["realization.stoppedlongrunning"]
REALIZATION_RESUBMIT_TYPE = Literal["realization.resubmit"]
REALIZATION_FAILURE: Final = "realization.failure"
REALIZATION_PENDING: Final = "realization.pending"
Expand All @@ -31,6 +32,7 @@ class Id:
REALIZATION_UNKNOWN: Final = "realization.unknown"
REALIZATION_WAITING: Final = "realization.waiting"
REALIZATION_TIMEOUT: Final = "realization.timeout"
REALIZATION_STOPPED_LONG_RUNNING: Final = "realization.stoppedlongrunning"
REALIZATION_RESUBMIT: Final = "realization.resubmit"

ENSEMBLE_STARTED_TYPE = Literal["ensemble.started"]
Expand Down Expand Up @@ -142,6 +144,12 @@ class RealizationTimeout(RealizationBaseEvent):
event_type: Id.REALIZATION_TIMEOUT_TYPE = Id.REALIZATION_TIMEOUT


class RealizationStoppedLongRunning(RealizationBaseEvent):
event_type: Id.REALIZATION_STOPPED_LONG_RUNNING_TYPE = (
Id.REALIZATION_STOPPED_LONG_RUNNING
)


class EnsembleBaseEvent(BaseEvent):
ensemble: str | None = None

Expand Down Expand Up @@ -200,6 +208,7 @@ class EEUserDone(BaseEvent):
| RealizationSuccess
| RealizationFailed
| RealizationTimeout
| RealizationStoppedLongRunning
| RealizationUnknown
| RealizationWaiting
| RealizationResubmit
Expand Down
21 changes: 21 additions & 0 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
RealizationPending,
RealizationResubmit,
RealizationRunning,
RealizationStoppedLongRunning,
RealizationSuccess,
RealizationTimeout,
RealizationUnknown,
Expand All @@ -51,6 +52,7 @@ class UnsupportedOperationException(ValueError):
RealizationSuccess: state.REALIZATION_STATE_FINISHED,
RealizationUnknown: state.REALIZATION_STATE_UNKNOWN,
RealizationTimeout: state.REALIZATION_STATE_FAILED,
RealizationStoppedLongRunning: state.REALIZATION_STATE_FAILED,
RealizationResubmit: state.REALIZATION_STATE_WAITING, # For consistency since realization will turn to waiting state when resubmitted
ForwardModelStepStart: state.FORWARD_MODEL_STATE_START,
ForwardModelStepRunning: state.FORWARD_MODEL_STATE_RUNNING,
Expand Down Expand Up @@ -285,6 +287,7 @@ def update_from_event(
RealizationSuccess,
RealizationFailed,
RealizationTimeout,
RealizationStoppedLongRunning,
}:
end_time = convert_iso8601_to_datetime(timestamp)
if type(event) is RealizationFailed:
Expand Down Expand Up @@ -315,6 +318,24 @@ def update_from_event(
"reaching MAX_RUNTIME",
)
)
elif e_type is RealizationStoppedLongRunning:
for (
fm_step_id,
fm_step,
) in source_snapshot.get_fm_steps_for_real(event.real).items():
if fm_step.get(ids.STATUS) != state.FORWARD_MODEL_STATE_FINISHED:
fm_idx = (event.real, fm_step_id)
if fm_idx not in source_snapshot._fm_step_snapshots:
self._fm_step_snapshots[fm_idx] = FMStepSnapshot()
self._fm_step_snapshots[fm_idx].update(
FMStepSnapshot(
status=state.FORWARD_MODEL_STATE_FAILURE,
end_time=end_time,
error="The run is cancelled due to "
"excessive runtime, 25% more than the average "
"runtime (check keyword STOP_LONG_RUNNING)",
)
)
elif e_type is RealizationResubmit:
for fm_step_id in source_snapshot.get_fm_steps_for_real(event.real):
fm_idx = (event.real, fm_step_id)
Expand Down
15 changes: 13 additions & 2 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
import orjson
from pydantic.dataclasses import dataclass

from _ert.events import Event, ForwardModelStepChecksum, Id, event_from_dict
from _ert.events import (
Event,
ForwardModelStepChecksum,
Id,
RealizationStoppedLongRunning,
event_from_dict,
)

from .driver import Driver
from .event import FinishedEvent, StartedEvent
Expand Down Expand Up @@ -142,12 +148,17 @@ async def _stop_long_running_jobs(
> long_running_factor * self._average_job_runtime
and not task.done()
):
logger.info(
logger.warning(
f"Stopping realization {iens} as its running duration "
f"{self._jobs[iens].running_duration}s is longer than "
f"the factor {long_running_factor} multiplied with the "
f"average runtime {self._average_job_runtime}s."
)
await self._events.put(
RealizationStoppedLongRunning(
real=str(iens), ensemble=self._ens_id
)
)
task.cancel()
with suppress(asyncio.CancelledError):
await task
Expand Down
28 changes: 26 additions & 2 deletions tests/ert/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@

import pytest

from _ert.events import Id, RealizationFailed, RealizationTimeout
from _ert.events import (
Id,
RealizationFailed,
RealizationStoppedLongRunning,
RealizationTimeout,
)
from ert.config import QueueConfig, QueueSystem
from ert.ensemble_evaluator import Realization
from ert.load_status import LoadResult, LoadStatus
Expand Down Expand Up @@ -445,7 +450,9 @@ async def kill(iens):


@pytest.mark.timeout(6)
async def test_that_long_running_jobs_were_stopped(storage, tmp_path, mock_driver):
async def test_that_long_running_jobs_were_stopped(
storage, tmp_path, mock_driver, caplog
):
killed_iens = []

async def kill(iens):
Expand Down Expand Up @@ -475,8 +482,25 @@ async def wait(iens):
)

assert await sch.execute(min_required_realizations=5) == Id.ENSEMBLE_SUCCEEDED

stop_long_running_events_found = 0
while not sch._events.empty():
event = await sch._events.get()
if type(event) is RealizationStoppedLongRunning:
stop_long_running_events_found += 1
assert stop_long_running_events_found == 4

assert killed_iens == [6, 7, 8, 9]

assert "Stopping realization 6 as its running duration" in caplog.text
assert "Stopping realization 7 as its running duration" in caplog.text
assert "Stopping realization 8 as its running duration" in caplog.text
assert "Stopping realization 9 as its running duration" in caplog.text
assert (
"is longer than the factor 1.25 multiplied with the average runtime"
in caplog.text
)


@pytest.mark.integration_test
@pytest.mark.flaky(reruns=5)
Expand Down

0 comments on commit f7ac8b4

Please sign in to comment.