From 3f841bad5d4a030d3242e16ef60177e0fbdb1372 Mon Sep 17 00:00:00 2001 From: zzhhjjj Date: Tue, 2 Jul 2024 14:20:31 +0000 Subject: [PATCH] update tests to add SP process group --- examples/llama/tests/test_conversion.py | 22 +++--- tests/helpers/utils.py | 10 +-- .../nanoset/test_build_nanoset_dataloader.py | 30 ++++---- tests/test_clip_grads.py | 8 +-- tests/test_data_parallel.py | 2 +- tests/test_distributed.py | 12 ++-- tests/test_optimizer_params_groups.py | 2 +- tests/test_p2p.py | 2 +- ..._parameters_accumulate_gradient_in_fp32.py | 6 +- tests/test_pipeline_parallel.py | 10 +-- tests/test_serialize.py | 68 ++++++++++--------- tests/test_tie_weights.py | 8 +-- tests/test_zero.py | 2 +- 13 files changed, 97 insertions(+), 85 deletions(-) diff --git a/examples/llama/tests/test_conversion.py b/examples/llama/tests/test_conversion.py index b5ce3529..7e86de6e 100644 --- a/examples/llama/tests/test_conversion.py +++ b/examples/llama/tests/test_conversion.py @@ -105,7 +105,7 @@ def _test_nt_to_hf(parallel_context: ParallelContext, input_ids: torch.Tensor): def test_nt_to_hf(input_ids: torch.Tensor): - init_distributed(tp=1, dp=1, pp=1)(_test_nt_to_hf)(input_ids=input_ids) + init_distributed(tp=1, dp=1, pp=1, sp=1)(_test_nt_to_hf)(input_ids=input_ids) def _test_nt_to_hf_with_files(parallel_context: ParallelContext, input_ids: torch.Tensor, test_context: TestContext): @@ -130,7 +130,9 @@ def _test_nt_to_hf_with_files(parallel_context: ParallelContext, input_ids: torc def test_nt_to_hf_with_files(input_ids: torch.Tensor): - init_distributed(tp=1, dp=1, pp=1)(_test_nt_to_hf_with_files)(input_ids=input_ids, test_context=TestContext()) + init_distributed(tp=1, dp=1, pp=1, sp=1)(_test_nt_to_hf_with_files)( + input_ids=input_ids, test_context=TestContext() + ) def _test_hf_to_nt(parallel_context: ParallelContext, input_ids: torch.Tensor): @@ -141,11 +143,11 @@ def _test_hf_to_nt(parallel_context: ParallelContext, input_ids: torch.Tensor): logits_nt = model_nt.model(input_ids, input_mask).permute(1, 0, 2) logits_hf = model_hf(input_ids).logits assert logits_nt.size() == logits_hf.size() - torch.testing.assert_allclose(logits_hf, logits_nt, atol=ATOL) + torch.testing.assert_allclose(logits_hf, logits_nt, atol=ATOL) def test_hf_to_nt(input_ids: torch.Tensor): - init_distributed(tp=1, dp=1, pp=1)(_test_hf_to_nt)(input_ids=input_ids) + init_distributed(tp=1, dp=1, pp=1, sp=1)(_test_hf_to_nt)(input_ids=input_ids) def _test_hf_to_nt_with_files(parallel_context: ParallelContext, input_ids: torch.Tensor, test_context: TestContext): @@ -168,7 +170,9 @@ def _test_hf_to_nt_with_files(parallel_context: ParallelContext, input_ids: torc def test_hf_to_nt_with_files(input_ids: torch.Tensor): - init_distributed(tp=1, dp=1, pp=1)(_test_hf_to_nt_with_files)(input_ids=input_ids, test_context=TestContext()) + init_distributed(tp=1, dp=1, pp=1, sp=1)(_test_hf_to_nt_with_files)( + input_ids=input_ids, test_context=TestContext() + ) def _test_composed_conversion(parallel_context: ParallelContext): @@ -196,7 +200,7 @@ def _test_composed_conversion(parallel_context: ParallelContext): def test_composed_conversion(): - init_distributed(tp=1, dp=1, pp=1)(_test_composed_conversion)() + init_distributed(tp=1, dp=1, pp=1, sp=1)(_test_composed_conversion)() def _save_parallel_nanotron(parallel_context: ParallelContext, input_ids: torch.Tensor, nt_path: Path): @@ -239,9 +243,11 @@ def test_tensor_parallel_conversion(input_ids: torch.Tensor): hf_path = root / "nanotron" # Launch both parts. - init_distributed(tp=2, dp=1, pp=1)(_save_parallel_nanotron)(input_ids=input_ids, nt_path=nt_path) + init_distributed(tp=2, dp=1, pp=1, sp=1)(_save_parallel_nanotron)(input_ids=input_ids, nt_path=nt_path) assert (nt_path / "logits.pt").exists() - init_distributed(tp=1, dp=1, pp=1)(_convert_from_parallel)(input_ids=input_ids, nt_path=nt_path, hf_path=hf_path) + init_distributed(tp=1, dp=1, pp=1, sp=1)(_convert_from_parallel)( + input_ids=input_ids, nt_path=nt_path, hf_path=hf_path + ) assert (hf_path / "logits.pt").exists() # Load logits and verify they match. diff --git a/tests/helpers/utils.py b/tests/helpers/utils.py index 44813cac..b8f6488a 100644 --- a/tests/helpers/utils.py +++ b/tests/helpers/utils.py @@ -107,8 +107,8 @@ def is_dict_equal(first: Dict, second: Dict, sub_paths: Optional[List[str]] = No return True, None -def get_all_3d_configurations(gpus: int) -> List[Tuple[int, int, int]]: - """Given a number of gpus, we want all 3d configurations possible such that pp * dp * tp = gpus""" +def get_all_4d_configurations(gpus: int) -> List[Tuple[int, int, int, int]]: + """Given a number of gpus, we want all 4d configurations possible such that pp * dp * tp * sp = gpus""" result = [] for tp in range(1, gpus + 1): if gpus % tp != 0: @@ -121,8 +121,10 @@ def get_all_3d_configurations(gpus: int) -> List[Tuple[int, int, int]]: for pp in range(1, gpus_left_after_dp + 1): if gpus_left_after_dp % pp != 0: continue - if tp * dp * pp == gpus: - result.append((pp, dp, tp)) + gpus_left_after_pp = gpus_left_after_dp // pp + for sp in range(1, gpus_left_after_pp + 1): + if tp * dp * pp * sp == gpus: + result.append((tp, dp, pp, sp)) return result diff --git a/tests/nanoset/test_build_nanoset_dataloader.py b/tests/nanoset/test_build_nanoset_dataloader.py index 2c3ff542..331e4f64 100644 --- a/tests/nanoset/test_build_nanoset_dataloader.py +++ b/tests/nanoset/test_build_nanoset_dataloader.py @@ -2,9 +2,6 @@ from math import isclose from pathlib import Path -package_path = Path(__file__).parent.parent -sys.path.append(str(package_path)) - import numpy as np import pytest from helpers.context import TestContext @@ -16,7 +13,7 @@ create_dummy_json_dataset, preprocess_dummy_dataset, ) -from helpers.utils import available_gpus, get_all_3d_configurations, init_distributed, rerun_if_address_is_in_use +from helpers.utils import available_gpus, get_all_4d_configurations, init_distributed, rerun_if_address_is_in_use from nanotron.data.dataloader_builder import build_nanoset_dataloader from nanotron.data.nanoset import Nanoset from nanotron.data.utils import count_dataset_indexes, normalize @@ -24,13 +21,16 @@ from nanotron.utils import main_rank_first from transformers import AutoTokenizer +package_path = Path(__file__).parent.parent +sys.path.append(str(package_path)) + @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @pytest.mark.parametrize("train_steps", [5, 100]) @@ -38,7 +38,7 @@ @pytest.mark.parametrize("tokenizer_name_or_path", ["openai-community/gpt2", "unsloth/llama-3-8b-bnb-4bit"]) @rerun_if_address_is_in_use() def test_build_nanoset_dataloader( - tp: int, dp: int, pp: int, train_steps: int, sequence_length: int, tokenizer_name_or_path: str + tp: int, dp: int, pp: int, sp: int, train_steps: int, sequence_length: int, tokenizer_name_or_path: str ): test_context = TestContext() @@ -49,7 +49,7 @@ def test_build_nanoset_dataloader( for idx, json_path in enumerate(json_paths): create_dummy_json_dataset(path_to_json=json_path, dummy_text=f"Nanoset {idx}!", n_samples=(idx + 1) * 50000) - init_distributed(tp=tp, dp=dp, pp=pp)(_test_build_nanoset_dataloader)( + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_build_nanoset_dataloader)( json_paths=json_paths, path_to_mmap_files=mmap_dataset_paths, train_steps=train_steps, @@ -155,17 +155,19 @@ def _test_build_nanoset_dataloader( @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @pytest.mark.parametrize("skipped_batches", [20, 50]) @pytest.mark.parametrize("tokenizer_name_or_path", ["openai-community/gpt2", "unsloth/llama-3-8b-bnb-4bit"]) @rerun_if_address_is_in_use() -def test_recover_nanoset_dataloader(tp: int, dp: int, pp: int, skipped_batches: int, tokenizer_name_or_path: str): +def test_recover_nanoset_dataloader( + tp: int, dp: int, pp: int, sp: int, skipped_batches: int, tokenizer_name_or_path: str +): test_context = TestContext() # Create dataset files @@ -175,7 +177,7 @@ def test_recover_nanoset_dataloader(tp: int, dp: int, pp: int, skipped_batches: for idx, json_path in enumerate(json_paths): create_dummy_json_dataset(path_to_json=json_path, dummy_text=f"Nanoset {idx}!", n_samples=(idx + 1) * 50000) - init_distributed(tp=tp, dp=dp, pp=pp)(_test_recover_nanoset_dataloader)( + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_recover_nanoset_dataloader)( json_paths=json_paths, path_to_mmap_files=mmap_dataset_paths, skipped_batches=skipped_batches, diff --git a/tests/test_clip_grads.py b/tests/test_clip_grads.py index b4682875..66940294 100644 --- a/tests/test_clip_grads.py +++ b/tests/test_clip_grads.py @@ -34,7 +34,7 @@ @pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0]) @rerun_if_address_is_in_use() def test_clip_grads_with_pp(norm_type: float): - init_distributed(tp=1, dp=1, pp=2)(_test_clip_grads_with_pp)(norm_type=norm_type) + init_distributed(tp=1, dp=1, pp=2, sp=1)(_test_clip_grads_with_pp)(norm_type=norm_type) def _test_clip_grads_with_pp(parallel_context: ParallelContext, norm_type: float): @@ -203,7 +203,7 @@ def _test_clip_grads_with_pp(parallel_context: ParallelContext, norm_type: float @pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0]) @rerun_if_address_is_in_use() def test_clip_grads_with_tp(tp_mode: TensorParallelLinearMode, async_communication: bool, norm_type: float): - init_distributed(tp=2, dp=1, pp=1)(_test_clip_grads_with_tp)( + init_distributed(tp=2, dp=1, pp=1, sp=1)(_test_clip_grads_with_tp)( tp_mode=tp_mode, async_communication=async_communication, norm_type=norm_type ) @@ -345,7 +345,7 @@ def _test_clip_grads_with_tp( @pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0]) @rerun_if_address_is_in_use() def test_clip_grads_tied_weights(norm_type: float): - init_distributed(tp=1, dp=1, pp=2)(_test_clip_grads_tied_weights)(norm_type=norm_type) + init_distributed(tp=1, dp=1, pp=2, sp=1)(_test_clip_grads_tied_weights)(norm_type=norm_type) def _test_clip_grads_tied_weights(parallel_context: ParallelContext, norm_type: float): @@ -438,7 +438,7 @@ def _test_clip_grads_tied_weights(parallel_context: ParallelContext, norm_type: @pytest.mark.parametrize("norm_type", [math.inf, 1.0, 2.0]) @rerun_if_address_is_in_use() def test_clip_grads_fp32_accumulator(norm_type: float, half_precision: torch.dtype): - init_distributed(tp=1, dp=1, pp=2)(_test_clip_grads_fp32_accumulator)( + init_distributed(tp=1, dp=1, pp=2, sp=1)(_test_clip_grads_fp32_accumulator)( norm_type=norm_type, half_precision=half_precision ) diff --git a/tests/test_data_parallel.py b/tests/test_data_parallel.py index 21ae191a..bc367470 100644 --- a/tests/test_data_parallel.py +++ b/tests/test_data_parallel.py @@ -17,7 +17,7 @@ @pytest.mark.parametrize("accumulation_steps", [1, 3]) @rerun_if_address_is_in_use() def test_ddp_with_afab(accumulation_steps): - init_distributed(tp=1, dp=2, pp=1)(_test_ddp_with_afab)(accumulation_steps=accumulation_steps) + init_distributed(tp=1, dp=2, pp=1, sp=1)(_test_ddp_with_afab)(accumulation_steps=accumulation_steps) def _test_ddp_with_afab(parallel_context: ParallelContext, accumulation_steps: int): diff --git a/tests/test_distributed.py b/tests/test_distributed.py index 7c0d2462..241dd56e 100644 --- a/tests/test_distributed.py +++ b/tests/test_distributed.py @@ -3,7 +3,7 @@ import torch.distributed as dist from helpers.utils import ( available_gpus, - get_all_3d_configurations, + get_all_4d_configurations, init_distributed, rerun_if_address_is_in_use, ) @@ -36,13 +36,13 @@ def _test_init_parallel_context(parallel_context: ParallelContext): @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @rerun_if_address_is_in_use() -def test_init_parallel_context(tp: int, dp: int, pp: int): - init_distributed(tp=tp, dp=dp, pp=pp)(_test_init_parallel_context)() \ No newline at end of file +def test_init_parallel_context(tp: int, dp: int, pp: int, sp: int): + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_init_parallel_context)() diff --git a/tests/test_optimizer_params_groups.py b/tests/test_optimizer_params_groups.py index fa835e1c..60d541c3 100644 --- a/tests/test_optimizer_params_groups.py +++ b/tests/test_optimizer_params_groups.py @@ -496,7 +496,7 @@ def optimizer_builder(inp_param_groups): def test_ddp_optimizer_grad_accumulation_lr_weight_decay_multiple_group( half_precision: torch.dtype, accumulation_steps: int ): - init_distributed(tp=1, dp=2, pp=1)(_test_ddp_optimizer_grad_accumulation_lr_weight_decay_multiple_group)( + init_distributed(tp=1, dp=2, pp=1, sp=1)(_test_ddp_optimizer_grad_accumulation_lr_weight_decay_multiple_group)( half_precision=half_precision, accumulation_steps=accumulation_steps, ) diff --git a/tests/test_p2p.py b/tests/test_p2p.py index ed8245a8..9b260c12 100644 --- a/tests/test_p2p.py +++ b/tests/test_p2p.py @@ -14,7 +14,7 @@ @pytest.mark.parametrize("full", [True, False]) @rerun_if_address_is_in_use() def test_check_send_recv_tensor(send_contiguous: bool, full: bool): - init_distributed(tp=1, dp=1, pp=2)(_test_check_send_recv_tensor)(send_contiguous=send_contiguous, full=full) + init_distributed(tp=1, dp=1, pp=2, sp=1)(_test_check_send_recv_tensor)(send_contiguous=send_contiguous, full=full) def _test_check_send_recv_tensor(parallel_context: ParallelContext, send_contiguous: bool, full: bool): diff --git a/tests/test_parameters_accumulate_gradient_in_fp32.py b/tests/test_parameters_accumulate_gradient_in_fp32.py index ba0debd6..0c7eb152 100644 --- a/tests/test_parameters_accumulate_gradient_in_fp32.py +++ b/tests/test_parameters_accumulate_gradient_in_fp32.py @@ -143,7 +143,7 @@ def test_optimizer_can_step_gradient_in_fp32(half_precision: torch.dtype): @pytest.mark.parametrize("train_iterations", [1, 3]) @rerun_if_address_is_in_use() def test_ddp_with_grad_accum_in_fp32(half_precision: torch.dtype, accumulation_steps: int, train_iterations: int): - init_distributed(tp=1, dp=2, pp=1)(_test_ddp_with_grad_accum_in_fp32)( + init_distributed(tp=1, dp=2, pp=1, sp=1)(_test_ddp_with_grad_accum_in_fp32)( half_precision=half_precision, accumulation_steps=accumulation_steps, train_iterations=train_iterations, @@ -257,7 +257,7 @@ def _test_ddp_with_grad_accum_in_fp32( accumulator.backward(loss_fp32_accum) for name, param in model_ddp_fp32_accum.named_parameters(): - # Check that half grads has been set to None in sync step, to avoid it being uncorrectly used + # Check that half grads has been set to None in sync step, to avoid it being incorrectly used half_grad = param.grad assert half_grad is None, f"{half_grad} != None" @@ -310,7 +310,7 @@ def _test_ddp_with_grad_accum_in_fp32( @pytest.mark.parametrize("reduce_scatter", [True, False]) @rerun_if_address_is_in_use() def test_tied_weights_sync_with_grad_accum_in_fp32(pipeline_engine: PipelineEngine, reduce_scatter: bool): - init_distributed(tp=1, dp=2, pp=2)(_test_tied_weights_sync_with_grad_accum_in_fp32)( + init_distributed(tp=1, dp=2, pp=2, sp=1)(_test_tied_weights_sync_with_grad_accum_in_fp32)( pipeline_engine=pipeline_engine, reduce_scatter=reduce_scatter ) diff --git a/tests/test_pipeline_parallel.py b/tests/test_pipeline_parallel.py index a7f8008f..ac8a18a6 100644 --- a/tests/test_pipeline_parallel.py +++ b/tests/test_pipeline_parallel.py @@ -22,7 +22,7 @@ @pytest.mark.skipif(available_gpus() < 2, reason="Testing build_and_set_rank requires at least 2 gpus") @rerun_if_address_is_in_use() def test_build_and_set_rank(): - init_distributed(tp=1, dp=1, pp=2)(_test_build_and_set_rank)() + init_distributed(tp=1, dp=1, pp=2, sp=1)(_test_build_and_set_rank)() def _test_build_and_set_rank(parallel_context: ParallelContext): @@ -72,7 +72,7 @@ def test_init_on_device_and_dtype(): @pytest.mark.parametrize("pp", list(range(2, min(4, available_gpus()) + 1))) @rerun_if_address_is_in_use() def test_pipeline_engine(pipeline_engine: PipelineEngine, pp: int): - init_distributed(tp=1, dp=1, pp=pp)(_test_pipeline_engine)(pipeline_engine=pipeline_engine) + init_distributed(tp=1, dp=1, pp=pp, sp=1)(_test_pipeline_engine)(pipeline_engine=pipeline_engine) def _test_pipeline_engine(parallel_context: ParallelContext, pipeline_engine: PipelineEngine): @@ -217,7 +217,7 @@ def _test_pipeline_engine(parallel_context: ParallelContext, pipeline_engine: Pi @pytest.mark.parametrize("pp", list(range(2, min(4, available_gpus()) + 1))) @rerun_if_address_is_in_use() def test_pipeline_engine_with_tensor_that_does_not_require_grad(pipeline_engine: PipelineEngine, pp: int): - init_distributed(pp=pp, dp=1, tp=1)(_test_pipeline_engine_with_tensor_that_does_not_require_grad)( + init_distributed(pp=pp, dp=1, tp=1, sp=1)(_test_pipeline_engine_with_tensor_that_does_not_require_grad)( pipeline_engine=pipeline_engine ) @@ -448,7 +448,7 @@ def dummy_infinite_data_loader_with_non_differentiable_tensor( @pytest.mark.parametrize("pp", list(range(2, min(4, available_gpus()) + 1))) @rerun_if_address_is_in_use() def test_pipeline_forward_without_engine(pp: int): - init_distributed(pp=pp, dp=1, tp=1)(_test_pipeline_forward_without_engine)() + init_distributed(pp=pp, dp=1, tp=1, sp=1)(_test_pipeline_forward_without_engine)() def _test_pipeline_forward_without_engine(parallel_context: ParallelContext): @@ -623,7 +623,7 @@ def dummy_infinite_data_loader_with_non_differentiable_tensor( ) @rerun_if_address_is_in_use() def test_pipeline_engine_diamond(pipeline_engine: PipelineEngine): - init_distributed(pp=4, dp=1, tp=1)(_test_pipeline_engine_diamond)(pipeline_engine=pipeline_engine) + init_distributed(pp=4, dp=1, tp=1, sp=1)(_test_pipeline_engine_diamond)(pipeline_engine=pipeline_engine) pass diff --git a/tests/test_serialize.py b/tests/test_serialize.py index 329ff279..6b28c4c0 100644 --- a/tests/test_serialize.py +++ b/tests/test_serialize.py @@ -4,7 +4,7 @@ from helpers.dummy import dummy_infinite_data_loader, init_dummy_model from helpers.utils import ( available_gpus, - get_all_3d_configurations, + get_all_4d_configurations, init_distributed, is_dict_equal, rerun_if_address_is_in_use, @@ -42,18 +42,18 @@ def test_save_and_load_with_changed_topolgy(): @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @rerun_if_address_is_in_use() -def test_save_and_load_model(tp: int, dp: int, pp: int): +def test_save_and_load_model(tp: int, dp: int, pp: int, sp: int): test_context = TestContext() # We use DP=2 as we're interested in testing that one - init_distributed(tp=tp, dp=dp, pp=pp)(_test_save_and_load_model)(test_context=test_context) + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_save_and_load_model)(test_context=test_context) def _test_save_and_load_model(parallel_context: ParallelContext, test_context: TestContext): @@ -84,18 +84,18 @@ def _test_save_and_load_model(parallel_context: ParallelContext, test_context: T @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @rerun_if_address_is_in_use() -def test_save_and_load_optimizer(tp: int, dp: int, pp: int): +def test_save_and_load_optimizer(tp: int, dp: int, pp: int, sp: int): test_context = TestContext() # We use DP=2 as we're interested in testing that one - init_distributed(tp=tp, dp=dp, pp=pp)(_test_save_and_load_optimizer)(test_context=test_context) + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_save_and_load_optimizer)(test_context=test_context) def _test_save_and_load_optimizer(parallel_context: ParallelContext, test_context: TestContext): @@ -149,18 +149,20 @@ def _test_save_and_load_optimizer(parallel_context: ParallelContext, test_contex @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @rerun_if_address_is_in_use() -def test_save_zero_optimizer_and_load_optimizer(tp: int, dp: int, pp: int): +def test_save_zero_optimizer_and_load_optimizer(tp: int, dp: int, pp: int, sp: int): test_context = TestContext() # We use DP=2 as we're interested in testing that one - init_distributed(tp=tp, dp=dp, pp=pp)(_test_save_zero_optimizer_and_load_optimizer)(test_context=test_context) + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_save_zero_optimizer_and_load_optimizer)( + test_context=test_context + ) def _test_save_zero_optimizer_and_load_optimizer(parallel_context: ParallelContext, test_context: TestContext): @@ -223,18 +225,18 @@ def _test_save_zero_optimizer_and_load_optimizer(parallel_context: ParallelConte @pytest.mark.skip(reason="Assumption that zero and non zero optimizer have the same serialization format doesn't hold") @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @rerun_if_address_is_in_use() -def test_save_zero_optimizer_and_load_data_parallel_optimizer(tp: int, dp: int, pp: int): +def test_save_zero_optimizer_and_load_data_parallel_optimizer(tp: int, dp: int, pp: int, sp: int): test_context = TestContext() # We use DP=2 as we're interested in testing that one - init_distributed(tp=tp, dp=dp, pp=pp)(_test_save_zero_optimizer_and_load_data_parallel_optimizer)( + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_save_zero_optimizer_and_load_data_parallel_optimizer)( test_context=test_context ) @@ -294,18 +296,18 @@ def _test_save_zero_optimizer_and_load_data_parallel_optimizer( @pytest.mark.skip(reason="Assumption that zero and non zero optimizer have the same serialization format doesn't hold") @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @rerun_if_address_is_in_use() -def test_save_data_parallel_optimizer_and_load_zero_optimizer(tp: int, dp: int, pp: int): +def test_save_data_parallel_optimizer_and_load_zero_optimizer(tp: int, dp: int, pp: int, sp: int): test_context = TestContext() # We use DP=2 as we're interested in testing that one - init_distributed(tp=tp, dp=dp, pp=pp)(_test_save_data_parallel_optimizer_and_load_zero_optimizer)( + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_save_data_parallel_optimizer_and_load_zero_optimizer)( test_context=test_context ) @@ -361,18 +363,18 @@ def _test_save_data_parallel_optimizer_and_load_zero_optimizer( @pytest.mark.parametrize( - "tp,dp,pp", + "tp,dp,pp,sp", [ - pytest.param(*all_3d_configs) + pytest.param(*all_4d_configs) for gpus in range(1, min(available_gpus(), 4) + 1) - for all_3d_configs in get_all_3d_configurations(gpus) + for all_4d_configs in get_all_4d_configurations(gpus) ], ) @rerun_if_address_is_in_use() -def test_save_optimizer_with_additional_state_dict_keys(tp: int, dp: int, pp: int): +def test_save_optimizer_with_additional_state_dict_keys(tp: int, dp: int, pp: int, sp: int): test_context = TestContext() # We use DP=2 as we're interested in testing that one - init_distributed(tp=tp, dp=dp, pp=pp)(_test_save_optimizer_with_additional_state_dict_keys)( + init_distributed(tp=tp, dp=dp, pp=pp, sp=sp)(_test_save_optimizer_with_additional_state_dict_keys)( test_context=test_context ) @@ -480,7 +482,7 @@ def _test_save_optimizer_with_additional_state_dict_keys(parallel_context: Paral def test_save_and_load_random_states(): test_context = TestContext() # We use DP=2 as we're interested in testing - init_distributed(tp=2, dp=1, pp=1)(_test_save_and_load_random_states)(test_context=test_context) + init_distributed(tp=2, dp=1, pp=1, sp=1)(_test_save_and_load_random_states)(test_context=test_context) def _test_save_and_load_random_states(parallel_context: ParallelContext, test_context: TestContext): @@ -519,7 +521,7 @@ def _test_save_and_load_random_states(parallel_context: ParallelContext, test_co @rerun_if_address_is_in_use() def test_serialize_deserialize_tensormetadata(): test_context = TestContext() - init_distributed(tp=2, dp=1, pp=1)(_test_serialize_deserialize_tensormetadata)(test_context=test_context) + init_distributed(tp=2, dp=1, pp=1, sp=1)(_test_serialize_deserialize_tensormetadata)(test_context=test_context) def _test_serialize_deserialize_tensormetadata(parallel_context: ParallelContext, test_context: TestContext): diff --git a/tests/test_tie_weights.py b/tests/test_tie_weights.py index eecfc097..d81aeab0 100644 --- a/tests/test_tie_weights.py +++ b/tests/test_tie_weights.py @@ -15,7 +15,7 @@ @rerun_if_address_is_in_use() def test_tie_weight_in_same_device(): - init_distributed(tp=1, dp=1, pp=1)(_test_tie_weight_in_same_device)() + init_distributed(tp=1, dp=1, pp=1, sp=1)(_test_tie_weight_in_same_device)() def _test_tie_weight_in_same_device(parallel_context: ParallelContext): @@ -49,7 +49,7 @@ def _test_tie_weight_in_same_device(parallel_context: ParallelContext): @rerun_if_address_is_in_use() def test_tie_weight_in_different_device(): - init_distributed(tp=1, dp=1, pp=2)(_test_tie_weight_in_different_device)() + init_distributed(tp=1, dp=1, pp=2, sp=1)(_test_tie_weight_in_different_device)() def _test_tie_weight_in_different_device(parallel_context: ParallelContext): @@ -120,7 +120,7 @@ def _test_tie_weight_in_different_device(parallel_context: ParallelContext): @rerun_if_address_is_in_use() def test_tie_weight_across_dp_is_impossible(): - init_distributed(tp=1, dp=2, pp=1)(_test_tie_weight_across_dp_is_impossible)() + init_distributed(tp=1, dp=2, pp=1, sp=1)(_test_tie_weight_across_dp_is_impossible)() def _test_tie_weight_across_dp_is_impossible(parallel_context: ParallelContext): @@ -158,7 +158,7 @@ def _test_tie_weight_across_dp_is_impossible(parallel_context: ParallelContext): @rerun_if_address_is_in_use() def test_tie_weight_in_different_device_have_gradients_synchronized(): - init_distributed(tp=1, dp=1, pp=2)(_test_tie_weight_in_different_device_have_gradients_synchronized)() + init_distributed(tp=1, dp=1, pp=2, sp=1)(_test_tie_weight_in_different_device_have_gradients_synchronized)() def _test_tie_weight_in_different_device_have_gradients_synchronized(parallel_context: ParallelContext): diff --git a/tests/test_zero.py b/tests/test_zero.py index b7503553..c749bc0d 100644 --- a/tests/test_zero.py +++ b/tests/test_zero.py @@ -504,7 +504,7 @@ def _test_zero_optimizer_with_tp( @rerun_if_address_is_in_use() def test_sliced_flat_tensor(): - init_distributed(1, 1, 1)(_test_sliced_flat_tensor)() + init_distributed(1, 1, 1, 1)(_test_sliced_flat_tensor)() def _test_sliced_flat_tensor(parallel_context: ParallelContext):