Skip to content

Commit

Permalink
Save changes (not working and non-verified)
Browse files Browse the repository at this point in the history
  • Loading branch information
timmens committed Jan 23, 2025
1 parent e00745b commit 1a4cce7
Show file tree
Hide file tree
Showing 2 changed files with 194 additions and 131 deletions.
155 changes: 103 additions & 52 deletions src/optimagic/optimization/history.py
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ def _get_next_batch_id(self) -> int:
# Function data, function value, and monotone function value
# ----------------------------------------------------------------------------------

def fun_data(self, cost_model: CostModel, monotone: bool) -> pd.DataFrame:
def fun_data(self, cost_model: CostModel, monotone: bool, dropna: bool = False) -> pd.DataFrame:
"""Return the function value data.
Args:
Expand All @@ -116,13 +116,37 @@ def fun_data(self, cost_model: CostModel, monotone: bool) -> pd.DataFrame:
fun: list[float | None] | NDArray[np.float64] = self.monotone_fun
else:
fun = self.fun
task = _task_as_categorical(self.task)
time = self._get_time(cost_model)
return pd.DataFrame({"fun": fun, "task": task, "time": time})
timings = self._get_total_timings(cost_model)

if not self.is_serial:

timings = _apply_reduction_to_batches(
data=timings,
batch_ids=self.batches,
reduction_function=cost_model.aggregate_batch_time,
)

min_or_max = np.nanmin if self.direction == Direction.MINIMIZE else np.nanmax
fun = _apply_reduction_to_batches(
data=fun,
batch_ids=self.batches,
reduction_function=min_or_max,
)

time = np.cumsum(timings)
data = pd.DataFrame({"fun": fun, "time": time})

if self.is_serial:
data["task"] = _task_to_categorical(self.task)

if dropna:
data = data.dropna()

return data.rename_axis("counter")

@property
def fun(self) -> list[float | None]:
return self._fun
def fun(self) -> NDArray[np.float64]:
return np.array(self._fun, dtype=np.float64)

@property
def monotone_fun(self) -> NDArray[np.float64]:
Expand Down Expand Up @@ -155,24 +179,52 @@ def is_accepted(self) -> NDArray[np.bool_]:
# Parameter data, params, flat params, and flat params names
# ----------------------------------------------------------------------------------

def params_data(self, cost_model: CostModel) -> pd.DataFrame:
def params_data(self, dropna: bool = False) -> pd.DataFrame:
"""Return the parameter data.
Args:
cost_model: The cost model that is used to calculate the time measure.
dropna: Whether to drop rows with missing values. These correspond to
parameters that were used to calculate a pure jacobian.
Returns:
pd.DataFrame: The parameter data. The columns are: 'name' (the parameter
names), 'value' (the parameter values), 'task', and 'time'.
names), 'value' (the parameter values), 'task' (the task for which the
parameter was used), and 'counter' (a counter that is unique for each
row).
"""
wide = pd.DataFrame(self.flat_params, columns=self.flat_param_names)
wide["task"] = _task_as_categorical(self.task)
wide["time"] = self._get_time(cost_model)
data = pd.melt(
wide, var_name="name", value_name="value", id_vars=["task", "time"]
wide["task"] = _task_to_categorical(self.task)
wide["batches"] = self.batches
wide["fun"] = self.fun

# 1. Get the location of the best function value in each batch and corre. params
# 2. Make long

if not self.is_serial:

if self.direction == Direction.MINIMIZE:
loc = data.groupby("batches")["fun"].idxmin()
elif self.direction == Direction.MAXIMIZE:
loc = data.groupby("batches")["fun"].idxmax()

breakpoint()

data = data.loc[loc]

data = data.drop(columns=["batches", "fun"])

long = pd.melt(
wide, var_name="name", value_name="value", id_vars=["task", "batches", "fun"]
)
return data.reindex(columns=["name", "value", "task", "time"])

data = long.reindex(columns=["name", "value", "task", "batches", "fun"])

if dropna:
data = data.dropna()

return data.rename_axis("counter")


@property
def params(self) -> list[PyTree]:
Expand All @@ -189,17 +241,17 @@ def flat_param_names(self) -> list[str]:
# Time
# ----------------------------------------------------------------------------------

def _get_time(
def _get_total_timings(
self, cost_model: CostModel | Literal["wall_time"]
) -> NDArray[np.float64]:
"""Return the cumulative time measure.
"""Return the total timings across all tasks.
Args:
cost_model: The cost model that is used to calculate the time measure. If
"wall_time", the wall time is returned.
Returns:
np.ndarray: The time measure.
np.ndarray: The sum of the timings across all tasks.
"""
if not isinstance(cost_model, CostModel) and cost_model != "wall_time":
Expand All @@ -208,25 +260,20 @@ def _get_time(
if cost_model == "wall_time":
return np.array(self.stop_time, dtype=np.float64) - self.start_time[0]

fun_time = self._get_time_per_task(
fun_time = self._get_timings_per_task(
task=EvalTask.FUN, cost_factor=cost_model.fun
)
jac_time = self._get_time_per_task(
jac_time = self._get_timings_per_task(
task=EvalTask.JAC, cost_factor=cost_model.jac
)
fun_and_jac_time = self._get_time_per_task(
fun_and_jac_time = self._get_timings_per_task(
task=EvalTask.FUN_AND_JAC, cost_factor=cost_model.fun_and_jac
)

time = fun_time + jac_time + fun_and_jac_time
batch_aware_time = _apply_to_batch(
data=time,
batch_ids=self.batches,
func=cost_model.aggregate_batch_time,
)
return np.cumsum(batch_aware_time)
return fun_time + jac_time + fun_and_jac_time


def _get_time_per_task(
def _get_timings_per_task(
self, task: EvalTask, cost_factor: float | None
) -> NDArray[np.float64]:
"""Return the time measure per task.
Expand Down Expand Up @@ -261,12 +308,16 @@ def start_time(self) -> list[float]:
def stop_time(self) -> list[float]:
return self._stop_time

# Batches
# Batches and fast_path
# ----------------------------------------------------------------------------------

@property
def batches(self) -> list[int]:
return self._batches

@property
def is_serial(self) -> bool:
return all(self.batches == np.arange(len(self.batches)))

# Tasks
# ----------------------------------------------------------------------------------
Expand Down Expand Up @@ -380,16 +431,16 @@ def _validate_args_are_all_none_or_lists_of_same_length(
raise ValueError("All arguments must be lists of the same length or None.")


def _task_as_categorical(task: list[EvalTask]) -> pd.Categorical:
def _task_to_categorical(task: list[EvalTask]) -> pd.Categorical:
return pd.Categorical(
[t.value for t in task], categories=[t.value for t in EvalTask]
)


def _apply_to_batch(
def _apply_reduction_to_batches(
data: NDArray[np.float64],
batch_ids: list[int],
func: Callable[[Iterable[float]], float],
reduction_function: Callable[[Iterable[float]], float],
) -> NDArray[np.float64]:
"""Apply a reduction operator on batches of data.
Expand All @@ -399,55 +450,55 @@ def _apply_to_batch(
data: 1d array with data.
batch_ids: A list with batch ids whose length is equal to the size of data.
Values need to be sorted and can be repeated.
func: A reduction function that takes an iterable of floats as input (e.g., a
numpy.ndarray or list) and returns a scalar.
reduction_function: A reduction function that takes an iterable of floats as
input (e.g., a numpy.ndarray or list) and returns a scalar.
Returns:
The transformed data. Has the same length as data. For each batch, the result of
the reduction operation is stored at the first index of that batch, and all
other values of that batch are set to zero.
The transformed data. Has one entry per unique batch id, equal to the result of
applying the reduction function to the data of that batch.
"""
batch_starts = _get_batch_start(batch_ids)
batch_stops = [*batch_starts[1:], len(data)]
batch_starts, batch_stops = _get_batch_starts_and_stops(batch_ids)

batch_results: list[float] = []

for start, stop in zip(batch_starts, batch_stops, strict=True):
batch_data = data[start:stop]
batch_id = batch_ids[start]

try:
reduced = func(batch_data)
reduced = reduction_function(batch_data)
except Exception as e:
msg = (
f"Calling function {func.__name__} on batch {batch_id} of the History "
f"raised an Exception. Please verify that {func.__name__} is "
"well-defined, takes a list of floats as input, and returns a scalar."
f"Calling function {reduction_function.__name__} on batch {batch_id} "
"of the History raised an Exception. Please verify that "
f"{reduction_function.__name__} is well-defined, takes a list of "
"floats as input, and returns a scalar."
)
raise ValueError(msg) from e

if not np.isscalar(reduced):
msg = (
f"Function {func.__name__} did not return a scalar for batch "
f"{batch_id}. Please verify that {func.__name__} returns a "
"scalar when called on a list of floats."
f"Function {reduction_function.__name__} did not return a scalar for "
f"batch {batch_id}. Please verify that {reduction_function.__name__} "
"returns a scalar when called on a list of floats."
)
raise ValueError(msg)

batch_results.append(reduced) # type: ignore[arg-type]

out = np.zeros_like(data)
out[batch_starts] = batch_results
return out
return np.array(batch_results, dtype=np.float64)


def _get_batch_start(batch_ids: list[int]) -> list[int]:
"""Get start indices of batches.
def _get_batch_starts_and_stops(batch_ids: list[int]) -> tuple[list[int], list[int]]:
"""Get start and stop indices of batches.
This function assumes that batch_ids non-empty and sorted.
"""
ids_arr = np.array(batch_ids, dtype=np.int64)
indices = np.where(ids_arr[:-1] != ids_arr[1:])[0] + 1
list_indices: list[int] = indices.tolist() # type: ignore[assignment]
return [0, *list_indices]
starts = [0, *list_indices]
stops = [*starts[1:], len(batch_ids)]
return starts, stops
Loading

0 comments on commit 1a4cce7

Please sign in to comment.