Skip to content

Commit

Permalink
WIP cluster mode refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
mpvanderschelling committed Feb 10, 2025
1 parent 3deba43 commit 76f224f
Show file tree
Hide file tree
Showing 10 changed files with 407 additions and 252 deletions.
394 changes: 263 additions & 131 deletions src/f3dasm/_src/core.py

Large diffs are not rendered by default.

21 changes: 12 additions & 9 deletions src/f3dasm/_src/datageneration/datagenerator_factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
from typing import Any, Callable, Dict, List, Optional

# Local
from ..core import DataGenerator
from ..core import DataGenerator, ExperimentSample
from .functions import _DATAGENERATORS

# Authorship & Credits
Expand Down Expand Up @@ -74,9 +74,10 @@ def convert_function(f: Callable,
output = output if output is not None else []

class TempDataGenerator(DataGenerator):
def execute(self, **_kwargs) -> None:
def execute(self, experiment_sample: ExperimentSample,
**_kwargs) -> ExperimentSample:
_input = {input_name:
self.experiment_sample.input_data.get(input_name)
experiment_sample.input_data.get(input_name)
for input_name in input if input_name not in kwargs}
_output = f(**_input, **kwargs)

Expand All @@ -89,13 +90,15 @@ def execute(self, **_kwargs) -> None:

for name, value in zip(output, _output):
if name in to_disk:
self.experiment_sample.store(name=name,
object=value,
to_disk=True)
experiment_sample.store(name=name,
object=value,
to_disk=True)
else:
self.experiment_sample.store(name=name,
object=value,
to_disk=False)
experiment_sample.store(name=name,
object=value,
to_disk=False)

return experiment_sample

return TempDataGenerator()

Expand Down
9 changes: 6 additions & 3 deletions src/f3dasm/_src/datageneration/functions/function.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,8 @@ def __call__(self, input_x: np.ndarray) -> np.ndarray:

return np.array(y).reshape(-1, 1)

def execute(self, **kwargs) -> None:
def execute(self, experiment_sample: ExperimentSample, **kwargs
) -> ExperimentSample:
"""
Execute the function and store the result in the experiment sample.
Expand All @@ -130,16 +131,18 @@ def execute(self, **kwargs) -> None:
>>> func = Function()
>>> func.execute()
"""
x, _ = self.experiment_sample.to_numpy()
x, _ = experiment_sample.to_numpy()

if isinstance(x, ArrayBox):
x = x._value
if isinstance(x, ArrayBox):
x = x._value
y = np.nan_to_num(self(x), nan=np.nan)
self.experiment_sample.store(
experiment_sample.store(
name="y", object=float(y.ravel().astype(np.float64)))

return experiment_sample

def _retrieve_original_input(self, x: np.ndarray):
"""
Retrieve the original input vector if the input is augmented.
Expand Down
136 changes: 61 additions & 75 deletions src/f3dasm/_src/experimentdata.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,10 @@
from hydra.utils import get_original_cwd
from omegaconf import DictConfig

# Local
from ._io import (DOMAIN_FILENAME, EXPERIMENTDATA_SUBFOLDER,
INPUT_DATA_FILENAME, JOBS_FILENAME, LOCK_FILENAME, MAX_TRIES,
OUTPUT_DATA_FILENAME, _project_dir_factory, store_to_disk)
# Local
OUTPUT_DATA_FILENAME, _project_dir_factory)
from .core import Block, DataGenerator
from .datageneration import _datagenerator_factory
from .design import Domain, _domain_factory, _sampler_factory
Expand Down Expand Up @@ -125,12 +125,12 @@ def __init__(
)

self.data = _data
self.domain = _domain
self._domain = _domain
self.project_dir = _project_dir

# Store to_disk objects so that the references are kept only
for id, experiment_sample in self:
self.store_experimentsample(experiment_sample, id)
experiment_sample.store_experimentsample_references(idx=id)

def __len__(self):
"""
Expand Down Expand Up @@ -212,7 +212,7 @@ def __eq__(self, __o: ExperimentData) -> bool:
>>> experiment_data1 == experiment_data2
True
"""
return (self.data == __o.data and self.domain == __o.domain
return (self.data == __o.data and self._domain == __o._domain
and self.project_dir == __o.project_dir)

def __getitem__(self, key: int | Iterable[int]) -> ExperimentData:
Expand All @@ -235,7 +235,7 @@ def __getitem__(self, key: int | Iterable[int]) -> ExperimentData:

return ExperimentData.from_data(
data={k: self.data[k] for k in self.index[key]},
domain=self.domain,
domain=self._domain,
project_dir=self.project_dir)

def _repr_html_(self) -> str:
Expand Down Expand Up @@ -299,26 +299,6 @@ def wrapper_func(project_dir: Path, *args, **kwargs) -> None:

# If the lock has been acquired:
with lock:
# tries = 0
# while tries <= MAX_TRIES:
# try:
# # Load a fresh instance of ExperimentData from file
# loaded_self = ExperimentData.from_file(
# self.project_dir)
# break
# # Catch racing conditions
# except (EmptyFileError, DecodeError):
# tries += 1
# logger.debug((
# f"Error reading a file, retrying"
# f" {tries+1}/{MAX_TRIES}"))
# sleep(random.uniform(0.5, 2.5))

# if tries >= MAX_TRIES:
# raise ReachMaximumTriesError(file_path=self.project_dir,
# max_tries=tries)

# Load a fresh instance of ExperimentData from file
loaded_self = ExperimentData.from_file(
self.project_dir)

Expand Down Expand Up @@ -369,12 +349,32 @@ def jobs(self) -> pd.Series:
"""
return pd.Series({id: es.job_status.name for id, es in self})

@property
def domain(self) -> Domain:
return self._domain

@domain.setter
def domain(self, domain: Domain):
"""
Sets the domain of the ExperimentData object.
Parameters
----------
domain : Domain
The domain to set.
"""
self._domain = domain
for _, es in self:
es.domain = domain

# Alternative constructors
# =========================================================================

@classmethod
def from_file(cls: Type[ExperimentData],
project_dir: Path | str) -> ExperimentData:
project_dir: Path | str,
wait_for_creation: bool = False,
max_tries: int = MAX_TRIES) -> ExperimentData:
"""
Create an ExperimentData object from .csv and .json files.
Expand All @@ -396,15 +396,19 @@ def from_file(cls: Type[ExperimentData],
project_dir = Path(project_dir)

try:
return _from_file_attempt(project_dir)
return _from_file_attempt(project_dir=project_dir,
wait_for_creation=wait_for_creation,
max_tries=max_tries)
except FileNotFoundError:
try:
filename_with_path = Path(get_original_cwd()) / project_dir
except ValueError: # get_original_cwd() hydra initialization error
raise FileNotFoundError(
f"Cannot find the folder {project_dir} !")

return _from_file_attempt(filename_with_path)
return _from_file_attempt(project_dir=filename_with_path,
wait_for_creation=wait_for_creation,
max_tries=max_tries)

@classmethod
def from_sampling(cls, sampler: Block | str | DictConfig,
Expand Down Expand Up @@ -510,7 +514,7 @@ def from_data(cls, data: Optional[Dict[int, ExperimentSample]] = None,
experiment_data = cls()

experiment_data.data = defaultdict(ExperimentSample, data)
experiment_data.domain = domain
experiment_data._domain = domain
experiment_data.project_dir = _project_dir_factory(project_dir)
return experiment_data

Expand Down Expand Up @@ -610,7 +614,7 @@ def store(self, project_dir: Optional[Path | str] = None):
(subdirectory / INPUT_DATA_FILENAME).with_suffix('.csv'))
df_output.to_csv(
(subdirectory / OUTPUT_DATA_FILENAME).with_suffix('.csv'))
self.domain.store(subdirectory / DOMAIN_FILENAME)
self._domain.store(subdirectory / DOMAIN_FILENAME)
self.jobs.to_csv((subdirectory / JOBS_FILENAME).with_suffix('.csv'))

def to_numpy(self) -> Tuple[np.ndarray, np.ndarray]:
Expand Down Expand Up @@ -900,7 +904,7 @@ def reset_index(self) -> ExperimentData:
"""
return ExperimentData.from_data(
data={i: v for i, v in enumerate(self.data.values())},
domain=self.domain,
domain=self._domain,
project_dir=self.project_dir)

def join(self, experiment_data: ExperimentData) -> ExperimentData:
Expand Down Expand Up @@ -928,7 +932,7 @@ def join(self, experiment_data: ExperimentData) -> ExperimentData:
for (i, es_self), (_, es_other) in zip(copy_self, copy_other):
copy_self.data[i] = es_self + es_other

copy_self.domain += copy_other.domain
copy_self._domain += copy_other._domain

return copy_self

Expand All @@ -945,7 +949,7 @@ def _add(self, experiment_data: ExperimentData):
copy_other.data.values())}

self.data.update(other_updated_data)
self.domain += copy_other.domain
self._domain += copy_other._domain

def _add_experiment_sample(self, experiment_sample: ExperimentSample):
last_key = max(self.index) if self else -1
Expand All @@ -964,7 +968,7 @@ def _overwrite(self, experiment_data: ExperimentData,
for (_, es), id in zip(copy_other, indices):
self.data[id] = es

self.domain += copy_other.domain
self._domain += copy_other._domain

def replace_nan(self, value: Any):
"""
Expand Down Expand Up @@ -1027,7 +1031,7 @@ def sort(self, criterion: Callable[[ExperimentSample], Any],
)
return ExperimentData.from_data(
data=sorted_data,
domain=self.domain,
domain=self._domain,
project_dir=self.project_dir
)

Expand Down Expand Up @@ -1055,7 +1059,7 @@ def get_experiment_sample(self, id: int) -> ExperimentSample:
return self.data[id]

def store_experimentsample(
self, experiment_sample: ExperimentSample, id: int):
self, experiment_sample: ExperimentSample, idx: int):
"""
Store an ExperimentSample object in the ExperimentData object.
Expand All @@ -1070,41 +1074,8 @@ def store_experimentsample(
--------
>>> experiment_data.store_experimentsample(sample, 0)
"""
self.domain += experiment_sample.domain

for name, value in experiment_sample._output_data.items():

# # If the output parameter is not in the domain, add it
# if name not in self.domain.output_names:
# self.domain.add_output(name=name, to_disk=True)

parameter = self.domain.output_space[name]

# If the parameter is to be stored on disk, store it
# Also check if the value is not already a reference!
if parameter.to_disk and not isinstance(value, (Path, str)):
storage_location = store_to_disk(
project_dir=self.project_dir, object=value, name=name,
id=id, store_function=parameter.store_function)

experiment_sample._output_data[name] = Path(storage_location)

for name, value in experiment_sample._input_data.items():
parameter = self.domain.input_space[name]

# If the parameter is to be stored on disk, store it
# Also check if the value is not already a reference!
if parameter.to_disk and not isinstance(value, (Path, str)):
storage_location = store_to_disk(
project_dir=self.project_dir, object=value, name=name,
id=id, store_function=parameter.store_function)

experiment_sample._input_data[name] = Path(storage_location)

# Set the experiment sample in the ExperimentData object
self.data[id] = experiment_sample

# Used in parallel mode
self._domain += experiment_sample.domain
self.data[idx] = experiment_sample

def get_open_job(self) -> Tuple[int, ExperimentSample]:
"""
Expand Down Expand Up @@ -1514,7 +1485,7 @@ def x0_factory(experiment_data: ExperimentData,
if mode == 'new':
x0 = ExperimentData.from_sampling(
sampler=sampler,
domain=experiment_data.domain,
domain=experiment_data._domain,
n_samples=n_samples
)

Expand All @@ -1538,15 +1509,20 @@ def x0_factory(experiment_data: ExperimentData,
return x0.reset_index()


def _from_file_attempt(project_dir: Path, max_tries: int = MAX_TRIES
def _from_file_attempt(project_dir: Path, max_tries: int = MAX_TRIES,
wait_for_creation: bool = False
) -> ExperimentData:
"""Attempt to create an ExperimentData object
from .csv and .pkl files.
Parameters
----------
path : Path
project_dir : Path
Name of the user-defined directory where the files are stored.
max_tries : int, optional
Maximum number of tries to read the files, by default MAX_TRIES
wait_for_creation : bool, optional
If True, wait for the files to be created, by default False
Returns
-------
Expand Down Expand Up @@ -1578,6 +1554,16 @@ def _from_file_attempt(project_dir: Path, max_tries: int = MAX_TRIES
))
sleep(random.uniform(0.5, 2.5))

except FileNotFoundError:
if not wait_for_creation:
raise FileNotFoundError(
f"File {subdirectory} not found")
tries += 1
logger.debug((
f"FileNotFoundError({subdirectory}), sleeping!"
))
sleep(random.uniform(9.5, 11.0))

raise ReachMaximumTriesError(file_path=subdirectory, max_tries=max_tries)


Expand Down
Loading

0 comments on commit 76f224f

Please sign in to comment.