Skip to content

Commit

Permalink
update with platform
Browse files Browse the repository at this point in the history
  • Loading branch information
caozhou committed Jun 4, 2024
1 parent 245e6c1 commit 32b3e67
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 42 deletions.
4 changes: 1 addition & 3 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,7 @@ jobs:
container-test-job:
runs-on: self-hosted
container:
image: localhost:5000/flagscale_cicd:v1.1
env:
NODE_ENV: development
image: localhost:5000/flagscale_cicd:v1.2
ports:
- 80
options: --gpus all --hostname flagscale_cicd
Expand Down
4 changes: 2 additions & 2 deletions flagscale/auto_tuner/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,9 @@ def gen(self, strategy):
# Set train_iters of each task
if "control" in config.auto_tuner:
config.train.model.train_iters = config.auto_tuner.control.get(
"train_iters", 10)
"train_iters", 5)
else:
config.train.model.train_iters = 10
config.train.model.train_iters = 5

# log dir
config.experiment.exp_dir = os.path.join(config.experiment.exp_dir,
Expand Down
92 changes: 80 additions & 12 deletions flagscale/auto_tuner/record/recorder.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import re
import logging
import subprocess
import pandas as pd


Expand Down Expand Up @@ -65,6 +66,81 @@ def record(self, task, strategy):
strategy["performance"] = performace
strategy["error"] = None

# Pass back to platform if need
if (
"airs_switch" in self.config.auto_tuner.platform
and self.config.auto_tuner.platform.airs_switch
and strategy["performance"]
):
self.pass_back_to_platform(strategy)

def pass_back_to_platform(self, strategy):
gbs = int(self.config.train.model.global_batch_size)
seq_len = int(self.config.train.model.seq_length)
throughput = gbs * seq_len / (strategy["performance"] / 1000)
day = round(
self.config.train.model.train_samples
* seq_len
/ (throughput * 60 * 60 * 24),
2,
)
command = [
"airsctl job performance",
"-D",
f"{strategy['data_parallel_size']}",
"--distributed_optimizer",
(
f"{strategy['use_distributed_optimizer']}"
if strategy["use_distributed_optimizer"] is not None
else "False"
),
"-E",
f"{strategy['expert_model_parallel_size']}",
"-M",
f"{strategy['micro_batch_size']}",
"-L",
f"{strategy['pipeline_model_parallel_size']}",
"-G",
(
f"{strategy['recompute_granularity']}"
if strategy["recompute_granularity"]
else ""
),
"-R",
(
f"{strategy['recompute_method']}"
if strategy["recompute_granularity"]
else ""
),
"-N",
(
f"{strategy['recompute_num_layers']}"
if strategy["recompute_num_layers"]
else "0"
),
"-S",
(
f"{strategy['sequence_parallel']}"
if strategy["sequence_parallel"] is not None
else "False"
),
"-T",
f"{strategy['tensor_model_parallel_size']}",
"-V",
(
f"{strategy['num_layers_per_virtual_pipeline_stage']}"
if strategy["num_layers_per_virtual_pipeline_stage"]
else "0"
),
"--throughput",
f"{throughput}",
"--day",
f"{day}",
"--time",
f"{int(strategy['performance'])}",
]
subprocess.run(command, shell=True, check=True)

def grep_max_memory(self, path, pattern="max reserved"):
"""Read the log file and return the max memory."""
if not os.path.exists(path):
Expand Down Expand Up @@ -95,9 +171,7 @@ def grep_max_memory(self, path, pattern="max reserved"):
except:
continue
assert value is not None, "Can't grep the max memory"
self.logger.info(
f"task_{self.cur_strategy['idx']} max_memory: {max_memory}"
)
self.logger.info(f"task_{self.cur_strategy['idx']} max_memory: {max_memory}")
return max_memory

def get_performance_and_host_path(self, task):
Expand Down Expand Up @@ -166,9 +240,7 @@ def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"):
continue
assert value is not None, "Can't grep the performance"
if not performance:
self.logger.info(
f"task_{self.cur_strategy['idx']} performance: {None}"
)
self.logger.info(f"task_{self.cur_strategy['idx']} performance: {None}")
return None
if len(performance) == 1:
self.logger.info(
Expand All @@ -177,9 +249,7 @@ def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"):
return round(performance[0], 3)
else:
average = sum(performance[1:]) / (len(performance) - 1)
self.logger.info(
f"task_{self.cur_strategy['idx']} performance: {average}"
)
self.logger.info(f"task_{self.cur_strategy['idx']} performance: {average}")
return round(average, 3)

def grep_error(self, path, pattern="Error"):
Expand Down Expand Up @@ -207,9 +277,7 @@ def grep_error(self, path, pattern="Error"):
else:
errors_info.add(line)

self.logger.info(
f"task_{self.cur_strategy['idx']} error: {errors_info}"
)
self.logger.info(f"task_{self.cur_strategy['idx']} error: {errors_info}")
return errors_info

def sort(self, history):
Expand Down
7 changes: 5 additions & 2 deletions flagscale/auto_tuner/tuner.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ def __init__(self, config: DictConfig):
# Set platform envs
if "platform" not in self.config.auto_tuner:
self.config.auto_tuner.platform = {}
if os.environ.get("AIRS_SWITCH", False):
if os.environ.get("AIRS_SWITCH", None) != "False":
self.config.auto_tuner.platform.airs_switch = True

if os.environ.get("AIRS_SIZE", None):
Expand All @@ -86,6 +86,9 @@ def __init__(self, config: DictConfig):
self.config.experiment.runner.nproc_per_node = int(
os.environ["AIRS_ACCELERATOR_COUNT"])

if os.environ.get("AIRS_FBMEM", None):
self.config.auto_tuner.memory = int(os.environ["AIRS_FBMEM"])

if os.environ.get("AIRS_HOSTFILE_PATH", None):
# Set original config
self.orig_config.experiment.runner.hostfile = os.environ[
Expand Down Expand Up @@ -238,7 +241,7 @@ def monitor(self):
end_time = time.time()
# To increase the time to 600s for the first task with data processing and cache.
if self.idx == 1:
max_time_per_task = 600
max_time_per_task = 2 * self.max_time_per_task
else:
max_time_per_task = self.max_time_per_task
if end_time - self.task_start_time > max_time_per_task:
Expand Down
30 changes: 23 additions & 7 deletions flagscale/auto_tuner/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,23 +26,39 @@ def beside(keys, strategy, history):

def sort_by_memory(strategy):
return (
-strategy["tensor_model_parallel_size"],
-strategy["pipeline_model_parallel_size"],
-strategy["use_recompute"],
-strategy["tensor_model_parallel_size"],
(
-strategy["sequence_parallel"]
if strategy["sequence_parallel"] is not None
else -float("inf")
),
strategy["micro_batch_size"],
-strategy["pipeline_model_parallel_size"],
strategy["data_parallel_size"],
(
-strategy["use_distributed_optimizer"]
if strategy["use_distributed_optimizer"] is not None
else -float("inf")
),
)


def sort_by_performance(strategy):
magic_number = 4
return (
-strategy["use_recompute"],
(strategy["tensor_model_parallel_size"] % magic_number),
(strategy["micro_batch_size"] % magic_number),
strategy["use_recompute"],
-strategy["tensor_model_parallel_size"],
(
-strategy["sequence_parallel"]
if strategy["sequence_parallel"] is not None
else -float("inf")
),
strategy["micro_batch_size"],
strategy["pipeline_model_parallel_size"],
-strategy["data_parallel_size"],
(
strategy["recompute_num_layers"]
if strategy["recompute_num_layers"]
if strategy["recompute_num_layers"] is not None
else float("inf")
),
)
32 changes: 16 additions & 16 deletions flagscale/launcher/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -351,30 +351,31 @@ def _get_runner_cmd(
return runner_cmd


def _generate_run_script(config,
host,
node_rank,
cmd,
background=True,
with_test=False):
def _generate_run_script(
config, host, node_rank, cmd, background=True, with_test=False
):
system_config = config.train.system
logging_config = config.train.system.logging

no_shared_fs = config.experiment.runner.get("no_shared_fs", False)
if no_shared_fs:
host_output_file = os.path.join(logging_config.log_dir, f"host.output")
else:
host_output_file = os.path.join(logging_config.log_dir,
f"host_{node_rank}_{host}.output")
host_run_script_file = os.path.join(logging_config.scripts_dir,
f"host_{node_rank}_{host}_run.sh")
host_pid_file = os.path.join(logging_config.pids_dir,
f"host_{node_rank}_{host}.pid")
host_output_file = os.path.join(
logging_config.log_dir, f"host_{node_rank}_{host}.output"
)
host_run_script_file = os.path.join(
logging_config.scripts_dir, f"host_{node_rank}_{host}_run.sh"
)
host_pid_file = os.path.join(
logging_config.pids_dir, f"host_{node_rank}_{host}.pid"
)

os.makedirs(logging_config.scripts_dir, exist_ok=True)

root_dir = os.path.dirname(
os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
)
megatron_dir = os.path.join(root_dir, "megatron")
cmds_config = config.experiment.get("cmds", None)
if cmds_config:
Expand Down Expand Up @@ -706,10 +707,9 @@ def _query_status(self):
node_rank = host_list.index(host)
result = self._query_each(host, node_rank)
results.append(result)

if all(status != "" for status in results):
if all((status != "" and status != "Z") for status in results):
job_status = JobStatus.RUNNING
elif all(status == "" for status in results):
elif all((status == "" or status == "Z") for status in results):
job_status = JobStatus.COMPLETED_OR_IDLE
else:
job_status = JobStatus.TRANSITIONAL
Expand Down

0 comments on commit 32b3e67

Please sign in to comment.