Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix workflows running from gui's tools panel not working #9954

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion src/ert/cli/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
from typing import TYPE_CHECKING

from ert.runpaths import Runpaths
from ert.workflow_runner import WorkflowRunner

if TYPE_CHECKING:
Expand All @@ -20,7 +21,14 @@ def execute_workflow(
msg = "Workflow {} is not in the list of available workflows"
logger.error(msg.format(workflow_name))
return
runner = WorkflowRunner(workflow=workflow, storage=storage, ert_config=ert_config)
run_paths = Runpaths(
jobname_format=ert_config.model_config.jobname_format_string,
runpath_format=ert_config.model_config.runpath_format_string,
filename=str(ert_config.runpath_file),
substitutions=ert_config.substitutions,
eclbase=ert_config.model_config.eclbase_format_string,
)
runner = WorkflowRunner(workflow=workflow, storage=storage, run_paths=run_paths)
runner.run_blocking()
if not all(v["completed"] for v in runner.workflowReport().values()):
logger.error(f"Workflow {workflow_name} failed!")
2 changes: 1 addition & 1 deletion src/ert/gui/tools/export/exporter.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def run_export(self, parameters: list[Any]) -> None:

export_job_runner = WorkflowJobRunner(self.export_job)
user_warn = export_job_runner.run(
fixtures={"storage": self._notifier.storage, "ert_config": self.config},
fixtures={"storage": self._notifier.storage},
arguments=parameters,
)
if export_job_runner.hasFailed():
Expand Down
12 changes: 5 additions & 7 deletions src/ert/gui/tools/plugins/plugin_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,26 +6,25 @@

from _ert.threading import ErtThread
from ert.config import CancelPluginException
from ert.runpaths import Runpaths
from ert.workflow_runner import WorkflowJobRunner

from .process_job_dialog import ProcessJobDialog

if TYPE_CHECKING:
from ert.config import ErtConfig
from ert.storage import LocalStorage

from .plugin import Plugin


class PluginRunner:
def __init__(
self, plugin: Plugin, ert_config: ErtConfig, storage: LocalStorage
self, plugin: Plugin, run_paths: Runpaths, storage: LocalStorage
) -> None:
super().__init__()
self.ert_config = ert_config
self.run_paths = run_paths
self.storage = storage
self.__plugin = plugin

self.__plugin_finished_callback: Callable[[], None] = lambda: None

self.__result = None
Expand All @@ -35,17 +34,16 @@ def __init__(
def run(self) -> None:
try:
plugin = self.__plugin

arguments = plugin.getArguments(
fixtures={"storage": self.storage, "ert_config": self.ert_config}
fixtures={"storage": self.storage, "run_paths": self.run_paths}
)
dialog = ProcessJobDialog(plugin.getName(), plugin.getParentWindow())
dialog.setObjectName("process_job_dialog")

dialog.cancelConfirmed.connect(self.cancel)
fixtures = {
k: getattr(self, k)
for k in ["storage", "ert_config"]
for k in ["storage", "run_paths"]
if getattr(self, k)
}
workflow_job_thread = ErtThread(
Expand Down
10 changes: 9 additions & 1 deletion src/ert/gui/tools/plugins/plugins_tool.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from PyQt6.QtWidgets import QMenu

from ert.gui.tools import Tool
from ert.runpaths import Runpaths

from .plugin_runner import PluginRunner

Expand Down Expand Up @@ -35,8 +36,15 @@ def __init__(
self.__plugins = {}

self.menu = QMenu("&Plugins")
runpaths = Runpaths(
jobname_format=ert_config.model_config.jobname_format_string,
runpath_format=ert_config.model_config.runpath_format_string,
filename=str(ert_config.runpath_file),
substitutions=ert_config.substitutions,
eclbase=ert_config.model_config.eclbase_format_string,
)
for plugin in plugin_handler:
plugin_runner = PluginRunner(plugin, ert_config, notifier.storage)
plugin_runner = PluginRunner(plugin, runpaths, notifier.storage)
plugin_runner.setPluginFinishedCallback(self.trigger)

self.__plugins[plugin] = plugin_runner
Expand Down
8 changes: 8 additions & 0 deletions src/ert/gui/tools/workflows/run_workflow_widget.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from _ert.threading import ErtThread
from ert.gui.ertwidgets import EnsembleSelector
from ert.gui.tools.workflows.workflow_dialog import WorkflowDialog
from ert.runpaths import Runpaths
from ert.workflow_runner import WorkflowRunner

if TYPE_CHECKING:
Expand Down Expand Up @@ -128,6 +129,13 @@ def startWorkflow(self) -> None:
workflow,
storage=self.storage,
ensemble=self.source_ensemble_selector.currentData(),
run_paths=Runpaths(
jobname_format=self.config.model_config.jobname_format_string,
runpath_format=self.config.model_config.runpath_format_string,
filename=str(self.config.runpath_file),
substitutions=self.config.substitutions,
eclbase=self.config.model_config.eclbase_format_string,
),
)
self._workflow_runner.run()

Expand Down
23 changes: 11 additions & 12 deletions src/ert/plugins/hook_implementations/workflows/csv_export.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
import json
import os
from collections.abc import Sequence
from typing import TYPE_CHECKING

import pandas as pd

from ert import ErtScript, LibresFacade
from ert.config import ErtConfig
from ert.storage import Storage

if TYPE_CHECKING:
from ert.storage import Ensemble


def loadDesignMatrix(filename: str) -> pd.DataFrame:
Expand Down Expand Up @@ -52,23 +54,20 @@ def getDescription() -> str:

def run(
self,
ert_config: ErtConfig,
storage: Storage,
workflow_args: Sequence[str],
) -> str:
output_file = workflow_args[0]
ensemble_data_as_json = None if len(workflow_args) < 2 else workflow_args[1]
design_matrix_path = None if len(workflow_args) < 3 else workflow_args[2]
_ = True if len(workflow_args) < 4 else workflow_args[3]
drop_const_cols = False if len(workflow_args) < 5 else workflow_args[4]
facade = LibresFacade(ert_config)

ensemble_data_as_dict = (
json.loads(ensemble_data_as_json) if ensemble_data_as_json else {}
)

# Use the keys (UUIDs as strings) to get ensembles
ensembles = []
ensembles: list[Ensemble] = []
for ensemble_id in ensemble_data_as_dict:
assert self.storage is not None
ensemble = self.storage.get_ensemble(ensemble_id)
Expand Down Expand Up @@ -96,12 +95,12 @@ def run(
if not design_matrix_data.empty:
ensemble_data = ensemble_data.join(design_matrix_data, how="outer")

misfit_data = facade.load_all_misfit_data(ensemble)
misfit_data = LibresFacade.load_all_misfit_data(ensemble)
if not misfit_data.empty:
ensemble_data = ensemble_data.join(misfit_data, how="outer")

summary_data = ensemble.load_all_summary_data()
if not summary_data.empty:
realizations = ensemble.get_realization_list_with_responses()
summary_data = ensemble.load_responses("summary", tuple(realizations))
if not summary_data.is_empty():
ensemble_data = ensemble_data.join(summary_data, how="outer")
else:
ensemble_data["Date"] = None
Expand All @@ -114,8 +113,8 @@ def run(
)

data = pd.concat([data, ensemble_data])

data = data.reorder_levels(["Realization", "Iteration", "Date", "Ensemble"])
if not data.empty:
data = data.reorder_levels(["Realization", "Iteration", "Date", "Ensemble"])
if drop_const_cols:
data = data.loc[:, (data != data.iloc[0]).any()]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from ert.exceptions import StorageError

if TYPE_CHECKING:
from ert.config import ErtConfig
from ert.storage import Ensemble


Expand All @@ -23,17 +22,14 @@ class ExportMisfitDataJob(ErtScript):
((response_value - observation_data) / observation_std)**2
"""

def run(
self, ert_config: ErtConfig, ensemble: Ensemble, workflow_args: list[Any]
) -> None:
def run(self, ensemble: Ensemble, workflow_args: list[Any]) -> None:
target_file = "misfit.hdf" if not workflow_args else workflow_args[0]

realizations = ensemble.get_realization_list_with_responses()

from ert import LibresFacade # noqa: PLC0415 (circular import)

facade = LibresFacade(ert_config)
misfit = facade.load_all_misfit_data(ensemble)
misfit = LibresFacade.load_all_misfit_data(ensemble)
if len(realizations) == 0 or misfit.empty:
raise StorageError("No responses loaded")
misfit.columns = pd.Index([val.split(":")[1] for val in misfit.columns])
Expand Down
20 changes: 9 additions & 11 deletions src/ert/plugins/hook_implementations/workflows/export_runpath.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from ert.validation import rangestring_to_list

if TYPE_CHECKING:
from ert.config import ErtConfig
from ert.storage import Ensemble


class ExportRunpathJob(ErtScript):
Expand All @@ -33,20 +33,18 @@ class ExportRunpathJob(ErtScript):
file.
"""

def run(self, ert_config: ErtConfig, workflow_args: list[Any]) -> None:
def run(
self, run_paths: Runpaths, ensemble: Ensemble, workflow_args: list[Any]
) -> None:
args = " ".join(workflow_args).split() # Make sure args is a list of words
run_paths = Runpaths(
jobname_format=ert_config.model_config.jobname_format_string,
runpath_format=ert_config.model_config.runpath_format_string,
filename=str(ert_config.runpath_file),
substitutions=ert_config.substitutions,
eclbase=ert_config.model_config.eclbase_format_string,
)
assert ensemble
iter = ensemble.iteration
reals = ensemble.ensemble_size
run_paths.write_runpath_list(
*self.get_ranges(
args,
ert_config.analysis_config.num_iterations,
ert_config.model_config.num_realizations,
iter,
reals,
)
)

Expand Down
15 changes: 8 additions & 7 deletions src/ert/run_models/base_run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -677,11 +677,12 @@ def validate_successful_realizations_count(self) -> None:
def run_workflows(
self,
runtime: HookRuntime,
storage: Storage | None = None,
ensemble: Ensemble | None = None,
) -> None:
for workflow in self._hooked_workflows[runtime]:
WorkflowRunner(workflow, storage, ensemble).run_blocking()
WorkflowRunner(
workflow, self._storage, ensemble, self.run_paths
).run_blocking()

def _evaluate_and_postprocess(
self,
Expand All @@ -703,7 +704,7 @@ def _evaluate_and_postprocess(
context_env=self._context_env,
)

self.run_workflows(HookRuntime.PRE_SIMULATION, self._storage, ensemble)
self.run_workflows(HookRuntime.PRE_SIMULATION, ensemble)
successful_realizations = self.run_ensemble_evaluator(
run_args,
ensemble,
Expand All @@ -729,7 +730,7 @@ def _evaluate_and_postprocess(
f"{self.ensemble_size - num_successful_realizations}"
)
logger.info(f"Experiment run finished in: {self.get_runtime()}s")
self.run_workflows(HookRuntime.POST_SIMULATION, self._storage, ensemble)
self.run_workflows(HookRuntime.POST_SIMULATION, ensemble)

return num_successful_realizations

Expand Down Expand Up @@ -802,8 +803,8 @@ def update(
prior_ensemble=prior,
)
if prior.iteration == 0:
self.run_workflows(HookRuntime.PRE_FIRST_UPDATE, self._storage, prior)
self.run_workflows(HookRuntime.PRE_UPDATE, self._storage, prior)
self.run_workflows(HookRuntime.PRE_FIRST_UPDATE, prior)
self.run_workflows(HookRuntime.PRE_UPDATE, prior)
try:
smoother_update(
prior,
Expand All @@ -825,5 +826,5 @@ def update(
"Update algorithm failed for iteration:"
f"{posterior.iteration}. The following error occurred: {e}"
) from e
self.run_workflows(HookRuntime.POST_UPDATE, self._storage, prior)
self.run_workflows(HookRuntime.POST_UPDATE, prior)
return posterior
4 changes: 2 additions & 2 deletions src/ert/run_models/ensemble_experiment.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,6 @@ def run_experiment(
raise ErtRunError(str(exc)) from exc

if not restart:
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
self.experiment = self._storage.create_experiment(
name=self.experiment_name,
parameters=(
Expand All @@ -109,6 +108,7 @@ def run_experiment(
name=self.ensemble_name,
ensemble_size=self.ensemble_size,
)
self.run_workflows(HookRuntime.PRE_EXPERIMENT, self.ensemble)
else:
self.active_realizations = self._create_mask_from_failed_realizations()

Expand Down Expand Up @@ -143,7 +143,7 @@ def run_experiment(
self.ensemble,
evaluator_server_config,
)
self.run_workflows(HookRuntime.POST_EXPERIMENT)
self.run_workflows(HookRuntime.POST_EXPERIMENT, self.ensemble)

@classmethod
def name(cls) -> str:
Expand Down
4 changes: 2 additions & 2 deletions src/ert/run_models/ensemble_smoother.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,6 @@ def run_experiment(
) -> None:
self.log_at_startup()
self.restart = restart
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
ensemble_format = self.target_ensemble_format
experiment = self._storage.create_experiment(
parameters=self._parameter_configuration,
Expand All @@ -89,6 +88,7 @@ def run_experiment(
ensemble_size=self.ensemble_size,
name=ensemble_format % 0,
)
self.run_workflows(HookRuntime.PRE_EXPERIMENT, prior)
self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id))
prior_args = create_run_arguments(
self.run_paths,
Expand Down Expand Up @@ -120,7 +120,7 @@ def run_experiment(
posterior,
evaluator_server_config,
)
self.run_workflows(HookRuntime.POST_EXPERIMENT)
self.run_workflows(HookRuntime.POST_EXPERIMENT, posterior)

@classmethod
def name(cls) -> str:
Expand Down
4 changes: 2 additions & 2 deletions src/ert/run_models/multiple_data_assimilation.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def run_experiment(
f"Prior ensemble with ID: {id} does not exists"
) from err
else:
self.run_workflows(HookRuntime.PRE_EXPERIMENT)
sim_args = {"weights": self._relative_weights}
experiment = self._storage.create_experiment(
parameters=self._parameter_configuration,
Expand All @@ -132,6 +131,7 @@ def run_experiment(
iteration=0,
name=self.target_ensemble_format % 0,
)
self.run_workflows(HookRuntime.PRE_EXPERIMENT, prior)
self.set_env_key("_ERT_EXPERIMENT_ID", str(experiment.id))
self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id))
prior_args = create_run_arguments(
Expand Down Expand Up @@ -171,7 +171,7 @@ def run_experiment(
)
prior = posterior

self.run_workflows(HookRuntime.POST_EXPERIMENT)
self.run_workflows(HookRuntime.POST_EXPERIMENT, prior)

@staticmethod
def parse_weights(weights: str) -> list[float]:
Expand Down
Loading
Loading