diff --git a/ert3/engine/_record.py b/ert3/engine/_record.py index cc25ca2ae3d..56dbba8dd34 100644 --- a/ert3/engine/_record.py +++ b/ert3/engine/_record.py @@ -47,4 +47,5 @@ def sample_record( record_name=record_name, ensemble_record=ensrecord, experiment_name=experiment_name, + is_parameter=True, ) diff --git a/ert3/engine/_run.py b/ert3/engine/_run.py index 39a25dff4c6..b4bb0e66d61 100644 --- a/ert3/engine/_run.py +++ b/ert3/engine/_run.py @@ -9,15 +9,21 @@ def _prepare_experiment( experiment_name: str, ensemble: ert3.config.EnsembleConfig, ensemble_size: int, + parameters_config: ert3.config.ParametersConfig, ) -> None: if ert3.workspace.experiment_has_run(workspace_root, experiment_name): raise ValueError(f"Experiment {experiment_name} have been carried out.") - parameter_names = [elem.record for elem in ensemble.input] + parameter_names = {elem.record for elem in ensemble.input} + parameters = { + param.name: list(param.variables) + for param in parameters_config + if param.name in parameter_names + } ert3.storage.init_experiment( workspace=workspace_root, experiment_name=experiment_name, - parameters=parameter_names, + parameters=parameters, ensemble_size=ensemble_size, ) @@ -64,7 +70,9 @@ def _prepare_evaluation( # This reassures mypy that the ensemble size is defined assert ensemble.size is not None - _prepare_experiment(workspace_root, experiment_name, ensemble, ensemble.size) + _prepare_experiment( + workspace_root, experiment_name, ensemble, ensemble.size, parameters_config + ) for input_record in ensemble.input: record_name = input_record.record @@ -111,7 +119,9 @@ def _prepare_sensitivity( ) input_records = ert3.algorithms.one_at_the_time(parameter_distributions) - _prepare_experiment(workspace_root, experiment_name, ensemble, len(input_records)) + _prepare_experiment( + workspace_root, experiment_name, ensemble, len(input_records), parameters_config + ) parameters: Dict[str, List[ert3.data.Record]] = { param.record: [] for param in ensemble.input diff --git a/ert3/storage/_storage.py b/ert3/storage/_storage.py index ff869ec845a..3d9cdb42e8b 100644 --- a/ert3/storage/_storage.py +++ b/ert3/storage/_storage.py @@ -1,6 +1,6 @@ import json from pathlib import Path -from typing import Any, Dict, Iterable, Optional, Set +from typing import Any, Dict, Iterable, Mapping, MutableMapping, Optional, Set, List import io import logging import pandas as pd @@ -130,7 +130,7 @@ def init(*, workspace: Path) -> None: _init_experiment( workspace=workspace, experiment_name=f"{workspace}.{special_key}", - parameters=[], + parameters={}, ensemble_size=-1, ) @@ -139,7 +139,7 @@ def init_experiment( *, workspace: Path, experiment_name: str, - parameters: Iterable[str], + parameters: Mapping[str, Iterable[str]], ensemble_size: int, ) -> None: if ensemble_size <= 0: @@ -157,7 +157,7 @@ def _init_experiment( *, workspace: Path, experiment_name: str, - parameters: Iterable[str], + parameters: Mapping[str, Iterable[str]], ensemble_size: int, ) -> None: if not experiment_name: @@ -171,11 +171,16 @@ def _init_experiment( exp_response = _post_to_server(path="experiments", json={"name": experiment_name}) exp_id = exp_response.json()["id"] response = _post_to_server( - path=f"experiments/{exp_id}/ensembles", + f"experiments/{exp_id}/ensembles", json={ - "parameter_names": list(parameters), + "parameter_names": [ + f"{record}.{param}" + for record, params in parameters.items() + for param in params + ], "response_names": [], "size": ensemble_size, + "metadata": {"name": experiment_name}, }, ) if response.status_code != 200: @@ -222,24 +227,64 @@ def _add_numerical_data( ensemble_id = experiment["ensembles"][0] # currently just one ens per exp record_url = f"ensembles/{ensemble_id}/records/{record_name}" - for idx, record in enumerate(ensemble_record.records): - df = pd.DataFrame([record.data], columns=record.index, index=[idx]) + dataframe = pd.DataFrame( + [r.data for r in ensemble_record.records], + columns=ensemble_record.records[0].index, + ) + + response = _post_to_server( + f"{record_url}/matrix", + data=dataframe.to_csv().encode(), + headers={"content-type": "text/csv"}, + ) + + if response.status_code == 409: + raise ert3.exceptions.ElementExistsError("Record already exists") + + if response.status_code != 200: + raise ert3.exceptions.StorageError(response.text) + + meta_response = _put_to_server(f"{record_url}/metadata", json=metadata.dict()) + + if meta_response.status_code != 200: + raise ert3.exceptions.StorageError(meta_response.text) + + +def _add_parameter_numerical_data( + workspace: Path, + experiment_name: str, + record_name: str, + ensemble_record: ert3.data.EnsembleRecord, +) -> None: + experiment = _get_experiment_by_name(experiment_name) + if experiment is None: + raise ert3.exceptions.NonExistantExperiment( + f"Cannot add {record_name} data to " + f"non-existing experiment: {experiment_name}" + ) + + metadata = _NumericalMetaData( + ensemble_size=ensemble_record.ensemble_size, + record_type=_get_record_type(ensemble_record), + ) + + ensemble_id = experiment["ensembles"][0] # currently just one ens per exp + + dataframe = pd.DataFrame([r.data for r in ensemble_record.records]) + for param_name in dataframe: + record_url = f"ensembles/{ensemble_id}/records/{record_name}.{param_name}" response = _post_to_server( - path=f"{record_url}/matrix", - params={"realization_index": idx}, - data=df.to_csv().encode(), + f"{record_url}/matrix", + data=dataframe[param_name].to_csv().encode(), headers={"content-type": "text/csv"}, ) if response.status_code == 409: raise ert3.exceptions.ElementExistsError("Record already exists") - if response.status_code != 200: raise ert3.exceptions.StorageError(response.text) - meta_response = _put_to_server( - path=f"{record_url}/metadata", - params={"realization_index": idx}, + f"{record_url}/metadata", json=metadata.dict(), ) @@ -247,33 +292,22 @@ def _add_numerical_data( raise ert3.exceptions.StorageError(meta_response.text) -def _response2record( - response_content: bytes, record_type: ert3.data.RecordType, realization_id: int -) -> ert3.data.Record: +def _response2records( + response_content: bytes, record_type: ert3.data.RecordType +) -> List[ert3.data.Record]: dataframe = pd.read_csv( io.BytesIO(response_content), index_col=0, float_precision="round_trip" ) - raw_index = tuple(dataframe.columns) if record_type == ert3.data.RecordType.LIST_FLOAT: - array_data = tuple( - float(dataframe.loc[realization_id][raw_idx]) for raw_idx in raw_index - ) - return ert3.data.Record(data=array_data) + return [ert3.data.Record(data=row.to_list()) for _, row in dataframe.iterrows()] elif record_type == ert3.data.RecordType.MAPPING_INT_FLOAT: - int_index = tuple(int(e) for e in dataframe.columns) - idata = { - idx: float(dataframe.loc[realization_id][raw_idx]) - for raw_idx, idx in zip(raw_index, int_index) - } - return ert3.data.Record(data=idata) + return [ + ert3.data.Record(data={int(k): v for k, v in row.to_dict().items()}) + for _, row in dataframe.iterrows() + ] elif record_type == ert3.data.RecordType.MAPPING_STR_FLOAT: - str_index = tuple(str(e) for e in dataframe.columns) - sdata = { - idx: float(dataframe.loc[realization_id][raw_idx]) - for raw_idx, idx in zip(raw_index, str_index) - } - return ert3.data.Record(data=sdata) + return [ert3.data.Record(data=row.to_dict()) for _, row in dataframe.iterrows()] else: raise ValueError( f"Unexpected record type when loading numerical record: {record_type}" @@ -282,8 +316,7 @@ def _response2record( def _get_numerical_metadata(ensemble_id: str, record_name: str) -> _NumericalMetaData: response = _get_from_server( - path=f"ensembles/{ensemble_id}/records/{record_name}/metadata", - params={"realization_index": 0}, # This assumes there is a realization 0 + f"ensembles/{ensemble_id}/records/{record_name}/metadata" ) if response.status_code == 404: @@ -309,30 +342,48 @@ def _get_numerical_data( ensemble_id = experiment["ensembles"][0] # currently just one ens per exp metadata = _get_numerical_metadata(ensemble_id, record_name) - records = [] - for real_id in range(metadata.ensemble_size): - response = _get_from_server( - path=f"ensembles/{ensemble_id}/records/{record_name}", - params={"realization_index": real_id}, - headers={"accept": "text/csv"}, + response = _get_from_server( + f"ensembles/{ensemble_id}/records/{record_name}", + headers={"accept": "text/csv"}, + ) + + if response.status_code == 404: + raise ert3.exceptions.ElementMissingError( + f"No {record_name} data for experiment: {experiment_name}" ) - if response.status_code == 404: - raise ert3.exceptions.ElementMissingError( - f"No {record_name} data for experiment: {experiment_name}" - ) + if response.status_code != 200: + raise ert3.exceptions.StorageError(response.text) + + records = _response2records( + response.content, + metadata.record_type, + ) + + return ert3.data.EnsembleRecord(records=records) - if response.status_code != 200: - raise ert3.exceptions.StorageError(response.text) - record = _response2record( - response.content, - metadata.record_type, - real_id, +def _get_experiment_parameters( + workspace: Path, experiment_name: str +) -> Mapping[str, Iterable[str]]: + experiment = _get_experiment_by_name(experiment_name) + if experiment is None: + raise ert3.exceptions.NonExistantExperiment( + f"Cannot get parameters from non-existing experiment: {experiment_name}" ) - records.append(record) - return ert3.data.EnsembleRecord(records=records) + ensemble_id = experiment["ensembles"][0] # currently just one ens per exp + response = _get_from_server(f"ensembles/{ensemble_id}/parameters") + if response.status_code != 200: + raise ert3.exceptions.StorageError(response.text) + parameters: MutableMapping[str, List[str]] = {} + for name in response.json(): + key, val = name.split(".") + if key in parameters: + parameters[key].append(val) + else: + parameters[key] = [val] + return parameters def add_ensemble_record( @@ -341,11 +392,17 @@ def add_ensemble_record( record_name: str, ensemble_record: ert3.data.EnsembleRecord, experiment_name: Optional[str] = None, + is_parameter: bool = False, ) -> None: if experiment_name is None: experiment_name = f"{workspace}.{_ENSEMBLE_RECORDS}" - _add_numerical_data(workspace, experiment_name, record_name, ensemble_record) + if is_parameter: + _add_parameter_numerical_data( + workspace, experiment_name, record_name, ensemble_record + ) + else: + _add_numerical_data(workspace, experiment_name, record_name, ensemble_record) def get_ensemble_record( @@ -356,8 +413,52 @@ def get_ensemble_record( ) -> ert3.data.EnsembleRecord: if experiment_name is None: experiment_name = f"{workspace}.{_ENSEMBLE_RECORDS}" + experiment = _get_experiment_by_name(experiment_name) + if experiment is None: + raise ert3.exceptions.NonExistantExperiment( + f"Cannot get {record_name} data, no experiment named: {experiment_name}" + ) - return _get_numerical_data(workspace, experiment_name, record_name) + param_names = _get_experiment_parameters(workspace, experiment_name) + if record_name in param_names: + ensemble_records = [ + _get_numerical_data( + workspace, experiment_name, f"{record_name}.{param_name}" + ) + for param_name in param_names[record_name] + ] + + # Combine records into the first ensemble record + combined_records: List[ert3.data.Record] = [] + for record_idx, _ in enumerate(ensemble_records[0].records): + record0 = ensemble_records[0].records[record_idx] + + if isinstance(record0.data, list): + ldata = [ + val + for data in ( + ensemble_record.records[record_idx].data + for ensemble_record in ensemble_records + ) + if isinstance(data, list) + for val in data + ] + combined_records.append(ert3.data.Record(data=ldata)) + elif isinstance(record0.data, dict): + ddata = { + key: val + for data in ( + ensemble_record.records[record_idx].data + for ensemble_record in ensemble_records + ) + if isinstance(data, dict) + for key, val in data.items() + } + combined_records.append(ert3.data.Record(data=ddata)) + + return ert3.data.EnsembleRecord(records=combined_records) + else: + return _get_numerical_data(workspace, experiment_name, record_name) def get_ensemble_record_names( @@ -372,7 +473,7 @@ def get_ensemble_record_names( ) ensemble_id = experiment["ensembles"][0] # currently just one ens per exp - response = _get_from_server(path=f"ensembles/{ensemble_id}/records") + response = _get_from_server(f"ensembles/{ensemble_id}/records") if response.status_code != 200: raise ert3.exceptions.StorageError(response.text) return list(response.json().keys()) @@ -381,18 +482,7 @@ def get_ensemble_record_names( def get_experiment_parameters( *, workspace: Path, experiment_name: str ) -> Iterable[str]: - - experiment = _get_experiment_by_name(experiment_name) - if experiment is None: - raise ert3.exceptions.NonExistantExperiment( - f"Cannot get parameters from non-existing experiment: {experiment_name}" - ) - - ensemble_id = experiment["ensembles"][0] # currently just one ens per exp - response = _get_from_server(path=f"ensembles/{ensemble_id}/parameters") - if response.status_code != 200: - raise ert3.exceptions.StorageError(response.text) - return list(response.json()) + return list(_get_experiment_parameters(workspace, experiment_name)) def delete_experiment(*, workspace: Path, experiment_name: str) -> None: diff --git a/tests/ert3/storage/test_storage.py b/tests/ert3/storage/test_storage.py index ba6846d96c7..cca7340c7f2 100644 --- a/tests/ert3/storage/test_storage.py +++ b/tests/ert3/storage/test_storage.py @@ -25,7 +25,7 @@ def test_ensemble_size_zero(tmpdir, ert_storage): ert3.storage.init_experiment( workspace=tmpdir, experiment_name="my_experiment", - parameters=[], + parameters={}, ensemble_size=0, ) @@ -37,7 +37,7 @@ def test_none_as_experiment_name(tmpdir, ert_storage): ert3.storage.init_experiment( workspace=tmpdir, experiment_name=None, - parameters=[], + parameters={}, ensemble_size=10, ) @@ -48,7 +48,7 @@ def test_double_add_experiment(tmpdir, ert_storage): ert3.storage.init_experiment( workspace=tmpdir, experiment_name="my_experiment", - parameters=[], + parameters={}, ensemble_size=42, ) with pytest.raises( @@ -58,7 +58,7 @@ def test_double_add_experiment(tmpdir, ert_storage): ert3.storage.init_experiment( workspace=tmpdir, experiment_name="my_experiment", - parameters=[], + parameters={}, ensemble_size=42, ) @@ -68,14 +68,17 @@ def test_add_experiments(tmpdir, ert_storage): ert3.storage.init(workspace=tmpdir) experiment_names = ["a", "b", "c", "super-experiment", "explosions"] - experiment_parameters = [ + experiment_parameter_records = [ ["x"], ["a", "b"], ["alpha", "beta"], ["oxygen", "heat", "fuel"], ] - experiments = zip(experiment_names, experiment_parameters) - for idx, (experiment_name, experiment_parameters) in enumerate(experiments): + experiments = zip(experiment_names, experiment_parameter_records) + for idx, (experiment_name, experiment_parameter_records) in enumerate(experiments): + experiment_parameters = { + key: ["some_coeff"] for key in experiment_parameter_records + } ert3.storage.init_experiment( workspace=tmpdir, experiment_name=experiment_name, @@ -89,7 +92,7 @@ def test_add_experiments(tmpdir, ert_storage): parameters = ert3.storage.get_experiment_parameters( workspace=tmpdir, experiment_name=experiment_name ) - assert experiment_parameters == parameters + assert experiment_parameter_records == parameters @pytest.mark.requires_ert_storage @@ -134,8 +137,11 @@ def test_add_and_get_ensemble_record(tmpdir, raw_ensrec, ert_storage): ensrecord = ert3.data.EnsembleRecord(records=raw_ensrec) ert3.storage.add_ensemble_record( - workspace=tmpdir, record_name="my_ensemble_record", ensemble_record=ensrecord + workspace=tmpdir, + record_name="my_ensemble_record", + ensemble_record=ensrecord, ) + retrieved_ensrecord = ert3.storage.get_ensemble_record( workspace=tmpdir, record_name="my_ensemble_record" ) @@ -143,6 +149,54 @@ def test_add_and_get_ensemble_record(tmpdir, raw_ensrec, ert_storage): assert ensrecord == retrieved_ensrecord +@pytest.mark.requires_ert_storage +@pytest.mark.parametrize( + "raw_ensrec", + ( + [{"data": [i + 0.5, i + 1.1, i + 2.2]} for i in range(3)], + [{"data": {"a": i + 0.5, "b": i + 1.1, "c": i + 2.2}} for i in range(5)], + [{"data": {2: i + 0.5, 5: i + 1.1, 7: i + 2.2}} for i in range(2)], + ), +) +def test_add_and_get_ensemble_parameter_record(tmpdir, raw_ensrec, ert_storage): + raw_data = raw_ensrec[0]["data"] + assert isinstance(raw_data, (list, dict)) + if isinstance(raw_data, list): + indices = [str(x) for x in range(len(raw_data))] + else: + indices = [str(x) for x in raw_data] + + ert3.storage.init(workspace=tmpdir) + ert3.storage.init_experiment( + workspace=tmpdir, + experiment_name="experiment_name", + parameters={"my_ensemble_record": indices}, + ensemble_size=len(raw_ensrec), + ) + + ensrecord = ert3.data.EnsembleRecord(records=raw_ensrec) + ert3.storage.add_ensemble_record( + workspace=tmpdir, + experiment_name="experiment_name", + record_name="my_ensemble_record", + ensemble_record=ensrecord, + is_parameter=True, + ) + + record_names = ert3.storage.get_ensemble_record_names( + workspace=tmpdir, experiment_name="experiment_name" + ) + assert {f"my_ensemble_record.{x}" for x in indices} == set(record_names) + + retrieved_ensrecord = ert3.storage.get_ensemble_record( + workspace=tmpdir, + experiment_name="experiment_name", + record_name="my_ensemble_record", + ) + + assert ensrecord == retrieved_ensrecord + + @pytest.mark.requires_ert_storage def test_add_ensemble_record_twice(tmpdir, ert_storage): ert3.storage.init(workspace=tmpdir) @@ -183,7 +237,7 @@ def test_add_and_get_experiment_ensemble_record(tmpdir, ert_storage): ert3.storage.init_experiment( workspace=tmpdir, experiment_name=experiment, - parameters=[], + parameters={}, ensemble_size=ensemble_size, ) for nid in range(1, 3): @@ -256,7 +310,7 @@ def test_get_record_names(tmpdir, ert_storage): ert3.storage.init_experiment( workspace=tmpdir, experiment_name=experiment, - parameters=[], + parameters={}, ensemble_size=ensemble_size, ) for nid in range(1, 3): @@ -297,7 +351,7 @@ def test_delete_experiment(tmpdir, ert_storage): ert3.storage.init_experiment( workspace=tmpdir, experiment_name="test", - parameters=[], + parameters={}, ensemble_size=42, )