Skip to content

Commit

Permalink
Add some timing and improve some loading
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-aarstad committed Jan 13, 2024
1 parent 99af83b commit d72b9fd
Show file tree
Hide file tree
Showing 6 changed files with 200 additions and 9 deletions.
9 changes: 8 additions & 1 deletion src/ert/dark_storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,18 +36,24 @@ def data_for_key(
given case. The row index is the realization number, and the columns are an
index over the indexes/dates"""

import time
if key.startswith("LOG10_"):
key = key[6:]
if key in ensemble.get_summary_keyset():
data = ensemble.load_all_summary_data([key], realization_index)
t= time.perf_counter()
data = ensemble.load_summary(key, realization_index)
data = data[key].unstack(level="Date")
print(f"data_for_key 1 {time.perf_counter() -t }")
elif key in ensemble.get_gen_kw_keyset():
t= time.perf_counter()
data = ensemble.load_all_gen_kw_data(key.split(":")[0], realization_index)
if data.empty:
return pd.DataFrame()
data = data[key].to_frame().dropna()
data.columns = pd.Index([0])
print(f"data_for_key 2 {time.perf_counter() -t }")
elif key in ensemble.get_gen_data_keyset():
t= time.perf_counter()
key_parts = key.split("@")
key = key_parts[0]
report_step = int(key_parts[1]) if len(key_parts) > 1 else 0
Expand All @@ -58,6 +64,7 @@ def data_for_key(
report_step,
realization_index,
).T
print(f"data_for_key 3 {time.perf_counter() -t }")
except (ValueError, KeyError):
return pd.DataFrame()
else:
Expand Down
14 changes: 12 additions & 2 deletions src/ert/dark_storage/endpoints/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from fastapi import APIRouter, Body, Depends, File, Header, Request, UploadFile, status
from fastapi.responses import Response
from typing_extensions import Annotated
from ert.config.enkf_observation_implementation_type import EnkfObservationImplementationType

from ert.dark_storage import json_schema as js
from ert.dark_storage.common import (
Expand Down Expand Up @@ -177,6 +178,8 @@ async def get_ensemble_record(
realization_index: Optional[int] = None,
label: Optional[str] = None,
) -> Any:
import time
t= time.perf_counter()
dataframe = data_for_key(db.get_ensemble(ensemble_id), name, realization_index)
if realization_index is not None:
# dataframe.loc returns a Series, and when we reconstruct a DataFrame
Expand All @@ -189,13 +192,16 @@ async def get_ensemble_record(
dataframe.columns = [str(s) for s in dataframe.columns]
stream = io.BytesIO()
dataframe.to_parquet(stream)
print(f"rest {time.perf_counter()- t}")
return Response(
content=stream.getvalue(),
media_type="application/x-parquet",
)
elif media_type == "application/json":
print(f"rest {time.perf_counter()- t}")
return Response(dataframe.to_json(), media_type="application/json")
else:
print(f"rest {time.perf_counter()- t}")
return Response(
content=dataframe.to_csv().encode(),
media_type="text/csv",
Expand Down Expand Up @@ -251,13 +257,17 @@ def get_ensemble_responses(
response_map: Dict[str, js.RecordOut] = {}

ens = db.get_ensemble(ensemble_id)

name_dict={}
for obs in res.get_observations():
name_dict[obs.observation_key] = obs.observation_type

for name in ens.get_summary_keyset():
obs_keys = res.observation_keys(name)
response_map[str(name)] = js.RecordOut(
id=UUID(int=0),
name=name,
userdata={"data_origin": "Summary"},
has_observations=len(obs_keys) != 0,
has_observations=name in name_dict,
)

for name in res.get_gen_data_keys():
Expand Down
12 changes: 9 additions & 3 deletions src/ert/gui/tools/plot/plot_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -94,9 +94,13 @@ def all_data_type_keys(self) -> List:
the key"""

all_keys = {}
import time
t1= time.perf_counter()
with StorageService.session() as client:
print(f"plotAPI - all_data_type_keys - session {time.perf_counter()- t1}")
for experiment in self._get_experiments():
for ensemble in self._get_ensembles(experiment["id"]):
t= time.perf_counter()
response: requests.Response = client.get(
f"/ensembles/{ensemble['id']}/responses", timeout=self._timeout
)
Expand All @@ -110,7 +114,8 @@ def all_data_type_keys(self) -> List:
"metadata": value["userdata"],
"log_scale": key.startswith("LOG10_"),
}

print(f"plotAPI - all_data_type_keys1 {time.perf_counter()- t}")
t= time.perf_counter()
response: requests.Response = client.get(
f"/ensembles/{ensemble['id']}/parameters", timeout=self._timeout
)
Expand All @@ -125,15 +130,16 @@ def all_data_type_keys(self) -> List:
"metadata": e["userdata"],
"log_scale": key.startswith("LOG10_"),
}

print(f"plotAPI - all_data_type_keys2 {time.perf_counter()- t}")
print(f"plotAPI - all_data_type_keys {time.perf_counter()- t1}")
return list(all_keys.values())

def get_all_cases_not_running(self) -> List:
"""Returns a list of all cases that are not running. For each case a dict with
info about the case is returned"""
# Currently, the ensemble information from the storage API does not contain any
# hint if a case is running or not for now we return all the cases, running or
# not
# no
return self._get_all_cases()

def data_for_key(self, case_name, key) -> pd.DataFrame:
Expand Down
7 changes: 4 additions & 3 deletions src/ert/gui/tools/plot/plot_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ def __init__(self, config_file, parent):
QMainWindow.__init__(self, parent)

logger.info("PlotWindow __init__")

print("PlotWindow 1")
self.setMinimumWidth(850)
self.setMinimumHeight(650)

Expand All @@ -60,7 +60,7 @@ def __init__(self, config_file, parent):
QMessageBox.critical(self, "Request Failed", f"{e}")
self._key_definitions = []
QApplication.restoreOverrideCursor()

print("PlotWindow 2")
self._plot_customizer = PlotCustomizer(self, self._key_definitions)

self._plot_customizer.settingsChanged.connect(self.keySelected)
Expand Down Expand Up @@ -95,7 +95,7 @@ def __init__(self, config_file, parent):
QMessageBox.critical(self, "Request Failed", f"{e}")
cases = []
QApplication.restoreOverrideCursor()

print("PlotWindow 3")
case_names = [case["name"] for case in cases if not case["hidden"]]

self._data_type_keys_widget = DataTypeKeysWidget(self._key_definitions)
Expand All @@ -108,6 +108,7 @@ def __init__(self, config_file, parent):
current_plot_widget = self._plot_widgets[self._central_tab.currentIndex()]
self._data_type_keys_widget.selectDefault()
self._updateCustomizer(current_plot_widget)
print("PlotWindow done")

def currentPlotChanged(self):
key_def = self.getSelectedKey()
Expand Down
51 changes: 51 additions & 0 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,12 +362,59 @@ def load_responses(
input_path = self.mount_point / f"realization-{realization}" / f"{key}.nc"
if not input_path.exists():
raise KeyError(f"No response for key {key}, realization: {realization}")
ds = xr.open_dataset(input_path, engine="netcdf4")
loaded.append(ds)
response = xr.combine_nested(loaded, concat_dim="realization")
assert isinstance(response, xr.Dataset)
return response


def load_responses_summary(
self, key: str, realizations: npt.NDArray[np.int_]
) -> xr.Dataset:
loaded = []
for realization in realizations:
input_path = self.mount_point / f"realization-{realization}" / f"summary.nc"
if not input_path.exists():
raise KeyError(f"No response for summary, realization: {realization}")

## test using xarray.open_mfdataset()
ds = xr.open_dataset(input_path, engine="scipy")
ds= ds.query(name=f'name=="{key}"')

loaded.append(ds)
response = xr.combine_nested(loaded, concat_dim="realization")
assert isinstance(response, xr.Dataset)
return response



def load_summary(self, key:str, realization_index: Optional[int] = None,) -> pd.DataFrame:
realizations = self.get_realization_list_with_responses()
if realization_index is not None:
if realization_index not in realizations:
raise IndexError(f"No such realization {realization_index}")
realizations = [realization_index]

import time
try:
t = time.perf_counter()
df = self.load_responses_summary(key, tuple(realizations)).to_dataframe()
print(f"load_summary - Load time used {time.perf_counter() - t}")
except (ValueError, KeyError):
return pd.DataFrame()

t = time.perf_counter()
print(df.memory_usage())
df = df.unstack(level="name")
df.columns = [col[1] for col in df.columns.values]
df.index = df.index.rename(
{"time": "Date", "realization": "Realization"}
).reorder_levels(["Realization", "Date"])
print(f"load_summary - Reorder time used {time.perf_counter() - t}")

return df

@deprecated("Use load_responses")
def load_all_summary_data(
self,
Expand All @@ -383,9 +430,13 @@ def load_all_summary_data(
summary_keys = self.get_summary_keyset()

try:
import time
t = time.perf_counter()
df = self.load_responses("summary", tuple(realizations)).to_dataframe()
print(f"load_all_summary_data - Time used {time.perf_counter() - t}")
except (ValueError, KeyError):
return pd.DataFrame()
#df= df.query(f'name == "{key}"')
df = df.unstack(level="name")
df.columns = [col[1] for col in df.columns.values]
df.index = df.index.rename(
Expand Down
116 changes: 116 additions & 0 deletions test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@


from typing import Optional, Dict
from ert.config.enkf_observation_implementation_type import EnkfObservationImplementationType
from ert.storage import StorageReader, open_storage, EnsembleReader
from ert.libres_facade import LibresFacade
import pandas as pd
import time


def data_for_key(
ensemble: EnsembleReader,
key: str,
realization_index: Optional[int] = None,
) -> pd.DataFrame:
"""Returns a pandas DataFrame with the datapoints for a given key for a
given case. The row index is the realization number, and the columns are an
index over the indexes/dates"""

if key.startswith("LOG10_"):
key = key[6:]
if key in ensemble.get_summary_keyset():
data = ensemble.load_summary(key)
data = data[key].unstack(level="Date")
print(f"Done-2")
elif key in ensemble.get_gen_kw_keyset():
data = ensemble.load_all_gen_kw_data(key.split(":")[0], realization_index)
if data.empty:
return pd.DataFrame()
data = data[key].to_frame().dropna()
data.columns = pd.Index([0])
print(f"Done-3")
elif key in ensemble.get_gen_data_keyset():
key_parts = key.split("@")
key = key_parts[0]
report_step = int(key_parts[1]) if len(key_parts) > 1 else 0
print(f"Done-3")
try:
data = ensemble.load_gen_data(
key,
report_step,
realization_index,
).T
except (ValueError, KeyError):
return pd.DataFrame()
else:
return pd.DataFrame()

try:
return data.astype(float)
except ValueError:
return data



def get_ensemble_responses(
#res: LibresFacade = DEFAULT_LIBRESFACADE,
db: StorageReader,
ensemble_id,
):
res= LibresFacade.from_config_file("/home/frode/poly_ex2/poly/poly.ert")

t= time.perf_counter()
ens = db.get_ensemble(ensemble_id)

print(f"Summary keyset {len(ens.get_summary_keyset())}")
print(f"Observations: {len(res.get_observations())}")

ll= []
name_dict={}
for obs in res.get_observations():
name_dict[obs.observation_key] = obs.observation_type

for name in ens.get_summary_keyset():
has_observation=False
if name in name_dict:
#if obs.observation_type == EnkfObservationImplementationType.SUMMARY_OBS and obs.observation_key == name:
has_observation=True
ll.append(name)

print(f"has obs {len(ll)}")
print(f"time1 {time.perf_counter() - t}")
t= time.perf_counter()

for name in res.get_gen_data_keys():
has_observation=False
for obs in res.get_observations():
if obs.observation_type == EnkfObservationImplementationType.GEN_OBS and obs.observation_key == name:
has_observation=True
break


print(f"time2 {time.perf_counter() - t}")









with open_storage("/home/frode/poly_ex2/poly/storage", mode="r") as storage:
ensemble= storage.get_ensemble_by_name("default_1")
#df= data_for_key(ensemble, "PSUM99")
#print(df)

get_ensemble_responses(storage, ensemble_id=ensemble.id)

#with open_storage("/home/frode/code/ert/test-data/snake_oil/storage/snake_oil/ensemble", mode="r") as storage:
#ensemble= storage.get_ensemble_by_name("default")
#df= data_for_key(ensemble, "FOPR")
#print(df)
#get_ensemble_responses(storage, ensemble_id=ensemble.id)


0 comments on commit d72b9fd

Please sign in to comment.