diff --git a/src/ert/dark_storage/common.py b/src/ert/dark_storage/common.py index a8b4002d362..07c3b7698ae 100644 --- a/src/ert/dark_storage/common.py +++ b/src/ert/dark_storage/common.py @@ -47,11 +47,11 @@ def data_for_key( elif key in ensemble.get_gen_kw_keyset(): t= time.perf_counter() data = ensemble.load_all_gen_kw_data(key.split(":")[0], realization_index) + print(f"data_for_key 2 {time.perf_counter() -t }") if data.empty: return pd.DataFrame() data = data[key].to_frame().dropna() data.columns = pd.Index([0]) - print(f"data_for_key 2 {time.perf_counter() -t }") elif key in ensemble.get_gen_data_keyset(): t= time.perf_counter() key_parts = key.split("@") @@ -66,8 +66,10 @@ def data_for_key( ).T print(f"data_for_key 3 {time.perf_counter() -t }") except (ValueError, KeyError): + print(f"data_for_key 3 except {time.perf_counter() -t }") return pd.DataFrame() else: + print("data_for_key not found") return pd.DataFrame() try: diff --git a/src/ert/dark_storage/endpoints/ensembles.py b/src/ert/dark_storage/endpoints/ensembles.py index 8d0a53fc02c..d0739d89557 100644 --- a/src/ert/dark_storage/endpoints/ensembles.py +++ b/src/ert/dark_storage/endpoints/ensembles.py @@ -42,6 +42,26 @@ def get_ensemble( ) +@router.get("/ensembles/{ensemble_id}/small", response_model=js.EnsembleOut) +def get_ensemble_small( + *, + storage: StorageAccessor = DEFAULT_STORAGE, + ensemble_id: UUID, +) -> js.EnsembleOut: + ensemble = storage.get_ensemble(ensemble_id) + return js.EnsembleOut( + id=ensemble_id, + children=[], + parent=None, + experiment_id=ensemble.experiment_id, + userdata={"name": ensemble.name}, + size=ensemble.ensemble_size, + parameter_names=[], + response_names=[], + child_ensemble_ids=[], + ) + + @router.put("/ensembles/{ensemble_id}/userdata") async def replace_ensemble_userdata( *, diff --git a/src/ert/dark_storage/endpoints/records.py b/src/ert/dark_storage/endpoints/records.py index 884d62a6c4f..f2aa28f575e 100644 --- a/src/ert/dark_storage/endpoints/records.py +++ b/src/ert/dark_storage/endpoints/records.py @@ -175,7 +175,7 @@ async def get_ensemble_record( name: str, ensemble_id: UUID, accept: Annotated[Union[str, None], Header()] = None, - realization_index: Optional[int] = None, + realization_index: Optional[int] = None, # remove this?? Is it used anywhere? label: Optional[str] = None, ) -> Any: import time @@ -192,16 +192,16 @@ async def get_ensemble_record( dataframe.columns = [str(s) for s in dataframe.columns] stream = io.BytesIO() dataframe.to_parquet(stream) - print(f"rest {time.perf_counter()- t}") + print(f"rest1 {name} {time.perf_counter()- t}") return Response( content=stream.getvalue(), media_type="application/x-parquet", ) elif media_type == "application/json": - print(f"rest {time.perf_counter()- t}") + print(f"rest2 {time.perf_counter()- t}") return Response(dataframe.to_json(), media_type="application/json") else: - print(f"rest {time.perf_counter()- t}") + print(f"rest3 {time.perf_counter()- t}") return Response( content=dataframe.to_csv().encode(), media_type="text/csv", @@ -262,6 +262,8 @@ def get_ensemble_responses( for obs in res.get_observations(): name_dict[obs.observation_key] = obs.observation_type + import time + t= time.perf_counter() for name in ens.get_summary_keyset(): response_map[str(name)] = js.RecordOut( id=UUID(int=0), @@ -269,7 +271,9 @@ def get_ensemble_responses( userdata={"data_origin": "Summary"}, has_observations=name in name_dict, ) + print(f"get_ensemble_responses 1 {time.perf_counter() - t}") + t= time.perf_counter() for name in res.get_gen_data_keys(): obs_keys = res.observation_keys(name) response_map[str(name)] = js.RecordOut( @@ -278,5 +282,6 @@ def get_ensemble_responses( userdata={"data_origin": "GEN_DATA"}, has_observations=len(obs_keys) != 0, ) + print(f"get_ensemble_responses 2 {time.perf_counter() - t}") return response_map diff --git a/src/ert/dark_storage/enkf.py b/src/ert/dark_storage/enkf.py index d10d6dc1f63..45dd69033da 100644 --- a/src/ert/dark_storage/enkf.py +++ b/src/ert/dark_storage/enkf.py @@ -35,8 +35,14 @@ def init_facade() -> LibresFacade: def get_res(*, _: None = DEFAULT_SECURITY) -> LibresFacade: + print("get_res") if _libres_facade is None: - return init_facade() + import time + t = time.perf_counter() + a= init_facade() + print(f"get_res - init_facede, t={time.perf_counter() -t}") + return a + return _libres_facade @@ -52,7 +58,11 @@ def get_storage(*, res: LibresFacade = DEFAULT_LIBRESFACADE) -> StorageReader: def reset_res(*, _: None = DEFAULT_SECURITY) -> None: + import time + print("reset_res") + t= time.perf_counter() global _libres_facade # noqa: PLW0603 if _libres_facade is not None: _libres_facade = None + print(f"t {time.perf_counter()-t}") return _libres_facade diff --git a/src/ert/gui/tools/plot/plot_api.py b/src/ert/gui/tools/plot/plot_api.py index cc7f73969fe..03b486cb7b7 100644 --- a/src/ert/gui/tools/plot/plot_api.py +++ b/src/ert/gui/tools/plot/plot_api.py @@ -3,7 +3,7 @@ from itertools import combinations as combi from json.decoder import JSONDecodeError from typing import List, Optional - +import time import httpx import pandas as pd import requests @@ -21,8 +21,12 @@ def __init__(self): self._reset_storage_facade() def _reset_storage_facade(self): + t= time.perf_counter() with StorageService.session() as client: + print(f"_reset_storage_facade time1 {time.perf_counter()-t}") + t= time.perf_counter() client.post("/updates/facade", timeout=self._timeout) + print(f"_reset_storage_facade time2 {time.perf_counter()-t}") def _get_case(self, name: str) -> Optional[dict]: for e in self._get_all_cases(): @@ -43,7 +47,7 @@ def _get_all_cases(self) -> List[dict]: for experiment in experiments: for ensemble_id in experiment["ensemble_ids"]: response = client.get( - f"/ensembles/{ensemble_id}", timeout=self._timeout + f"/ensembles/{ensemble_id}/small", timeout=self._timeout ) self._check_response(response) response_json = response.json() @@ -69,11 +73,16 @@ def _check_response(response: requests.Response): ) def _get_experiments(self) -> dict: + import time + t= time.perf_counter() with StorageService.session() as client: + print(f"_get_experiments1 {time.perf_counter()- t}") + t= time.perf_counter() response: requests.Response = client.get( "/experiments", timeout=self._timeout ) self._check_response(response) + print(f"_get_experiments3 {time.perf_counter()- t}") return response.json() def _get_ensembles(self, experiement_id) -> List: @@ -97,8 +106,14 @@ def all_data_type_keys(self) -> List: import time t1= time.perf_counter() with StorageService.session() as client: - print(f"plotAPI - all_data_type_keys - session {time.perf_counter()- t1}") - for experiment in self._get_experiments(): + t= time.perf_counter() + response: requests.Response = client.get( + "/experiments", timeout=self._timeout + ) + self._check_response(response) + print(f"_get_experiments3 {time.perf_counter()- t}") + + for experiment in response.json(): for ensemble in self._get_ensembles(experiment["id"]): t= time.perf_counter() response: requests.Response = client.get( @@ -131,7 +146,8 @@ def all_data_type_keys(self) -> List: "log_scale": key.startswith("LOG10_"), } print(f"plotAPI - all_data_type_keys2 {time.perf_counter()- t}") - print(f"plotAPI - all_data_type_keys {time.perf_counter()- t1}") + print(f"plotAPI - all_data_type_keys {time.perf_counter()- t1}") + print(f"plotAPI - all_data_type_keys -session end {time.perf_counter()- t1}") return list(all_keys.values()) def get_all_cases_not_running(self) -> List: @@ -210,6 +226,8 @@ def history_data(self, key, case=None) -> pd.DataFrame: given data key, if any. The row index is the index/date and the column index is the key.""" + print(f"history data {key}") + if ":" in key: head, tail = key.split(":", 2) history_key = f"{head}H:{tail}" diff --git a/src/ert/gui/tools/plot/plot_window.py b/src/ert/gui/tools/plot/plot_window.py index 381eedc8e58..65c31ee51d8 100644 --- a/src/ert/gui/tools/plot/plot_window.py +++ b/src/ert/gui/tools/plot/plot_window.py @@ -42,18 +42,23 @@ class PlotWindow(QMainWindow): def __init__(self, config_file, parent): QMainWindow.__init__(self, parent) - + import time + t_total= time.perf_counter() logger.info("PlotWindow __init__") print("PlotWindow 1") self.setMinimumWidth(850) self.setMinimumHeight(650) + self.setWindowTitle(f"Plotting - {config_file}") self.activateWindow() QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor) try: + print("plotapi init") + t= time.perf_counter() self._api = PlotApi() + print(f"PlotApi init {time.perf_counter()-t}") self._key_definitions = self._api.all_data_type_keys() except (RequestError, TimeoutError) as e: logger.exception(e) @@ -89,7 +94,9 @@ def __init__(self, config_file, parent): QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor) try: + t= time.perf_counter() cases = self._api.get_all_cases_not_running() + print(f" Plotwindow2.2 {time.perf_counter()-t }") except (RequestError, TimeoutError) as e: logger.exception(e) QMessageBox.critical(self, "Request Failed", f"{e}") @@ -108,7 +115,7 @@ def __init__(self, config_file, parent): current_plot_widget = self._plot_widgets[self._central_tab.currentIndex()] self._data_type_keys_widget.selectDefault() self._updateCustomizer(current_plot_widget) - print("PlotWindow done") + print(f"PlotWindow init done, time= {time.perf_counter() -t_total }s") def currentPlotChanged(self): key_def = self.getSelectedKey() @@ -116,6 +123,8 @@ def currentPlotChanged(self): return key = key_def["key"] + print(f"currentPlotChanged {key}") + plot_widget = self._central_tab.currentWidget() if plot_widget._plotter.dimensionality == key_def["dimensionality"]: @@ -124,7 +133,10 @@ def currentPlotChanged(self): case_to_data_map = {} for case in cases: try: + import time + t= time.perf_counter() case_to_data_map[case] = self._api.data_for_key(case, key) + print(f"currentPlotChanged data_for_key= {time.perf_counter() -t}") except (RequestError, TimeoutError) as e: logger.exception(e) msg = f"{e}" @@ -134,7 +146,10 @@ def currentPlotChanged(self): observations = None if key_def["observations"] and cases: try: + import time + t= time.perf_counter() observations = self._api.observations_for_key(cases[0], key) + print(f"currentPlotChanged obs_forkey= {time.perf_counter() -t}") except (RequestError, TimeoutError) as e: logger.exception(e) msg = f"{e}" diff --git a/src/ert/services/storage_service.py b/src/ert/services/storage_service.py index e3cc7b10a1a..f8e9e0e1650 100644 --- a/src/ert/services/storage_service.py +++ b/src/ert/services/storage_service.py @@ -59,6 +59,7 @@ def fetch_url(self) -> str: for url in self.fetch_conn_info()["urls"]: try: + print("fetch_url") resp = requests.get(f"{url}/healthcheck", auth=self.fetch_auth()) if resp.status_code == 200: self._url = url diff --git a/src/ert/storage/local_ensemble.py b/src/ert/storage/local_ensemble.py index cb268e5a6be..7f86fdf14e1 100644 --- a/src/ert/storage/local_ensemble.py +++ b/src/ert/storage/local_ensemble.py @@ -105,8 +105,8 @@ def get_realization_mask_without_parent_failure(self) -> npt.NDArray[np.bool_]: def get_realization_mask_with_parameters(self) -> npt.NDArray[np.bool_]: return np.array([self._get_parameter(i) for i in range(self.ensemble_size)]) - def get_realization_mask_with_responses(self) -> npt.NDArray[np.bool_]: - return np.array([self._get_response(i) for i in range(self.ensemble_size)]) + def get_realization_mask_with_responses(self, key: Optional[str] = None) -> npt.NDArray[np.bool_]: + return np.array([self._get_response(i, key) for i in range(self.ensemble_size)]) def _get_parameter(self, realization: int) -> bool: if not self.experiment.parameter_configuration: @@ -117,10 +117,14 @@ def _get_parameter(self, realization: int) -> bool: for parameter in self.experiment.parameter_configuration ) - def _get_response(self, realization: int) -> bool: + def _get_response(self, realization: int, key: Optional[str] = None) -> bool: if not self.experiment.response_configuration: return False path = self.mount_point / f"realization-{realization}" + + if key: + return (path / f"{key}.nc").exists() + return all( (path / f"{response}.nc").exists() for response in self._filter_response_configuration() @@ -180,9 +184,9 @@ def realizations_initialized(self, realizations: List[int]) -> bool: return all((responses[real] or parameters[real]) for real in realizations) - def get_realization_list_with_responses(self) -> List[int]: + def get_realization_list_with_responses(self, key: Optional[str] = None) -> List[int]: return [ - idx for idx, b in enumerate(self.get_realization_mask_with_responses()) if b + idx for idx, b in enumerate(self.get_realization_mask_with_responses(key)) if b ] def set_failure( @@ -251,23 +255,21 @@ def _get_gen_data_config(self, key: str) -> GenDataConfig: assert isinstance(config, GenDataConfig) return config - @deprecated("Check the experiment for registered responses") + #@deprecated("Check the experiment for registered responses") def get_gen_data_keyset(self) -> List[str]: - keylist = [ - k - for k, v in self.experiment.response_info.items() - if "_ert_kind" in v and v["_ert_kind"] == "GenDataConfig" - ] - + import time + t= time.perf_counter() gen_data_list = [] - for key in keylist: - gen_data_config = self._get_gen_data_config(key) - if gen_data_config.report_steps is None: - gen_data_list.append(f"{key}@0") - else: - for report_step in gen_data_config.report_steps: - gen_data_list.append(f"{key}@{report_step}") - return sorted(gen_data_list, key=lambda k: k.lower()) + for k,v in self.experiment.response_configuration.items(): + if isinstance(v, GenDataConfig): + if v.report_steps is None: + gen_data_list.append(f"{k}@0") + else: + for report_step in v.report_steps: + gen_data_list.append(f"{k}@{report_step}") + ss= sorted(gen_data_list, key=lambda k: k.lower()) + print(f"get_gen_data_keyset {time.perf_counter()-t}") + return ss @deprecated("Check the experiment for registered parameters") def get_gen_kw_keyset(self) -> List[str]: @@ -293,7 +295,7 @@ def load_gen_data( report_step: int, realization_index: Optional[int] = None, ) -> pd.DataFrame: - realizations = self.get_realization_list_with_responses() + realizations = self.get_realization_list_with_responses(key) if realization_index is not None: if realization_index not in realizations: raise IndexError(f"No such realization {realization_index}") @@ -355,6 +357,8 @@ def load_parameters( def load_responses( self, key: str, realizations: npt.NDArray[np.int_] ) -> xr.Dataset: + import time + t= time.perf_counter() if key not in self.experiment.response_configuration: raise ValueError(f"{key} is not a response") loaded = [] @@ -362,10 +366,11 @@ def load_responses( input_path = self.mount_point / f"realization-{realization}" / f"{key}.nc" if not input_path.exists(): raise KeyError(f"No response for key {key}, realization: {realization}") - ds = xr.open_dataset(input_path, engine="netcdf4") + ds = xr.open_dataset(input_path, engine="scipy") loaded.append(ds) response = xr.combine_nested(loaded, concat_dim="realization") assert isinstance(response, xr.Dataset) + print(f"load_response {time.perf_counter() -t}") return response @@ -375,14 +380,10 @@ def load_responses_summary( loaded = [] for realization in realizations: input_path = self.mount_point / f"realization-{realization}" / f"summary.nc" - if not input_path.exists(): - raise KeyError(f"No response for summary, realization: {realization}") - - ## test using xarray.open_mfdataset() - ds = xr.open_dataset(input_path, engine="scipy") - ds= ds.query(name=f'name=="{key}"') - - loaded.append(ds) + if input_path.exists(): + ds = xr.open_dataset(input_path, engine="scipy") + ds= ds.query(name=f'name=="{key}"') + loaded.append(ds) response = xr.combine_nested(loaded, concat_dim="realization") assert isinstance(response, xr.Dataset) return response @@ -390,15 +391,19 @@ def load_responses_summary( def load_summary(self, key:str, realization_index: Optional[int] = None,) -> pd.DataFrame: - realizations = self.get_realization_list_with_responses() + import time + t = time.perf_counter() + #realizations = self.get_realization_list_with_responses("summary") + #realizations = self.get_realization_list_with_responses() + realizations = [i for i in range(self.ensemble_size)] + if realization_index is not None: if realization_index not in realizations: raise IndexError(f"No such realization {realization_index}") realizations = [realization_index] - import time + try: - t = time.perf_counter() df = self.load_responses_summary(key, tuple(realizations)).to_dataframe() print(f"load_summary - Load time used {time.perf_counter() - t}") except (ValueError, KeyError): diff --git a/test.py b/test.py index 11386e453e9..18395db87a1 100644 --- a/test.py +++ b/test.py @@ -1,7 +1,8 @@ -from typing import Optional, Dict +from typing import Optional, Dict, List from ert.config.enkf_observation_implementation_type import EnkfObservationImplementationType +from ert.config.gen_data_config import GenDataConfig from ert.storage import StorageReader, open_storage, EnsembleReader from ert.libres_facade import LibresFacade import pandas as pd @@ -22,19 +23,16 @@ def data_for_key( if key in ensemble.get_summary_keyset(): data = ensemble.load_summary(key) data = data[key].unstack(level="Date") - print(f"Done-2") elif key in ensemble.get_gen_kw_keyset(): data = ensemble.load_all_gen_kw_data(key.split(":")[0], realization_index) if data.empty: return pd.DataFrame() data = data[key].to_frame().dropna() data.columns = pd.Index([0]) - print(f"Done-3") elif key in ensemble.get_gen_data_keyset(): key_parts = key.split("@") key = key_parts[0] report_step = int(key_parts[1]) if len(key_parts) > 1 else 0 - print(f"Done-3") try: data = ensemble.load_gen_data( key, @@ -58,41 +56,102 @@ def get_ensemble_responses( db: StorageReader, ensemble_id, ): - res= LibresFacade.from_config_file("/home/frode/poly_ex2/poly/poly.ert") + res= LibresFacade.from_config_file("/home/frode/poly_ex3/poly/poly.ert") t= time.perf_counter() ens = db.get_ensemble(ensemble_id) print(f"Summary keyset {len(ens.get_summary_keyset())}") print(f"Observations: {len(res.get_observations())}") - + print(f"Gen data keys: {len(res.get_gen_data_keys())}") + print(f"Gen kw: {len(res.get_gen_kw())}") + ll= [] name_dict={} for obs in res.get_observations(): name_dict[obs.observation_key] = obs.observation_type for name in ens.get_summary_keyset(): - has_observation=False if name in name_dict: #if obs.observation_type == EnkfObservationImplementationType.SUMMARY_OBS and obs.observation_key == name: - has_observation=True ll.append(name) print(f"has obs {len(ll)}") print(f"time1 {time.perf_counter() - t}") t= time.perf_counter() - for name in res.get_gen_data_keys(): - has_observation=False - for obs in res.get_observations(): - if obs.observation_type == EnkfObservationImplementationType.GEN_OBS and obs.observation_key == name: - has_observation=True - break + gd=[] + for key in res.get_gen_data_keys(): + key_parts = key.split("@") + data_key = key_parts[0] + data_report_step = int(key_parts[1]) if len(key_parts) > 1 else 0 + + obs_key = None + + enkf_obs = res.config.enkf_obs + for obs_vector in enkf_obs: + if EnkfObservationImplementationType.GEN_OBS: + report_step = min(obs_vector.observations.keys()) + key = obs_vector.data_key + + if key == data_key and report_step == data_report_step: + obs_key = obs_vector.observation_key + if obs_key is not None: + gd.append(obs_key) + #else: + # return [] + + + print(f"has obs {len(gd)}") print(f"time2 {time.perf_counter() - t}") +#@deprecated("Check the experiment for registered responses") +def get_gen_data_keyset(ensemble: EnsembleReader) -> List[str]: + import time + t= time.perf_counter() + keylist = [ + k + for k, v in ensemble.experiment.response_info.items() + if "_ert_kind" in v and v["_ert_kind"] == "GenDataConfig" + ] + print(f"get_gen_data_keyset 1 {time.perf_counter()-t}") + + keylist2 = [ + k + for k, v in ensemble.experiment.response_configuration.items() + if isinstance(v, GenDataConfig) + ] + + + t= time.perf_counter() + gen_data_list = [] + for k,v in ensemble.experiment.response_configuration.items(): + if isinstance(v, GenDataConfig): + if v.report_steps is None: + gen_data_list.append(f"{k}@0") + else: + for report_step in v.report_steps: + gen_data_list.append(f"{k}@{report_step}") + print(f"get_gen_data_keyset 2 {time.perf_counter()-t}") + t= time.perf_counter() + ss= sorted(gen_data_list, key=lambda k: k.lower()) + print(f"get_gen_data_keyset 3 {time.perf_counter()-t}") + return ss + + + + + +from ert.shared.storage.extraction import create_priors +def get_experiments( + db: StorageReader, +) : + t= time.perf_counter() + res= LibresFacade.from_config_file("/home/frode/poly_ex2/poly/poly.ert") + print(f"time1 {time.perf_counter() - t}") @@ -100,12 +159,16 @@ def get_ensemble_responses( -with open_storage("/home/frode/poly_ex2/poly/storage", mode="r") as storage: +with open_storage("/home/frode/poly_ex3/poly/storage", mode="r") as storage: ensemble= storage.get_ensemble_by_name("default_1") - #df= data_for_key(ensemble, "PSUM99") + df= data_for_key(ensemble, "PSUM99") + #df= data_for_key(ensemble, "POLY_RES_666@0") + #df= data_for_key(ensemble, "COEFFS_9:COEFF_0") #print(df) - get_ensemble_responses(storage, ensemble_id=ensemble.id) + #get_ensemble_responses(storage, ensemble_id=ensemble.id) + #get_gen_data_keyset(ensemble) + #get_experiments(storage) #with open_storage("/home/frode/code/ert/test-data/snake_oil/storage/snake_oil/ensemble", mode="r") as storage: #ensemble= storage.get_ensemble_by_name("default") diff --git a/tests/performance_tests/performance_utils.py b/tests/performance_tests/performance_utils.py index 772a84c204a..b30b4968549 100644 --- a/tests/performance_tests/performance_utils.py +++ b/tests/performance_tests/performance_utils.py @@ -152,9 +152,9 @@ def dark_storage_app(monkeypatch): folder = py.path.local(tempfile.mkdtemp()) make_poly_example( folder, - "../../test-data/poly_template", - gen_data_count=34, - gen_data_entries=15, + "test-data/poly_template", + gen_data_count=3400, + gen_data_entries=150, summary_data_entries=100, reals=200, summary_data_count=4000, @@ -163,7 +163,7 @@ def dark_storage_app(monkeypatch): sum_obs_every=10, gen_obs_every=1, parameter_entries=10, - parameter_count=8, + parameter_count=10, update_steps=1, ) print(folder)