Skip to content

Commit

Permalink
added job for mlos bo run
Browse files Browse the repository at this point in the history
  • Loading branch information
aquaorifice committed May 7, 2024
1 parent e057d12 commit 877e5b9
Show file tree
Hide file tree
Showing 4 changed files with 163 additions and 3 deletions.
6 changes: 4 additions & 2 deletions endure.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@
from jobs.lcm_train import LCMTrainJob
from jobs.data_gen import DataGenJob
from jobs.ltune_train import LTuneTrainJob
from jobs.bayesian_pipeline import BayesianPipeline
from jobs.botorch_bo import BayesianPipeline
from jobs.mlos_bo import BayesianPipelineMlos


class EndureDriver:
Expand All @@ -30,7 +31,8 @@ def run(self):
"DataGen": DataGenJob,
"LCMTrain": LCMTrainJob,
"LTuneTrain": LTuneTrainJob,
"BayesianBaseline": BayesianPipeline,
"BayesianPipelineBoTorch": BayesianPipeline,
"BayesianPipelineMLOS": BayesianPipelineMlos,
}
jobs_list = self.config["app"]["run"]
for job_name in jobs_list:
Expand Down
8 changes: 7 additions & 1 deletion endure.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ run = [
# "DataGen",
# "LCMTrain",
# "LTuneTrain",
# "BayesianBaseline"
# "BayesianPipelineBoTorch",
# "BayesianPipelineMLOS"
]

# =============================================================================
Expand Down Expand Up @@ -212,6 +213,11 @@ db_path = "yz_databases"
# This must be a .db file for code to function. It will create a sqllite database
db_name = "yz_db_cost.db"

[job.BayesianOptimization.mlos]
#values can be Flaml, Smac, Random
optimizer = "Smac"
num_runs = 1000
iteration = 50
# =============================================================================
# HEADER LCM
# Add configurations related to learned cost models
Expand Down
File renamed without changes.
152 changes: 152 additions & 0 deletions jobs/mlos_bo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import os
import toml
import logging
import ConfigSpace as CS
import numpy as np
import pandas as pd

import mlos_core.optimizers
from endure.lsm.cost import EndureCost
from endure.lcm.data.generator import KHybridGenerator
from endure.lsm.solver import KLSMSolver
from endure.lsm.types import LSMDesign, System, Policy, Workload, LSMBounds
from endure.data.io import Reader


class BayesianPipelineMlos:
def __init__(self, conf: dict) -> None:
self.end_time: float = 0
self.conf = conf
self.start_time: float = 0
self.run_id: int = 0
self.conn = None
self.log: logging.Logger = logging.getLogger(conf["log"]["name"])
lsm_bounds_config = conf["lsm"]["bounds"]
self.bounds: LSMBounds = LSMBounds(
max_considered_levels=lsm_bounds_config["max_considered_levels"],
bits_per_elem_range=(lsm_bounds_config["memory_budget_range"][0], lsm_bounds_config["memory_budget_range"][1]), # if you decide to use it here
size_ratio_range=(lsm_bounds_config["size_ratio_range"][0], lsm_bounds_config["size_ratio_range"][1]),
page_sizes=tuple(lsm_bounds_config["page_sizes"]),
entry_sizes=tuple(lsm_bounds_config["entry_sizes"]),
memory_budget_range=(lsm_bounds_config["memory_budget_range"][0], lsm_bounds_config["memory_budget_range"][1]),
selectivity_range=(lsm_bounds_config["selectivity_range"][0], lsm_bounds_config["selectivity_range"][1]),
elements_range=(lsm_bounds_config["elements_range"][0], lsm_bounds_config["elements_range"][1])
)
self.cf: EndureCost = EndureCost(self.bounds.max_considered_levels)
self.num_k_values = self.conf["job"]["BayesianOptimization"]["num_k_values"]
print(self.bounds)
print(type(self.bounds))
self.generator = KHybridGenerator(self.bounds)
self.optimizer = self.conf["job"]["BayesianOptimization"]["mlos"]["optimizer"]
self.n_runs = self.conf["job"]["BayesianOptimization"]["mlos"]["num_runs"]
self.num_iterations = self.conf["job"]["BayesianOptimization"]["mlos"]["iteration"]

def run(self,):
mlos_costs = []
mlos_designs = []
analytical_costs = []
analytical_designs = []
systems = []
workloads = []
for i in range(self.n_runs):
print(f"Iteration {i + 1}/{self.n_runs} running")
system = self.generator._sample_system()
systems.append(system)
z0, z1, q, w = self.generator._sample_workload(4)
workload = Workload(z0=z0, z1=z1, q=q, w=w)
workloads.append(workload)
input_space = define_config_space(self.num_k_values, system, self.bounds)
optimizer = self.select_optimizer(self.optimizer, input_space)
best_observation = self.run_optimization_loop(self.num_iterations, optimizer, system, workload)
best_design = self.interpret_optimizer_result(best_observation)
mlos_cost = self.target_function(best_design, system, workload)
mlos_costs.append(mlos_cost)
mlos_designs.append(best_design)
design_analytical, cost_analytical = find_analytical_results(system, workload, self.bounds)
analytical_costs.append(cost_analytical)
analytical_designs.append(design_analytical)
return mlos_costs, analytical_costs, mlos_designs, analytical_designs, systems, workloads

def interpret_optimizer_result(self, observation) -> LSMDesign:
h = observation['h'].iloc[0] if isinstance(observation['h'], pd.Series) else observation['h']
T = observation['t'].iloc[0] if isinstance(observation['t'], pd.Series) else observation['t']
k_values = [observation[f'k_{i}'].iloc[0] if isinstance(observation[f'k_{i}'], pd.Series)
else observation[f'k_{i}'] for i in range(self.num_k_values)]
design = LSMDesign(h=h, T=T, K=k_values)
return design

def select_optimizer(self, optimizer: str, input_space):
if optimizer == "Smac":
return mlos_core.optimizers.SmacOptimizer(parameter_space=input_space)
elif optimizer == "Flaml":
return mlos_core.optimizers.FlamlOptimizer(parameter_space=input_space)
elif optimizer == "Random":
return mlos_core.optimizers.RandomOptimizer(parameter_space=input_space)
else:
raise ValueError(f"Unsupported optimizer type: {self.optimizer}")

def target_function(self, design: LSMDesign, system: System, workload: Workload) -> float:
return self.cf.calc_cost(design, system, workload.z0, workload.z1, workload.q, workload.w)

def run_optimization_loop(self, iteration, optimizer, system: System, workload: Workload):
for _ in range(iteration):
optimizer = self.run_optimization(optimizer, system, workload)
best_observation = optimizer.get_best_observation()
return best_observation

def run_optimization(self, optimizer, system: System, workload: Workload):
suggested_config = optimizer.suggest()
design = self.interpret_optimizer_result(suggested_config)
cost = self.target_function(design, system, workload)
config_data = {
'h': [design.h],
't': [design.T]
}
config_data.update({f'k_{i}': [design.K[i]] for i in range(len(design.K))})
config_df = pd.DataFrame(config_data)
score_series = pd.Series([cost])
optimizer.register(config_df, score_series)
return optimizer


def define_config_space(num_k_values: int, system: System, bounds: LSMBounds) -> CS.ConfigurationSpace:
input_space = CS.ConfigurationSpace(seed=1234)
# input_space.add_hyperparameter(CS.CategoricalHyperparameter
# ("policy", ["Tiering", "Leveling", "Classic", "KHybrid", "QFixed", "YZHybrid"]))
input_space.add_hyperparameter(CS.UniformFloatHyperparameter(name='h', lower=bounds.bits_per_elem_range[0],
upper=min(np.floor(system.H),
bounds.bits_per_elem_range[1])))
input_space.add_hyperparameter(CS.UniformIntegerHyperparameter(name='t', lower=bounds.size_ratio_range[0],
upper=bounds.size_ratio_range[1]))
for i in range(num_k_values):
input_space.add_hyperparameter(
CS.UniformIntegerHyperparameter(
name=f'k_{i}',
lower=1,
upper=bounds.size_ratio_range[1] - 1
)
)
return input_space


def find_analytical_results(system: System, workload: Workload, bounds: LSMBounds):
solver = KLSMSolver(bounds)
z0, z1, q, w = workload.z0, workload.z1, workload.q, workload.w
nominal_design, nominal_solution = solver.get_nominal_design(system, z0, z1, q, w)
k_values = nominal_design.K
scalars = np.array([nominal_design.h, nominal_design.T])
all_values = np.concatenate((scalars, k_values))
cost_ana = solver.nominal_objective(all_values, system, z0, z1, q, w)
print("Cost for the nominal design using analytical solver:", cost_ana)
print("Nominal Design suggested by analytical solver:", nominal_design)
return nominal_design, cost_ana


if __name__ == "__main__":
config = Reader.read_config("endure.toml")

log = logging.getLogger(config["log"]["name"])
log.info("Initializing Bayesian Optimization Job")

bayesian_optimizer = BayesianPipelineMlos(config)
bayesian_optimizer.run()

0 comments on commit 877e5b9

Please sign in to comment.