Skip to content

Commit

Permalink
Rename job to step in _ert/forward_model_runner
Browse files Browse the repository at this point in the history
This commit will change almost all occurences of the string job
to the correct term step, except for where it is regarded as
a potentially breaking change for users.
  • Loading branch information
berland committed Feb 10, 2025
1 parent ef417b6 commit 9afe260
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 127 deletions.
76 changes: 38 additions & 38 deletions src/_ert/forward_model_runner/forward_model_step.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,35 +85,35 @@ class ForwardModelStep:
MEMORY_POLL_PERIOD = 5 # Seconds between memory polls

def __init__(
self, job_data: ForwardModelStepJSON, index: int, sleep_interval: int = 1
self, step_data: ForwardModelStepJSON, index: int, sleep_interval: int = 1
) -> None:
self.sleep_interval = sleep_interval
self.job_data = job_data
self.step_data = step_data
self.index = index
self.std_err = job_data.get("stderr")
self.std_out = job_data.get("stdout")
self.std_err = step_data.get("stderr")
self.std_out = step_data.get("stdout")

def run(self) -> Generator[Start | Exited | Running]:
try:
yield from self._run()
except Exception as e:
yield Exited(self, exit_code=1).with_error(str(e))

def create_start_message_and_check_job_files(self) -> Start:
def create_start_message_and_check_step_files(self) -> Start:
start_message = Start(self)

errors = self._check_job_files()
errors = self._check_step_files()
errors.extend(self._assert_arg_list())

if errors:
start_message = start_message.with_error("\n".join(errors))
return start_message

def _build_arg_list(self) -> list[str]:
executable = self.job_data.get("executable")
executable = self.step_data.get("executable")
# assert executable is not None
combined_arg_list = [executable]
if arg_list := self.job_data.get("argList"):
if arg_list := self.step_data.get("argList"):
combined_arg_list += arg_list
return combined_arg_list

Expand All @@ -122,8 +122,8 @@ def _open_file_handles(
) -> tuple[
io.TextIOWrapper | None, io.TextIOWrapper | None, io.TextIOWrapper | None
]:
if self.job_data.get("stdin"):
stdin = open(cast(Path, self.job_data.get("stdin")), encoding="utf-8") # noqa
if self.step_data.get("stdin"):
stdin = open(cast(Path, self.step_data.get("stdin")), encoding="utf-8") # noqa
else:
stdin = None

Expand All @@ -149,12 +149,12 @@ def _open_file_handles(

def _create_environment(self) -> dict[str, str] | None:
combined_environment = None
if environment := self.job_data.get("environment"):
if environment := self.step_data.get("environment"):
combined_environment = {**os.environ, **environment}
return combined_environment

def _run(self) -> Generator[Start | Exited | Running]:
start_message = self.create_start_message_and_check_job_files()
start_message = self.create_start_message_and_check_step_files()

yield start_message
if not start_message.success():
Expand All @@ -165,7 +165,7 @@ def _run(self) -> Generator[Start | Exited | Running]:
(stdin, stdout, stderr) = self._open_file_handles()
# stdin/stdout/stderr are closed at the end of this function

target_file = self.job_data.get("target_file")
target_file = self.step_data.get("target_file")
target_file_mtime: int | None = _get_target_file_ntime(target_file)
run_start_time = dt.now()
try:
Expand Down Expand Up @@ -217,7 +217,7 @@ def _run(self) -> Generator[Start | Exited | Running]:
rss=memory_rss,
max_rss=max_memory_usage,
fm_step_id=self.index,
fm_step_name=self.job_data.get("name"),
fm_step_name=self.step_data.get("name"),
cpu_seconds=cpu_seconds_processtree.total_cpu_seconds(),
oom_score=oom_score,
),
Expand All @@ -242,11 +242,11 @@ def _create_exited_message_based_on_exit_code(
return exited_message

exited_message = Exited(self, exit_code)
if self.job_data.get("error_file") and os.path.exists(
self.job_data["error_file"]
if self.step_data.get("error_file") and os.path.exists(
self.step_data["error_file"]
):
return exited_message.with_error(
f"Found the error file:{self.job_data['error_file']} - job failed."
f"Found the error file:{self.step_data['error_file']} - step failed."
)

if target_file_mtime:
Expand All @@ -268,7 +268,7 @@ def _create_exited_msg_for_non_zero_exit_code(

if killed_by_oom(fm_step_pids):
return exited_message.with_error(
f"Forward model step {self.job_data.get('name')} "
f"Forward model step {self.step_data.get('name')} "
f"was killed due to out-of-memory on {socket.gethostname()}. "
"Max memory usage recorded by Ert for the "
f"realization was {max_memory_usage // 1024 // 1024} MB. "
Expand All @@ -282,7 +282,7 @@ def _create_exited_msg_for_non_zero_exit_code(
def handle_process_timeout_and_create_exited_msg(
self, exit_code: int | None, proc: Popen[Process], run_start_time: dt
) -> Exited | None:
max_running_minutes = self.job_data.get("max_running_minutes")
max_running_minutes = self.step_data.get("max_running_minutes")

run_time = dt.now() - run_start_time
if max_running_minutes is None or run_time.seconds < max_running_minutes * 60:
Expand All @@ -300,7 +300,7 @@ def handle_process_timeout_and_create_exited_msg(
os.killpg(process_group_id, signal.SIGKILL)

return Exited(self, exit_code).with_error(
f"Job:{self.name()} has been running "
f"Step:{self.name()} has been running "
f"for more than {max_running_minutes} "
"minutes - explicitly killed."
)
Expand All @@ -320,28 +320,28 @@ def _handle_process_io_error_and_create_exited_message(
return Exited(self, e.errno).with_error(msg)

def name(self) -> str:
return self.job_data["name"]
return self.step_data["name"]

def _check_job_files(self) -> list[str]:
def _check_step_files(self) -> list[str]:
"""
Returns the empty list if no failed checks, or a list of errors in case
of failed checks.
"""
errors = []
if self.job_data.get("stdin") and not os.path.exists(self.job_data["stdin"]):
errors.append(f"Could not locate stdin file: {self.job_data['stdin']}")
if self.step_data.get("stdin") and not os.path.exists(self.step_data["stdin"]):
errors.append(f"Could not locate stdin file: {self.step_data['stdin']}")

if self.job_data.get("start_file") and not os.path.exists(
cast(Path, self.job_data["start_file"])
if self.step_data.get("start_file") and not os.path.exists(
cast(Path, self.step_data["start_file"])
):
errors.append(f"Could not locate start_file:{self.job_data['start_file']}")
errors.append(f"Could not locate start_file:{self.step_data['start_file']}")

if self.job_data.get("error_file") and os.path.exists(
cast(Path, self.job_data.get("error_file"))
if self.step_data.get("error_file") and os.path.exists(
cast(Path, self.step_data.get("error_file"))
):
os.unlink(cast(Path, self.job_data.get("error_file")))
os.unlink(cast(Path, self.step_data.get("error_file")))

if executable_error := check_executable(self.job_data.get("executable")):
if executable_error := check_executable(self.step_data.get("executable")):
errors.append(executable_error)

return errors
Expand All @@ -354,10 +354,10 @@ def _check_target_file_is_written(
case of success, an error message in the case of failure.
"""
# no target file is expected at all, indicate success
if "target_file" not in self.job_data:
if "target_file" not in self.step_data:
return None

target_file = self.job_data["target_file"]
target_file = self.step_data["target_file"]

start_time = time.time()
while True:
Expand All @@ -383,23 +383,23 @@ def _check_target_file_is_written(

def _assert_arg_list(self):
errors = []
if "arg_types" in self.job_data:
arg_types = self.job_data["arg_types"]
arg_list = self.job_data.get("argList")
if "arg_types" in self.step_data:
arg_types = self.step_data["arg_types"]
arg_list = self.step_data.get("argList")
for index, arg_type in enumerate(arg_types):
if arg_type == "RUNTIME_FILE":
file_path = os.path.join(os.getcwd(), arg_list[index])
if not os.path.isfile(file_path):
errors.append(
f"In job {self.name()}: RUNTIME_FILE {arg_list[index]} "
f"In step {self.name()}: RUNTIME_FILE {arg_list[index]} "
"does not exist."
)
if arg_type == "RUNTIME_INT":
try:
int(arg_list[index])
except ValueError:
errors.append(
f"In job {self.name()}: argument with index {index} "
f"In step {self.name()}: argument with index {index} "
"is of incorrect type, should be integer."
)
return errors
Expand Down
2 changes: 1 addition & 1 deletion src/_ert/forward_model_runner/reporting/__init__.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
"""
The reporting package provides classes for reporting the results of forward
model jobs.
model steps.
"""

from .base import Reporter
Expand Down
44 changes: 22 additions & 22 deletions src/_ert/forward_model_runner/reporting/event.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
from _ert.forward_model_runner.client import Client, ClientConnectionError
from _ert.forward_model_runner.reporting.base import Reporter
from _ert.forward_model_runner.reporting.message import (
_JOB_EXIT_FAILED_STRING,
_STEP_EXIT_FAILED_STRING,
Checksum,
Exited,
Finish,
Expand All @@ -39,7 +39,7 @@ class EventSentinel:

class Event(Reporter):
"""
The Event reporter forwards events, coming from the running job, added with
The Event reporter forwards events, coming from the running step, added with
"report" to the given connection information.
An Init event must be provided as the first message, which starts reporting,
Expand All @@ -48,7 +48,7 @@ class Event(Reporter):
If event fails to be sent (e.g. due to connection error) it does not proceed to the
next event but instead tries to re-send the same event.
Whenever the Finish event (when all the jobs have exited) is provided
Whenever the Finish event (when all the steps have exited) is provided
the reporter will try to send all remaining events for a maximum of 60 seconds
before stopping the reporter. Any remaining events will not be sent.
"""
Expand All @@ -68,7 +68,7 @@ def __init__(

self._statemachine = StateMachine()
self._statemachine.add_handler((Init,), self._init_handler)
self._statemachine.add_handler((Start, Running, Exited), self._job_handler)
self._statemachine.add_handler((Start, Running, Exited), self._step_handler)
self._statemachine.add_handler((Checksum,), self._checksum_handler)
self._statemachine.add_handler((Finish,), self._finished_handler)

Expand Down Expand Up @@ -140,48 +140,48 @@ def _init_handler(self, msg: Init):
self._real_id = str(msg.real_id)
self._event_publisher_thread.start()

def _job_handler(self, msg: Start | Running | Exited):
assert msg.job
job_name = msg.job.name()
job_msg = {
def _step_handler(self, msg: Start | Running | Exited):
assert msg.step
step_name = msg.step.name()
step_msg = {
"ensemble": self._ens_id,
"real": self._real_id,
"fm_step": str(msg.job.index),
"fm_step": str(msg.step.index),
}
if isinstance(msg, Start):
logger.debug(f"Job {job_name} was successfully started")
logger.debug(f"Step {step_name} was successfully started")
event = ForwardModelStepStart(
**job_msg,
std_out=str(Path(msg.job.std_out).resolve()),
std_err=str(Path(msg.job.std_err).resolve()),
**step_msg,
std_out=str(Path(msg.step.std_out).resolve()),
std_err=str(Path(msg.step.std_err).resolve()),
)
self._dump_event(event)
if not msg.success():
logger.error(f"Job {job_name} FAILED to start")
event = ForwardModelStepFailure(**job_msg, error_msg=msg.error_message)
logger.error(f"Step {step_name} FAILED to start")
event = ForwardModelStepFailure(**step_msg, error_msg=msg.error_message)
self._dump_event(event)

elif isinstance(msg, Exited):
if msg.success():
logger.debug(f"Job {job_name} exited successfully")
self._dump_event(ForwardModelStepSuccess(**job_msg))
logger.debug(f"Step {step_name} exited successfully")
self._dump_event(ForwardModelStepSuccess(**step_msg))
else:
logger.error(
_JOB_EXIT_FAILED_STRING.format(
job_name=msg.job.name(),
_STEP_EXIT_FAILED_STRING.format(
step_name=msg.step.name(),
exit_code=msg.exit_code,
error_message=msg.error_message,
)
)
event = ForwardModelStepFailure(
**job_msg, exit_code=msg.exit_code, error_msg=msg.error_message
**step_msg, exit_code=msg.exit_code, error_msg=msg.error_message
)
self._dump_event(event)

elif isinstance(msg, Running):
logger.debug(f"{job_name} job is running")
logger.debug(f"{step_name} step is running")
event = ForwardModelStepRunning(
**job_msg,
**step_msg,
max_memory_usage=msg.memory_status.max_rss,
current_memory_usage=msg.memory_status.rss,
cpu_seconds=msg.memory_status.cpu_seconds,
Expand Down
Loading

0 comments on commit 9afe260

Please sign in to comment.