Skip to content

Commit

Permalink
set context parallel default value
Browse files Browse the repository at this point in the history
  • Loading branch information
caozhou committed Jun 4, 2024
1 parent 32b3e67 commit d9e80ad
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 203 deletions.
114 changes: 53 additions & 61 deletions flagscale/auto_tuner/record/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,14 @@ def __init__(self, config):
# Metric to grep in the last rank of last node log file
if "auto_tuner" in self.config and "performance" in self.config.auto_tuner:
self.metric = self.config.auto_tuner.performance.get(
"name", "elapsed time per iteration \(ms\):"
)
"name", "elapsed time per iteration \(ms\):")
else:
self.metric = "elapsed time per iteration \(ms\):"

# Sort order of performance, order just in [ascend, and descend], default ascend
if "auto_tuner" in self.config and "performance" in self.config.auto_tuner:
self.sorted_order = self.config.auto_tuner.performance.get(
"order", "ascend"
)
"order", "ascend")
else:
self.sorted_order = "ascend"

Expand All @@ -48,7 +46,8 @@ def record(self, task, strategy):

# If task is stopped by autotuner, task may not be failed,just hang or too slow.
elif self.cur_strategy.get("stopped_by_tuner", False):
performace = self.grep_performance(peformance_path, self.metric)
performace = self.grep_performance(peformance_path,
self.metric)
strategy["performance"] = performace
strategy["max_mem"] = self.grep_max_memory(host_path)
strategy["error"] = None
Expand All @@ -67,79 +66,65 @@ def record(self, task, strategy):
strategy["error"] = None

# Pass back to platform if need
if (
"airs_switch" in self.config.auto_tuner.platform
and self.config.auto_tuner.platform.airs_switch
and strategy["performance"]
):
if ("airs_switch" in self.config.auto_tuner.platform
and self.config.auto_tuner.platform.airs_switch
and strategy["performance"]):
self.pass_back_to_platform(strategy)

def pass_back_to_platform(self, strategy):
gbs = int(self.config.train.model.global_batch_size)
seq_len = int(self.config.train.model.seq_length)
throughput = gbs * seq_len / (strategy["performance"] / 1000)
day = round(
self.config.train.model.train_samples
* seq_len
/ (throughput * 60 * 60 * 24),
self.config.train.model.train_samples * seq_len /
(throughput * 60 * 60 * 24),
2,
)
command = [
"airsctl job performance",
"-D",
f"{strategy['data_parallel_size']}",
"--distributed_optimizer",
(
f"{strategy['use_distributed_optimizer']}"
if strategy["use_distributed_optimizer"] is not None
else "False"
),
(f"{strategy['use_distributed_optimizer']}" if
strategy["use_distributed_optimizer"] is not None else "False"),
"-E",
f"{strategy['expert_model_parallel_size']}",
"-C",
f"{strategy['context_parallel_size']}",
"-M",
f"{strategy['micro_batch_size']}",
"-L",
f"{strategy['pipeline_model_parallel_size']}",
"-G",
(
f"{strategy['recompute_granularity']}"
if strategy["recompute_granularity"]
else ""
),
(f"{strategy['recompute_granularity']}"
if strategy["recompute_granularity"] else "None"),
"-R",
(
f"{strategy['recompute_method']}"
if strategy["recompute_granularity"]
else ""
),
(f"{strategy['recompute_method']}"
if strategy["recompute_granularity"] else "None"),
"-N",
(
f"{strategy['recompute_num_layers']}"
if strategy["recompute_num_layers"]
else "0"
),
(f"{strategy['recompute_num_layers']}"
if strategy["recompute_num_layers"] else "0"),
"-S",
(
f"{strategy['sequence_parallel']}"
if strategy["sequence_parallel"] is not None
else "False"
),
(f"{strategy['sequence_parallel']}"
if strategy["sequence_parallel"] is not None else "False"),
"-T",
f"{strategy['tensor_model_parallel_size']}",
"-V",
(
f"{strategy['num_layers_per_virtual_pipeline_stage']}"
if strategy["num_layers_per_virtual_pipeline_stage"]
else "0"
),
(f"{strategy['num_layers_per_virtual_pipeline_stage']}"
if strategy["num_layers_per_virtual_pipeline_stage"] else "0"),
"--throughput",
f"{throughput}",
f"{int(throughput)}",
"--day",
f"{day}",
"--time",
f"{int(strategy['performance'])}",
]
subprocess.run(command, shell=True, check=True)
joined_command = " ".join(command)
self.logger.info(f"Pass back to platform: {joined_command}")
try:
subprocess.run(joined_command, shell=True, check=True)
except Exception as e:
self.logger.info(f"Failed to pass back to platform: {e}")

def grep_max_memory(self, path, pattern="max reserved"):
"""Read the log file and return the max memory."""
Expand Down Expand Up @@ -171,7 +156,8 @@ def grep_max_memory(self, path, pattern="max reserved"):
except:
continue
assert value is not None, "Can't grep the max memory"
self.logger.info(f"task_{self.cur_strategy['idx']} max_memory: {max_memory}")
self.logger.info(
f"task_{self.cur_strategy['idx']} max_memory: {max_memory}")
return max_memory

def get_performance_and_host_path(self, task):
Expand All @@ -197,10 +183,10 @@ def get_performance_and_host_path(self, task):
outputs = os.listdir(os.path.join(details, max_host))
assert len(outputs) == 1, f"the sub dir of {outputs} must be just one."
new_outputs = os.listdir(os.path.join(details, max_host, outputs[0]))
assert len(new_outputs) == 1, f"the sub dir of {new_outputs} must be just one."
last_path = os.path.join(
details, max_host, outputs[0], new_outputs[0], "attempt_0"
)
assert len(new_outputs
) == 1, f"the sub dir of {new_outputs} must be just one."
last_path = os.path.join(details, max_host, outputs[0], new_outputs[0],
"attempt_0")
last_dir = None
last_dir_rank = 0
for item in os.listdir(last_path):
Expand All @@ -216,7 +202,9 @@ def get_performance_and_host_path(self, task):
raise ValueError("The log file does not exist.")
return log_path, logs

def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"):
def grep_performance(self,
path,
pattern="elapsed time per iteration \(ms\):"):
"""Read the log file and return the performance."""
metric_pattern = pattern + r":* *(\d+(\.\d*)?)|(\d+(\.\d*)?) *" + pattern
if not path or not os.path.exists(path):
Expand All @@ -240,7 +228,8 @@ def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"):
continue
assert value is not None, "Can't grep the performance"
if not performance:
self.logger.info(f"task_{self.cur_strategy['idx']} performance: {None}")
self.logger.info(
f"task_{self.cur_strategy['idx']} performance: {None}")
return None
if len(performance) == 1:
self.logger.info(
Expand All @@ -249,7 +238,8 @@ def grep_performance(self, path, pattern="elapsed time per iteration \(ms\):"):
return round(performance[0], 3)
else:
average = sum(performance[1:]) / (len(performance) - 1)
self.logger.info(f"task_{self.cur_strategy['idx']} performance: {average}")
self.logger.info(
f"task_{self.cur_strategy['idx']} performance: {average}")
return round(average, 3)

def grep_error(self, path, pattern="Error"):
Expand Down Expand Up @@ -277,7 +267,8 @@ def grep_error(self, path, pattern="Error"):
else:
errors_info.add(line)

self.logger.info(f"task_{self.cur_strategy['idx']} error: {errors_info}")
self.logger.info(
f"task_{self.cur_strategy['idx']} error: {errors_info}")
return errors_info

def sort(self, history):
Expand All @@ -290,20 +281,21 @@ def sort(self, history):
if self.sorted_order == "ascend":
sorted_history = sorted(
no_pruned_history,
key=lambda x: (
x["performance"] if x["performance"] is not None else float("inf")
),
key=lambda x:
(x["performance"]
if x["performance"] is not None else float("inf")),
)
elif self.sorted_order == "descend":
sorted_history = sorted(
no_pruned_history,
key=lambda x: (
x["performance"] if x["performance"] is not None else float("-inf")
),
key=lambda x:
(x["performance"]
if x["performance"] is not None else float("-inf")),
reverse=True,
)
else:
raise ValueError(f"The sorted order {self.sorted_order} is not supported.")
raise ValueError(
f"The sorted order {self.sorted_order} is not supported.")
assert sorted_history is not None
return sorted_history

Expand Down
Loading

0 comments on commit d9e80ad

Please sign in to comment.