diff --git a/src/ert/__main__.py b/src/ert/__main__.py index 65040fc1b59..1d97b09b5e9 100755 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -414,6 +414,12 @@ def get_ert_parser(parser: Optional[ArgumentParser] = None) -> ArgumentParser: help="Name of the ensemble where the results for the " "updated parameters will be stored.", ) + ensemble_smoother_parser.add_argument( + "--experiment-name", + type=str, + default="ensemble-experiment", + help="Name of the experiment", + ) ensemble_smoother_parser.add_argument( "--realizations", type=valid_realizations, diff --git a/src/ert/gui/ertnotifier.py b/src/ert/gui/ertnotifier.py index 6c925f353bc..f4043b1b152 100644 --- a/src/ert/gui/ertnotifier.py +++ b/src/ert/gui/ertnotifier.py @@ -1,20 +1,22 @@ -from typing import Optional +from typing import List, Optional from qtpy.QtCore import QObject, Signal, Slot -from ert.storage import Ensemble, Storage +from ert.storage import LocalEnsemble, LocalExperiment, LocalStorage class ErtNotifier(QObject): ertChanged = Signal() storage_changed = Signal(object, name="storageChanged") + current_experiment_changed = Signal(object, name="currentExperimentChanged") current_ensemble_changed = Signal(object, name="currentEnsembleChanged") def __init__(self, config_file: str): QObject.__init__(self) self._config_file = config_file - self._storage: Optional[Storage] = None - self._current_ensemble: Optional[Ensemble] = None + self._storage: Optional[LocalStorage] = None + self._current_experiment: Optional[LocalExperiment] = None + self._current_ensemble: Optional[LocalEnsemble] = None self._is_simulation_running = False @property @@ -22,7 +24,7 @@ def is_storage_available(self) -> bool: return self._storage is not None @property - def storage(self) -> Storage: + def storage(self) -> LocalStorage: assert self.is_storage_available return self._storage # type: ignore @@ -31,9 +33,17 @@ def config_file(self) -> str: return self._config_file @property - def current_ensemble(self) -> Optional[Ensemble]: - if self._current_ensemble is None and self._storage is not None: - ensembles = list(self._storage.ensembles) + def current_experiment(self) -> Optional[LocalExperiment]: + if self._current_experiment is None and self._storage is not None: + experiments = list(self._storage.experiments) + if experiments: + self._current_experiment = experiments[0] + return self._current_experiment + + @property + def current_ensemble(self) -> Optional[LocalEnsemble]: + if self._current_ensemble is None and self._current_experiment is not None: + ensembles = list(self._current_experiment.ensembles) if ensembles: self._current_ensemble = ensembles[0] return self._current_ensemble @@ -53,15 +63,30 @@ def emitErtChange(self) -> None: self.ertChanged.emit() @Slot(object) - def set_storage(self, storage: Storage) -> None: + def set_storage(self, storage: LocalStorage) -> None: self._storage = storage + self._current_experiment = None + self._current_ensemble = None self.storage_changed.emit(storage) @Slot(object) - def set_current_ensemble(self, ensemble: Optional[Ensemble] = None) -> None: + def set_current_experiment( + self, experiment: Optional[LocalExperiment] = None + ) -> None: + self._current_experiment = experiment + self._current_ensemble = None + self.current_experiment_changed.emit(experiment) + + @Slot(object) + def set_current_ensemble(self, ensemble: Optional[LocalEnsemble] = None) -> None: self._current_ensemble = ensemble self.current_ensemble_changed.emit(ensemble) @Slot(bool) def set_is_simulation_running(self, is_running: bool) -> None: self._is_simulation_running = is_running + + def get_current_experiment_ensembles(self) -> List[LocalEnsemble]: + if self._current_experiment is None: + return [] + return list(self._current_experiment.ensembles) diff --git a/src/ert/gui/ertwidgets/ensembleselector.py b/src/ert/gui/ertwidgets/ensembleselector.py index db0d977c449..d465f01136f 100644 --- a/src/ert/gui/ertwidgets/ensembleselector.py +++ b/src/ert/gui/ertwidgets/ensembleselector.py @@ -9,7 +9,7 @@ from ert.storage.realization_storage_state import RealizationStorageState if TYPE_CHECKING: - from ert.storage import Ensemble + from ert.storage import LocalEnsemble class EnsembleSelector(QComboBox): @@ -24,83 +24,71 @@ def __init__( ): super().__init__() self.notifier = notifier - - # If true current ensemble of ert will be change self._update_ert = update_ert - # only show initialized ensembles self._show_only_undefined = show_only_undefined - # If True, we filter out any ensembles which have children - # One use case is if a user wants to rerun because of failures - # not related to parameterization. We can allow that, but only - # if the ensemble has not been used in an update, as that would - # invalidate the result self._show_only_no_children = show_only_no_children self.setSizeAdjustPolicy(QComboBox.AdjustToContents) - self.setEnabled(False) if update_ert: - # Update ERT when this combo box is changed self.currentIndexChanged[int].connect(self._on_current_index_changed) - - # Update this combo box when ERT is changed notifier.current_ensemble_changed.connect( self._on_global_current_ensemble_changed ) notifier.ertChanged.connect(self.populate) notifier.storage_changed.connect(self.populate) + notifier.current_experiment_changed.connect(self.populate) if notifier.is_storage_available: self.populate() @property - def selected_ensemble(self) -> Ensemble: + def selected_ensemble(self) -> LocalEnsemble: return self.itemData(self.currentIndex()) def populate(self) -> None: block = self.blockSignals(True) - self.clear() - if self._ensemble_list(): self.setEnabled(True) - for ensemble in self._ensemble_list(): self.addItem(ensemble.name, userData=ensemble) - current_index = self.findData( self.notifier.current_ensemble, Qt.ItemDataRole.UserRole ) - self.setCurrentIndex(max(current_index, 0)) - self.blockSignals(block) - self.ensemble_populated.emit() - def _ensemble_list(self) -> Iterable[Ensemble]: + def _ensemble_list(self) -> Iterable[LocalEnsemble]: + if self.notifier.current_experiment is None: + return [] + + ensembles = self.notifier.current_experiment.ensembles + if self._show_only_undefined: ensembles = ( ensemble - for ensemble in self.notifier.storage.ensembles + for ensemble in ensembles if all( e == RealizationStorageState.UNDEFINED for e in ensemble.get_ensemble_state() ) ) - else: - ensembles = self.notifier.storage.ensembles + ensemble_list = list(ensembles) + if self._show_only_no_children: - parents = [ - ens.parent for ens in self.notifier.storage.ensembles if ens.parent - ] + parents = [ens.parent for ens in ensemble_list if ens.parent] ensemble_list = [val for val in ensemble_list if val.id not in parents] + return sorted(ensemble_list, key=lambda x: x.started_at, reverse=True) def _on_current_index_changed(self, index: int) -> None: self.notifier.set_current_ensemble(self.itemData(index)) - def _on_global_current_ensemble_changed(self, data: Optional[Ensemble]) -> None: + def _on_global_current_ensemble_changed( + self, data: Optional[LocalEnsemble] + ) -> None: self.setCurrentIndex(max(self.findData(data, Qt.ItemDataRole.UserRole), 0)) diff --git a/src/ert/storage/__init__.py b/src/ert/storage/__init__.py index dd4d8fd04a6..618ecf7711c 100644 --- a/src/ert/storage/__init__.py +++ b/src/ert/storage/__init__.py @@ -1,3 +1,8 @@ +""" +Hierarchical Structure: +The code follows a clear hierarchical structure: Storage -> Experiment -> Ensemble. +""" + from __future__ import annotations import os diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index d55ce295d4e..e69acdfa312 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -1,7 +1,6 @@ from __future__ import annotations import json -from datetime import datetime from functools import cached_property from pathlib import Path from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional @@ -20,7 +19,6 @@ SummaryConfig, SurfaceConfig, ) -from ert.config.parsing.context_values import ContextBoolEncoder from ert.config.response_config import ResponseConfig from ert.storage.mode import BaseMode, Mode, require_write @@ -86,76 +84,6 @@ def __init__( (path / "index.json").read_text(encoding="utf-8") ) - @classmethod - def create( - cls, - storage: LocalStorage, - uuid: UUID, - path: Path, - *, - parameters: Optional[List[ParameterConfig]] = None, - responses: Optional[List[ResponseConfig]] = None, - observations: Optional[Dict[str, xr.Dataset]] = None, - simulation_arguments: Optional[Dict[Any, Any]] = None, - name: Optional[str] = None, - ) -> LocalExperiment: - """ - Create a new LocalExperiment and store its configuration data. - - Parameters - ---------- - storage : LocalStorage - Storage instance for experiment creation. - uuid : UUID - Unique identifier for the new experiment. - path : Path - File system path for storing experiment data. - parameters : list of ParameterConfig, optional - List of parameter configurations. - responses : list of ResponseConfig, optional - List of response configurations. - observations : dict of str: xr.Dataset, optional - Observations dictionary. - simulation_arguments : SimulationArguments, optional - Simulation arguments for the experiment. - name : str, optional - Experiment name. Defaults to current date if None. - - Returns - ------- - local_experiment : LocalExperiment - Instance of the newly created experiment. - """ - if name is None: - name = datetime.today().strftime("%Y-%m-%d") - - (path / "index.json").write_text(_Index(id=uuid, name=name).model_dump_json()) - - parameter_data = {} - for parameter in parameters or []: - parameter.save_experiment_data(path) - parameter_data.update({parameter.name: parameter.to_dict()}) - with open(path / cls._parameter_file, "w", encoding="utf-8") as f: - json.dump(parameter_data, f, indent=2) - - response_data = {} - for response in responses or []: - response_data.update({response.name: response.to_dict()}) - with open(path / cls._responses_file, "w", encoding="utf-8") as f: - json.dump(response_data, f, default=str, indent=2) - - if observations: - output_path = path / "observations" - output_path.mkdir() - for obs_name, dataset in observations.items(): - dataset.to_netcdf(output_path / f"{obs_name}", engine="scipy") - - with open(path / cls._metadata_file, "w", encoding="utf-8") as f: - simulation_data = simulation_arguments if simulation_arguments else {} - json.dump(simulation_data, f, cls=ContextBoolEncoder) - - return cls(storage, path, Mode.WRITE) - @require_write def create_ensemble( self, @@ -194,6 +122,41 @@ def create_ensemble( prior_ensemble=prior_ensemble, ) + def get_ensemble(self, uuid: UUID) -> LocalEnsemble: + """ + Retrieves an ensemble by UUID. + + Parameters + ---------- + uuid : UUID + The UUID of the ensemble to retrieve. + + Returns + local_ensemble : LocalEnsemble + The ensemble associated with the given UUID. + """ + return self.ensembles[uuid] + + def get_ensemble_by_name(self, name: str) -> LocalEnsemble: + """ + Retrieves an ensemble by name. + + Parameters + ---------- + name : str + The name of the ensemble to retrieve. + + Returns + ------- + local_ensemble : LocalEnsemble + The ensemble associated with the given name. + """ + + for ens in self.ensembles: + if ens.name == name: + return ens + raise KeyError(f"Ensemble with name '{name}' not found") + @property def ensembles(self) -> Generator[LocalEnsemble, None, None]: yield from ( diff --git a/src/ert/storage/local_storage.py b/src/ert/storage/local_storage.py index 660d6d18d3c..c0174d4c918 100644 --- a/src/ert/storage/local_storage.py +++ b/src/ert/storage/local_storage.py @@ -26,6 +26,7 @@ from pydantic import BaseModel, Field from ert.config import ErtConfig +from ert.config.parsing.context_values import ContextBoolEncoder from ert.shared import __version__ from ert.storage.local_ensemble import LocalEnsemble from ert.storage.local_experiment import LocalExperiment @@ -34,7 +35,6 @@ Mode, require_write, ) -from ert.storage.realization_storage_state import RealizationStorageState if TYPE_CHECKING: from ert.config import ParameterConfig, ResponseConfig @@ -66,6 +66,9 @@ class LocalStorage(BaseMode): through file locks. """ + _parameter_file = Path("parameter.json") + _responses_file = Path("responses.json") + _metadata_file = Path("metadata.json") LOCK_TIMEOUT = 5 def __init__( @@ -130,6 +133,51 @@ def refresh(self) -> None: self._ensembles = self._load_ensembles() self._experiments = self._load_experiments() + @require_write + def create_experiment( + self, + parameters: Optional[List[ParameterConfig]] = None, + responses: Optional[List[ResponseConfig]] = None, + observations: Optional[Dict[str, xr.Dataset]] = None, + simulation_arguments: Optional[Dict[Any, Any]] = None, + name: Optional[str] = None, + ) -> LocalExperiment: + exp_id = uuid4() + path = self._experiment_path(exp_id) + path.mkdir(parents=True, exist_ok=False) + + if name is None: + name = datetime.now().strftime("%Y-%m-%d") + + (path / "index.json").write_text(_Index(id=exp_id, name=name).model_dump_json()) + + parameter_data = {} + for parameter in parameters or []: + parameter.save_experiment_data(path) + parameter_data.update({parameter.name: parameter.to_dict()}) + with open(path / self._parameter_file, "w", encoding="utf-8") as f: + json.dump(parameter_data, f, indent=2) + + response_data = {} + for response in responses or []: + response_data.update({response.name: response.to_dict()}) + with open(path / self._responses_file, "w", encoding="utf-8") as f: + json.dump(response_data, f, default=str, indent=2) + + if observations: + output_path = path / "observations" + output_path.mkdir() + for obs_name, dataset in observations.items(): + dataset.to_netcdf(output_path / f"{obs_name}", engine="scipy") + + with open(path / self._metadata_file, "w", encoding="utf-8") as f: + simulation_data = simulation_arguments if simulation_arguments else {} + json.dump(simulation_data, f, cls=ContextBoolEncoder) + + exp = LocalExperiment(self, path, Mode.WRITE) + self._experiments[exp_id] = exp + return exp + def get_experiment(self, uuid: UUID) -> LocalExperiment: """ Retrieves an experiment by UUID. @@ -147,51 +195,34 @@ def get_experiment(self, uuid: UUID) -> LocalExperiment: return self._experiments[uuid] - def get_ensemble(self, uuid: Union[UUID, str]) -> LocalEnsemble: + def get_experiment_by_name(self, name: str) -> LocalExperiment: """ - Retrieves an ensemble by UUID. - - Parameters - ---------- - uuid : UUID - The UUID of the ensemble to retrieve. - - Returns - local_ensemble : LocalEnsemble - The ensemble associated with the given UUID. - """ - if isinstance(uuid, str): - uuid = UUID(uuid) - return self._ensembles[uuid] - - def get_ensemble_by_name(self, name: str) -> LocalEnsemble: - """ - Retrieves an ensemble by name. + Retrieves an experiment by name. Parameters ---------- name : str - The name of the ensemble to retrieve. + The name of the experiment to retrieve. Returns ------- - local_ensemble : LocalEnsemble - The ensemble associated with the given name. - """ + local_experiment : LocalExperiment + The experiment associated with the given name. - for ens in self._ensembles.values(): - if ens.name == name: - return ens - raise KeyError(f"Ensemble with name '{name}' not found") + Raises + ------ + KeyError + If no experiment with the given name is found. + """ + for exp in self._experiments.values(): + if exp.name == name: + return exp + raise KeyError(f"Experiment with name '{name}' not found") @property def experiments(self) -> Generator[LocalExperiment, None, None]: yield from self._experiments.values() - @property - def ensembles(self) -> Generator[LocalEnsemble, None, None]: - yield from self._ensembles.values() - def _load_index(self) -> _Index: try: return _Index.model_validate_json( @@ -227,9 +258,6 @@ def _load_experiments(self) -> Dict[UUID, LocalExperiment]: for exp_id in experiment_ids } - def _ensemble_path(self, ensemble_id: UUID) -> Path: - return self.path / "ensembles" / str(ensemble_id) - def _experiment_path(self, experiment_id: UUID) -> Path: return self.path / "experiments" / str(experiment_id) @@ -287,135 +315,6 @@ def _release_lock(self) -> None: self._lock.release() (self.path / "storage.lock").unlink() - @require_write - def create_experiment( - self, - parameters: Optional[List[ParameterConfig]] = None, - responses: Optional[List[ResponseConfig]] = None, - observations: Optional[Dict[str, xr.Dataset]] = None, - simulation_arguments: Optional[Dict[Any, Any]] = None, - name: Optional[str] = None, - ) -> LocalExperiment: - """ - Creates a new experiment in the storage. - - Parameters - ---------- - parameters : list of ParameterConfig, optional - The parameters for the experiment. - responses : list of ResponseConfig, optional - The responses for the experiment. - observations : dict of str to Dataset, optional - The observations for the experiment. - simulation_arguments : SimulationArguments, optional - The simulation arguments for the experiment. - name : str, optional - The name of the experiment. - - Returns - ------- - local_experiment : LocalExperiment - The newly created experiment. - """ - - exp_id = uuid4() - path = self._experiment_path(exp_id) - path.mkdir(parents=True, exist_ok=False) - - exp = LocalExperiment.create( - self, - exp_id, - path, - parameters=parameters, - responses=responses, - observations=observations, - simulation_arguments=simulation_arguments, - name=name, - ) - - self._experiments[exp.id] = exp - return exp - - @require_write - def create_ensemble( - self, - experiment: Union[LocalExperiment, UUID], - *, - ensemble_size: int, - iteration: int = 0, - name: Optional[str] = None, - prior_ensemble: Union[LocalEnsemble, UUID, None] = None, - ) -> LocalEnsemble: - """ - Creates a new ensemble in the storage. - - Raises a ValueError if the ensemble size is larger than the prior - ensemble. - - Parameters - ---------- - experiment : {LocalExperiment, UUID} - The experiment for which the ensemble is created. - ensemble_size : int - The number of realizations in the ensemble. - iteration : int, optional - The iteration index for the ensemble. - name : str, optional - The name of the ensemble. - prior_ensemble : {LocalEnsemble, UUID}, optional - An optional ensemble to use as a prior. - - Returns - ------- - local_ensemble : LocalEnsemble - The newly created ensemble. - """ - - experiment_id = experiment if isinstance(experiment, UUID) else experiment.id - - uuid = uuid4() - path = self._ensemble_path(uuid) - path.mkdir(parents=True, exist_ok=False) - - prior_ensemble_id: Optional[UUID] = None - if isinstance(prior_ensemble, UUID): - prior_ensemble_id = prior_ensemble - elif isinstance(prior_ensemble, LocalEnsemble): - prior_ensemble_id = prior_ensemble.id - prior_ensemble = ( - self.get_ensemble(prior_ensemble_id) if prior_ensemble_id else None - ) - if prior_ensemble and ensemble_size > prior_ensemble.ensemble_size: - raise ValueError( - f"New ensemble ({ensemble_size}) must be of equal or " - f"smaller size than parent ensemble ({prior_ensemble.ensemble_size})" - ) - ens = LocalEnsemble.create( - self, - path, - uuid, - ensemble_size=ensemble_size, - experiment_id=experiment_id, - iteration=iteration, - name=str(name), - prior_ensemble_id=prior_ensemble_id, - ) - if prior_ensemble: - for realization, state in enumerate(prior_ensemble.get_ensemble_state()): - if state in [ - RealizationStorageState.LOAD_FAILURE, - RealizationStorageState.PARENT_FAILURE, - RealizationStorageState.UNDEFINED, - ]: - ens.set_failure( - realization, - RealizationStorageState.PARENT_FAILURE, - f"Failure from prior: {state}", - ) - - self._ensembles[ens.id] = ens - return ens - @require_write def _add_migration_information( self, from_version: int, to_version: int, name: str diff --git a/tests/integration_tests/analysis/test_es_update.py b/tests/integration_tests/analysis/test_es_update.py index 1af6e595b1f..3b121dc0a7d 100644 --- a/tests/integration_tests/analysis/test_es_update.py +++ b/tests/integration_tests/analysis/test_es_update.py @@ -57,13 +57,16 @@ def test_that_posterior_has_lower_variance_than_prior(): "--disable-monitor", "--realizations", "1-50", + "--experiment-name", + "testing", "poly.ert", ) facade = LibresFacade.from_config_file("poly.ert") with open_storage(facade.enspath) as storage: - prior_ensemble = storage.get_ensemble_by_name("iter-0") + experiment = storage.get_experiment_by_name("testing") + prior_ensemble = experiment.get_ensemble_by_name("iter-0") df_default = prior_ensemble.load_all_gen_kw_data() - posterior_ensemble = storage.get_ensemble_by_name("iter-1") + posterior_ensemble = experiment.get_ensemble_by_name("iter-1") df_target = posterior_ensemble.load_all_gen_kw_data() # The std for the ensemble should decrease