diff --git a/src/ert/__main__.py b/src/ert/__main__.py index 65040fc1b59..1d97b09b5e9 100755 --- a/src/ert/__main__.py +++ b/src/ert/__main__.py @@ -414,6 +414,12 @@ def get_ert_parser(parser: Optional[ArgumentParser] = None) -> ArgumentParser: help="Name of the ensemble where the results for the " "updated parameters will be stored.", ) + ensemble_smoother_parser.add_argument( + "--experiment-name", + type=str, + default="ensemble-experiment", + help="Name of the experiment", + ) ensemble_smoother_parser.add_argument( "--realizations", type=valid_realizations, diff --git a/src/ert/gui/ertnotifier.py b/src/ert/gui/ertnotifier.py index 6c925f353bc..d603c681e0f 100644 --- a/src/ert/gui/ertnotifier.py +++ b/src/ert/gui/ertnotifier.py @@ -1,8 +1,8 @@ -from typing import Optional +from typing import List, Optional from qtpy.QtCore import QObject, Signal, Slot -from ert.storage import Ensemble, Storage +from ert.storage import LocalEnsemble, LocalStorage class ErtNotifier(QObject): @@ -13,8 +13,8 @@ class ErtNotifier(QObject): def __init__(self, config_file: str): QObject.__init__(self) self._config_file = config_file - self._storage: Optional[Storage] = None - self._current_ensemble: Optional[Ensemble] = None + self._storage: Optional[LocalStorage] = None + self._current_ensemble: Optional[LocalEnsemble] = None self._is_simulation_running = False @property @@ -22,7 +22,7 @@ def is_storage_available(self) -> bool: return self._storage is not None @property - def storage(self) -> Storage: + def storage(self) -> LocalStorage: assert self.is_storage_available return self._storage # type: ignore @@ -31,11 +31,11 @@ def config_file(self) -> str: return self._config_file @property - def current_ensemble(self) -> Optional[Ensemble]: - if self._current_ensemble is None and self._storage is not None: - ensembles = list(self._storage.ensembles) - if ensembles: - self._current_ensemble = ensembles[0] + def current_ensemble(self) -> Optional[LocalEnsemble]: + if self._current_ensemble is None and self.is_storage_available: + all_ensembles = self.get_all_ensembles() + if all_ensembles: + self._current_ensemble = all_ensembles[0] return self._current_ensemble @property @@ -53,15 +53,24 @@ def emitErtChange(self) -> None: self.ertChanged.emit() @Slot(object) - def set_storage(self, storage: Storage) -> None: + def set_storage(self, storage: LocalStorage) -> None: self._storage = storage + self._current_ensemble = None self.storage_changed.emit(storage) @Slot(object) - def set_current_ensemble(self, ensemble: Optional[Ensemble] = None) -> None: + def set_current_ensemble(self, ensemble: Optional[LocalEnsemble] = None) -> None: self._current_ensemble = ensemble self.current_ensemble_changed.emit(ensemble) @Slot(bool) def set_is_simulation_running(self, is_running: bool) -> None: self._is_simulation_running = is_running + + def get_all_ensembles(self) -> List[LocalEnsemble]: + if not self.is_storage_available: + return [] + all_ensembles = [] + for experiment in self.storage.experiments: + all_ensembles.extend(list(experiment.ensembles)) + return all_ensembles diff --git a/src/ert/gui/ertwidgets/ensembleselector.py b/src/ert/gui/ertwidgets/ensembleselector.py index db0d977c449..1ba9893189f 100644 --- a/src/ert/gui/ertwidgets/ensembleselector.py +++ b/src/ert/gui/ertwidgets/ensembleselector.py @@ -9,7 +9,7 @@ from ert.storage.realization_storage_state import RealizationStorageState if TYPE_CHECKING: - from ert.storage import Ensemble + from ert.storage import LocalEnsemble class EnsembleSelector(QComboBox): @@ -24,26 +24,14 @@ def __init__( ): super().__init__() self.notifier = notifier - - # If true current ensemble of ert will be change self._update_ert = update_ert - # only show initialized ensembles self._show_only_undefined = show_only_undefined - # If True, we filter out any ensembles which have children - # One use case is if a user wants to rerun because of failures - # not related to parameterization. We can allow that, but only - # if the ensemble has not been used in an update, as that would - # invalidate the result self._show_only_no_children = show_only_no_children self.setSizeAdjustPolicy(QComboBox.AdjustToContents) - self.setEnabled(False) if update_ert: - # Update ERT when this combo box is changed self.currentIndexChanged[int].connect(self._on_current_index_changed) - - # Update this combo box when ERT is changed notifier.current_ensemble_changed.connect( self._on_global_current_ensemble_changed ) @@ -55,52 +43,51 @@ def __init__( self.populate() @property - def selected_ensemble(self) -> Ensemble: + def selected_ensemble(self) -> LocalEnsemble: return self.itemData(self.currentIndex()) def populate(self) -> None: block = self.blockSignals(True) - self.clear() - if self._ensemble_list(): self.setEnabled(True) - for ensemble in self._ensemble_list(): self.addItem(ensemble.name, userData=ensemble) - current_index = self.findData( self.notifier.current_ensemble, Qt.ItemDataRole.UserRole ) - self.setCurrentIndex(max(current_index, 0)) - self.blockSignals(block) - self.ensemble_populated.emit() - def _ensemble_list(self) -> Iterable[Ensemble]: + def _ensemble_list(self) -> Iterable[LocalEnsemble]: + if not self.notifier.is_storage_available: + return [] + + all_ensembles = [] + for experiment in self.notifier.storage.experiments: + all_ensembles.extend(experiment.ensembles) + if self._show_only_undefined: - ensembles = ( + all_ensembles = [ ensemble - for ensemble in self.notifier.storage.ensembles + for ensemble in all_ensembles if all( e == RealizationStorageState.UNDEFINED for e in ensemble.get_ensemble_state() ) - ) - else: - ensembles = self.notifier.storage.ensembles - ensemble_list = list(ensembles) - if self._show_only_no_children: - parents = [ - ens.parent for ens in self.notifier.storage.ensembles if ens.parent ] - ensemble_list = [val for val in ensemble_list if val.id not in parents] - return sorted(ensemble_list, key=lambda x: x.started_at, reverse=True) + + if self._show_only_no_children: + parents = [ens.parent for ens in all_ensembles if ens.parent] + all_ensembles = [val for val in all_ensembles if val.id not in parents] + + return sorted(all_ensembles, key=lambda x: x.started_at, reverse=True) def _on_current_index_changed(self, index: int) -> None: self.notifier.set_current_ensemble(self.itemData(index)) - def _on_global_current_ensemble_changed(self, data: Optional[Ensemble]) -> None: + def _on_global_current_ensemble_changed( + self, data: Optional[LocalEnsemble] + ) -> None: self.setCurrentIndex(max(self.findData(data, Qt.ItemDataRole.UserRole), 0)) diff --git a/src/ert/run_models/base_run_model.py b/src/ert/run_models/base_run_model.py index c9c0633f3a7..7d4efcb3634 100644 --- a/src/ert/run_models/base_run_model.py +++ b/src/ert/run_models/base_run_model.py @@ -68,7 +68,7 @@ ) from ert.mode_definitions import MODULE_MODE from ert.runpaths import Runpaths -from ert.storage import Ensemble, Storage +from ert.storage import Ensemble, LocalEnsemble, Storage from ert.workflow_runner import WorkflowRunner from ..config.analysis_config import UpdateSettings @@ -671,8 +671,8 @@ def __init__( ) def update( - self, prior: Ensemble, posterior_name: str, weight: float = 1.0 - ) -> Ensemble: + self, prior: LocalEnsemble, posterior_name: str, weight: float = 1.0 + ) -> LocalEnsemble: self.validate() self.send_event( RunModelUpdateBeginEvent(iteration=prior.iteration, run_id=prior.id) @@ -684,11 +684,10 @@ def update( msg="Creating posterior ensemble..", ) ) - posterior = self._storage.create_ensemble( - prior.experiment, + posterior = prior.experiment.create_ensemble( + name=posterior_name, ensemble_size=prior.ensemble_size, iteration=prior.iteration + 1, - name=posterior_name, prior_ensemble=prior, ) if prior.iteration == 0: diff --git a/src/ert/run_models/ensemble_experiment.py b/src/ert/run_models/ensemble_experiment.py index 6a813fb26a3..2e4c8e12195 100644 --- a/src/ert/run_models/ensemble_experiment.py +++ b/src/ert/run_models/ensemble_experiment.py @@ -7,7 +7,7 @@ from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig -from ert.storage import Ensemble, Experiment, Storage +from ert.storage import LocalEnsemble, LocalExperiment, LocalStorage from ..run_arg import create_run_arguments from .base_run_model import BaseRunModel, StatusEvents @@ -31,15 +31,14 @@ def __init__( minimum_required_realizations: int, random_seed: Optional[int], config: ErtConfig, - storage: Storage, + storage: LocalStorage, queue_config: QueueConfig, status_queue: SimpleQueue[StatusEvents], ): self.ensemble_name = ensemble_name self.experiment_name = experiment_name - self.experiment: Experiment | None = None - self.ensemble: Ensemble | None = None - + self.experiment: LocalExperiment | None = None + self.ensemble: LocalEnsemble | None = None super().__init__( config, storage, @@ -63,8 +62,8 @@ def run_experiment( observations=self.ert_config.observations, responses=self.ert_config.ensemble_config.response_configuration, ) - self.ensemble = self._storage.create_ensemble( - self.experiment, + assert self.experiment + self.ensemble = self.experiment.create_ensemble( name=self.ensemble_name, ensemble_size=self.ensemble_size, ) @@ -82,6 +81,7 @@ def run_experiment( np.array(self.active_realizations, dtype=bool), ensemble=self.ensemble, ) + sample_prior( self.ensemble, np.where(self.active_realizations)[0], diff --git a/src/ert/run_models/ensemble_smoother.py b/src/ert/run_models/ensemble_smoother.py index d23f7238f3d..e3720a33dd3 100644 --- a/src/ert/run_models/ensemble_smoother.py +++ b/src/ert/run_models/ensemble_smoother.py @@ -9,7 +9,7 @@ from ert.config import ErtConfig from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig -from ert.storage import Storage +from ert.storage import LocalStorage from ..config.analysis_config import UpdateSettings from ..config.analysis_module import ESSettings @@ -19,7 +19,6 @@ if TYPE_CHECKING: from ert.config import QueueConfig - logger = logging.getLogger(__name__) @@ -32,7 +31,7 @@ def __init__( minimum_required_realizations: int, random_seed: Optional[int], config: ErtConfig, - storage: Storage, + storage: LocalStorage, queue_config: QueueConfig, es_settings: ESSettings, update_settings: UpdateSettings, @@ -53,7 +52,6 @@ def __init__( ) self.target_ensemble_format = target_ensemble self.experiment_name = experiment_name - self.support_restart = False def run_experiment( @@ -67,39 +65,36 @@ def run_experiment( responses=self.ert_config.ensemble_config.response_configuration, name=self.experiment_name, ) - self.set_env_key("_ERT_EXPERIMENT_ID", str(experiment.id)) - prior = self._storage.create_ensemble( - experiment, - ensemble_size=self.ensemble_size, + + prior = experiment.create_ensemble( name=ensemble_format % 0, + ensemble_size=self.ensemble_size, ) self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id)) + prior_args = create_run_arguments( self.run_paths, np.array(self.active_realizations, dtype=bool), ensemble=prior, ) - sample_prior( prior, np.where(self.active_realizations)[0], random_seed=self.random_seed, ) - self._evaluate_and_postprocess( prior_args, prior, evaluator_server_config, ) - posterior = self.update(prior, ensemble_format % 1) + posterior = self.update(prior, ensemble_format % 1) posterior_args = create_run_arguments( self.run_paths, np.array(self.active_realizations, dtype=bool), ensemble=posterior, ) - self._evaluate_and_postprocess( posterior_args, posterior, diff --git a/src/ert/run_models/iterated_ensemble_smoother.py b/src/ert/run_models/iterated_ensemble_smoother.py index bb389505c95..39090084aab 100644 --- a/src/ert/run_models/iterated_ensemble_smoother.py +++ b/src/ert/run_models/iterated_ensemble_smoother.py @@ -133,10 +133,9 @@ def run_experiment( responses=self.ert_config.ensemble_config.response_configuration, name=self.experiment_name, ) - prior = self._storage.create_ensemble( - experiment=experiment, - ensemble_size=self.ensemble_size, + prior = experiment.create_ensemble( name=target_ensemble_format % 0, + ensemble_size=self.ensemble_size, ) self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id)) self.set_env_key("_ERT_EXPERIMENT_ID", str(experiment.id)) @@ -172,9 +171,8 @@ def run_experiment( ) ) - posterior = self._storage.create_ensemble( - experiment, - name=target_ensemble_format % (prior_iter + 1), # noqa + posterior = experiment.create_ensemble( + name=target_ensemble_format % (prior_iter + 1), ensemble_size=prior.ensemble_size, iteration=prior_iter + 1, prior_ensemble=prior, @@ -194,8 +192,6 @@ def run_experiment( initial_mask=initial_mask, ) - # sies iteration starts at 1, we keep iters at 0, - # so we subtract sies to be 0-indexed analysis_success = prior_iter < (self.sies_iteration - 1) if analysis_success: update_success = True diff --git a/src/ert/run_models/multiple_data_assimilation.py b/src/ert/run_models/multiple_data_assimilation.py index df2558b8c89..64a900650e1 100644 --- a/src/ert/run_models/multiple_data_assimilation.py +++ b/src/ert/run_models/multiple_data_assimilation.py @@ -10,7 +10,7 @@ from ert.config import ErtConfig from ert.enkf_main import sample_prior from ert.ensemble_evaluator import EvaluatorServerConfig -from ert.storage import Ensemble, Storage +from ert.storage import LocalEnsemble, Storage from ..config.analysis_config import UpdateSettings from ..config.analysis_module import ESSettings @@ -87,7 +87,7 @@ def run_experiment( experiment = prior.experiment self.set_env_key("_ERT_EXPERIMENT_ID", str(experiment.id)) self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id)) - assert isinstance(prior, Ensemble) + assert isinstance(prior, LocalEnsemble) if self.start_iteration != prior.iteration + 1: raise ValueError( f"Experiment misconfigured, got starting iteration: {self.start_iteration}," @@ -105,11 +105,10 @@ def run_experiment( name=self.experiment_name, ) - prior = self._storage.create_ensemble( - experiment, + prior = experiment.create_ensemble( + name=self.target_ensemble_format % 0, ensemble_size=self.ensemble_size, iteration=0, - name=self.target_ensemble_format % 0, ) self.set_env_key("_ERT_EXPERIMENT_ID", str(experiment.id)) self.set_env_key("_ERT_ENSEMBLE_ID", str(prior.id)) diff --git a/src/ert/simulator/batch_simulator.py b/src/ert/simulator/batch_simulator.py index 8beb945141f..b37f5448881 100644 --- a/src/ert/simulator/batch_simulator.py +++ b/src/ert/simulator/batch_simulator.py @@ -9,7 +9,10 @@ from .batch_simulator_context import BatchContext if TYPE_CHECKING: - from ert.storage import Ensemble, Storage + from ert.storage import ( + Ensemble, + LocalStorage, + ) class BatchSimulator: @@ -167,7 +170,7 @@ def start( self, case_name: str, case_data: List[Tuple[int, Dict[str, Dict[str, Any]]]], - storage: Storage, + storage: LocalStorage, ) -> BatchContext: """Start batch simulation, return a simulation context @@ -230,8 +233,7 @@ def start( parameters=self.ert_config.ensemble_config.parameter_configuration, responses=self.ert_config.ensemble_config.response_configuration, ) - ensemble = storage.create_ensemble( - experiment.id, + ensemble = experiment.create_ensemble( name=case_name, ensemble_size=self.ert_config.model_config.num_realizations, ) diff --git a/src/ert/storage/__init__.py b/src/ert/storage/__init__.py index dd4d8fd04a6..618ecf7711c 100644 --- a/src/ert/storage/__init__.py +++ b/src/ert/storage/__init__.py @@ -1,3 +1,8 @@ +""" +Hierarchical Structure: +The code follows a clear hierarchical structure: Storage -> Experiment -> Ensemble. +""" + from __future__ import annotations import os diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index 79377c32264..4311ac041fb 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -7,7 +7,7 @@ from functools import lru_cache from pathlib import Path from typing import TYPE_CHECKING, Dict, List, Optional, Tuple, Union -from uuid import UUID +from uuid import UUID, uuid4 import numpy as np import pandas as pd @@ -59,26 +59,43 @@ def __init__( storage: LocalStorage, path: Path, mode: Mode, + *, + ensemble_size: Optional[int] = None, + experiment_id: Optional[UUID] = None, + iteration: int = 0, + name: Optional[str] = None, + prior_ensemble_id: Optional[UUID] = None, ): - """ - Initialize a LocalEnsemble instance. - - Parameters - ---------- - storage : LocalStorage - Local storage instance. - path : Path - File system path to ensemble data. - mode : Mode - Access mode for the ensemble (read/write). - """ - super().__init__(mode) self._storage = storage self._path = path - self._index = _Index.model_validate_json( - (path / "index.json").read_text(encoding="utf-8") - ) + + if mode == Mode.WRITE and not (path / "index.json").exists(): + # Creating a new ensemble + if ensemble_size is None or experiment_id is None or name is None: + raise ValueError( + "ensemble_size, experiment_id, and name are required when creating a new ensemble" + ) + + (path / "experiment").mkdir(parents=True, exist_ok=False) + index = _Index( + id=uuid4(), + ensemble_size=ensemble_size, + experiment_id=experiment_id, + iteration=iteration, + name=name, + prior_ensemble_id=prior_ensemble_id, + started_at=datetime.now(), + ) + with open(path / "index.json", mode="w", encoding="utf-8") as f: + print(index.model_dump_json(), file=f) + else: + # Loading an existing ensemble + index = _Index.model_validate_json( + (path / "index.json").read_text(encoding="utf-8") + ) + + self._index = index self._error_log_name = "error.json" @lru_cache(maxsize=None) @@ -87,64 +104,6 @@ def create_realization_dir(realization: int) -> Path: self._realization_dir = create_realization_dir - @classmethod - def create( - cls, - storage: LocalStorage, - path: Path, - uuid: UUID, - *, - ensemble_size: int, - experiment_id: UUID, - iteration: int = 0, - name: str, - prior_ensemble_id: Optional[UUID], - ) -> LocalEnsemble: - """ - Create a new ensemble in local storage. - - Parameters - ---------- - storage : LocalStorage - Local storage instance. - path : Path - File system path for ensemble data. - uuid : UUID - Unique identifier for the new ensemble. - ensemble_size : int - Number of realizations. - experiment_id : UUID - Identifier of associated experiment. - iteration : int - Iteration number of ensemble. - name : str - Name of ensemble. - prior_ensemble_id : UUID, optional - Identifier of prior ensemble. - - Returns - ------- - local_ensemble : LocalEnsemble - Instance of the newly created ensemble. - """ - - (path / "experiment").mkdir(parents=True, exist_ok=False) - - index = _Index( - id=uuid, - ensemble_size=ensemble_size, - experiment_id=experiment_id, - iteration=iteration, - name=name, - prior_ensemble_id=prior_ensemble_id, - started_at=datetime.now(), - ) - - with open(path / "index.json", mode="w", encoding="utf-8") as f: - print(index.model_dump_json(), file=f) - - return cls(storage, path, Mode.WRITE) - @property def mount_point(self) -> Path: return self._path diff --git a/src/ert/storage/local_experiment.py b/src/ert/storage/local_experiment.py index d55ce295d4e..7a088f4d8af 100644 --- a/src/ert/storage/local_experiment.py +++ b/src/ert/storage/local_experiment.py @@ -1,10 +1,9 @@ from __future__ import annotations import json -from datetime import datetime from functools import cached_property from pathlib import Path -from typing import TYPE_CHECKING, Any, Dict, Generator, List, Optional +from typing import TYPE_CHECKING, Any, Dict, List, Optional from uuid import UUID import numpy as np @@ -20,8 +19,8 @@ SummaryConfig, SurfaceConfig, ) -from ert.config.parsing.context_values import ContextBoolEncoder from ert.config.response_config import ResponseConfig +from ert.storage.local_ensemble import LocalEnsemble from ert.storage.mode import BaseMode, Mode, require_write if TYPE_CHECKING: @@ -85,76 +84,28 @@ def __init__( self._index = _Index.model_validate_json( (path / "index.json").read_text(encoding="utf-8") ) - - @classmethod - def create( - cls, - storage: LocalStorage, - uuid: UUID, - path: Path, - *, - parameters: Optional[List[ParameterConfig]] = None, - responses: Optional[List[ResponseConfig]] = None, - observations: Optional[Dict[str, xr.Dataset]] = None, - simulation_arguments: Optional[Dict[Any, Any]] = None, - name: Optional[str] = None, - ) -> LocalExperiment: - """ - Create a new LocalExperiment and store its configuration data. - - Parameters - ---------- - storage : LocalStorage - Storage instance for experiment creation. - uuid : UUID - Unique identifier for the new experiment. - path : Path - File system path for storing experiment data. - parameters : list of ParameterConfig, optional - List of parameter configurations. - responses : list of ResponseConfig, optional - List of response configurations. - observations : dict of str: xr.Dataset, optional - Observations dictionary. - simulation_arguments : SimulationArguments, optional - Simulation arguments for the experiment. - name : str, optional - Experiment name. Defaults to current date if None. - - Returns - ------- - local_experiment : LocalExperiment - Instance of the newly created experiment. - """ - if name is None: - name = datetime.today().strftime("%Y-%m-%d") - - (path / "index.json").write_text(_Index(id=uuid, name=name).model_dump_json()) - - parameter_data = {} - for parameter in parameters or []: - parameter.save_experiment_data(path) - parameter_data.update({parameter.name: parameter.to_dict()}) - with open(path / cls._parameter_file, "w", encoding="utf-8") as f: - json.dump(parameter_data, f, indent=2) - - response_data = {} - for response in responses or []: - response_data.update({response.name: response.to_dict()}) - with open(path / cls._responses_file, "w", encoding="utf-8") as f: - json.dump(response_data, f, default=str, indent=2) - - if observations: - output_path = path / "observations" - output_path.mkdir() - for obs_name, dataset in observations.items(): - dataset.to_netcdf(output_path / f"{obs_name}", engine="scipy") - - with open(path / cls._metadata_file, "w", encoding="utf-8") as f: - simulation_data = simulation_arguments if simulation_arguments else {} - json.dump(simulation_data, f, cls=ContextBoolEncoder) - - return cls(storage, path, Mode.WRITE) + self._ensembles: List[LocalEnsemble] = [] + self._load_ensembles() + + def _load_ensembles(self) -> None: + ensembles_path = self.mount_point / "ensembles" + if not ensembles_path.exists(): + self._ensembles = [] + return + + ensembles: List[LocalEnsemble] = [] + for ensemble_path in ensembles_path.iterdir(): + try: + ensemble = LocalEnsemble( + storage=self._storage, path=ensemble_path, mode=self.mode + ) + if ensemble.experiment_id == self.id: + ensembles.append(ensemble) + except FileNotFoundError: + continue + + # Sort ensembles by started_at in reverse order + self._ensembles = sorted(ensembles, key=lambda x: x.started_at, reverse=True) @require_write def create_ensemble( @@ -185,20 +136,57 @@ def create_ensemble( local_ensemble : LocalEnsemble The newly created ensemble instance. """ - - return self._storage.create_ensemble( - self, + ensemble = LocalEnsemble( + storage=self._storage, + path=self.mount_point / "ensembles" / name, + mode=Mode.WRITE, ensemble_size=ensemble_size, + experiment_id=self.id, iteration=iteration, name=name, - prior_ensemble=prior_ensemble, + prior_ensemble_id=prior_ensemble.id if prior_ensemble else None, ) + self._ensembles.append(ensemble) + return ensemble + + def get_ensemble(self, uuid: UUID) -> LocalEnsemble: + """ + Retrieves an ensemble by UUID. + + Parameters + ---------- + uuid : UUID + The UUID of the ensemble to retrieve. + + Returns + local_ensemble : LocalEnsemble + The ensemble associated with the given UUID. + """ + return self.ensembles[uuid] + + def get_ensemble_by_name(self, name: str) -> LocalEnsemble: + """ + Retrieves an ensemble by name. + + Parameters + ---------- + name : str + The name of the ensemble to retrieve. + + Returns + ------- + local_ensemble : LocalEnsemble + The ensemble associated with the given name. + """ + + for ens in self.ensembles: + if ens.name == name: + return ens + raise KeyError(f"Ensemble with name '{name}' not found") @property - def ensembles(self) -> Generator[LocalEnsemble, None, None]: - yield from ( - ens for ens in self._storage.ensembles if ens.experiment_id == self.id - ) + def ensembles(self) -> List[LocalEnsemble]: + return self._ensembles @property def metadata(self) -> Dict[str, Any]: diff --git a/src/ert/storage/local_storage.py b/src/ert/storage/local_storage.py index 660d6d18d3c..eedec0779d2 100644 --- a/src/ert/storage/local_storage.py +++ b/src/ert/storage/local_storage.py @@ -26,6 +26,7 @@ from pydantic import BaseModel, Field from ert.config import ErtConfig +from ert.config.parsing.context_values import ContextBoolEncoder from ert.shared import __version__ from ert.storage.local_ensemble import LocalEnsemble from ert.storage.local_experiment import LocalExperiment @@ -34,7 +35,6 @@ Mode, require_write, ) -from ert.storage.realization_storage_state import RealizationStorageState if TYPE_CHECKING: from ert.config import ParameterConfig, ResponseConfig @@ -54,6 +54,13 @@ class _Migrations(BaseModel): class _Index(BaseModel): version: int = _LOCAL_STORAGE_VERSION migrations: MutableSequence[_Migrations] = Field(default_factory=list) + id: Optional[UUID] = None + name: Optional[str] = None + + class Config: + extra = ( + "allow" # This allows additional fields that aren't defined in the model + ) class LocalStorage(BaseMode): @@ -66,6 +73,9 @@ class LocalStorage(BaseMode): through file locks. """ + _parameter_file = Path("parameter.json") + _responses_file = Path("responses.json") + _metadata_file = Path("metadata.json") LOCK_TIMEOUT = 5 def __init__( @@ -130,6 +140,51 @@ def refresh(self) -> None: self._ensembles = self._load_ensembles() self._experiments = self._load_experiments() + @require_write + def create_experiment( + self, + parameters: Optional[List[ParameterConfig]] = None, + responses: Optional[List[ResponseConfig]] = None, + observations: Optional[Dict[str, xr.Dataset]] = None, + simulation_arguments: Optional[Dict[Any, Any]] = None, + name: Optional[str] = None, + ) -> LocalExperiment: + exp_id = uuid4() + path = self._experiment_path(exp_id) + path.mkdir(parents=True, exist_ok=False) + + if name is None: + name = datetime.now().strftime("%Y-%m-%d") + + (path / "index.json").write_text(_Index(id=exp_id, name=name).model_dump_json()) + + parameter_data = {} + for parameter in parameters or []: + parameter.save_experiment_data(path) + parameter_data.update({parameter.name: parameter.to_dict()}) + with open(path / self._parameter_file, "w", encoding="utf-8") as f: + json.dump(parameter_data, f, indent=2) + + response_data = {} + for response in responses or []: + response_data.update({response.name: response.to_dict()}) + with open(path / self._responses_file, "w", encoding="utf-8") as f: + json.dump(response_data, f, default=str, indent=2) + + if observations: + output_path = path / "observations" + output_path.mkdir() + for obs_name, dataset in observations.items(): + dataset.to_netcdf(output_path / f"{obs_name}", engine="scipy") + + with open(path / self._metadata_file, "w", encoding="utf-8") as f: + simulation_data = simulation_arguments if simulation_arguments else {} + json.dump(simulation_data, f, cls=ContextBoolEncoder) + + exp = LocalExperiment(self, path, Mode.WRITE) + self._experiments[exp_id] = exp + return exp + def get_experiment(self, uuid: UUID) -> LocalExperiment: """ Retrieves an experiment by UUID. @@ -147,51 +202,34 @@ def get_experiment(self, uuid: UUID) -> LocalExperiment: return self._experiments[uuid] - def get_ensemble(self, uuid: Union[UUID, str]) -> LocalEnsemble: - """ - Retrieves an ensemble by UUID. - - Parameters - ---------- - uuid : UUID - The UUID of the ensemble to retrieve. - - Returns - local_ensemble : LocalEnsemble - The ensemble associated with the given UUID. - """ - if isinstance(uuid, str): - uuid = UUID(uuid) - return self._ensembles[uuid] - - def get_ensemble_by_name(self, name: str) -> LocalEnsemble: + def get_experiment_by_name(self, name: str) -> LocalExperiment: """ - Retrieves an ensemble by name. + Retrieves an experiment by name. Parameters ---------- name : str - The name of the ensemble to retrieve. + The name of the experiment to retrieve. Returns ------- - local_ensemble : LocalEnsemble - The ensemble associated with the given name. - """ + local_experiment : LocalExperiment + The experiment associated with the given name. - for ens in self._ensembles.values(): - if ens.name == name: - return ens - raise KeyError(f"Ensemble with name '{name}' not found") + Raises + ------ + KeyError + If no experiment with the given name is found. + """ + for exp in self._experiments.values(): + if exp.name == name: + return exp + raise KeyError(f"Experiment with name '{name}' not found") @property def experiments(self) -> Generator[LocalExperiment, None, None]: yield from self._experiments.values() - @property - def ensembles(self) -> Generator[LocalEnsemble, None, None]: - yield from self._ensembles.values() - def _load_index(self) -> _Index: try: return _Index.model_validate_json( @@ -221,14 +259,26 @@ def _load_ensembles(self) -> Dict[UUID, LocalEnsemble]: } def _load_experiments(self) -> Dict[UUID, LocalExperiment]: - experiment_ids = {ens.experiment_id for ens in self._ensembles.values()} - return { - exp_id: LocalExperiment(self, self._experiment_path(exp_id), self.mode) - for exp_id in experiment_ids - } + experiments_path = self.path / "experiments" + if not experiments_path.exists(): + return {} - def _ensemble_path(self, ensemble_id: UUID) -> Path: - return self.path / "ensembles" / str(ensemble_id) + experiments: Dict[UUID, LocalExperiment] = {} + for experiment_path in experiments_path.iterdir(): + try: + exp_id = UUID(experiment_path.name) + experiment = LocalExperiment(self, experiment_path, self.mode) + experiments[exp_id] = experiment + except ValueError: + logger.warning( + f"Invalid experiment directory name: {experiment_path.name}" + ) + except FileNotFoundError: + logger.exception( + f"Failed to load an experiment from path: {experiment_path}" + ) + continue + return experiments def _experiment_path(self, experiment_id: UUID) -> Path: return self.path / "experiments" / str(experiment_id) @@ -287,135 +337,6 @@ def _release_lock(self) -> None: self._lock.release() (self.path / "storage.lock").unlink() - @require_write - def create_experiment( - self, - parameters: Optional[List[ParameterConfig]] = None, - responses: Optional[List[ResponseConfig]] = None, - observations: Optional[Dict[str, xr.Dataset]] = None, - simulation_arguments: Optional[Dict[Any, Any]] = None, - name: Optional[str] = None, - ) -> LocalExperiment: - """ - Creates a new experiment in the storage. - - Parameters - ---------- - parameters : list of ParameterConfig, optional - The parameters for the experiment. - responses : list of ResponseConfig, optional - The responses for the experiment. - observations : dict of str to Dataset, optional - The observations for the experiment. - simulation_arguments : SimulationArguments, optional - The simulation arguments for the experiment. - name : str, optional - The name of the experiment. - - Returns - ------- - local_experiment : LocalExperiment - The newly created experiment. - """ - - exp_id = uuid4() - path = self._experiment_path(exp_id) - path.mkdir(parents=True, exist_ok=False) - - exp = LocalExperiment.create( - self, - exp_id, - path, - parameters=parameters, - responses=responses, - observations=observations, - simulation_arguments=simulation_arguments, - name=name, - ) - - self._experiments[exp.id] = exp - return exp - - @require_write - def create_ensemble( - self, - experiment: Union[LocalExperiment, UUID], - *, - ensemble_size: int, - iteration: int = 0, - name: Optional[str] = None, - prior_ensemble: Union[LocalEnsemble, UUID, None] = None, - ) -> LocalEnsemble: - """ - Creates a new ensemble in the storage. - - Raises a ValueError if the ensemble size is larger than the prior - ensemble. - - Parameters - ---------- - experiment : {LocalExperiment, UUID} - The experiment for which the ensemble is created. - ensemble_size : int - The number of realizations in the ensemble. - iteration : int, optional - The iteration index for the ensemble. - name : str, optional - The name of the ensemble. - prior_ensemble : {LocalEnsemble, UUID}, optional - An optional ensemble to use as a prior. - - Returns - ------- - local_ensemble : LocalEnsemble - The newly created ensemble. - """ - - experiment_id = experiment if isinstance(experiment, UUID) else experiment.id - - uuid = uuid4() - path = self._ensemble_path(uuid) - path.mkdir(parents=True, exist_ok=False) - - prior_ensemble_id: Optional[UUID] = None - if isinstance(prior_ensemble, UUID): - prior_ensemble_id = prior_ensemble - elif isinstance(prior_ensemble, LocalEnsemble): - prior_ensemble_id = prior_ensemble.id - prior_ensemble = ( - self.get_ensemble(prior_ensemble_id) if prior_ensemble_id else None - ) - if prior_ensemble and ensemble_size > prior_ensemble.ensemble_size: - raise ValueError( - f"New ensemble ({ensemble_size}) must be of equal or " - f"smaller size than parent ensemble ({prior_ensemble.ensemble_size})" - ) - ens = LocalEnsemble.create( - self, - path, - uuid, - ensemble_size=ensemble_size, - experiment_id=experiment_id, - iteration=iteration, - name=str(name), - prior_ensemble_id=prior_ensemble_id, - ) - if prior_ensemble: - for realization, state in enumerate(prior_ensemble.get_ensemble_state()): - if state in [ - RealizationStorageState.LOAD_FAILURE, - RealizationStorageState.PARENT_FAILURE, - RealizationStorageState.UNDEFINED, - ]: - ens.set_failure( - realization, - RealizationStorageState.PARENT_FAILURE, - f"Failure from prior: {state}", - ) - - self._ensembles[ens.id] = ens - return ens - @require_write def _add_migration_information( self, from_version: int, to_version: int, name: str diff --git a/tests/integration_tests/analysis/test_es_update.py b/tests/integration_tests/analysis/test_es_update.py index 1af6e595b1f..3b121dc0a7d 100644 --- a/tests/integration_tests/analysis/test_es_update.py +++ b/tests/integration_tests/analysis/test_es_update.py @@ -57,13 +57,16 @@ def test_that_posterior_has_lower_variance_than_prior(): "--disable-monitor", "--realizations", "1-50", + "--experiment-name", + "testing", "poly.ert", ) facade = LibresFacade.from_config_file("poly.ert") with open_storage(facade.enspath) as storage: - prior_ensemble = storage.get_ensemble_by_name("iter-0") + experiment = storage.get_experiment_by_name("testing") + prior_ensemble = experiment.get_ensemble_by_name("iter-0") df_default = prior_ensemble.load_all_gen_kw_data() - posterior_ensemble = storage.get_ensemble_by_name("iter-1") + posterior_ensemble = experiment.get_ensemble_by_name("iter-1") df_target = posterior_ensemble.load_all_gen_kw_data() # The std for the ensemble should decrease diff --git a/tests/performance_tests/test_memory_usage.py b/tests/performance_tests/test_memory_usage.py index 7eb28ad9af3..0ba4d18b9df 100644 --- a/tests/performance_tests/test_memory_usage.py +++ b/tests/performance_tests/test_memory_usage.py @@ -44,15 +44,20 @@ def poly_template(monkeypatch): def test_memory_smoothing(poly_template): ert_config = ErtConfig.from_file("poly.ert") fill_storage_with_data(poly_template, ert_config) + with open_storage(poly_template / "ensembles", mode="w") as storage: - prior_ens = storage.get_ensemble_by_name("prior") - posterior_ens = storage.create_ensemble( - prior_ens.experiment_id, + # Assuming the experiment is named "default" as in the fill_storage_with_data function + experiment = storage.get_experiment_by_name("default") + + prior_ens = experiment.get_ensemble_by_name("prior") + + posterior_ens = experiment.create_ensemble( ensemble_size=prior_ens.ensemble_size, iteration=1, name="posterior", prior_ensemble=prior_ens, ) + smoother_update( prior_ens, posterior_ens, @@ -65,15 +70,18 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None: path = Path(poly_template) / "ensembles" with open_storage(path, mode="w") as storage: ens_config = ert_config.ensemble_config - experiment_id = storage.create_experiment( + experiment = storage.create_experiment( parameters=ens_config.parameter_configuration, responses=ens_config.response_configuration, observations=ert_config.observations, + name="default", ) - source = storage.create_ensemble(experiment_id, name="prior", ensemble_size=100) + + source = experiment.create_ensemble(name="prior", ensemble_size=100) summary_obs_keys = ens_config.getKeylistFromImplType(SummaryConfig) realizations = list(range(ert_config.model_config.num_realizations)) + for _, obs in ert_config.observations.items(): data_key = obs.attrs["response"] for real in realizations: @@ -94,8 +102,7 @@ def fill_storage_with_data(poly_template: Path, ert_config: ErtConfig) -> None: sample_prior(source, realizations, ens_config.parameters) - storage.create_ensemble( - source.experiment_id, + experiment.create_ensemble( ensemble_size=source.ensemble_size, iteration=1, name="target_ens",