From c8c18d2d9f3892c9793f74a3e95c9d5e69074b60 Mon Sep 17 00:00:00 2001 From: ndhuynh Date: Mon, 11 Mar 2024 13:24:34 -0400 Subject: [PATCH] [Refactor] Clean up jobs with toml file (#22) * [Refactor] Remove extra configs * [Reactor] Clean up ltune data gen * [Refactor] Clean up toml file descriptions * [Refactor] Remove individual data generators * [Refactor] Clean up LTuneTrain building * [Refactor] Add to descriptions in the `endure.toml` default example --- endure.py | 6 +- endure.toml | 154 +++++++++++--------------- endure/ltune/data/dataset.py | 22 ++-- endure/ltune/data/generator.py | 20 ++-- endure/ltune/loss.py | 6 +- jobs/config/LCMDataGen.toml | 57 ---------- jobs/{lcm_data_gen.py => data_gen.py} | 94 +++++++++------- jobs/ltune_data_gen.py | 136 ----------------------- jobs/ltune_train.py | 134 ++++++++++------------ 9 files changed, 201 insertions(+), 428 deletions(-) delete mode 100644 jobs/config/LCMDataGen.toml rename jobs/{lcm_data_gen.py => data_gen.py} (51%) mode change 100755 => 100644 delete mode 100644 jobs/ltune_data_gen.py diff --git a/endure.py b/endure.py index e42562b..aad7c9b 100755 --- a/endure.py +++ b/endure.py @@ -4,9 +4,8 @@ import toml import sys -from jobs.lcm_data_gen import LCMDataGenJob from jobs.lcm_train import LCMTrainJob -from jobs.ltune_data_gen import LTuneDataGenJob +from jobs.data_gen import DataGenJob from jobs.ltune_train import LTuneTrainJob from jobs.bayesian_pipeline import BayesianPipeline @@ -27,9 +26,8 @@ def run(self): self.log.info(f'Staring app {self.config["app"]["name"]}') jobs = { - "LCMDataGen": LCMDataGenJob, + "DataGen": DataGenJob, "LCMTrain": LCMTrainJob, - "LTuneDataGen": LTuneDataGenJob, "LTuneTrain": LTuneTrainJob, "BayesianBaseline": BayesianPipeline, } diff --git a/endure.toml b/endure.toml index 439fc72..b038528 100644 --- a/endure.toml +++ b/endure.toml @@ -1,17 +1,27 @@ # ============================================================================= # ENDURE Configuration File -# Every job will contain it's own HEADER with settings appropiate -# for the job. -# +# Following subsections are available +# APP +# LOGGER - output setting +# IO - base directory for IO +# LSM - Log structured merge tree assumptions and settings +# JOB - all job specific settings +# LCM - Learned cost model specifics +# LTune - Learned tuner specifics +# SCHEDULERS - ML learning rate schduler kwargs +# OPTIMIZERS - ML optimizer kwargs +# LOSS - ML Loss function kwargs +# ============================================================================= + +# ============================================================================= # HEADER APP # Logic of app including jobs list to run # ============================================================================= [app] name = "ENDURE" run = [ - # "LCMDataGen", + # "DataGen", # "LCMTrain", - # "LTuneDataGen", # "LTuneTrain", # "BayesianBaseline" ] @@ -34,6 +44,40 @@ disable_tqdm = false [io] data_dir = "/data" +# ============================================================================= +# HEADER LSM +# Generic LSM settings including maximum bounds, system settings, starting +# budget for memory, number of elements, etc +# ============================================================================= +[lsm] +# Design will effect everything else down stream (e.g. choice of neural network +# architecture for learned cost model) +# Tiering +# Leveling +# Classic - Considers both leveing and tiering +# QFixed - Levels 1 -> L = Q +# YZHybrid - Levels 1 -> (L-1) = Q, Level L = Z +# KHybrid - Each level has own K_i decision +design = 'KHybrid' + +[lsm.bounds] +max_considered_levels = 20 # Max number of levels to consider +size_ratio_range = [2, 31] # low, high of size ratios to consider +page_sizes = [4, 8, 16] # KB pages +entry_sizes = [1024, 2048, 4096, 8192] # bits +memory_budget_range = [5, 20] # low, high, bits per element +selectivity_range = [1e-7, 1e-9] # low, high +elements_range = [100000000, 1000000000] # element range + +# Default system values if not generating random systems +[lsm.system] +E = 8192 # size of a single entry in bits +s = 2e-7 # range query selectivity, 1 implies the full key range per query +B = 4 # number of physical entries per page +N = 1000000000 # total number of key-val pairs for LSM tree +H = 10 # total memory budget in bits per element +phi = 1 # read/write asymmetry coefficient, 1 implies w/r cost the same + # ============================================================================= # HEADER JOB # Settings for each individual job (executable) @@ -42,24 +86,22 @@ data_dir = "/data" use_gpu_if_avail = false # ----------------------------------------------------------------------------- -[job.LCMDataGen] +[job.DataGen] # ----------------------------------------------------------------------------- -dir = "test-data/kcost-t30" -file_prefix = "kcost" -num_workers = -1 # -1 forces all cores to be used -num_files = 2 -samples = 1024 # per file sample -overwrite_if_exists = true +dir = "lcm/test/std" +generator = "LTuner" # Select between data for tuner (LTuner) or LCM +file_prefix = "tuner" # all files named file_prefix_000X.parquet +num_workers = -1 # -1 forces all cores to be used +num_files = 2 # number of files to generate +samples = 1024 # per file sample +overwrite_if_exists = true # if files exist overwrite with new data # ----------------------------------------------------------------------------- [job.LCMTrain] # ----------------------------------------------------------------------------- max_epochs = 5 save_dir = "models/lcm/kcost" - -# Model selection, picking "Auto" will give automatically select the model -# associated with the LSM design in the configuration file -model = "Auto" +no_checkpoint = false # Different loss functions to train via # MSE - Mean squared error @@ -71,21 +113,15 @@ model = "Auto" loss_fn = "MSE" # Supported optimizers -# SGD - Stochastic gradient descent -# Adam -# Adagrad +# [SGD, Adam, Adagrad] optimizer = "Adam" # Learning rate schedulers # [CosineAnnealing, Exponential, Constant, None] lr_scheduler = "Constant" -# Stop checkpointing to improve training -no_checkpoint = false - [job.LCMTrain.train] dir = "train-data/kcost-t30" -format = "parquet" batch_size = 32 shuffle = true num_workers = 2 @@ -93,23 +129,11 @@ drop_last = true [job.LCMTrain.test] dir = "test-data/kcost-t30" -format = "parquet" batch_size = 1024 shuffle = false num_workers = 4 drop_last = true -# ----------------------------------------------------------------------------- -[job.LTuneDataGen] -# ----------------------------------------------------------------------------- -format = "parquet" -dir = "test-data/ltune/std" -file_prefix = "wl" -num_workers = 4 # -1 forces all cores to be used -num_files = 2 -samples = 1024 # per file sample -overwrite_if_exists = true - # ----------------------------------------------------------------------------- [job.LTuneTrain] # ----------------------------------------------------------------------------- @@ -119,10 +143,11 @@ save_dir = "models/ltune/klsm" # Learned cost model is our loss, input full path to checkpoint or model file loss_fn_path = "models/lcm/kcost" -# Check train.optimizer for available options +# Optimizer settings in header.optimizer +# [SGD, Adam, Adagrad] optimizer = "Adam" -# Learning rate schedulers +# Learning rate schedulers, settings in header.scheduler # [CosineAnnealing, Exponential, Constant, None] lr_scheduler = "Constant" @@ -130,7 +155,6 @@ no_checkpoint = false [job.LTuneTrain.train] dir = "train-data/ltune/std" -format = "parquet" batch_size = 2 shuffle = true num_workers = 1 @@ -138,7 +162,6 @@ drop_last = true [job.LTuneTrain.test] dir = "test-data/ltune/std" -format = "parquet" batch_size = 2 shuffle = false num_workers = 1 @@ -198,40 +221,6 @@ z1 = 0.190 q = 0.545 w = 0.202 -# ============================================================================= -# HEADER LSM -# Generic LSM settings including maximum bounds, system settings, starting -# budget for memory, number of elements, etc -# ============================================================================= -[lsm] -# Design will effect everything else down stream (e.g. choice of neural network -# architecture for learned cost model) -# Tiering -# Leveling -# Classic - Considers both leveing and tiering -# QFixed - Levels 1 -> L = Q -# YZHybrid - Levels 1 -> (L-1) = Q, Level L = Z -# KHybrid - Each level has own K_i decision -design = 'KHybrid' - -[lsm.bounds] -max_considered_levels = 20 # Max number of levels to consider -size_ratio_range = [2, 31] # low, high of size ratios to consider -page_sizes = [4, 8, 16] # KB pages -entry_sizes = [1024, 2048, 4096, 8192] # bits -memory_budget_range = [5, 20] # low, high, bits per element -selectivity_range = [1e-7, 1e-9] # low, high -elements_range = [100000000, 1000000000] # element range - -# Default system values if not generating random systems -[lsm.system] -E = 8192 # size of a single entry in bits -s = 2e-7 # range query selectivity, 1 implies the full key range per query -B = 4 # number of physical entries per page -N = 1000000000 # total number of key-val pairs for LSM tree -H = 10 # total memory budget in bits per element -phi = 1 # read/write asymmetry coefficient, 1 implies w/r cost the same - # ============================================================================= # HEADER LCM # Add configurations related to learned cost models @@ -245,12 +234,8 @@ embedding_size = 8 hidden_length = 3 hidden_width = 32 decision_dim = 64 - -# Dropout percentage -dropout = 0.0 - -# Batch or Layer norm -norm_layer = "Batch" +dropout = 0.0 # dropout percentage +norm_layer = "Batch" # "Batch" or "Layer" norm # Used only for classic models, generally smaller than embedding size policy_embedding_size = 4 @@ -282,14 +267,9 @@ hard = true [ltune.model] hidden_length = 1 hidden_width = 64 - -# Dropout percentage -dropout = 0 - -# Batch or Layer norm -norm_layer = "Batch" - -categorical_mode = "reinmax" +dropout = 0 # dropout percentage +norm_layer = "Batch" # batch or layer norm +categorical_mode = "reinmax" # reinmax or gumbel # ============================================================================= # END LTUNE diff --git a/endure/ltune/data/dataset.py b/endure/ltune/data/dataset.py index ef13d34..1a95790 100644 --- a/endure/ltune/data/dataset.py +++ b/endure/ltune/data/dataset.py @@ -1,7 +1,6 @@ import glob import numpy as np import os -import pandas as pd import pyarrow.parquet as pa import torch import torch.utils.data @@ -13,33 +12,30 @@ class LTuneDataSet(torch.utils.data.IterableDataset): def __init__( self, folder: str, - format: str = "parquet", shuffle: bool = False, ) -> None: self._format = format - self._fnames = glob.glob(os.path.join(folder, "*." + format)) + self._fnames = glob.glob(os.path.join(folder, "*.parquet")) self._shuffle = shuffle def _get_input_cols(self): return kINPUT_FEATS def _load_data(self, fname): - if self._format == "parquet": - df = pa.read_table(fname).to_pandas() - else: - df = pd.read_csv(fname) + df = pa.read_table(fname).to_pandas() return df def __iter__(self): worker_info = torch.utils.data.get_worker_info() - if worker_info is None: - files = self._fnames - else: - file_bins = np.array_split(self._fnames, worker_info.num_workers) + files = self._fnames + if self._shuffle: + np.random.shuffle(files) + + if worker_info is not None: + file_bins = np.array_split(files, worker_info.num_workers) files = file_bins[worker_info.id] - if self._shuffle: - np.random.shuffle(files) + for file in files: df = self._load_data(file) inputs = torch.from_numpy(df[self._get_input_cols()].values).float() diff --git a/endure/ltune/data/generator.py b/endure/ltune/data/generator.py index 75ed251..ce8bc02 100644 --- a/endure/ltune/data/generator.py +++ b/endure/ltune/data/generator.py @@ -2,25 +2,23 @@ import numpy as np -from endure.lsm.types import System +from endure.lsm.types import LSMBounds, System from endure.ltune.data.input_features import kSYSTEM_HEADER, kWORKLOAD_HEADER class LTuneDataGenerator: def __init__( self, - page_sizes: List[int] = [4, 8, 16], - entry_sizes: List[int] = [1024, 2048, 4096, 8192], - memory_budget_range: Tuple[float, float] = (5.0, 20.0), - selectivity_range: Tuple[float, float] = (1e-7, 1e-9), - elements_range: Tuple[int, int] = (100000000, 1000000000), + bounds: LSMBounds, precision: int = 3, ) -> None: - self.entry_sizes = entry_sizes - self.memory_budget_range = memory_budget_range - self.page_sizes = page_sizes - self.selectivity_range = selectivity_range - self.elements_range = elements_range + self.entry_sizes = bounds.entry_sizes + self.memory_budget_range = bounds.memory_budget_range + self.page_sizes = bounds.page_sizes + self.selectivity_range = bounds.selectivity_range + self.elements_range = bounds.elements_range + + self.bounds = bounds self.precision = precision def _sample_workload(self, dimensions: int) -> list: diff --git a/endure/ltune/loss.py b/endure/ltune/loss.py index 24e6e28..308c525 100644 --- a/endure/ltune/loss.py +++ b/endure/ltune/loss.py @@ -6,7 +6,7 @@ import toml from endure.lcm.model.builder import LearnedCostModelBuilder -from endure.lsm.types import STR_POLICY_DICT +from endure.lsm.types import Policy class LearnedCostModelLoss(torch.nn.Module): @@ -25,9 +25,7 @@ def __init__(self, config: dict[str, Any], model_path: str): max_levels=lcm_cfg["lsm"]["max_levels"], **lcm_cfg["lcm"]["model"], ) - lcm_model = STR_POLICY_DICT.get(lcm_cfg["lsm"]["design"], None) - if lcm_model is None: - raise TypeError(f"Illegal LCM model choice: {lcm_model=}") + lcm_model = getattr(Policy, lcm_cfg["lsm"]["design"]) self.model = self.lcm_builder.build_model(lcm_model) data = torch.load( diff --git a/jobs/config/LCMDataGen.toml b/jobs/config/LCMDataGen.toml deleted file mode 100644 index 86648e3..0000000 --- a/jobs/config/LCMDataGen.toml +++ /dev/null @@ -1,57 +0,0 @@ -# ============================================================================= -# HEADER LCMDataGen -# Settings for data generation job -# ============================================================================= - -# Directory to save all parquet data files -dir = "/data/test-data/kcost-t30" - -# Each file will contain the following prefix -# e.g. _0000.parquet -file_prefix = "kcost" - -# Generator dictates which LSM design to generate data for -# Classic - Generates both leveling and tiering items -# Tiering -# Leveling -# QFixed -# KHybrid -generator = "KHybrid" - -# Parallel workers, -1 issues all cores to be used -num_workers = 1 - -# Number of total files to generate -num_files = 2 - -# Number of samples per file -samples = 1024 - -# If we generate a file and see a conflicting file in the directory, we can -# either overwrite or skip creating the file all together -overwrite_if_exists = true - -# ============================================================================= -# HEADER LOG -# Bounds for data creation -# ============================================================================= -[log] -name = 'endure-logger' -format = "[%(levelname)s][%(asctime)-15s][%(filename)s] %(message)s" -datefmt = '%d-%m-%y:%H:%M:%S' -level = "DEBUG" -disable_tqdm = false - -# ============================================================================= -# HEADER BOUNDS -# Bounds for data creation -# ============================================================================= -[bounds] -max_considered_levels = 20 # Max number of levels to consider -size_ratio_range = [2, 31] # low, high of size ratios to consider -page_sizes = [4, 8, 16] # KB pages -entry_sizes = [1024, 2048, 4096, 8192] # bits -memory_budget_range = [5, 20] # low, high, bits per element -selectivity_range = [1e-7, 1e-9] # low, high -elements_range = [100000000, 1000000000] # element range - diff --git a/jobs/lcm_data_gen.py b/jobs/data_gen.py old mode 100755 new mode 100644 similarity index 51% rename from jobs/lcm_data_gen.py rename to jobs/data_gen.py index 8ccb85f..8d9a112 --- a/jobs/lcm_data_gen.py +++ b/jobs/data_gen.py @@ -10,24 +10,40 @@ from endure.data.io import Reader from endure.lsm.types import Policy, LSMBounds +from endure.ltune.data.generator import LTuneDataGenerator import endure.lcm.data.generator as Generators -class LCMDataGenJob: +class DataGenJob: def __init__(self, config): self.log = logging.getLogger(config["log"]["name"]) self.log.info("Running Data Generator Job") + self.output_dir = os.path.join( + config["io"]["data_dir"], + config["job"]["DataGen"]["dir"], + ) + self.bounds = LSMBounds(**config["lsm"]["bounds"]) + self.design = getattr(Policy, config["lsm"]["design"]) + self.config = config - self.job_config = config["job"]["LCMDataGen"] + self.jconfig = config["job"]["DataGen"] + if self.jconfig["generator"] == "LTuner": + self.log.info("Generating data for Learned Tuner") + elif self.jconfig["generator"] == "LCM": + self.log.info("Generating data for Learned Cost Models") + else: + self.log.critical("Invalid generator type") + raise KeyError def create_bounds(self) -> LSMBounds: return LSMBounds(**self.config["lsm"]["bounds"]) - def _choose_generator(self) -> Generators.LCMDataGenerator: - design_enum = getattr(Policy, self.config["lsm"]["design"]) - bounds = self.create_bounds() - self.log.info(f"Generator: {design_enum.name}") - self.log.info(f"{bounds=}") + def _choose_generator(self) -> Generators.LCMDataGenerator | LTuneDataGenerator: + if self.jconfig["generator"] == "LTuner": + return LTuneDataGenerator(self.bounds) + + self.log.info(f"Generator: {self.design.name}") + self.log.info(f"{self.bounds=}") generators = { Policy.Tiering: Generators.ClassicGenerator, Policy.Leveling: Generators.ClassicGenerator, @@ -35,57 +51,49 @@ def _choose_generator(self) -> Generators.LCMDataGenerator: Policy.QFixed: Generators.QCostGenerator, Policy.KHybrid: Generators.KHybridGenerator, } - generator = generators.get(design_enum, None) + generator = generators.get(self.design, None) if generator is None: raise TypeError("Invalid generator choice") - gen_kwargs: dict[str, Any] = { - "bounds": bounds, - } - if design_enum in [Policy.Tiering, Policy.Leveling]: - gen_kwargs["policies"] = [design_enum] - elif design_enum == Policy.Classic: + gen_kwargs: dict[str, Any] = {"bounds": self.bounds} + if self.design in [Policy.Tiering, Policy.Leveling]: + gen_kwargs["policies"] = [self.design] + elif self.design == Policy.Classic: gen_kwargs["policies"] = [Policy.Tiering, Policy.Leveling] generator = generator(**gen_kwargs) return generator def generate_parquet_file( - self, generator: Generators.LCMDataGenerator, idx: int, pos: int + self, + generator: Generators.LCMDataGenerator | LTuneDataGenerator, + idx: int, + pos: int, ) -> int: - fname_prefix = self.job_config["file_prefix"] + fname_prefix = self.jconfig["file_prefix"] fname = f"{fname_prefix}_{idx:04}.parquet" - fpath = os.path.join( - self.config["io"]["data_dir"], self.job_config["dir"], fname - ) - if os.path.exists(fpath) and (not self.job_config["overwrite_if_exists"]): + fpath = os.path.join(self.output_dir, fname) + + if os.path.exists(fpath) and (not self.jconfig["overwrite_if_exists"]): self.log.info(f"{fpath} exists, exiting.") return -1 - samples = range(int(self.job_config["samples"])) - table = [] - for _ in tqdm( - samples, + pbar = tqdm( + range(int(self.jconfig["samples"])), desc=fname, position=pos, ncols=80, disable=self.config["log"]["disable_tqdm"], - ): - table.append(generator.generate_row()) + ) + table = [generator.generate_row() for _ in pbar] table = pa.Table.from_pylist(table) pq.write_table(table, fpath) return idx - def generate_file_single_thread(self) -> None: - generator = self._choose_generator() - - for idx in range(self.job_config["num_files"]): - self.generate_parquet_file(generator, idx, 0) - - def generate_file(self, idx: int) -> int: + def generate_file(self, idx: int, single_threaded: bool = False) -> int: pos = 0 - if len(mp.current_process()._identity) > 0: + if len(mp.current_process()._identity) > 0 and not single_threaded: pos = mp.current_process()._identity[0] - 1 generator = self._choose_generator() @@ -94,21 +102,21 @@ def generate_file(self, idx: int) -> int: return idx def run(self) -> None: - data_dir = os.path.join(self.config["io"]["data_dir"], self.job_config["dir"]) - os.makedirs(data_dir, exist_ok=True) - self.log.info(f"Writing all files to {data_dir}") + self.log.info("Creating workload data") + os.makedirs(self.output_dir, exist_ok=True) + self.log.info(f"Writing all files to {self.output_dir}") - inputs = list(range(0, self.job_config["num_files"])) - threads = self.job_config["num_workers"] + inputs = list(range(0, self.jconfig["num_files"])) + threads = self.jconfig["num_workers"] if threads == -1: threads = mp.cpu_count() - if threads > self.job_config["num_files"]: + if threads > self.jconfig["num_files"]: self.log.info("Num threads > num files, scaling down") - threads = self.job_config["num_files"] + threads = self.jconfig["num_files"] self.log.debug(f"Using {threads=}") if threads == 1: - self.generate_file_single_thread() + self.generate_file(0, single_threaded=True) else: with mp.Pool( threads, initializer=tqdm.set_lock, initargs=(mp.RLock(),) @@ -127,5 +135,5 @@ def run(self) -> None: log = logging.getLogger(config["log"]["name"]) log.setLevel(config["log"]["level"]) - job = LCMDataGenJob(config) + job = DataGenJob(config) job.run() diff --git a/jobs/ltune_data_gen.py b/jobs/ltune_data_gen.py deleted file mode 100644 index 8969dc3..0000000 --- a/jobs/ltune_data_gen.py +++ /dev/null @@ -1,136 +0,0 @@ -#!/usr/bin/env python -import csv -import logging -import multiprocessing as mp -import os - -from tqdm import tqdm -import pyarrow as pa -import pyarrow.parquet as pq - -from endure.data.io import Reader -from endure.ltune.data.generator import LTuneDataGenerator - - -class LTuneDataGenJob: - def __init__(self, config): - self.log = logging.getLogger(config["log"]["name"]) - self.log.info("Starting LTuneDataGenJob") - self.config = config - self.setting = config["job"]["LTuneDataGen"] - self.output_dir = os.path.join( - self.config["io"]["data_dir"], self.setting["dir"] - ) - - def _choose_generator(self): - return LTuneDataGenerator() - - def generate_csv_file(self, generator, idx: int, pos: int) -> int: - fname_prefix = self.setting["file_prefix"] - fname = f"{fname_prefix}-{idx:04}.csv" - fpath = os.path.join(self.output_dir, fname) - - if os.path.exists(fpath) and not self.setting["overwrite_if_exists"]: - self.log.info(f"{fpath} exists, exiting.") - return -1 - - samples = range(self.setting["samples"]) - header = generator.generate_header() - with open(fpath, "w") as fid: - writer = csv.writer(fid) - writer.writerow(header) - for _ in tqdm( - samples, - desc=fname, - position=pos, - ncols=80, - disable=self.config["log"]["disable_tqdm"], - ): - row = generator.generate_row() - writer.writerow(row) - - return idx - - def generate_parquet_file( - self, generator: LTuneDataGenerator, idx: int, pos: int - ) -> int: - fname_prefix = self.setting["file_prefix"] - fname = f"{fname_prefix}-{idx:04}.parquet" - fpath = os.path.join(self.output_dir, fname) - - if os.path.exists(fpath) and not self.setting["overwrite_if_exists"]: - self.log.info(f"{fpath} exists, exiting.") - return -1 - - samples = range(self.setting["samples"]) - table = [] - for _ in tqdm( - samples, - desc=fname, - position=pos, - ncols=80, - disable=self.config["log"]["disable_tqdm"], - ): - table.append(generator.generate_row()) - table = pa.Table.from_pylist(table) - pq.write_table(table, fpath) - - return idx - - def generate_file(self, idx: int) -> int: - pos = 0 - if len(mp.current_process()._identity) > 0: - pos = mp.current_process()._identity[0] - 1 - generator = self._choose_generator() - - if self.setting["format"] == "parquet": - self.generate_parquet_file(generator, idx, pos) - else: # format == 'csv' - self.generate_csv_file(generator, idx, pos) - - return idx - - def generate_file_single_thread(self) -> None: - generator = self._choose_generator() - - if self.setting["format"] == "parquet": - file_gen = self.generate_parquet_file - else: # format == 'csv' - file_gen = self.generate_csv_file - - for idx in range(self.setting["num_files"]): - file_gen(generator, idx, 0) - - def run(self) -> None: - self.log.info("Creating workload data") - os.makedirs(self.output_dir, exist_ok=True) - self.log.info(f"Writing all files to {self.output_dir}") - - inputs = list(range(0, self.setting["num_files"])) - threads = self.setting["num_workers"] - if threads == -1: - threads = mp.cpu_count() - self.log.info(f"{threads=}") - - if threads == 1: - self.generate_file_single_thread() - else: - with mp.Pool( - threads, initializer=tqdm.set_lock, initargs=(mp.RLock(),) - ) as p: - p.map(self.generate_file, inputs) - - return - - -if __name__ == "__main__": - config = Reader.read_config("endure.toml") - - logging.basicConfig( - format=config["log"]["format"], datefmt=config["log"]["datefmt"] - ) - log = logging.getLogger(config["log"]["name"]) - log.setLevel(config["log"]["level"]) - - a = LTuneDataGenJob(config) - a.run() diff --git a/jobs/ltune_train.py b/jobs/ltune_train.py index bea541d..f42a768 100644 --- a/jobs/ltune_train.py +++ b/jobs/ltune_train.py @@ -8,7 +8,7 @@ import torch.optim as Opt from torch.utils.data import DataLoader -from endure.lsm.types import STR_POLICY_DICT +from endure.lsm.types import LSMBounds, Policy from endure.ltune.data.dataset import LTuneDataSet from endure.ltune.loss import LearnedCostModelLoss from endure.ltune.model.builder import LTuneModelBuilder @@ -19,70 +19,66 @@ class LTuneTrainJob: def __init__(self, config: dict[str, Any]) -> None: - self._config = config - self._setting = config["job"]["LTuneTrain"] - self.log = logging.getLogger(self._config["log"]["name"]) + self.log = logging.getLogger(self.config["log"]["name"]) self.log.info("Running Training Job") + self.use_gpu = config["job"]["use_gpu_if_avail"] + self.save_dir = os.path.join( + config["io"]["data_dir"], + config["job"]["LTuneTrain"]["save_dir"], + ) + self.design = getattr(Policy, config["lsm"]["design"]) + self.bounds = LSMBounds(**config["lsm"]["bounds"]) + self.opt_builder = OptimizerBuilder(config["optimizer"]) + self.schedule_builder = LRSchedulerBuilder(config["scheduler"]) + self.model_builder = LTuneModelBuilder( + size_ratio_range=self.bounds.size_ratio_range, + max_levels=self.bounds.max_considered_levels, + **config["ltune"]["model"], + ) - policy = STR_POLICY_DICT.get(self._config["lsm"]["design"], None) - if policy is None: - raise TypeError(f"Invalid LSM Design") - self.policy = policy + self.config = config + self.jconfig = config["job"]["LTuneTrain"] def _build_loss_fn(self) -> torch.nn.Module: - model = LearnedCostModelLoss(self._config, self._setting["loss_fn_path"]) - if self._setting["use_gpu_if_avail"] and torch.cuda.is_available(): + model = LearnedCostModelLoss(self.config, self.jconfig["loss_fn_path"]) + if self.jconfig["use_gpu_if_avail"] and torch.cuda.is_available(): model.to("cuda") return model def _build_model(self) -> torch.nn.Module: - size_ratio_min = self._config["lsm"]["size_ratio"]["min"] - size_ratio_max = self._config["lsm"]["size_ratio"]["max"] - builder = LTuneModelBuilder( - hidden_width=self._config["ltune"]["model"]["hidden_width"], - hidden_length=self._config["ltune"]["model"]["hidden_length"], - dropout=self._config["ltune"]["model"]["dropout"], - norm_layer=self._config["ltune"]["model"]["norm_layer"], - categorical_mode=self._config["ltune"]["model"]["categorical_mode"], - size_ratio_range=(size_ratio_min, size_ratio_max), - max_levels=self._config["lsm"]["max_levels"], - ) - model = builder.build_model(self.policy) - if self._setting["use_gpu_if_avail"] and torch.cuda.is_available(): + model = self.model_builder.build_model(self.design) + if self.use_gpu and torch.cuda.is_available(): model.to("cuda") return model - def _build_optimizer(self, model) -> Opt.Optimizer: - builder = OptimizerBuilder(self._config) - choice = self._setting["optimizer"] + def _build_optimizer(self, model: torch.nn.Module) -> Opt.Optimizer: + choice = self.jconfig["optimizer"] - return builder.build_optimizer(choice, model) + return self.opt_builder.build_optimizer(choice, model) def _build_scheduler( self, optimizer: Opt.Optimizer ) -> Optional[Opt.lr_scheduler._LRScheduler]: - builder = LRSchedulerBuilder(self._config) - choice = self._setting["lr_scheduler"] + choice = self.jconfig["lr_scheduler"] - return builder.build_scheduler(optimizer, choice) + return self.schedule_builder.build_scheduler(optimizer, choice) def _build_train(self) -> DataLoader: train_dir = os.path.join( - self._config["io"]["data_dir"], - self._setting["train"]["dir"], + self.config["io"]["data_dir"], + self.jconfig["train"]["dir"], ) train_data = LTuneDataSet( folder=train_dir, - shuffle=self._setting["train"]["shuffle"], - format=self._setting["train"]["format"], + shuffle=self.jconfig["train"]["shuffle"], ) train = DataLoader( train_data, - batch_size=self._setting["train"]["batch_size"], - drop_last=self._setting["train"]["drop_last"], - num_workers=self._setting["train"]["num_workers"], + batch_size=self.jconfig["train"]["batch_size"], + drop_last=self.jconfig["train"]["drop_last"], + num_workers=self.jconfig["train"]["num_workers"], pin_memory=True, prefetch_factor=10, ) @@ -91,39 +87,33 @@ def _build_train(self) -> DataLoader: def _build_test(self) -> DataLoader: test_dir = os.path.join( - self._config["io"]["data_dir"], - self._setting["test"]["dir"], + self.config["io"]["data_dir"], + self.jconfig["test"]["dir"], ) test_data = LTuneDataSet( folder=test_dir, - shuffle=self._setting["test"]["shuffle"], - format=self._setting["test"]["format"], + shuffle=self.jconfig["test"]["shuffle"], ) test = DataLoader( test_data, - batch_size=self._setting["test"]["batch_size"], - drop_last=self._setting["test"]["drop_last"], - num_workers=self._setting["test"]["num_workers"], + batch_size=self.jconfig["test"]["batch_size"], + drop_last=self.jconfig["test"]["drop_last"], + num_workers=self.jconfig["test"]["num_workers"], ) return test - def _make_save_dir(self) -> Optional[str]: - self.log.info(f'Saving model in: {self._setting["save_dir"]}') - save_dir = os.path.join( - self._config["io"]["data_dir"], - self._setting["save_dir"], - ) + def _make_save_dir(self) -> bool: + self.log.info(f"Saving model in: {self.save_dir}") try: - os.makedirs(save_dir, exist_ok=False) + os.makedirs(self.save_dir, exist_ok=False) except FileExistsError: - return None + return False - # dump configuration file - with open(os.path.join(save_dir, "endure.toml"), "w") as fid: - toml.dump(self._config, fid) + with open(os.path.join(self.save_dir, "endure.toml"), "w") as fid: + toml.dump(self.config, fid) - return save_dir + return True @staticmethod def gumbel_temp_schedule( @@ -150,16 +140,16 @@ def reinmax_temp_schedule( return def get_train_callback(self) -> Optional[Callable[[dict], None]]: - if not self._config["lsm"]["design"] == "KLSM": + if not self.design == Policy.KHybrid: return None - if self._config["ltune"]["model"]["categorical_mode"] == "reinmax": + if self.config["ltune"]["model"]["categorical_mode"] == "reinmax": return lambda train_kwargs: self.reinmax_temp_schedule(train_kwargs) - + # default train_callback will be gumbel softmax return lambda train_kwargs: self.gumbel_temp_schedule(train_kwargs) def run(self) -> Optional[Trainer]: - model_base_dir = self._make_save_dir() - if model_base_dir is None: + dir_success = self._make_save_dir() + if not dir_success: self.log.info("Model directory already exists, exiting...") return None @@ -169,7 +159,7 @@ def run(self) -> Optional[Trainer]: train_data = self._build_train() test_data = self._build_test() loss_fn = self._build_loss_fn() - disable_tqdm = self._config["log"]["disable_tqdm"] + disable_tqdm = self.config["log"]["disable_tqdm"] callback = self.get_train_callback() trainer = Trainer( @@ -180,14 +170,14 @@ def run(self) -> Optional[Trainer]: train_data=train_data, test_data=test_data, scheduler=scheduler, - max_epochs=self._setting["max_epochs"], - use_gpu_if_avail=self._setting["use_gpu_if_avail"], - model_dir=model_base_dir, - model_train_kwargs=self._config["ltune"]["model"]["train_kwargs"], - model_test_kwargs=self._config["ltune"]["model"]["test_kwargs"], + max_epochs=self.jconfig["max_epochs"], + use_gpu_if_avail=self.use_gpu, + model_dir=self.save_dir, + model_train_kwargs=self.config["ltune"]["model"]["train_kwargs"], + model_test_kwargs=self.config["ltune"]["model"]["test_kwargs"], disable_tqdm=disable_tqdm, - no_checkpoint=self._config["job"]["LTuneTrain"]["no_checkpoint"], - train_callback=callback + no_checkpoint=self.jconfig["no_checkpoint"], + train_callback=callback, ) trainer.run() @@ -198,13 +188,11 @@ def run(self) -> Optional[Trainer]: from endure.data.io import Reader config = Reader.read_config("endure.toml") - logging.basicConfig( format=config["log"]["format"], datefmt=config["log"]["datefmt"] ) - log = logging.getLogger(config["log"]["name"]) log.setLevel(config["log"]["level"]) - a = LTuneTrainJob(config) - a.run() + job = LTuneTrainJob(config) + job.run()