Skip to content

Commit

Permalink
Add GUI event for STOP_LONG_RUNNING
Browse files Browse the repository at this point in the history
This makes the effect of STOP_LONG_RUNNING visible in the GUI.
  • Loading branch information
berland committed Feb 10, 2025
1 parent 210f25b commit eca7f83
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 3 deletions.
9 changes: 9 additions & 0 deletions src/_ert/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ class Id:
REALIZATION_UNKNOWN_TYPE = Literal["realization.unknown"]
REALIZATION_WAITING_TYPE = Literal["realization.waiting"]
REALIZATION_TIMEOUT_TYPE = Literal["realization.timeout"]
REALIZATION_STOPPED_LONG_RUNNING_TYPE = Literal["realization.stoppedlongrunning"]
REALIZATION_RESUBMIT_TYPE = Literal["realization.resubmit"]
REALIZATION_FAILURE: Final = "realization.failure"
REALIZATION_PENDING: Final = "realization.pending"
Expand All @@ -31,6 +32,7 @@ class Id:
REALIZATION_UNKNOWN: Final = "realization.unknown"
REALIZATION_WAITING: Final = "realization.waiting"
REALIZATION_TIMEOUT: Final = "realization.timeout"
REALIZATION_STOPPED_LONG_RUNNING: Final = "realization.stoppedlongrunning"
REALIZATION_RESUBMIT: Final = "realization.resubmit"

ENSEMBLE_STARTED_TYPE = Literal["ensemble.started"]
Expand Down Expand Up @@ -142,6 +144,12 @@ class RealizationTimeout(RealizationBaseEvent):
event_type: Id.REALIZATION_TIMEOUT_TYPE = Id.REALIZATION_TIMEOUT


class RealizationStoppedLongRunning(RealizationBaseEvent):
event_type: Id.REALIZATION_STOPPED_LONG_RUNNING_TYPE = (
Id.REALIZATION_STOPPED_LONG_RUNNING
)


class EnsembleBaseEvent(BaseEvent):
ensemble: str | None = None

Expand Down Expand Up @@ -200,6 +208,7 @@ class EEUserDone(BaseEvent):
| RealizationSuccess
| RealizationFailed
| RealizationTimeout
| RealizationStoppedLongRunning
| RealizationUnknown
| RealizationWaiting
| RealizationResubmit
Expand Down
20 changes: 20 additions & 0 deletions src/ert/ensemble_evaluator/snapshot.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
RealizationPending,
RealizationResubmit,
RealizationRunning,
RealizationStoppedLongRunning,
RealizationSuccess,
RealizationTimeout,
RealizationUnknown,
Expand All @@ -51,6 +52,7 @@ class UnsupportedOperationException(ValueError):
RealizationSuccess: state.REALIZATION_STATE_FINISHED,
RealizationUnknown: state.REALIZATION_STATE_UNKNOWN,
RealizationTimeout: state.REALIZATION_STATE_FAILED,
RealizationStoppedLongRunning: state.REALIZATION_STATE_FAILED,
RealizationResubmit: state.REALIZATION_STATE_WAITING, # For consistency since realization will turn to waiting state when resubmitted
ForwardModelStepStart: state.FORWARD_MODEL_STATE_START,
ForwardModelStepRunning: state.FORWARD_MODEL_STATE_RUNNING,
Expand Down Expand Up @@ -315,6 +317,24 @@ def update_from_event(
"reaching MAX_RUNTIME",
)
)
elif e_type is RealizationStoppedLongRunning:
for (
fm_step_id,
fm_step,
) in source_snapshot.get_fm_steps_for_real(event.real).items():
if fm_step.get(ids.STATUS) != state.FORWARD_MODEL_STATE_FINISHED:
fm_idx = (event.real, fm_step_id)
if fm_idx not in source_snapshot._fm_step_snapshots:
self._fm_step_snapshots[fm_idx] = FMStepSnapshot()
self._fm_step_snapshots[fm_idx].update(
FMStepSnapshot(
status=state.FORWARD_MODEL_STATE_FAILURE,
end_time=end_time,
error="The run is cancelled due to "
"excessive runtime, 25% more than the average "
"runtime (check keyword STOP_LONG_RUNNING)",
)
)
elif e_type is RealizationResubmit:
for fm_step_id in source_snapshot.get_fm_steps_for_real(event.real):
fm_idx = (event.real, fm_step_id)
Expand Down
15 changes: 13 additions & 2 deletions src/ert/scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
import orjson
from pydantic.dataclasses import dataclass

from _ert.events import Event, ForwardModelStepChecksum, Id, event_from_dict
from _ert.events import (
Event,
ForwardModelStepChecksum,
Id,
RealizationStoppedLongRunning,
event_from_dict,
)

from .driver import Driver
from .event import FinishedEvent, StartedEvent
Expand Down Expand Up @@ -142,12 +148,17 @@ async def _stop_long_running_jobs(
> long_running_factor * self._average_job_runtime
and not task.done()
):
logger.info(
logger.error(
f"Stopping realization {iens} as its running duration "
f"{self._jobs[iens].running_duration}s is longer than "
f"the factor {long_running_factor} multiplied with the "
f"average runtime {self._average_job_runtime}s."
)
await self._events.put(
RealizationStoppedLongRunning(
real=str(iens), ensemble=self._ens_id
)
)
task.cancel()
with suppress(asyncio.CancelledError):
await task
Expand Down
44 changes: 43 additions & 1 deletion tests/ert/unit_tests/scheduler/test_scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,12 @@

import pytest

from _ert.events import Id, RealizationFailed, RealizationTimeout
from _ert.events import (
Id,
RealizationFailed,
RealizationStoppedLongRunning,
RealizationTimeout,
)
from ert.config import QueueConfig, QueueSystem
from ert.ensemble_evaluator import Realization
from ert.load_status import LoadResult, LoadStatus
Expand Down Expand Up @@ -222,6 +227,43 @@ async def wait():
assert "Realization 0 stopped due to MAX_RUNTIME=1 seconds" in caplog.text


@pytest.mark.integration_test
@pytest.mark.timeout(10)
async def test_stop_long_running(mock_driver, caplog, tmp_path, storage):
wait_started = asyncio.Event()

ensemble = storage.create_experiment().create_ensemble(name="foo", ensemble_size=2)
realizations = [
create_stub_realization(ensemble, tmp_path, 0),
create_stub_realization(ensemble, tmp_path, 1),
]

async def wait(iens):
wait_started.set()
if iens == 0:
return
await asyncio.sleep(100)

sch = scheduler.Scheduler(mock_driver(wait=wait), realizations)

result = await asyncio.create_task(sch.execute(min_required_realizations=1))
assert wait_started.is_set()
assert result == Id.ENSEMBLE_SUCCEEDED

stop_long_running_event_found = False
while not stop_long_running_event_found and not sch._events.empty():
event = await sch._events.get()
if type(event) is RealizationStoppedLongRunning:
stop_long_running_event_found = True
assert stop_long_running_event_found

assert "Stopping realization 1 as its running duration" in caplog.text
assert (
"is longer than the factor 1.25 multiplied with the average runtime"
in caplog.text
)


@pytest.mark.integration_test
async def test_no_resubmit_on_max_runtime_kill(realization, mock_driver):
retries = 0
Expand Down

0 comments on commit eca7f83

Please sign in to comment.