Skip to content

Commit

Permalink
add autotuner example and args
Browse files Browse the repository at this point in the history
  • Loading branch information
caozhou committed Jun 6, 2024
1 parent 53e9dcc commit 08efd6e
Show file tree
Hide file tree
Showing 9 changed files with 142 additions and 83 deletions.
43 changes: 43 additions & 0 deletions examples/aquila/conf/config_auto_tuner.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
defaults:
- train: demo
- _self_

experiment:
exp_name: aquila2
exp_dir: ./outputs
task:
type: train
backend: megatron
entrypoint: ./flagscale/train/train_aquila.py
runner:
backend: torchrun
nnodes: 1
nproc_per_node: 8
envs:
CUDA_VISIBLE_DEVICES: 0,1,2,3,4,5,6,7
CUDA_DEVICE_MAX_CONNECTIONS: 1
auto_tuner:
space:
data_parallel_size: "auto"
use_distributed_optimizer: [true, false]
tensor_model_parallel_size: [2, 4, 8]
sequence_parallel: [true]
pipeline_model_parallel_size: "auto"
num_layers_per_virtual_pipeline_stage: [1]
context_parallel_size: "auto"
expert_model_parallel_size: [1]
micro_batch_size: "auto"
use_recompute: [true]
recompute_method: "auto"
recompute_granularity: "auto"
recompute_num_layers: "auto"
control:
max_time_per_task: 300
train_iters: 5
max_time: 600

action: run

hydra:
run:
dir: ${experiment.exp_dir}/hydra
12 changes: 6 additions & 6 deletions flagscale/auto_tuner/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ class Generator:
def __init__(self, config):
self.config = config
# TODO: Just a temporary solution, need to be configurated by user
if "args_mapping" in config.auto_tuner:
self.args_mapping = config.auto_tuner.args_mapping
if "args_mapping" in config.experiment.auto_tuner:
self.args_mapping = config.experiment.auto_tuner.args_mapping
else:
self.args_mapping = {
"data_parallel_size": "data_parallel_size",
Expand Down Expand Up @@ -50,8 +50,8 @@ def gen(self, strategy):
config.experiment.runner.tee = 3
config.experiment.runner.redirects = 3

# FLAGSCALE_AUTOTUNER should be true, it will not save ckpt when train ended and report memory every iteration
config.experiment.envs.FLAGSCALE_AUTOTUNER = True
# auto_tune should be true, it will not save ckpt when train ended and report memory every iteration
config.train.system.auto_tune = True

# Del lr_warmup_samples and train_samples to run megatron.
assert "optimizer" in config.train.model
Expand Down Expand Up @@ -79,8 +79,8 @@ def gen(self, strategy):
config.train.system.checkpoint.save_interval = 2000

# Set train_iters of each task
if "control" in config.auto_tuner:
config.train.model.train_iters = config.auto_tuner.control.get(
if "control" in config.experiment.auto_tuner:
config.train.model.train_iters = config.experiment.auto_tuner.control.get(
"train_iters", 5)
else:
config.train.model.train_iters = 5
Expand Down
12 changes: 6 additions & 6 deletions flagscale/auto_tuner/record/recorder.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,15 @@ def __init__(self, config):
"history.csv",
)
# Metric to grep in the last rank of last node log file
if "auto_tuner" in self.config and "performance" in self.config.auto_tuner:
self.metric = self.config.auto_tuner.performance.get(
if "auto_tuner" in self.config and "performance" in self.config.experiment.auto_tuner:
self.metric = self.config.experiment.auto_tuner.performance.get(
"name", "elapsed time per iteration \(ms\):")
else:
self.metric = "elapsed time per iteration \(ms\):"

# Sort order of performance, order just in [ascend, and descend], default ascend
if "auto_tuner" in self.config and "performance" in self.config.auto_tuner:
self.sorted_order = self.config.auto_tuner.performance.get(
if "auto_tuner" in self.config and "performance" in self.config.experiment.auto_tuner:
self.sorted_order = self.config.experiment.auto_tuner.performance.get(
"order", "ascend")
else:
self.sorted_order = "ascend"
Expand Down Expand Up @@ -66,8 +66,8 @@ def record(self, task, strategy):
strategy["error"] = None

# Pass back to platform if need
if ("airs_switch" in self.config.auto_tuner.platform
and self.config.auto_tuner.platform.airs_switch
if ("airs_switch" in self.config.experiment.auto_tuner.platform
and self.config.experiment.auto_tuner.platform.airs_switch
and strategy["performance"]):
self.pass_back_to_platform(strategy)

Expand Down
98 changes: 49 additions & 49 deletions flagscale/auto_tuner/search/searcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,33 +83,33 @@ def _sort(self, key, dim, priority=None):
def build_space(self, config):
"""Set value of each dim and sort."""
space = {}
cards = config.auto_tuner.cards
cards_per_node = config.auto_tuner.nproc_per_node
cards = config.experiment.auto_tuner.cards
cards_per_node = config.experiment.auto_tuner.nproc_per_node
num_layers = config.train.model.num_layers
gbs = config.train.model.global_batch_size
if "space" not in config.auto_tuner:
config.auto_tuner.space = {}
if "space" not in config.experiment.auto_tuner:
config.experiment.auto_tuner.space = {}

if "algo" not in self.config.auto_tuner:
self.config.auto_tuner.algo = {"name": "grid", "priority": None}
priority = config.auto_tuner.algo.get("priority", None)
if config.auto_tuner.platform.get("airs_switch", False):
if "algo" not in self.config.experiment.auto_tuner:
self.config.experiment.auto_tuner.algo = {"name": "grid", "priority": None}
priority = config.experiment.auto_tuner.algo.get("priority", None)
if config.experiment.auto_tuner.platform.get("airs_switch", False):
priority = "memory"
# Set data parallel degree
space["data_parallel_size"] = (
[i for i in range(1, cards + 1)]
if "data_parallel_size" not in config.auto_tuner.space
or config.auto_tuner.space.data_parallel_size == "auto"
else config.auto_tuner.space.data_parallel_size
if "data_parallel_size" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.data_parallel_size == "auto"
else config.experiment.auto_tuner.space.data_parallel_size
)
self._sort("data_parallel_size", space["data_parallel_size"], priority)

# Set distributed optimizer
space["use_distributed_optimizer"] = (
[True, False]
if "use_distributed_optimizer" not in config.auto_tuner.space
or config.auto_tuner.space.use_distributed_optimizer == "auto"
else config.auto_tuner.space.use_distributed_optimizer
if "use_distributed_optimizer" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.use_distributed_optimizer == "auto"
else config.experiment.auto_tuner.space.use_distributed_optimizer
)
self._sort(
"use_distributed_optimizer", space["use_distributed_optimizer"], priority
Expand All @@ -118,9 +118,9 @@ def build_space(self, config):
# Set tensor parallel degree
space["tensor_model_parallel_size"] = (
[i for i in range(1, cards_per_node + 1)]
if "tensor_model_parallel_size" not in config.auto_tuner.space
or config.auto_tuner.space.tensor_model_parallel_size == "auto"
else config.auto_tuner.space.tensor_model_parallel_size
if "tensor_model_parallel_size" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.tensor_model_parallel_size == "auto"
else config.experiment.auto_tuner.space.tensor_model_parallel_size
)
self._sort(
"tensor_model_parallel_size", space["tensor_model_parallel_size"], priority
Expand All @@ -129,18 +129,18 @@ def build_space(self, config):
# Set sequence parallel
space["sequence_parallel"] = (
[True, False]
if "sequence_parallel" not in config.auto_tuner.space
or config.auto_tuner.space.sequence_parallel == "auto"
else config.auto_tuner.space.sequence_parallel
if "sequence_parallel" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.sequence_parallel == "auto"
else config.experiment.auto_tuner.space.sequence_parallel
)
self._sort("sequence_parallel", space["sequence_parallel"], priority)

# Set pipeline parallel degree
space["pipeline_model_parallel_size"] = (
[i for i in range(1, cards + 1)]
if "pipeline_model_parallel_size" not in config.auto_tuner.space
or config.auto_tuner.space.pipeline_model_parallel_size == "auto"
else config.auto_tuner.space.pipeline_model_parallel_size
if "pipeline_model_parallel_size" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.pipeline_model_parallel_size == "auto"
else config.experiment.auto_tuner.space.pipeline_model_parallel_size
)
self._sort(
"pipeline_model_parallel_size",
Expand All @@ -151,9 +151,9 @@ def build_space(self, config):
# Set virtual pipeline parallel degree
space["num_layers_per_virtual_pipeline_stage"] = (
[i for i in range(1, num_layers + 1)]
if "num_layers_per_virtual_pipeline_stage" not in config.auto_tuner.space
or config.auto_tuner.space.num_layers_per_virtual_pipeline_stage == "auto"
else config.auto_tuner.space.num_layers_per_virtual_pipeline_stage
if "num_layers_per_virtual_pipeline_stage" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage == "auto"
else config.experiment.auto_tuner.space.num_layers_per_virtual_pipeline_stage
)
self._sort(
"num_layers_per_virtual_pipeline_stage",
Expand All @@ -164,63 +164,63 @@ def build_space(self, config):
# Set use recompute
space["use_recompute"] = (
[True, False]
if "use_recompute" not in config.auto_tuner.space
or config.auto_tuner.space.use_recompute == "auto"
else config.auto_tuner.space.use_recompute
if "use_recompute" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.use_recompute == "auto"
else config.experiment.auto_tuner.space.use_recompute
)
self._sort("use_recompute", space["use_recompute"], priority)
# Set recompute method
space["recompute_method"] = (
["uniform", "block"]
if "recompute_method" not in config.auto_tuner.space
or config.auto_tuner.space.recompute_method == "auto"
else config.auto_tuner.space.recompute_method
if "recompute_method" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.recompute_method == "auto"
else config.experiment.auto_tuner.space.recompute_method
)
self._sort("recompute_method", space["recompute_method"], priority)

# Set recompute granularity
space["recompute_granularity"] = (
["full", "selective"]
if "recompute_granularity" not in config.auto_tuner.space
or config.auto_tuner.space.recompute_granularity == "auto"
else config.auto_tuner.space.recompute_granularity
if "recompute_granularity" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.recompute_granularity == "auto"
else config.experiment.auto_tuner.space.recompute_granularity
)
self._sort("recompute_granularity", space["recompute_granularity"], priority)

# Set recompute num layers
space["recompute_num_layers"] = (
[i for i in range(1, num_layers + 1)]
if "recompute_num_layers" not in config.auto_tuner.space
or config.auto_tuner.space.recompute_num_layers == "auto"
else config.auto_tuner.space.recompute_num_layers
if "recompute_num_layers" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.recompute_num_layers == "auto"
else config.experiment.auto_tuner.space.recompute_num_layers
)
self._sort("recompute_num_layers", space["recompute_num_layers"], priority)

# Set micro batch size
space["micro_batch_size"] = (
[i for i in range(1, gbs + 1)]
if "micro_batch_size" not in config.auto_tuner.space
or config.auto_tuner.space.micro_batch_size == "auto"
else config.auto_tuner.space.micro_batch_size
if "micro_batch_size" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.micro_batch_size == "auto"
else config.experiment.auto_tuner.space.micro_batch_size
)
self._sort("micro_batch_size", space["micro_batch_size"], priority)

# Set context parallel degree
space["context_parallel_size"] = (
[i for i in range(1, cards + 1)]
if "context_parallel_size" not in config.auto_tuner.space
or config.auto_tuner.space.context_parallel_size == "auto"
else config.auto_tuner.space.context_parallel_size
if "context_parallel_size" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.context_parallel_size == "auto"
else config.experiment.auto_tuner.space.context_parallel_size
)
self._sort("context_parallel_size", space["context_parallel_size"], priority)

# Set expert parallel degree
# NOTE: Expert parallel degree is not supported now
space["expert_model_parallel_size"] = (
[1]
if "expert_model_parallel_size" not in config.auto_tuner.space
or config.auto_tuner.space.expert_model_parallel_size == "auto"
else config.auto_tuner.space.expert_model_parallel_size
if "expert_model_parallel_size" not in config.experiment.auto_tuner.space
or config.experiment.auto_tuner.space.expert_model_parallel_size == "auto"
else config.experiment.auto_tuner.space.expert_model_parallel_size
)
self._sort(
"expert_model_parallel_size", space["expert_model_parallel_size"], priority
Expand All @@ -240,7 +240,7 @@ def build_strategies(self, space, config):
return recompute_part

def build_algo(self, strategies, config):
name = self.config.auto_tuner.algo.name
name = self.config.experiment.auto_tuner.algo.name
if name == "grid":
from .algorithm import GridAlgo

Expand All @@ -251,7 +251,7 @@ def build_algo(self, strategies, config):
def _product_parallel_dims(self, space, config):
# Avoid space explosion after product
product_parallelism_dims = []
cards = config.auto_tuner.cards
cards = config.experiment.auto_tuner.cards
for data_parallel_size in space["data_parallel_size"]:
dims = {}
if not divisible(cards, data_parallel_size):
Expand Down
Loading

0 comments on commit 08efd6e

Please sign in to comment.