Skip to content

Commit

Permalink
More improvements for larger gen_data
Browse files Browse the repository at this point in the history
  • Loading branch information
frode-aarstad committed Jan 16, 2024
1 parent d72b9fd commit acf47bd
Show file tree
Hide file tree
Showing 10 changed files with 206 additions and 67 deletions.
4 changes: 3 additions & 1 deletion src/ert/dark_storage/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ def data_for_key(
elif key in ensemble.get_gen_kw_keyset():
t= time.perf_counter()
data = ensemble.load_all_gen_kw_data(key.split(":")[0], realization_index)
print(f"data_for_key 2 {time.perf_counter() -t }")
if data.empty:
return pd.DataFrame()
data = data[key].to_frame().dropna()
data.columns = pd.Index([0])
print(f"data_for_key 2 {time.perf_counter() -t }")
elif key in ensemble.get_gen_data_keyset():
t= time.perf_counter()
key_parts = key.split("@")
Expand All @@ -66,8 +66,10 @@ def data_for_key(
).T
print(f"data_for_key 3 {time.perf_counter() -t }")
except (ValueError, KeyError):
print(f"data_for_key 3 except {time.perf_counter() -t }")
return pd.DataFrame()
else:
print("data_for_key not found")
return pd.DataFrame()

try:
Expand Down
20 changes: 20 additions & 0 deletions src/ert/dark_storage/endpoints/ensembles.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,26 @@ def get_ensemble(
)


@router.get("/ensembles/{ensemble_id}/small", response_model=js.EnsembleOut)
def get_ensemble_small(
*,
storage: StorageAccessor = DEFAULT_STORAGE,
ensemble_id: UUID,
) -> js.EnsembleOut:
ensemble = storage.get_ensemble(ensemble_id)
return js.EnsembleOut(
id=ensemble_id,
children=[],
parent=None,
experiment_id=ensemble.experiment_id,
userdata={"name": ensemble.name},
size=ensemble.ensemble_size,
parameter_names=[],
response_names=[],
child_ensemble_ids=[],
)


@router.put("/ensembles/{ensemble_id}/userdata")
async def replace_ensemble_userdata(
*,
Expand Down
13 changes: 9 additions & 4 deletions src/ert/dark_storage/endpoints/records.py
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ async def get_ensemble_record(
name: str,
ensemble_id: UUID,
accept: Annotated[Union[str, None], Header()] = None,
realization_index: Optional[int] = None,
realization_index: Optional[int] = None, # remove this?? Is it used anywhere?
label: Optional[str] = None,
) -> Any:
import time
Expand All @@ -192,16 +192,16 @@ async def get_ensemble_record(
dataframe.columns = [str(s) for s in dataframe.columns]
stream = io.BytesIO()
dataframe.to_parquet(stream)
print(f"rest {time.perf_counter()- t}")
print(f"rest1 {name} {time.perf_counter()- t}")
return Response(
content=stream.getvalue(),
media_type="application/x-parquet",
)
elif media_type == "application/json":
print(f"rest {time.perf_counter()- t}")
print(f"rest2 {time.perf_counter()- t}")
return Response(dataframe.to_json(), media_type="application/json")
else:
print(f"rest {time.perf_counter()- t}")
print(f"rest3 {time.perf_counter()- t}")
return Response(
content=dataframe.to_csv().encode(),
media_type="text/csv",
Expand Down Expand Up @@ -262,14 +262,18 @@ def get_ensemble_responses(
for obs in res.get_observations():
name_dict[obs.observation_key] = obs.observation_type

import time
t= time.perf_counter()
for name in ens.get_summary_keyset():
response_map[str(name)] = js.RecordOut(
id=UUID(int=0),
name=name,
userdata={"data_origin": "Summary"},
has_observations=name in name_dict,
)
print(f"get_ensemble_responses 1 {time.perf_counter() - t}")

t= time.perf_counter()
for name in res.get_gen_data_keys():
obs_keys = res.observation_keys(name)
response_map[str(name)] = js.RecordOut(
Expand All @@ -278,5 +282,6 @@ def get_ensemble_responses(
userdata={"data_origin": "GEN_DATA"},
has_observations=len(obs_keys) != 0,
)
print(f"get_ensemble_responses 2 {time.perf_counter() - t}")

return response_map
12 changes: 11 additions & 1 deletion src/ert/dark_storage/enkf.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,14 @@ def init_facade() -> LibresFacade:


def get_res(*, _: None = DEFAULT_SECURITY) -> LibresFacade:
print("get_res")
if _libres_facade is None:
return init_facade()
import time
t = time.perf_counter()
a= init_facade()
print(f"get_res - init_facede, t={time.perf_counter() -t}")
return a

return _libres_facade


Expand All @@ -52,7 +58,11 @@ def get_storage(*, res: LibresFacade = DEFAULT_LIBRESFACADE) -> StorageReader:


def reset_res(*, _: None = DEFAULT_SECURITY) -> None:
import time
print("reset_res")
t= time.perf_counter()
global _libres_facade # noqa: PLW0603
if _libres_facade is not None:
_libres_facade = None
print(f"t {time.perf_counter()-t}")
return _libres_facade
28 changes: 23 additions & 5 deletions src/ert/gui/tools/plot/plot_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
from itertools import combinations as combi
from json.decoder import JSONDecodeError
from typing import List, Optional

import time
import httpx
import pandas as pd
import requests
Expand All @@ -21,8 +21,12 @@ def __init__(self):
self._reset_storage_facade()

def _reset_storage_facade(self):
t= time.perf_counter()
with StorageService.session() as client:
print(f"_reset_storage_facade time1 {time.perf_counter()-t}")
t= time.perf_counter()
client.post("/updates/facade", timeout=self._timeout)
print(f"_reset_storage_facade time2 {time.perf_counter()-t}")

def _get_case(self, name: str) -> Optional[dict]:
for e in self._get_all_cases():
Expand All @@ -43,7 +47,7 @@ def _get_all_cases(self) -> List[dict]:
for experiment in experiments:
for ensemble_id in experiment["ensemble_ids"]:
response = client.get(
f"/ensembles/{ensemble_id}", timeout=self._timeout
f"/ensembles/{ensemble_id}/small", timeout=self._timeout
)
self._check_response(response)
response_json = response.json()
Expand All @@ -69,11 +73,16 @@ def _check_response(response: requests.Response):
)

def _get_experiments(self) -> dict:
import time
t= time.perf_counter()
with StorageService.session() as client:
print(f"_get_experiments1 {time.perf_counter()- t}")
t= time.perf_counter()
response: requests.Response = client.get(
"/experiments", timeout=self._timeout
)
self._check_response(response)
print(f"_get_experiments3 {time.perf_counter()- t}")
return response.json()

def _get_ensembles(self, experiement_id) -> List:
Expand All @@ -97,8 +106,14 @@ def all_data_type_keys(self) -> List:
import time
t1= time.perf_counter()
with StorageService.session() as client:
print(f"plotAPI - all_data_type_keys - session {time.perf_counter()- t1}")
for experiment in self._get_experiments():
t= time.perf_counter()
response: requests.Response = client.get(
"/experiments", timeout=self._timeout
)
self._check_response(response)
print(f"_get_experiments3 {time.perf_counter()- t}")

for experiment in response.json():
for ensemble in self._get_ensembles(experiment["id"]):
t= time.perf_counter()
response: requests.Response = client.get(
Expand Down Expand Up @@ -131,7 +146,8 @@ def all_data_type_keys(self) -> List:
"log_scale": key.startswith("LOG10_"),
}
print(f"plotAPI - all_data_type_keys2 {time.perf_counter()- t}")
print(f"plotAPI - all_data_type_keys {time.perf_counter()- t1}")
print(f"plotAPI - all_data_type_keys {time.perf_counter()- t1}")
print(f"plotAPI - all_data_type_keys -session end {time.perf_counter()- t1}")
return list(all_keys.values())

def get_all_cases_not_running(self) -> List:
Expand Down Expand Up @@ -210,6 +226,8 @@ def history_data(self, key, case=None) -> pd.DataFrame:
given data key, if any. The row index is the index/date and the column
index is the key."""

print(f"history data {key}")

if ":" in key:
head, tail = key.split(":", 2)
history_key = f"{head}H:{tail}"
Expand Down
19 changes: 17 additions & 2 deletions src/ert/gui/tools/plot/plot_window.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,18 +42,23 @@
class PlotWindow(QMainWindow):
def __init__(self, config_file, parent):
QMainWindow.__init__(self, parent)

import time
t_total= time.perf_counter()
logger.info("PlotWindow __init__")
print("PlotWindow 1")
self.setMinimumWidth(850)
self.setMinimumHeight(650)


self.setWindowTitle(f"Plotting - {config_file}")
self.activateWindow()

QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
try:
print("plotapi init")
t= time.perf_counter()
self._api = PlotApi()
print(f"PlotApi init {time.perf_counter()-t}")
self._key_definitions = self._api.all_data_type_keys()
except (RequestError, TimeoutError) as e:
logger.exception(e)
Expand Down Expand Up @@ -89,7 +94,9 @@ def __init__(self, config_file, parent):

QApplication.setOverrideCursor(Qt.CursorShape.WaitCursor)
try:
t= time.perf_counter()
cases = self._api.get_all_cases_not_running()
print(f" Plotwindow2.2 {time.perf_counter()-t }")
except (RequestError, TimeoutError) as e:
logger.exception(e)
QMessageBox.critical(self, "Request Failed", f"{e}")
Expand All @@ -108,14 +115,16 @@ def __init__(self, config_file, parent):
current_plot_widget = self._plot_widgets[self._central_tab.currentIndex()]
self._data_type_keys_widget.selectDefault()
self._updateCustomizer(current_plot_widget)
print("PlotWindow done")
print(f"PlotWindow init done, time= {time.perf_counter() -t_total }s")

def currentPlotChanged(self):
key_def = self.getSelectedKey()
if key_def is None:
return
key = key_def["key"]

print(f"currentPlotChanged {key}")

plot_widget = self._central_tab.currentWidget()

if plot_widget._plotter.dimensionality == key_def["dimensionality"]:
Expand All @@ -124,7 +133,10 @@ def currentPlotChanged(self):
case_to_data_map = {}
for case in cases:
try:
import time
t= time.perf_counter()
case_to_data_map[case] = self._api.data_for_key(case, key)
print(f"currentPlotChanged data_for_key= {time.perf_counter() -t}")
except (RequestError, TimeoutError) as e:
logger.exception(e)
msg = f"{e}"
Expand All @@ -134,7 +146,10 @@ def currentPlotChanged(self):
observations = None
if key_def["observations"] and cases:
try:
import time
t= time.perf_counter()
observations = self._api.observations_for_key(cases[0], key)
print(f"currentPlotChanged obs_forkey= {time.perf_counter() -t}")
except (RequestError, TimeoutError) as e:
logger.exception(e)
msg = f"{e}"
Expand Down
1 change: 1 addition & 0 deletions src/ert/services/storage_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ def fetch_url(self) -> str:

for url in self.fetch_conn_info()["urls"]:
try:
print("fetch_url")
resp = requests.get(f"{url}/healthcheck", auth=self.fetch_auth())
if resp.status_code == 200:
self._url = url
Expand Down
Loading

0 comments on commit acf47bd

Please sign in to comment.