From cebf3928325abe803d4088c3c9a0dca5608024ed Mon Sep 17 00:00:00 2001 From: ephoris Date: Mon, 11 Mar 2024 15:41:22 -0400 Subject: [PATCH 1/4] [WIP] Cleaning up bayesian pipeline --- jobs/bayesian_pipeline.py | 388 ++++++++++++++++++++++++++++---------- 1 file changed, 286 insertions(+), 102 deletions(-) diff --git a/jobs/bayesian_pipeline.py b/jobs/bayesian_pipeline.py index 0e7ab63..931db3a 100644 --- a/jobs/bayesian_pipeline.py +++ b/jobs/bayesian_pipeline.py @@ -16,13 +16,29 @@ from endure.lsm.cost import EndureCost from endure.data.io import Reader -from endure.lsm.types import LSMDesign, System, Policy, Workload, LSMBounds, STR_POLICY_DICT -from endure.lcm.data.generator import ClassicGenerator, QCostGenerator, YZCostGenerator, KHybridGenerator +from endure.lsm.types import ( + LSMDesign, + System, + Policy, + Workload, + LSMBounds, +) +from endure.lcm.data.generator import ( + ClassicGenerator, + QCostGenerator, + YZCostGenerator, + KHybridGenerator, +) from endure.lsm.solver.classic_solver import ClassicSolver from endure.lsm.solver.qlsm_solver import QLSMSolver from endure.lsm.solver.yzlsm_solver import YZLSMSolver from endure.lsm.solver.klsm_solver import KLSMSolver -from jobs.infra.db_log import initialize_database, log_new_run, log_design_cost, log_run_details +from jobs.infra.db_log import ( + initialize_database, + log_new_run, + log_design_cost, + log_run_details, +) def print_best_designs(best_designs: List[Tuple[LSMDesign, float]]) -> None: @@ -31,74 +47,113 @@ def print_best_designs(best_designs: List[Tuple[LSMDesign, float]]) -> None: for design, cost in sorted_designs[:1]: if design.policy == Policy.KHybrid: k_values_str = ", ".join(str(k) for k in design.K) - print(f"Design: h={design.h}, T={design.T}, Policy={design.policy}, K=[{k_values_str}], Cost={cost}") + print( + f"Design: h={design.h}, T={design.T}, Policy={design.policy}, K=[{k_values_str}], Cost={cost}" + ) else: - print(f"Design: h={design.h}, T={design.T}, Policy={design.policy}, Q={design.Q}, Y={design.Y}," - f" Z={design.Z}, Cost={cost}") - with open('best_designs.txt', 'w') as file: + print( + f"Design: h={design.h}, T={design.T}, Policy={design.policy}, Q={design.Q}, Y={design.Y}," + f" Z={design.Z}, Cost={cost}" + ) + with open("best_designs.txt", "w") as file: file.write("All Best Designs Found:\n") for design, cost in best_designs: - file.write(f"Design: h={design.h}, T={design.T}, Policy={design.policy}, Q={design.Q}, " - f"Y={design.Y}, Z={design.Z}, Cost={cost}\n") + file.write( + f"Design: h={design.h}, T={design.T}, Policy={design.policy}, Q={design.Q}, " + f"Y={design.Y}, Z={design.Z}, Cost={cost}\n" + ) class BayesianPipeline: - def __init__(self, conf: dict) -> None: + def __init__(self, config: dict) -> None: self.end_time = None self.start_time = None - self.config: dict = conf - self.bayesian_setting: dict = self.config["job"]["BayesianOptimization"] - self.bounds = LSMBounds() - self.max_levels = self.bounds.max_considered_levels - self.cf: EndureCost = EndureCost(self.max_levels) - self.log: logging.Logger = logging.getLogger(self.config["log"]["name"]) - - self.system: System = System(**self.bayesian_setting["system"]) - self.workload: Workload = Workload(**self.bayesian_setting["workload"]) - self.initial_samples: int = self.bayesian_setting["initial_samples"] - self.acquisition_function: str = self.bayesian_setting["acquisition_function"] - self.q: int = self.bayesian_setting["batch_size"] - self.num_restarts: int = self.bayesian_setting["num_restarts"] - self.raw_samples: int = self.bayesian_setting["raw_samples"] - self.num_iterations: int = self.bayesian_setting["num_iterations"] - self.beta_value: float = self.bayesian_setting["beta_value"] self.conn = None self.run_id: int = 0 - self.write_to_db = self.bayesian_setting["database"]["write_to_db"] + self.log: logging.Logger = logging.getLogger(config["log"]["name"]) + + jconfig: dict = config["job"]["BayesianOptimization"] + self.bounds: LSMBounds = LSMBounds(**config["lsm"]["bounds"]) + self.cf: EndureCost = EndureCost(self.bounds.max_considered_levels) + + self.system: System = System(**jconfig["system"]) + self.workload: Workload = Workload(**jconfig["workload"]) + self.initial_samples: int = jconfig["initial_samples"] + self.acquisition_function: str = jconfig["acquisition_function"] + self.q: int = jconfig["batch_size"] + self.num_restarts: int = jconfig["num_restarts"] + self.raw_samples: int = jconfig["raw_samples"] + self.num_iterations: int = jconfig["num_iterations"] + self.beta_value: float = jconfig["beta_value"] + self.write_to_db = jconfig["database"]["write_to_db"] self.output_dir = os.path.join( - self.bayesian_setting["database"]["data_dir"], self.bayesian_setting["database"]["db_path"] + jconfig["database"]["data_dir"], + jconfig["database"]["db_path"], ) - self.db_path = os.path.join(self.output_dir, self.bayesian_setting["database"]["db_name"]) - model_type_str = self.bayesian_setting.get('model_type', 'Classic') - self.model_type = STR_POLICY_DICT.get(model_type_str, Policy.Classic) - self.num_k_values = self.bayesian_setting["num_k_values"] + self.db_path = os.path.join(self.output_dir, jconfig["database"]["db_name"]) + model_type_str = self.jconfig.get("model_type", "Classic") + self.model_type = getattr(Policy, model_type_str) + self.num_k_values = jconfig["num_k_values"] + + self.config: dict = config + self.jconfig: dict = jconfig - def run(self, system: Optional[System] = None, workload: Optional[Workload] = None, num_iterations: Optional[int] = None, - sample_size: Optional[int] = None, acqf: Optional[str] = None) -> Tuple[Optional[LSMDesign], Optional[float]]: + def run( + self, + system: Optional[System] = None, + workload: Optional[Workload] = None, + num_iterations: Optional[int] = None, + sample_size: Optional[int] = None, + acqf: Optional[str] = None, + ) -> Tuple[Optional[LSMDesign], Optional[float]]: self.start_time = time.time() self.initialize_environment(system, workload, num_iterations, sample_size, acqf) train_x, train_y, best_y = self._generate_initial_data(self.initial_samples) best_designs = self.optimization_loop(train_x, train_y, best_y) - best_design, best_cost, elapsed_time = self.finalize_optimization(best_designs) + best_design, best_cost, _ = self.finalize_optimization(best_designs) + return best_design, best_cost - def initialize_environment(self, system: Optional[System], workload: Optional[Workload], num_iterations: Optional[int], - sample_size: Optional[int], acqf: Optional[str]): + def initialize_environment( + self, + system: Optional[System], + workload: Optional[Workload], + num_iterations: Optional[int], + sample_size: Optional[int], + acqf: Optional[str], + ): os.makedirs(self.output_dir, exist_ok=True) self.conn = initialize_database(self.db_path) self.system = system if system is not None else self.system - self.initial_samples = sample_size if sample_size is not None else self.initial_samples + self.initial_samples = ( + sample_size if sample_size is not None else self.initial_samples + ) self.workload = workload if workload is not None else self.workload - self.acquisition_function = acqf if acqf is not None else self.acquisition_function - self.num_iterations = num_iterations if num_iterations is not None else self.num_iterations - self.run_id = log_new_run(self.conn, self.system, self.workload, self.num_iterations, - self.initial_samples, self.acquisition_function) + self.acquisition_function = ( + acqf if acqf is not None else self.acquisition_function + ) + self.num_iterations = ( + num_iterations if num_iterations is not None else self.num_iterations + ) + self.run_id = log_new_run( + self.conn, + self.system, + self.workload, + self.num_iterations, + self.initial_samples, + self.acquisition_function, + ) def generate_initial_bounds(self, system: System) -> torch.Tensor: - h_bounds = torch.tensor([self.bounds.bits_per_elem_range[0], min(np.floor(system.H) - , self.bounds.bits_per_elem_range[1])], dtype=torch.float) + h_bounds = torch.tensor( + [ + self.bounds.bits_per_elem_range[0], + max(np.floor(system.H), self.bounds.bits_per_elem_range[1]), + ], + dtype=torch.float, + ) - t_bounds = torch.tensor([self.bounds.size_ratio_range[0], self.bounds.size_ratio_range[1]]) + t_bounds = torch.tensor(self.bounds.size_ratio_range) policy_bounds = torch.tensor([0, 1]) if self.model_type == Policy.QFixed: q_bounds = torch.tensor([1, self.bounds.size_ratio_range[1] - 1]) @@ -108,15 +163,19 @@ def generate_initial_bounds(self, system: System) -> torch.Tensor: z_bounds = torch.tensor([1, self.bounds.size_ratio_range[1] - 1]) bounds = torch.stack([h_bounds, t_bounds, y_bounds, z_bounds], dim=-1) elif self.model_type == Policy.KHybrid: - lower_limits = [self.bounds.bits_per_elem_range[0], self.bounds.size_ratio_range[0]] +\ - [1] * self.num_k_values - upper_limits = [min(np.floor(system.H), self.bounds.bits_per_elem_range[1]), - self.bounds.size_ratio_range[1]] + \ - [self.bounds.size_ratio_range[1] - 1] * self.num_k_values + lower_limits = [ + self.bounds.bits_per_elem_range[0], + self.bounds.size_ratio_range[0], + ] + [1] * self.num_k_values + upper_limits = [ + max(np.floor(system.H), self.bounds.bits_per_elem_range[1]), + self.bounds.size_ratio_range[1], + ] + [self.bounds.size_ratio_range[1] - 1] * self.num_k_values new_bounds_list = [lower_limits, upper_limits] bounds = torch.tensor(new_bounds_list, dtype=torch.float64) else: bounds = torch.stack([h_bounds, t_bounds, policy_bounds], dim=-1) + return bounds def optimization_loop(self, train_x, train_y, best_y): @@ -124,11 +183,19 @@ def optimization_loop(self, train_x, train_y, best_y): fixed_feature_list = self._initialize_feature_list(bounds) best_designs = [] for i in range(self.num_iterations): - new_candidates = self.get_next_points(train_x, train_y, best_y, bounds, fixed_feature_list, - self.acquisition_function, 1) - new_designs, costs = self.evaluate_new_candidates(new_candidates) - train_x, train_y, best_y, best_designs = self.update_training_data(train_x, train_y, new_candidates, costs, - best_designs) + new_candidates = self.get_next_points( + train_x, + train_y, + best_y, + bounds, + fixed_feature_list, + self.acquisition_function, + 1, + ) + _, costs = self.evaluate_new_candidates(new_candidates) + train_x, train_y, best_y, best_designs = self.update_training_data( + train_x, train_y, new_candidates, costs, best_designs + ) self.log.debug(f"Iteration {i + 1}/{self.num_iterations} complete") self.log.debug("Bayesian Optimization completed") return best_designs @@ -156,31 +223,48 @@ def _initialize_feature_list(self, bounds): param_values = [range(1, upper_t_bound)] * self.num_k_values for combination in product(*param_values): fixed_feature = {1: t} - fixed_feature.update({i + 2: combination[i] for i in range(len(combination))}) + fixed_feature.update( + {i + 2: combination[i] for i in range(len(combination))} + ) fixed_features_list.append(fixed_feature) + return fixed_features_list def evaluate_new_candidates(self, new_candidates): new_designs = self.create_designs_from_candidates(new_candidates) - costs = [self.cf.calc_cost(design, self.system, self.workload.z0, self.workload.z1, self.workload.q, - self.workload.w) for design in new_designs] + costs = [ + self.cf.calc_cost( + design, + self.system, + self.workload.z0, + self.workload.z1, + self.workload.q, + self.workload.w, + ) + for design in new_designs + ] for design, cost in zip(new_designs, costs): log_design_cost(self.conn, self.run_id, design, cost) return new_designs, costs - def update_training_data(self, train_x, train_y, new_candidates, costs, best_designs): + def update_training_data( + self, train_x, train_y, new_candidates, costs, best_designs + ): new_target = torch.tensor(costs).unsqueeze(-1) train_x = torch.cat([train_x, new_candidates]) train_y = torch.cat([train_y, new_target]) best_y = train_y.min().item() - best_designs = self._update_best_designs(best_designs, new_candidates, new_target) + best_designs = self._update_best_designs( + best_designs, new_candidates, new_target + ) return train_x, train_y, best_y, best_designs def create_designs_from_candidates(self, candidates): for candidate in candidates: new_designs = self._generate_new_designs_helper(candidate) + return new_designs def _generate_new_designs_helper(self, candidate): @@ -191,23 +275,41 @@ def _generate_new_designs_helper(self, candidate): if self.model_type == Policy.QFixed: size_ratio, q_val = candidate[1].item(), candidate[2].item() policy = Policy.QFixed - new_designs = [LSMDesign(h=h, T=np.ceil(size_ratio), policy=policy, Q=int(q_val))] + new_designs = [ + LSMDesign(h=h, T=np.ceil(size_ratio), policy=policy, Q=int(q_val)) + ] # Uncomment the following lines of code if you want the q value to be the same # through all levels and behave like KLSM # policy = Policy.KHybrid - k_values = [q_val for _ in range(1, self.max_levels)] - new_designs = [LSMDesign(h=h, T=np.ceil(size_ratio), policy=policy, K=k_values)] + k_values = [q_val for _ in range(1, self.bounds.max_considered_levels)] + new_designs = [ + LSMDesign(h=h, T=np.ceil(size_ratio), policy=policy, K=k_values) + ] elif self.model_type == Policy.YZHybrid: - size_ratio, y_val, z_val = candidate[1].item(), candidate[2].item(), candidate[3].item() + size_ratio, y_val, z_val = ( + candidate[1].item(), + candidate[2].item(), + candidate[3].item(), + ) policy = Policy.YZHybrid - new_designs = [LSMDesign(h=h, T=np.ceil(size_ratio), policy=policy, Y=int(y_val), Z=int(z_val))] + new_designs = [ + LSMDesign( + h=h, + T=np.ceil(size_ratio), + policy=policy, + Y=int(y_val), + Z=int(z_val), + ) + ] elif self.model_type == Policy.KHybrid: size_ratio = candidate[1].item() k_values = [cand.item() for cand in candidate[2:]] policy = Policy.KHybrid - if len(k_values) < self.max_levels: - k_values += [1] * (self.max_levels - len(k_values)) - new_designs.append(LSMDesign(h=h, T=np.ceil(size_ratio), policy=policy, K=k_values)) + if len(k_values) < self.bounds.max_considered_levels: + k_values += [1] * (self.bounds.max_considered_levels - len(k_values)) + new_designs.append( + LSMDesign(h=h, T=np.ceil(size_ratio), policy=policy, K=k_values) + ) else: size_ratio, policy_val = candidate[1].item(), candidate[2].item() policy = Policy.Leveling if policy_val < 0.5 else Policy.Tiering @@ -218,41 +320,72 @@ def _generate_new_designs_helper(self, candidate): def finalize_optimization(self, best_designs): elapsed_time = time.time() - self.start_time sorted_designs = sorted(best_designs, key=lambda x: x[1]) - analaytical_design, analytical_cost = self._find_analytical_results(self.system, self.workload) + analaytical_design, analytical_cost = self._find_analytical_results( + self.system, self.workload + ) best_design, best_cost = sorted_designs[0][0], sorted_designs[0][1] - log_run_details(self.conn, self.run_id, elapsed_time, analytical_cost, best_cost, analaytical_design, - best_design) + log_run_details( + self.conn, + self.run_id, + elapsed_time, + analytical_cost, + best_cost, + analaytical_design, + best_design, + ) self.conn.close() if sorted_designs: return best_design, best_cost, elapsed_time else: return None, None, elapsed_time - def get_next_points(self, x: torch.Tensor, y: torch.Tensor, best_y: float, bounds: torch.Tensor, - fixed_features_list: List, acquisition_function: str = "ExpectedImprovement", n_points: int = 1, ) -> torch.Tensor: + def get_next_points( + self, + x: torch.Tensor, + y: torch.Tensor, + best_y: float, + bounds: torch.Tensor, + fixed_features_list: List, + acquisition_function: str = "ExpectedImprovement", + n_points: int = 1, + ) -> torch.Tensor: if self.model_type == Policy.QFixed or self.model_type == Policy.Classic: - single_model = MixedSingleTaskGP(x, y, cat_dims=[1, 2], input_transform=Normalize(d=x.shape[1], - bounds=bounds), - outcome_transform=Standardize(m=1)) + single_model = MixedSingleTaskGP( + x, + y, + cat_dims=[1, 2], + input_transform=Normalize(d=x.shape[1], bounds=bounds), + outcome_transform=Standardize(m=1), + ) elif self.model_type == Policy.YZHybrid: - single_model = MixedSingleTaskGP(x, y, cat_dims=[1, 2, 3], input_transform=Normalize(d=x.shape[1], - bounds=bounds), - outcome_transform=Standardize(m=1)) + single_model = MixedSingleTaskGP( + x, + y, + cat_dims=[1, 2, 3], + input_transform=Normalize(d=x.shape[1], bounds=bounds), + outcome_transform=Standardize(m=1), + ) elif self.model_type == Policy.KHybrid: # the self.num_k_values represents the number of categorical values the model # is predicting out of the self.max_levels. The +2 is because this is the list of indices # and the first 2 indices represent the 'h' value and then the 'T'value. So everything from index 1 # till the size of num_k_values + 2 is a categorical value cat_dims = list(range(1, self.num_k_values + 2)) - single_model = MixedSingleTaskGP(x, y, cat_dims=cat_dims, input_transform=Normalize(d=x.shape[1], - bounds=bounds), - outcome_transform=Standardize(m=1)) + single_model = MixedSingleTaskGP( + x, + y, + cat_dims=cat_dims, + input_transform=Normalize(d=x.shape[1], bounds=bounds), + outcome_transform=Standardize(m=1), + ) else: raise ValueError(f"Unsupported model type: {self.model_type}") mll = ExactMarginalLogLikelihood(single_model.likelihood, single_model) fit_gpytorch_model(mll) if acquisition_function == "ExpectedImprovement": - acqf = ExpectedImprovement(model=single_model, best_f=best_y, maximize=False) + acqf = ExpectedImprovement( + model=single_model, best_f=best_y, maximize=False + ) elif acquisition_function == "UpperConfidenceBound": beta = self.beta_value acqf = UpperConfidenceBound(model=single_model, beta=beta, maximize=False) @@ -266,7 +399,7 @@ def get_next_points(self, x: torch.Tensor, y: torch.Tensor, best_y: float, bound q=n_points, num_restarts=self.num_restarts, raw_samples=self.raw_samples, - fixed_features_list=fixed_features_list + fixed_features_list=fixed_features_list, ) return candidates @@ -284,20 +417,25 @@ def _generate_initial_data(self, n: int = 30) -> Tuple[torch.Tensor, torch.Tenso for _ in range(n): design = generator._sample_design(self.system) if self.model_type == Policy.Classic: - if design.policy == Policy.Leveling: - policy = 0 - elif design.policy == Policy.Tiering: - policy = 1 + policy = 0 if design.policy == Policy.Tiering else 1 x_values = np.array([design.h, design.T, policy]) elif self.model_type == Policy.QFixed: x_values = np.array([design.h, design.T, design.Q]) elif self.model_type == Policy.YZHybrid: x_values = np.array([design.h, design.T, design.Y, design.Z]) elif self.model_type == Policy.KHybrid: - k_values_padded = (design.K + [1] * self.num_k_values)[:self.num_k_values] + k_values_padded = (design.K + [1] * self.num_k_values)[ + : self.num_k_values + ] x_values = np.array([design.h, design.T] + k_values_padded) - cost = self.cf.calc_cost(design, self.system, self.workload.z0, self.workload.z1, - self.workload.q, self.workload.w) + cost = self.cf.calc_cost( + design, + self.system, + self.workload.z0, + self.workload.z1, + self.workload.q, + self.workload.w, + ) log_design_cost(self.conn, self.run_id, design, cost) train_x.append(x_values) train_y.append(cost) @@ -305,24 +443,52 @@ def _generate_initial_data(self, n: int = 30) -> Tuple[torch.Tensor, torch.Tenso train_x = torch.tensor(train_x) train_y = torch.tensor(train_y, dtype=torch.float64).unsqueeze(-1) best_y = train_y.min().item() + return train_x, train_y, best_y - def _update_best_designs(self, best_designs: List[Tuple[LSMDesign, float]], new_x: torch.Tensor, - new_y: torch.Tensor) -> List[Tuple[LSMDesign, float]]: + def _update_best_designs( + self, + best_designs: List[Tuple[LSMDesign, float]], + new_x: torch.Tensor, + new_y: torch.Tensor, + ) -> List[Tuple[LSMDesign, float]]: for x, y in zip(new_x, new_y): if self.model_type == Policy.QFixed: h, size_ratio, qvalue = x[0], x[1], x[2] - best_designs.append((LSMDesign(h=h.item(), T=np.ceil(size_ratio.item()), policy=Policy.QFixed, - Q=qvalue.item()), y.item())) + best_designs.append( + ( + LSMDesign( + h=h.item(), + T=np.ceil(size_ratio.item()), + policy=Policy.QFixed, + Q=qvalue.item(), + ), + y.item(), + ) + ) elif self.model_type == Policy.YZHybrid: h, size_ratio, yvalue, zvalue = x[0], x[1], x[2], x[3] best_designs.append( - (LSMDesign(h=h.item(), T=np.ceil(size_ratio.item()), policy=Policy.YZHybrid, Y=yvalue.item(), - Z=zvalue.item()), y.item())) + ( + LSMDesign( + h=h.item(), + T=np.ceil(size_ratio.item()), + policy=Policy.YZHybrid, + Y=yvalue.item(), + Z=zvalue.item(), + ), + y.item(), + ) + ) elif self.model_type == Policy.KHybrid: h, size_ratio = x[0], x[1] k_values = x[2:].tolist() - design = LSMDesign(h=h.item(), T=np.ceil(size_ratio.item()), policy=Policy.KHybrid, K=k_values) + design = LSMDesign( + h=h.item(), + T=np.ceil(size_ratio.item()), + policy=Policy.KHybrid, + K=k_values, + ) best_designs.append((design, y.item())) else: h, size_ratio, policy = x[0], x[1], x[2] @@ -330,10 +496,17 @@ def _update_best_designs(self, best_designs: List[Tuple[LSMDesign, float]], new_ pol = Policy.Leveling else: pol = Policy.Tiering - best_designs.append((LSMDesign(h=h.item(), T=np.ceil(size_ratio.item()), policy=pol), y.item())) + best_designs.append( + ( + LSMDesign(h=h.item(), T=np.ceil(size_ratio.item()), policy=pol), + y.item(), + ) + ) return best_designs - def _find_analytical_results(self, system: System, workload: Workload, bounds: Optional[LSMBounds] = None) -> Tuple[LSMDesign, float]: + def _find_analytical_results( + self, system: System, workload: Workload, bounds: Optional[LSMBounds] = None + ) -> Tuple[LSMDesign, float]: bounds = bounds if bounds is not None else self.bounds if self.model_type == Policy.Classic: solver = ClassicSolver(bounds) @@ -344,7 +517,9 @@ def _find_analytical_results(self, system: System, workload: Workload, bounds: O elif self.model_type == Policy.KHybrid: solver = KLSMSolver(bounds) z0, z1, q, w = workload.z0, workload.z1, workload.q, workload.w - nominal_design, nominal_solution = solver.get_nominal_design(system, z0, z1, q, w) + nominal_design, nominal_solution = solver.get_nominal_design( + system, z0, z1, q, w + ) if self.model_type == Policy.Classic: x = np.array([[nominal_design.h, nominal_design.T]]) @@ -354,7 +529,16 @@ def _find_analytical_results(self, system: System, workload: Workload, bounds: O x = np.array([[nominal_design.h, nominal_design.T, nominal_design.Q]]) cost = solver.nominal_objective(x[0], system, z0, z1, q, w) elif self.model_type == Policy.YZHybrid: - x = np.array([[nominal_design.h, nominal_design.T, nominal_design.Y, nominal_design.Z]]) + x = np.array( + [ + [ + nominal_design.h, + nominal_design.T, + nominal_design.Y, + nominal_design.Z, + ] + ] + ) cost = solver.nominal_objective(x[0], system, z0, z1, q, w) elif self.model_type == Policy.KHybrid: k_values = nominal_design.K From 84411a04e1b54b851cce3038b912555e13667f7a Mon Sep 17 00:00:00 2001 From: ndhuynh Date: Wed, 13 Mar 2024 13:00:08 -0400 Subject: [PATCH 2/4] [Refactor] Add leveling and tiering specific generators with a selection func --- endure/lcm/data/generator.py | 30 ++++- endure/lsm/data_generator.py | 220 +++++++++++++++++++++++++++++++++ endure/lsm/solver/__init__.py | 20 +++ jobs/bayesian_pipeline.py | 223 +++++++++++++++------------------- jobs/infra/db_log.py | 180 +++++++++++++++++++++------ 5 files changed, 509 insertions(+), 164 deletions(-) create mode 100644 endure/lsm/data_generator.py diff --git a/endure/lcm/data/generator.py b/endure/lcm/data/generator.py index faa599a..e67b180 100644 --- a/endure/lcm/data/generator.py +++ b/endure/lcm/data/generator.py @@ -1,5 +1,5 @@ import random -from typing import List, Optional +from typing import List, Optional, Type from itertools import combinations_with_replacement import numpy as np @@ -188,6 +188,18 @@ def _gen_row_data(self) -> list: return line +class TieringGenerator(ClassicGenerator): + def __init__(self, bounds: LSMBounds, **kwargs): + super().__init__(bounds, **kwargs) + self.policies = [Policy.Tiering] + + +class LevelingGenerator(ClassicGenerator): + def __init__(self, bounds: LSMBounds, **kwargs): + super().__init__(bounds, **kwargs) + self.policies = [Policy.Leveling] + + class KHybridGenerator(LCMDataGenerator): def __init__(self, bounds: LSMBounds, **kwargs): super().__init__(bounds, **kwargs) @@ -344,3 +356,19 @@ def _gen_row_data(self) -> list: design.Z, ] return line + + +def get_generator(choice: Policy) -> Type[LCMDataGenerator]: + choices = { + Policy.Tiering: TieringGenerator, + Policy.Leveling: LevelingGenerator, + Policy.Classic: ClassicGenerator, + Policy.QFixed: QCostGenerator, + Policy.YZHybrid: YZCostGenerator, + Policy.KHybrid: KHybridGenerator, + } + generator = choices.get(choice, None) + if generator is None: + raise KeyError + + return generator diff --git a/endure/lsm/data_generator.py b/endure/lsm/data_generator.py new file mode 100644 index 0000000..23e2b8c --- /dev/null +++ b/endure/lsm/data_generator.py @@ -0,0 +1,220 @@ +import random +from typing import List, Optional, override +from itertools import combinations_with_replacement + +import numpy as np + +from endure.lsm.types import LSMDesign, System, Policy, LSMBounds, Workload +from endure.lsm.cost import EndureCost + + +class LSMDataGenerator: + # Memory budget to prevent bits_per_elem from hitting too close to max, and + # always ensuring write_buffer > 0 + MEM_EPSILON = 0.1 + + def __init__( + self, + bounds: LSMBounds, + precision: int = 3, + ) -> None: + self.precision = precision + self.bounds = bounds + self.max_levels = bounds.max_considered_levels + self.cf = EndureCost(max_levels=bounds.max_considered_levels) + + def _sample_size_ratio(self) -> int: + low, high = self.bounds.size_ratio_range + return np.random.randint(low=low, high=high) + + def _sample_bloom_filter_bits(self, max: Optional[float] = None) -> float: + if max is None: + max = self.bounds.bits_per_elem_range[1] + min = self.bounds.bits_per_elem_range[0] + sample = (max - min) * np.random.rand() + min + return np.around(sample, self.precision) + + # TODO: Will want to configure environment to simulate larger ranges over + # potential system values + def _sample_entry_per_page(self, entry_size: int = 8192) -> int: + # Potential page sizes are 4KB, 8KB, 16KB + KB_TO_BITS = 8 * 1024 + page_sizes = np.array(self.bounds.page_sizes) + entries_per_page = (page_sizes * KB_TO_BITS) / entry_size + return np.random.choice(entries_per_page) + + def _sample_selectivity(self) -> float: + low, high = self.bounds.selectivity_range + return (high - low) * np.random.rand() + low + + def _sample_entry_size(self) -> int: + return np.random.choice(self.bounds.entry_sizes) + + def _sample_memory_budget(self) -> float: + low, high = self.bounds.memory_budget_range + return (high - low) * np.random.rand() + low + + def _sample_total_elements(self) -> int: + low, high = self.bounds.elements_range + return np.random.randint(low=low, high=high) + + def sample_system(self) -> System: + E = self._sample_entry_size() + B = self._sample_entry_per_page(entry_size=E) + s = self._sample_selectivity() + H = self._sample_memory_budget() + N = self._sample_total_elements() + system = System(E, s, B, N, H) + + return system + + def sample_design( + self, + system: System, + ) -> LSMDesign: + h = self._sample_bloom_filter_bits(max=(system.H - self.MEM_EPSILON)) + T = self._sample_size_ratio() + lsm = LSMDesign(h, T) + + return lsm + + def sample_workload(self, dimensions: int) -> Workload: + # See stackoverflow thread for why the simple solution is not uniform + # https://stackoverflow.com/questions/8064629 + workload = np.around(np.random.rand(dimensions - 1), self.precision) + workload = np.concatenate((workload, np.array([0, 1]))) + workload = np.sort(workload) + + workload = [b - a for a, b in zip(workload, workload[1:])] + return Workload(*workload) + + +class TieringGenerator(LSMDataGenerator): + def __init__( + self, + bounds: LSMBounds, + policies: List[Policy] = [Policy.Tiering, Policy.Leveling], + **kwargs, + ): + super().__init__(bounds, **kwargs) + self.policies = policies + + @override + def sample_design( + self, + system: System, + ) -> LSMDesign: + h = self._sample_bloom_filter_bits(max=(system.H - self.MEM_EPSILON)) + T = self._sample_size_ratio() + lsm = LSMDesign(h, T, policy=Policy.Tiering) + + return lsm + + +class LevelingGenerator(LSMDataGenerator): + def __init__( + self, + bounds: LSMBounds, + policies: List[Policy] = [Policy.Tiering, Policy.Leveling], + **kwargs, + ): + super().__init__(bounds, **kwargs) + self.policies = policies + + @override + def sample_design( + self, + system: System, + ) -> LSMDesign: + h = self._sample_bloom_filter_bits(max=(system.H - self.MEM_EPSILON)) + T = self._sample_size_ratio() + lsm = LSMDesign(h, T, policy=Policy.Leveling) + + return lsm + + +class ClassicGenerator(LSMDataGenerator): + def __init__( + self, + bounds: LSMBounds, + **kwargs, + ): + super().__init__(bounds, **kwargs) + + @override + def sample_design( + self, + system: System, + ) -> LSMDesign: + h = self._sample_bloom_filter_bits(max=(system.H - self.MEM_EPSILON)) + T = self._sample_size_ratio() + policy = random.choice((Policy.Tiering, Policy.Leveling)) + lsm = LSMDesign(h=h, T=T, policy=policy) + + return lsm + + +class KHybridGenerator(LSMDataGenerator): + def __init__(self, bounds: LSMBounds, **kwargs): + super().__init__(bounds, **kwargs) + + def _gen_k_levels(self, levels: int, max_size_ratio: int) -> list: + arr = combinations_with_replacement(range(max_size_ratio, 0, -1), levels) + + return list(arr) + + @override + def sample_design(self, system: System) -> LSMDesign: + design = super().sample_design(system) + h = design.h + T = design.T + levels = int(self.cf.L(design, system, ceil=True)) + k = np.random.randint(low=1, high=int(T), size=(levels)) + remaining = np.ones(self.max_levels - len(k)) + k = np.concatenate([k, remaining]) + design = LSMDesign(h=h, T=T, policy=Policy.KHybrid, K=k.tolist()) + + return design + + +class QCostGenerator(LSMDataGenerator): + def __init__(self, bounds: LSMBounds, **kwargs): + super().__init__(bounds, **kwargs) + + def _sample_q(self, max_size_ratio: int) -> int: + return np.random.randint( + low=self.bounds.size_ratio_range[0] - 1, + high=max_size_ratio, + ) + + @override + def sample_design(self, system: System) -> LSMDesign: + design = super().sample_design(system) + h = design.h + T = design.T + Q = self._sample_q(int(T)) + design = LSMDesign(h=h, T=T, policy=Policy.QFixed, Q=Q) + + return design + + +class YZCostGenerator(LSMDataGenerator): + def __init__(self, bounds: LSMBounds, **kwargs): + super().__init__(bounds, **kwargs) + + def _sample_capacity(self, max_size_ratio: int) -> int: + return np.random.randint( + low=self.bounds.size_ratio_range[0] - 1, + high=max_size_ratio, + ) + + @override + def sample_design(self, system: System) -> LSMDesign: + design = super().sample_design(system) + h = design.h + T = design.T + Y = self._sample_capacity(int(T)) + Z = self._sample_capacity(int(T)) + design = LSMDesign(h=h, T=T, policy=Policy.YZHybrid, Y=Y, Z=Z) + + return design diff --git a/endure/lsm/solver/__init__.py b/endure/lsm/solver/__init__.py index 813da9a..af148e6 100644 --- a/endure/lsm/solver/__init__.py +++ b/endure/lsm/solver/__init__.py @@ -1,4 +1,24 @@ +from typing import Type +from endure.lsm.types import Policy from .classic_solver import ClassicSolver from .qlsm_solver import QLSMSolver from .klsm_solver import KLSMSolver from .yzlsm_solver import YZLSMSolver + + +def get_solver( + choice: Policy, +) -> Type[ClassicSolver | QLSMSolver | KLSMSolver | YZLSMSolver]: + choices = { + Policy.Tiering: ClassicSolver, + Policy.Leveling: ClassicSolver, + Policy.Classic: ClassicSolver, + Policy.QFixed: QLSMSolver, + Policy.YZHybrid: YZLSMSolver, + Policy.KHybrid: KLSMSolver, + } + solver = choices.get(choice, None) + if solver is None: + raise KeyError + + return solver diff --git a/jobs/bayesian_pipeline.py b/jobs/bayesian_pipeline.py index 931db3a..985c949 100644 --- a/jobs/bayesian_pipeline.py +++ b/jobs/bayesian_pipeline.py @@ -13,26 +13,13 @@ from botorch.acquisition.monte_carlo import qExpectedImprovement from botorch.optim import optimize_acqf_mixed from botorch.models.transforms import Normalize, Standardize +from torch.types import Number -from endure.lsm.cost import EndureCost from endure.data.io import Reader -from endure.lsm.types import ( - LSMDesign, - System, - Policy, - Workload, - LSMBounds, -) -from endure.lcm.data.generator import ( - ClassicGenerator, - QCostGenerator, - YZCostGenerator, - KHybridGenerator, -) -from endure.lsm.solver.classic_solver import ClassicSolver -from endure.lsm.solver.qlsm_solver import QLSMSolver -from endure.lsm.solver.yzlsm_solver import YZLSMSolver -from endure.lsm.solver.klsm_solver import KLSMSolver +from endure.lsm.cost import EndureCost +from endure.lsm.types import LSMDesign, System, Policy, Workload, LSMBounds +import endure.lcm.data.generator as Gen +import endure.lsm.solver as Solver from jobs.infra.db_log import ( initialize_database, log_new_run, @@ -48,28 +35,32 @@ def print_best_designs(best_designs: List[Tuple[LSMDesign, float]]) -> None: if design.policy == Policy.KHybrid: k_values_str = ", ".join(str(k) for k in design.K) print( - f"Design: h={design.h}, T={design.T}, Policy={design.policy}, K=[{k_values_str}], Cost={cost}" + f"Design: h={design.h}, T={design.T}, " + f"Policy={design.policy}, K=[{k_values_str}], " + f"Cost={cost}" ) else: print( - f"Design: h={design.h}, T={design.T}, Policy={design.policy}, Q={design.Q}, Y={design.Y}," + f"Design: h={design.h}, T={design.T}, " + f"Policy={design.policy}, Q={design.Q}, Y={design.Y}," f" Z={design.Z}, Cost={cost}" ) with open("best_designs.txt", "w") as file: file.write("All Best Designs Found:\n") for design, cost in best_designs: file.write( - f"Design: h={design.h}, T={design.T}, Policy={design.policy}, Q={design.Q}, " + f"Design: h={design.h}, T={design.T}, " + f"Policy={design.policy}, Q={design.Q}, " f"Y={design.Y}, Z={design.Z}, Cost={cost}\n" ) class BayesianPipeline: def __init__(self, config: dict) -> None: - self.end_time = None - self.start_time = None - self.conn = None + self.end_time: float = 0 + self.start_time: float = 0 self.run_id: int = 0 + self.conn = None self.log: logging.Logger = logging.getLogger(config["log"]["name"]) jconfig: dict = config["job"]["BayesianOptimization"] @@ -91,7 +82,7 @@ def __init__(self, config: dict) -> None: jconfig["database"]["db_path"], ) self.db_path = os.path.join(self.output_dir, jconfig["database"]["db_name"]) - model_type_str = self.jconfig.get("model_type", "Classic") + model_type_str = jconfig.get("model_type", "Classic") self.model_type = getattr(Policy, model_type_str) self.num_k_values = jconfig["num_k_values"] @@ -121,7 +112,7 @@ def initialize_environment( num_iterations: Optional[int], sample_size: Optional[int], acqf: Optional[str], - ): + ) -> None: os.makedirs(self.output_dir, exist_ok=True) self.conn = initialize_database(self.db_path) self.system = system if system is not None else self.system @@ -135,6 +126,7 @@ def initialize_environment( self.num_iterations = ( num_iterations if num_iterations is not None else self.num_iterations ) + assert self.conn is not None self.run_id = log_new_run( self.conn, self.system, @@ -178,10 +170,13 @@ def generate_initial_bounds(self, system: System) -> torch.Tensor: return bounds - def optimization_loop(self, train_x, train_y, best_y): + def optimization_loop( + self, train_x: torch.Tensor, train_y: torch.Tensor, best_y: Number + ) -> List[Tuple[LSMDesign, Number]]: bounds = self.generate_initial_bounds(self.system) fixed_feature_list = self._initialize_feature_list(bounds) best_designs = [] + for i in range(self.num_iterations): new_candidates = self.get_next_points( train_x, @@ -198,9 +193,10 @@ def optimization_loop(self, train_x, train_y, best_y): ) self.log.debug(f"Iteration {i + 1}/{self.num_iterations} complete") self.log.debug("Bayesian Optimization completed") + return best_designs - def _initialize_feature_list(self, bounds): + def _initialize_feature_list(self, bounds: torch.Tensor) -> List: t_bounds = bounds[:, 1] lower_t_bound = int(np.floor(t_bounds[0].item())) upper_t_bound = int(np.ceil(t_bounds[1].item())) @@ -230,7 +226,9 @@ def _initialize_feature_list(self, bounds): return fixed_features_list - def evaluate_new_candidates(self, new_candidates): + def evaluate_new_candidates( + self, new_candidates: torch.Tensor + ) -> Tuple[List[LSMDesign], List[float]]: new_designs = self.create_designs_from_candidates(new_candidates) costs = [ @@ -244,6 +242,7 @@ def evaluate_new_candidates(self, new_candidates): ) for design in new_designs ] + assert self.conn is not None for design, cost in zip(new_designs, costs): log_design_cost(self.conn, self.run_id, design, cost) @@ -251,7 +250,7 @@ def evaluate_new_candidates(self, new_candidates): def update_training_data( self, train_x, train_y, new_candidates, costs, best_designs - ): + ) -> Tuple[torch.Tensor, torch.Tensor, Number, List[Tuple[LSMDesign, Number]]]: new_target = torch.tensor(costs).unsqueeze(-1) train_x = torch.cat([train_x, new_candidates]) train_y = torch.cat([train_y, new_target]) @@ -259,15 +258,19 @@ def update_training_data( best_designs = self._update_best_designs( best_designs, new_candidates, new_target ) + return train_x, train_y, best_y, best_designs - def create_designs_from_candidates(self, candidates): + def create_designs_from_candidates( + self, candidates: torch.Tensor + ) -> List[LSMDesign]: + new_designs = [] for candidate in candidates: - new_designs = self._generate_new_designs_helper(candidate) + new_designs += self._generate_new_designs_helper(candidate) return new_designs - def _generate_new_designs_helper(self, candidate): + def _generate_new_designs_helper(self, candidate: torch.Tensor) -> List[LSMDesign]: new_designs = [] h = candidate[0].item() if h == self.system.H: @@ -324,6 +327,7 @@ def finalize_optimization(self, best_designs): self.system, self.workload ) best_design, best_cost = sorted_designs[0][0], sorted_designs[0][1] + assert self.conn is not None log_run_details( self.conn, self.run_id, @@ -334,10 +338,11 @@ def finalize_optimization(self, best_designs): best_design, ) self.conn.close() + if sorted_designs: return best_design, best_cost, elapsed_time - else: - return None, None, elapsed_time + + return None, None, elapsed_time def get_next_points( self, @@ -393,6 +398,7 @@ def get_next_points( acqf = qExpectedImprovement(model=single_model, best_f=-best_y) else: raise ValueError(f"Unknown acquisition function: {acquisition_function}") + candidates, _ = optimize_acqf_mixed( acq_function=acqf, bounds=bounds, @@ -403,31 +409,29 @@ def get_next_points( ) return candidates - def _generate_initial_data(self, n: int = 30) -> Tuple[torch.Tensor, torch.Tensor]: + def _generate_initial_data( + self, n: int = 30 + ) -> Tuple[torch.Tensor, torch.Tensor, Number]: train_x = [] train_y = [] - if self.model_type == Policy.QFixed: - generator = QCostGenerator(self.bounds) - elif self.model_type == Policy.YZHybrid: - generator = YZCostGenerator(self.bounds) - elif self.model_type == Policy.KHybrid: - generator = KHybridGenerator(self.bounds) - else: - generator = ClassicGenerator(self.bounds) + + generator_class = Gen.get_generator(self.model_type) + generator = generator_class(self.bounds) + for _ in range(n): design = generator._sample_design(self.system) + x_vals = np.array([design.h, design.T]) if self.model_type == Policy.Classic: policy = 0 if design.policy == Policy.Tiering else 1 - x_values = np.array([design.h, design.T, policy]) + x_vals = np.concatenate((x_vals, [policy])) elif self.model_type == Policy.QFixed: - x_values = np.array([design.h, design.T, design.Q]) + x_vals = np.concatenate((x_vals, [design.Q])) elif self.model_type == Policy.YZHybrid: - x_values = np.array([design.h, design.T, design.Y, design.Z]) + x_vals = np.array((x_vals, [design.Y, design.Z])) elif self.model_type == Policy.KHybrid: - k_values_padded = (design.K + [1] * self.num_k_values)[ - : self.num_k_values - ] - x_values = np.array([design.h, design.T] + k_values_padded) + k_values_padded = design.K + [1] * self.num_k_values + k_values_padded = k_values_padded[: self.num_k_values] + x_vals = np.concatenate((x_vals, k_values_padded)) cost = self.cf.calc_cost( design, self.system, @@ -436,9 +440,11 @@ def _generate_initial_data(self, n: int = 30) -> Tuple[torch.Tensor, torch.Tenso self.workload.q, self.workload.w, ) + assert self.conn is not None log_design_cost(self.conn, self.run_id, design, cost) - train_x.append(x_values) + train_x.append(x_vals) train_y.append(cost) + train_x = np.array(train_x) train_x = torch.tensor(train_x) train_y = torch.tensor(train_y, dtype=torch.float64).unsqueeze(-1) @@ -453,55 +459,23 @@ def _update_best_designs( new_y: torch.Tensor, ) -> List[Tuple[LSMDesign, float]]: for x, y in zip(new_x, new_y): + kwargs = { + "h": x[0].item(), + "T": np.ceil(x[1].item()), + "policy": self.model_type, + } if self.model_type == Policy.QFixed: - h, size_ratio, qvalue = x[0], x[1], x[2] - best_designs.append( - ( - LSMDesign( - h=h.item(), - T=np.ceil(size_ratio.item()), - policy=Policy.QFixed, - Q=qvalue.item(), - ), - y.item(), - ) - ) + kwargs["Q"] = x[2].item() elif self.model_type == Policy.YZHybrid: - h, size_ratio, yvalue, zvalue = x[0], x[1], x[2], x[3] - best_designs.append( - ( - LSMDesign( - h=h.item(), - T=np.ceil(size_ratio.item()), - policy=Policy.YZHybrid, - Y=yvalue.item(), - Z=zvalue.item(), - ), - y.item(), - ) - ) + kwargs["Y"] = x[2].item() + kwargs["Z"] = x[3].item() elif self.model_type == Policy.KHybrid: - h, size_ratio = x[0], x[1] - k_values = x[2:].tolist() - design = LSMDesign( - h=h.item(), - T=np.ceil(size_ratio.item()), - policy=Policy.KHybrid, - K=k_values, - ) - best_designs.append((design, y.item())) - else: - h, size_ratio, policy = x[0], x[1], x[2] - if policy.item() < 0.5: - pol = Policy.Leveling - else: - pol = Policy.Tiering - best_designs.append( - ( - LSMDesign(h=h.item(), T=np.ceil(size_ratio.item()), policy=pol), - y.item(), - ) - ) + kwargs["K"] = x[2:].tolist() + else: # self.model_type == Policy.Classic + pol = Policy.Leveling if x[2].item() < 0.5 else Policy.Tiering + kwargs["policy"] = pol + best_designs.append((LSMDesign(**kwargs), y.item())) + return best_designs def _find_analytical_results( @@ -509,44 +483,43 @@ def _find_analytical_results( ) -> Tuple[LSMDesign, float]: bounds = bounds if bounds is not None else self.bounds if self.model_type == Policy.Classic: - solver = ClassicSolver(bounds) + solver = Solver.ClassicSolver(bounds) elif self.model_type == Policy.QFixed: - solver = QLSMSolver(bounds) + solver = Solver.QLSMSolver(bounds) elif self.model_type == Policy.YZHybrid: - solver = YZLSMSolver(bounds) + solver = Solver.YZLSMSolver(bounds) elif self.model_type == Policy.KHybrid: - solver = KLSMSolver(bounds) + solver = Solver.KLSMSolver(bounds) + else: + raise KeyError(f"Solver for {self.model_type} not implemented") + z0, z1, q, w = workload.z0, workload.z1, workload.q, workload.w - nominal_design, nominal_solution = solver.get_nominal_design( - system, z0, z1, q, w - ) + opt_design, _ = solver.get_nominal_design(system, z0, z1, q, w) if self.model_type == Policy.Classic: - x = np.array([[nominal_design.h, nominal_design.T]]) - policy = nominal_design.policy - cost = solver.nominal_objective(x[0], policy, system, z0, z1, q, w) + x = np.array([opt_design.h, opt_design.T]) + policy = opt_design.policy + assert isinstance(solver, Solver.ClassicSolver) + cost = solver.nominal_objective(x, policy, system, z0, z1, q, w) elif self.model_type == Policy.QFixed: - x = np.array([[nominal_design.h, nominal_design.T, nominal_design.Q]]) - cost = solver.nominal_objective(x[0], system, z0, z1, q, w) + x = np.array([opt_design.h, opt_design.T, opt_design.Q]) + assert isinstance(solver, Solver.QLSMSolver) + cost = solver.nominal_objective(x, system, z0, z1, q, w) elif self.model_type == Policy.YZHybrid: - x = np.array( - [ - [ - nominal_design.h, - nominal_design.T, - nominal_design.Y, - nominal_design.Z, - ] - ] - ) - cost = solver.nominal_objective(x[0], system, z0, z1, q, w) + x = np.array([opt_design.h, opt_design.T, opt_design.Y, opt_design.Z]) + assert isinstance(solver, Solver.YZLSMSolver) + cost = solver.nominal_objective(x, system, z0, z1, q, w) elif self.model_type == Policy.KHybrid: - k_values = nominal_design.K - x = np.array([nominal_design.h, nominal_design.T] + k_values) + x = np.array([opt_design.h, opt_design.T] + opt_design.K) + assert isinstance(solver, Solver.KLSMSolver) cost = solver.nominal_objective(x, system, z0, z1, q, w) + else: + raise KeyError(f"Unknown model type {self.model_type}") + print("Cost for the nominal design using analytical solver: ", cost) - print("Nominal Design suggested by analytical solver: ", nominal_design) - return nominal_design, cost + print("Nominal Design suggested by analytical solver: ", opt_design) + + return opt_design, cost if __name__ == "__main__": diff --git a/jobs/infra/db_log.py b/jobs/infra/db_log.py index 73b3161..ff1b6c5 100644 --- a/jobs/infra/db_log.py +++ b/jobs/infra/db_log.py @@ -1,12 +1,13 @@ import sqlite3 from endure.data.io import Reader -from endure.lsm.types import LSMDesign, Policy +from endure.lsm.types import LSMDesign, Policy, System, Workload -def initialize_database(db_path='cost_log.db'): +def initialize_database(db_path: str = "cost_log.db") -> sqlite3.Connection: connector = sqlite3.connect(db_path) cursor = connector.cursor() - cursor.execute(''' + cursor.execute( + """ CREATE TABLE IF NOT EXISTS runs ( run_id INTEGER PRIMARY KEY AUTOINCREMENT, empty_reads REAL, @@ -22,8 +23,10 @@ def initialize_database(db_path='cost_log.db'): iterations INT, sample_size INT, acquisition_function TEXT - );''') - cursor.execute(''' + );""" + ) + cursor.execute( + """ CREATE TABLE IF NOT EXISTS design_costs ( idx INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER, @@ -39,8 +42,10 @@ def initialize_database(db_path='cost_log.db'): k16 REAL, k17 REAL, k18 REAL, k19 REAL, k20 REAL, cost REAL, FOREIGN KEY (run_id) REFERENCES runs(run_id) - );''') - cursor.execute(''' + );""" + ) + cursor.execute( + """ CREATE TABLE IF NOT EXISTS run_details ( run_id INTEGER PRIMARY KEY, duration_secs INTEGER, @@ -59,8 +64,10 @@ def initialize_database(db_path='cost_log.db'): bayesian_Y INTEGER, bayesian_Z INTEGER, FOREIGN KEY (run_id) REFERENCES runs(run_id) - );''') - cursor.execute(''' + );""" + ) + cursor.execute( + """ CREATE TABLE IF NOT EXISTS run_details_k_values ( idx INTEGER PRIMARY KEY AUTOINCREMENT, run_id INTEGER, @@ -69,56 +76,153 @@ def initialize_database(db_path='cost_log.db'): k11 REAL, k12 REAL, k13 REAL, k14 REAL, k15 REAL, k16 REAL, k17 REAL, k18 REAL, k19 REAL, k20 REAL, FOREIGN KEY (run_id) REFERENCES runs(run_id) - );''') + );""" + ) connector.commit() + return connector -def log_new_run(connector, system, workload, iterations, sample, acqf): +def log_new_run( + connector: sqlite3.Connection, + system: System, + workload: Workload, + iterations: int, + sample: int, + acqf: str, +) -> int: cursor = connector.cursor() - cursor.execute('INSERT INTO runs (empty_reads, non_empty_reads, range_queries, writes, ' - 'max_bits_per_element, physical_entries_per_page, range_selectivity, ' - 'entries_per_page, total_elements, read_write_asymmetry, iterations, sample_size, ' - 'acquisition_function) ' - 'VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)', - (workload.z0, workload.z1, workload.q, workload.w, system.H, system.E, system.s, system.B, - system.N, system.phi, iterations, sample, acqf)) + cursor.execute( + """ + INSERT INTO runs ( + empty_reads, + non_empty_reads, + range_queries, + writes, + max_bits_per_element, + physical_entries_per_page, + range_selectivity, + entries_per_page, + total_elements, + read_write_asymmetry, + iterations, + sample_size, + acquisition_function + ) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + ( + workload.z0, + workload.z1, + workload.q, + workload.w, + system.H, + system.E, + system.s, + system.B, + system.N, + system.phi, + iterations, + sample, + acqf, + ), + ) connector.commit() + + assert cursor.lastrowid is not None return cursor.lastrowid -def log_design_cost(connector, run_id, design, cost): +def log_design_cost( + connector: sqlite3.Connection, + run_id: int, + design: LSMDesign, + cost: float, +) -> None: cursor = connector.cursor() policy = design.policy - k_values = design.K + [None] * (20 - len(design.K)) # TODO replace this with the max_levels - sql_command = ('INSERT INTO design_costs (run_id, bits_per_element, size_ratio, policy, Q, Y, Z, cost, ' + - ', '.join([f'k{i+1}' for i in range(20)]) + ') ' + 'VALUES (?, ?, ?, ?, ?, ?, ?, ?'', ' + - ', '.join(['?']*20) + ')') - cursor.execute(sql_command, (run_id, design.h, design.T, policy.name, design.Q, design.Y, design.Z, cost) + - tuple(k_values)) + k_values = design.K + [None] * ( + 20 - len(design.K) + ) # TODO replace this with the max_levels + sql_command = ( + "INSERT INTO design_costs (run_id, bits_per_element, size_ratio, policy, Q, Y, Z, cost, " + + ", ".join([f"k{i+1}" for i in range(20)]) + + ") " + + "VALUES (?, ?, ?, ?, ?, ?, ?, ?" + ", " + ", ".join(["?"] * 20) + ")" + ) + cursor.execute( + sql_command, + (run_id, design.h, design.T, policy.name, design.Q, design.Y, design.Z, cost) + + tuple(k_values), + ) connector.commit() -def log_run_details(connector, run_id, duration, analytical_cost, bayesian_cost, analytical_design, bayesian_design): +def log_run_details( + connector: sqlite3.Connection, + run_id: int, + duration: float, + analytical_cost: float, + bayesian_cost: float, + analytical_design: LSMDesign, + bayesian_design: LSMDesign, +) -> None: cursor = connector.cursor() analytical_policy = analytical_design.policy print(analytical_policy) bayesian_policy = bayesian_design.policy - cursor.execute(''' - INSERT INTO run_details (run_id, duration_secs, analytical_cost, bayesian_cost, analytical_h, analytical_T, - analytical_policy, analytical_Q, analytical_Y, analytical_Z, bayesian_h, bayesian_T, bayesian_policy, bayesian_Q, - bayesian_Y, bayesian_Z) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', - (run_id, duration, analytical_cost, bayesian_cost, analytical_design.h, analytical_design.T, - analytical_policy.name, analytical_design.Q, analytical_design.Y, analytical_design.Z, - bayesian_design.h, bayesian_design.T, bayesian_policy.name, bayesian_design.Q, - bayesian_design.Y, bayesian_design.Z)) + cursor.execute( + """ + INSERT INTO run_details ( + run_id, + duration_secs, + analytical_cost, + bayesian_cost, + analytical_h, + analytical_T, + analytical_policy, + analytical_Q, + analytical_Y, + analytical_Z, + bayesian_h, + bayesian_T, + bayesian_policy, + bayesian_Q, + bayesian_Y, + bayesian_Z + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)""", + ( + run_id, + duration, + analytical_cost, + bayesian_cost, + analytical_design.h, + analytical_design.T, + analytical_policy.name, + analytical_design.Q, + analytical_design.Y, + analytical_design.Z, + bayesian_design.h, + bayesian_design.T, + bayesian_policy.name, + bayesian_design.Q, + bayesian_design.Y, + bayesian_design.Z, + ), + ) if bayesian_policy == Policy.KHybrid: k_values = bayesian_design.K + [None] * (20 - len(bayesian_design.K)) - cursor.execute(''' - INSERT INTO run_details_k_values (run_id, k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, k11, k12, k13, k14, k15, - k16, k17, k18, k19, k20) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)''', - (run_id,) + tuple(k_values)) + cursor.execute( + """ + INSERT INTO run_details_k_values ( + run_id, k1, k2, k3, k4, k5, k6, k7, k8, k9, k10, + k11, k12, k13, k14, k15, k16, k17, k18, k19, k20 + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + """, + (run_id,) + tuple(k_values), + ) connector.commit() From 0cd84331af33202453b54bc15fb48e94bf9ec7b7 Mon Sep 17 00:00:00 2001 From: ndhuynh Date: Fri, 22 Mar 2024 12:38:33 -0400 Subject: [PATCH 3/4] [Linter] Running black on files --- jobs/bayesian_pipeline.py | 45 ++++++++++++----------- jobs/infra/bo_job_runs.py | 71 ++++++++++++++++++++++++++++--------- jobs/lcm_hyperparam_tune.py | 2 +- 3 files changed, 77 insertions(+), 41 deletions(-) diff --git a/jobs/bayesian_pipeline.py b/jobs/bayesian_pipeline.py index 985c949..c20dc8f 100644 --- a/jobs/bayesian_pipeline.py +++ b/jobs/bayesian_pipeline.py @@ -67,23 +67,18 @@ def __init__(self, config: dict) -> None: self.bounds: LSMBounds = LSMBounds(**config["lsm"]["bounds"]) self.cf: EndureCost = EndureCost(self.bounds.max_considered_levels) - self.system: System = System(**jconfig["system"]) - self.workload: Workload = Workload(**jconfig["workload"]) + self.system: System = System(**config["lsm"]["system"]) + self.workload: Workload = Workload(**config["lsm"]["workload"]) self.initial_samples: int = jconfig["initial_samples"] self.acquisition_function: str = jconfig["acquisition_function"] - self.q: int = jconfig["batch_size"] self.num_restarts: int = jconfig["num_restarts"] - self.raw_samples: int = jconfig["raw_samples"] self.num_iterations: int = jconfig["num_iterations"] - self.beta_value: float = jconfig["beta_value"] - self.write_to_db = jconfig["database"]["write_to_db"] self.output_dir = os.path.join( jconfig["database"]["data_dir"], jconfig["database"]["db_path"], ) self.db_path = os.path.join(self.output_dir, jconfig["database"]["db_name"]) - model_type_str = jconfig.get("model_type", "Classic") - self.model_type = getattr(Policy, model_type_str) + self.model_type = getattr(Policy, config["lsm"]["design"]) self.num_k_values = jconfig["num_k_values"] self.config: dict = config @@ -96,7 +91,7 @@ def run( num_iterations: Optional[int] = None, sample_size: Optional[int] = None, acqf: Optional[str] = None, - ) -> Tuple[Optional[LSMDesign], Optional[float]]: + ) -> Tuple[LSMDesign, float]: self.start_time = time.time() self.initialize_environment(system, workload, num_iterations, sample_size, acqf) train_x, train_y, best_y = self._generate_initial_data(self.initial_samples) @@ -171,13 +166,18 @@ def generate_initial_bounds(self, system: System) -> torch.Tensor: return bounds def optimization_loop( - self, train_x: torch.Tensor, train_y: torch.Tensor, best_y: Number - ) -> List[Tuple[LSMDesign, Number]]: + self, + train_x: torch.Tensor, + train_y: torch.Tensor, + best_y: Number, + ) -> list[tuple[LSMDesign, Number]]: bounds = self.generate_initial_bounds(self.system) fixed_feature_list = self._initialize_feature_list(bounds) best_designs = [] + self.log.debug(f"{best_y=}") - for i in range(self.num_iterations): + epochs = self.num_iterations + for i in range(epochs): new_candidates = self.get_next_points( train_x, train_y, @@ -187,11 +187,12 @@ def optimization_loop( self.acquisition_function, 1, ) + self.log.debug(f"[it {i + 1}/{epochs}] {new_candidates=}") _, costs = self.evaluate_new_candidates(new_candidates) train_x, train_y, best_y, best_designs = self.update_training_data( train_x, train_y, new_candidates, costs, best_designs ) - self.log.debug(f"Iteration {i + 1}/{self.num_iterations} complete") + self.log.debug(f"[it {i + 1}/{epochs}] {costs=}") self.log.debug("Bayesian Optimization completed") return best_designs @@ -339,10 +340,7 @@ def finalize_optimization(self, best_designs): ) self.conn.close() - if sorted_designs: - return best_design, best_cost, elapsed_time - - return None, None, elapsed_time + return best_design, best_cost, elapsed_time def get_next_points( self, @@ -371,10 +369,11 @@ def get_next_points( outcome_transform=Standardize(m=1), ) elif self.model_type == Policy.KHybrid: - # the self.num_k_values represents the number of categorical values the model - # is predicting out of the self.max_levels. The +2 is because this is the list of indices - # and the first 2 indices represent the 'h' value and then the 'T'value. So everything from index 1 - # till the size of num_k_values + 2 is a categorical value + # the self.num_k_values represents the number of categorical values + # the model is predicting out of the self.max_levels. The +2 is + # because this is the list of indices and the first 2 indices + # represent the 'h' value and then the 'T'value. So everything from + # index 1 till the size of num_k_values + 2 is a categorical value cat_dims = list(range(1, self.num_k_values + 2)) single_model = MixedSingleTaskGP( x, @@ -392,7 +391,7 @@ def get_next_points( model=single_model, best_f=best_y, maximize=False ) elif acquisition_function == "UpperConfidenceBound": - beta = self.beta_value + beta = self.jconfig["beta_value"] acqf = UpperConfidenceBound(model=single_model, beta=beta, maximize=False) elif acquisition_function == "qExpectedImprovement": acqf = qExpectedImprovement(model=single_model, best_f=-best_y) @@ -404,7 +403,7 @@ def get_next_points( bounds=bounds, q=n_points, num_restarts=self.num_restarts, - raw_samples=self.raw_samples, + raw_samples=self.jconfig["raw_samples"], fixed_features_list=fixed_features_list, ) return candidates diff --git a/jobs/infra/bo_job_runs.py b/jobs/infra/bo_job_runs.py index 467f096..978f083 100644 --- a/jobs/infra/bo_job_runs.py +++ b/jobs/infra/bo_job_runs.py @@ -4,7 +4,7 @@ import toml import torch -sys.path.append(os.path.join(sys.path[0], '../../')) +sys.path.append(os.path.join(sys.path[0], "../../")) from endure.lsm.types import LSMBounds, Workload from endure.lcm.data.generator import LCMDataGenerator @@ -25,29 +25,45 @@ def to_cuda(obj, seen=None): seen.add(obj_id) for attr_name in dir(obj): - if attr_name.startswith('__'): + if attr_name.startswith("__"): continue try: attr_value = getattr(obj, attr_name) if isinstance(attr_value, torch.Tensor): setattr(obj, attr_name, attr_value.to(device)) - elif hasattr(attr_value, '__dict__') or isinstance(attr_value, (list, dict)): + elif hasattr(attr_value, "__dict__") or isinstance( + attr_value, (list, dict) + ): to_cuda(attr_value, seen) - except Exception as e: + except Exception: pass else: print("CUDA not available") -def compare_designs(n_runs=100, csv_filename='yz_design_comparison.csv'): +def compare_designs(n_runs=100, csv_filename="yz_design_comparison.csv"): """Compare Bayesian and analytical designs.""" - with open(csv_filename, mode='w', newline='') as file: + with open(csv_filename, mode="w", newline="") as file: writer = csv.writer(file) - writer.writerow(['Entries per page(E)', 'Physical Entries per page(B)', 'Selectivity(s)', - 'Max bits per element(H)', 'Total elements (N)', 'Empty Reads', 'Non-Empty Reads', - 'Range Queries', 'Writes', 'BO Design', 'Analytical Design', 'BO Cost', - 'Analytical Cost', 'Diff(Analytical-Bayesian)']) + writer.writerow( + [ + "Entries per page(E)", + "Physical Entries per page(B)", + "Selectivity(s)", + "Max bits per element(H)", + "Total elements (N)", + "Empty Reads", + "Non-Empty Reads", + "Range Queries", + "Writes", + "BO Design", + "Analytical Design", + "BO Cost", + "Analytical Cost", + "Diff(Analytical-Bayesian)", + ] + ) for i in range(n_runs): print(f"Iteration {i + 1}/{n_runs} running") @@ -55,22 +71,43 @@ def compare_designs(n_runs=100, csv_filename='yz_design_comparison.csv'): z0, z1, q, w = generator._sample_workload(4) workload = Workload(z0=z0, z1=z1, q=q, w=w) bo_design, bo_cost = bayesian_optimizer.run(system, workload) - analytical_design, analytical_cost = bayesian_optimizer._find_analytical_results(system, workload) - writer.writerow([system.E, system.B, system.s, system.H, system.N, workload.z0, workload.z1, workload.q, workload.w, - bo_design, analytical_design, bo_cost, analytical_cost, analytical_cost - bo_cost]) + analytical_design, analytical_cost = ( + bayesian_optimizer._find_analytical_results(system, workload) + ) + writer.writerow( + [ + system.E, + system.B, + system.s, + system.H, + system.N, + workload.z0, + workload.z1, + workload.q, + workload.w, + bo_design, + analytical_design, + bo_cost, + analytical_cost, + analytical_cost - bo_cost, + ] + ) if __name__ == "__main__": file_dir = os.path.dirname(__file__) - config_path = os.path.join(file_dir, "BayesianBaseline.toml") + config_path = os.path.join(file_dir, "endure.toml") with open(config_path) as fid: config = toml.load(fid) bayesian_optimizer = BayesianPipeline(config) - bounds = LSMBounds() + bounds = LSMBounds(**config["lsm"]["bounds"]) generator = LCMDataGenerator(bounds) - cf = EndureCost(config) + cf = EndureCost(max_levels=bounds.max_considered_levels) to_cuda(bayesian_optimizer) to_cuda(generator) to_cuda(cf) - compare_designs(config["job"]["BayesianOptimization"]["multi_jobs_number"], config["job"]["BayesianOptimization"]["multi_job_file"]) + compare_designs( + config["job"]["BayesianOptimization"]["multi_jobs_number"], + config["job"]["BayesianOptimization"]["multi_job_file"], + ) diff --git a/jobs/lcm_hyperparam_tune.py b/jobs/lcm_hyperparam_tune.py index 5c9c23e..131eef2 100755 --- a/jobs/lcm_hyperparam_tune.py +++ b/jobs/lcm_hyperparam_tune.py @@ -21,7 +21,7 @@ def build_train(cfg, lsm_design: Policy) -> LCMDataSet: - train_dir = os.path.join( + train_dir: str = os.path.join( cfg["io"]["data_dir"], cfg["job"]["LCMTrain"]["train"]["dir"], ) From cc99ddd6afe353c65f4b86c47f83bb3ec84fb9f2 Mon Sep 17 00:00:00 2001 From: ndhuynh Date: Mon, 22 Apr 2024 11:21:40 -0400 Subject: [PATCH 4/4] [Toml] Add default lsm workload --- endure.py | 11 ++++++----- endure.toml | 37 ++++++++++++++----------------------- 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/endure.py b/endure.py index aad7c9b..6de96f2 100755 --- a/endure.py +++ b/endure.py @@ -1,8 +1,9 @@ #!/usr/bin/env python import logging import os -import toml import sys +import toml +from typing import Any from jobs.lcm_train import LCMTrainJob from jobs.data_gen import DataGenJob @@ -11,13 +12,13 @@ class EndureDriver: - def __init__(self, conf): - self.config = conf + def __init__(self, config: dict[str, Any]) -> None: + self.config = config logging.basicConfig( format=config["log"]["format"], datefmt=config["log"]["datefmt"] ) - self.log = logging.getLogger(config["log"]["name"]) + self.log: logging.Logger = logging.getLogger(config["log"]["name"]) self.log.setLevel(logging.getLevelName(config["log"]["level"])) log_level = logging.getLevelName(self.log.getEffectiveLevel()) self.log.debug(f"Log level: {log_level}") @@ -38,7 +39,7 @@ def run(self): self.log.warn(f"No job associated with {job_name}") continue job = job(config) - job.run() + _ = job.run() self.log.info("All jobs finished, exiting") diff --git a/endure.toml b/endure.toml index b038528..3aac93d 100644 --- a/endure.toml +++ b/endure.toml @@ -58,7 +58,7 @@ data_dir = "/data" # QFixed - Levels 1 -> L = Q # YZHybrid - Levels 1 -> (L-1) = Q, Level L = Z # KHybrid - Each level has own K_i decision -design = 'KHybrid' +design = 'QFixed' [lsm.bounds] max_considered_levels = 20 # Max number of levels to consider @@ -71,12 +71,19 @@ elements_range = [100000000, 1000000000] # element range # Default system values if not generating random systems [lsm.system] -E = 8192 # size of a single entry in bits -s = 2e-7 # range query selectivity, 1 implies the full key range per query -B = 4 # number of physical entries per page -N = 1000000000 # total number of key-val pairs for LSM tree -H = 10 # total memory budget in bits per element -phi = 1 # read/write asymmetry coefficient, 1 implies w/r cost the same +E = 1024 # size of a single entry in bits +s = 1.905581e-8 # range query selectivity, 1 implies the full key range per query +B = 64.0 # number of physical entries per page +N = 522365629 # total number of key-val pairs for LSM tree +H = 5.705814 # total memory budget in bits per element +phi = 1.0 # read/write asymmetry coefficient, 1 implies w/r cost the same + +# Default workload if not generating from random distribution +[lsm.workload] +z0 = 0.063 +z1 = 0.190 +q = 0.545 +w = 0.202 # ============================================================================= # HEADER JOB @@ -191,8 +198,6 @@ batch_size = 1 # [ExpectedImprovement, UpperConfidenceBound, qExpectedImprovement] acquisition_function = "ExpectedImprovement" beta_value = 0.3 -# model_type can take values - "Classic", "QFixed", "YZHybrid", "KHybrid" -model_type = "KHybrid" # determines how many workloads do we want to test using the bayesian pipeline multi_jobs_number = 100 multi_job_file = "design_comparison.csv" @@ -207,20 +212,6 @@ db_path = "yz_databases" # This must be a .db file for code to function. It will create a sqllite database db_name = "yz_db_cost.db" -[job.BayesianOptimization.system] -E = 1024 -s = 1.905581e-8 -B = 64.0 -N = 522365629 -H = 5.705814 -phi = 1.0 - -[job.BayesianOptimization.workload] -z0 = 0.063 -z1 = 0.190 -q = 0.545 -w = 0.202 - # ============================================================================= # HEADER LCM # Add configurations related to learned cost models