diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml index be232e994..ce1a652ba 100644 --- a/.github/workflows/test.yml +++ b/.github/workflows/test.yml @@ -13,9 +13,7 @@ jobs: container-test-job: runs-on: self-hosted container: - image: localhost:5000/flagscale_cicd:v1.1 - env: - NODE_ENV: development + image: localhost:5000/flagscale_cicd:v1.2 ports: - 80 options: --gpus all --hostname flagscale_cicd diff --git a/flagscale/auto_tuner/generate.py b/flagscale/auto_tuner/generate.py index 9e1eb9607..1a6b5439e 100644 --- a/flagscale/auto_tuner/generate.py +++ b/flagscale/auto_tuner/generate.py @@ -81,9 +81,9 @@ def gen(self, strategy): # Set train_iters of each task if "control" in config.auto_tuner: config.train.model.train_iters = config.auto_tuner.control.get( - "train_iters", 10) + "train_iters", 5) else: - config.train.model.train_iters = 10 + config.train.model.train_iters = 5 # log dir config.experiment.exp_dir = os.path.join(config.experiment.exp_dir, diff --git a/flagscale/auto_tuner/record/recorder.py b/flagscale/auto_tuner/record/recorder.py index 1efd06526..3c8fabdd7 100644 --- a/flagscale/auto_tuner/record/recorder.py +++ b/flagscale/auto_tuner/record/recorder.py @@ -1,6 +1,7 @@ import os import re import logging +import subprocess import pandas as pd @@ -65,6 +66,81 @@ def record(self, task, strategy): strategy["performance"] = performace strategy["error"] = None + # Pass back to platform if need + if ( + "airs_switch" in self.config.auto_tuner.platform + and self.config.auto_tuner.platform.airs_switch + and strategy["performance"] + ): + self.pass_back_to_platform(strategy) + + def pass_back_to_platform(self, strategy): + gbs = int(self.config.train.model.global_batch_size) + seq_len = int(self.config.train.model.seq_length) + throughput = gbs * seq_len / (strategy["performance"] / 1000) + day = round( + self.config.train.model.train_samples + * seq_len + / (throughput * 60 * 60 * 24), + 2, + ) + command = [ + "airsctl job performance", + "-D", + f"{strategy['data_parallel_size']}", + "--distributed_optimizer", + ( + f"{strategy['use_distributed_optimizer']}" + if strategy["use_distributed_optimizer"] is not None + else "False" + ), + "-E", + f"{strategy['expert_model_parallel_size']}", + "-M", + f"{strategy['micro_batch_size']}", + "-L", + f"{strategy['pipeline_model_parallel_size']}", + "-G", + ( + f"{strategy['recompute_granularity']}" + if strategy["recompute_granularity"] + else "" + ), + "-R", + ( + f"{strategy['recompute_method']}" + if strategy["recompute_granularity"] + else "" + ), + "-N", + ( + f"{strategy['recompute_num_layers']}" + if strategy["recompute_num_layers"] + else "0" + ), + "-S", + ( + f"{strategy['sequence_parallel']}" + if strategy["sequence_parallel"] is not None + else "False" + ), + "-T", + f"{strategy['tensor_model_parallel_size']}", + "-V", + ( + f"{strategy['num_layers_per_virtual_pipeline_stage']}" + if strategy["num_layers_per_virtual_pipeline_stage"] + else "0" + ), + "--throughput", + f"{throughput}", + "--day", + f"{day}", + "--time", + f"{int(strategy['performance'])}", + ] + subprocess.run(command, shell=True, check=True) + def grep_max_memory(self, path, pattern="max reserved"): """Read the log file and return the max memory.""" if not os.path.exists(path): @@ -95,9 +171,7 @@ def grep_max_memory(self, path, pattern="max reserved"): except: continue assert value is not None, "Can't grep the max memory" - self.logger.info( - f"task_{self.cur_strategy['idx']} max_memory: {max_memory}" - ) + self.logger.info(f"task_{self.cur_strategy['idx']} max_memory: {max_memory}") return max_memory def get_performance_and_host_path(self, task): @@ -166,9 +240,7 @@ def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"): continue assert value is not None, "Can't grep the performance" if not performance: - self.logger.info( - f"task_{self.cur_strategy['idx']} performance: {None}" - ) + self.logger.info(f"task_{self.cur_strategy['idx']} performance: {None}") return None if len(performance) == 1: self.logger.info( @@ -177,9 +249,7 @@ def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"): return round(performance[0], 3) else: average = sum(performance[1:]) / (len(performance) - 1) - self.logger.info( - f"task_{self.cur_strategy['idx']} performance: {average}" - ) + self.logger.info(f"task_{self.cur_strategy['idx']} performance: {average}") return round(average, 3) def grep_error(self, path, pattern="Error"): @@ -207,9 +277,7 @@ def grep_error(self, path, pattern="Error"): else: errors_info.add(line) - self.logger.info( - f"task_{self.cur_strategy['idx']} error: {errors_info}" - ) + self.logger.info(f"task_{self.cur_strategy['idx']} error: {errors_info}") return errors_info def sort(self, history): diff --git a/flagscale/auto_tuner/tuner.py b/flagscale/auto_tuner/tuner.py index 6718e7711..809e7219d 100644 --- a/flagscale/auto_tuner/tuner.py +++ b/flagscale/auto_tuner/tuner.py @@ -64,7 +64,7 @@ def __init__(self, config: DictConfig): # Set platform envs if "platform" not in self.config.auto_tuner: self.config.auto_tuner.platform = {} - if os.environ.get("AIRS_SWITCH", False): + if os.environ.get("AIRS_SWITCH", None) != "False": self.config.auto_tuner.platform.airs_switch = True if os.environ.get("AIRS_SIZE", None): @@ -86,6 +86,9 @@ def __init__(self, config: DictConfig): self.config.experiment.runner.nproc_per_node = int( os.environ["AIRS_ACCELERATOR_COUNT"]) + if os.environ.get("AIRS_FBMEM", None): + self.config.auto_tuner.memory = int(os.environ["AIRS_FBMEM"]) + if os.environ.get("AIRS_HOSTFILE_PATH", None): # Set original config self.orig_config.experiment.runner.hostfile = os.environ[ @@ -238,7 +241,7 @@ def monitor(self): end_time = time.time() # To increase the time to 600s for the first task with data processing and cache. if self.idx == 1: - max_time_per_task = 600 + max_time_per_task = 2 * self.max_time_per_task else: max_time_per_task = self.max_time_per_task if end_time - self.task_start_time > max_time_per_task: diff --git a/flagscale/auto_tuner/utils.py b/flagscale/auto_tuner/utils.py index 21cdda8f3..44f218aee 100644 --- a/flagscale/auto_tuner/utils.py +++ b/flagscale/auto_tuner/utils.py @@ -26,23 +26,39 @@ def beside(keys, strategy, history): def sort_by_memory(strategy): return ( - -strategy["tensor_model_parallel_size"], - -strategy["pipeline_model_parallel_size"], -strategy["use_recompute"], + -strategy["tensor_model_parallel_size"], + ( + -strategy["sequence_parallel"] + if strategy["sequence_parallel"] is not None + else -float("inf") + ), strategy["micro_batch_size"], + -strategy["pipeline_model_parallel_size"], + strategy["data_parallel_size"], + ( + -strategy["use_distributed_optimizer"] + if strategy["use_distributed_optimizer"] is not None + else -float("inf") + ), ) def sort_by_performance(strategy): - magic_number = 4 return ( - -strategy["use_recompute"], - (strategy["tensor_model_parallel_size"] % magic_number), - (strategy["micro_batch_size"] % magic_number), + strategy["use_recompute"], + -strategy["tensor_model_parallel_size"], + ( + -strategy["sequence_parallel"] + if strategy["sequence_parallel"] is not None + else -float("inf") + ), + strategy["micro_batch_size"], strategy["pipeline_model_parallel_size"], + -strategy["data_parallel_size"], ( strategy["recompute_num_layers"] - if strategy["recompute_num_layers"] + if strategy["recompute_num_layers"] is not None else float("inf") ), ) diff --git a/flagscale/launcher/runner.py b/flagscale/launcher/runner.py index c2c1fd5e8..2b4acd207 100644 --- a/flagscale/launcher/runner.py +++ b/flagscale/launcher/runner.py @@ -351,12 +351,9 @@ def _get_runner_cmd( return runner_cmd -def _generate_run_script(config, - host, - node_rank, - cmd, - background=True, - with_test=False): +def _generate_run_script( + config, host, node_rank, cmd, background=True, with_test=False +): system_config = config.train.system logging_config = config.train.system.logging @@ -364,17 +361,21 @@ def _generate_run_script(config, if no_shared_fs: host_output_file = os.path.join(logging_config.log_dir, f"host.output") else: - host_output_file = os.path.join(logging_config.log_dir, - f"host_{node_rank}_{host}.output") - host_run_script_file = os.path.join(logging_config.scripts_dir, - f"host_{node_rank}_{host}_run.sh") - host_pid_file = os.path.join(logging_config.pids_dir, - f"host_{node_rank}_{host}.pid") + host_output_file = os.path.join( + logging_config.log_dir, f"host_{node_rank}_{host}.output" + ) + host_run_script_file = os.path.join( + logging_config.scripts_dir, f"host_{node_rank}_{host}_run.sh" + ) + host_pid_file = os.path.join( + logging_config.pids_dir, f"host_{node_rank}_{host}.pid" + ) os.makedirs(logging_config.scripts_dir, exist_ok=True) root_dir = os.path.dirname( - os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) + os.path.dirname(os.path.dirname(os.path.abspath(__file__))) + ) megatron_dir = os.path.join(root_dir, "megatron") cmds_config = config.experiment.get("cmds", None) if cmds_config: @@ -706,10 +707,9 @@ def _query_status(self): node_rank = host_list.index(host) result = self._query_each(host, node_rank) results.append(result) - - if all(status != "" for status in results): + if all((status != "" and status != "Z") for status in results): job_status = JobStatus.RUNNING - elif all(status == "" for status in results): + elif all((status == "" or status == "Z") for status in results): job_status = JobStatus.COMPLETED_OR_IDLE else: job_status = JobStatus.TRANSITIONAL