Skip to content

Commit

Permalink
deepspeed-fork content for 1.18.0
Browse files Browse the repository at this point in the history
Signed-off-by: SW publisher <[email protected]>
  • Loading branch information
SW publisher authored and Jenkins committed Oct 10, 2024
1 parent d254d75 commit 907611f
Show file tree
Hide file tree
Showing 156 changed files with 12,028 additions and 768 deletions.
89 changes: 0 additions & 89 deletions .pre-commit-config.yaml

This file was deleted.

46 changes: 35 additions & 11 deletions accelerator/hpu_accelerator.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@

# DeepSpeed Team

import functools
import os
import pkgutil
import importlib
Expand All @@ -17,6 +18,7 @@ def __init__(self):
self._name = 'hpu'
self._communication_backend_name = 'hccl'
self._compile_backend = "hpu_backend"
self.apply_hpu_workarounds()
try:
import habana_frameworks.torch.hpu as hpu
hpu.setDeterministic(True)
Expand All @@ -27,6 +29,15 @@ def __init__(self):

self.fp16_supported = None

def apply_hpu_workarounds(self):

def update_wa_env_var(key, value):
if key not in os.environ.keys():
os.environ[key] = value

update_wa_env_var("PT_HPU_LAZY_ACC_PAR_MODE", "0")
update_wa_env_var("PT_HPU_ENABLE_REFINE_DYNAMIC_SHAPES", "0")

# Device APIs
def is_synchronized_device(self):
return False
Expand All @@ -41,9 +52,8 @@ def handles_memory_backpressure(self):
return True

def device_name(self, device_index=None):
if device_index is None:
return 'hpu'
return 'hpu:{}'.format(device_index)
# ignoring device_index.
return 'hpu'

def device(self, device_index=None):
return torch.device(self.device_name(device_index))
Expand Down Expand Up @@ -194,33 +204,34 @@ def replay_graph(self, graph):
return

# Tensor operations
# TODO(SW-192865): Remove WA for tensor wrappers.
@property
def BFloat16Tensor(self):
return self.hpu.BFloat16Tensor
return functools.partial(torch.tensor, dtype=torch.bfloat16, device='hpu')

@property
def ByteTensor(self):
return self.hpu.ByteTensor
return functools.partial(torch.tensor, dtype=torch.uint8, device='hpu')

@property
def DoubleTensor(self):
return self.hpu.DoubleTensor
return functools.partial(torch.tensor, dtype=torch.double, device='hpu')

@property
def FloatTensor(self):
return self.hpu.FloatTensor
return functools.partial(torch.tensor, dtype=torch.float, device='hpu')

@property
def HalfTensor(self):
return self.hpu.HalfTensor
return functools.partial(torch.tensor, dtype=torch.half, device='hpu')

@property
def IntTensor(self):
return self.hpu.IntTensor
return functools.partial(torch.tensor, dtype=torch.int, device='hpu')

@property
def LongTensor(self):
return self.hpu.LongTensor
return functools.partial(torch.tensor, dtype=torch.long, device='hpu')

def pin_memory(self, tensor, align_bytes=1):
return tensor.pin_memory(self.device())
Expand Down Expand Up @@ -289,6 +300,14 @@ def get_op_builder(self, class_name):
else:
return self.class_dict['NotImplementedBuilder'] if 'NotImplementedBuilder' in self.class_dict else None

#shall be removed once moving to torch.compile
def wrap_in_hpu_graph(self, module):
if self.hpu.is_lazy():
module = self.hpu.wrap_in_hpu_graph(module)
else:
print("Warning: hpu graphs in eager mode is not supported, ignoring")
return module

def build_extension(self):
from torch.utils.cpp_extension import BuildExtension
return BuildExtension
Expand All @@ -297,7 +316,12 @@ def export_envs(self):
return []

def visible_devices_envs(self):
return ['HABANA_VISIBLE_MODULES']
# TODO SW-195658: remove WA to not return HABANA_VISIBLE_MODULES once SW-195657 is resolved
# Current way deepspeed set this env var is not applicable with all HPU instances
# User has to follow instructions in:
# https://docs.habana.ai/en/latest/PyTorch/Reference/PT_Multiple_Tenants_on_HPU/Multiple_Workloads_Single_Docker.html
# keeping CUDA_VISIBLE_DEVICES
return ['CUDA_VISIBLE_DEVICES'] #['HABANA_VISIBLE_MODULES']

def set_visible_devices_envs(self, current_env, local_accelerator_ids):
for env in self.visible_devices_envs():
Expand Down
1 change: 1 addition & 0 deletions build.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
+hpu.synapse.v1.18.0
9 changes: 5 additions & 4 deletions csrc/transformer/inference/csrc/pt_binding.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -452,15 +452,16 @@ std::vector<at::Tensor> ds_softmax_context(at::Tensor& query_key_value,
unsigned layer_id,
unsigned num_layers,
at::Tensor& alibi,
float rope_theta)
float rope_theta,
bool is_prompt,
std::optional<at::Tensor> token_idx,
std::optional<at::Tensor> position_ids)
{
unsigned bsz = query_key_value.size(0);
unsigned seq_len = query_key_value.size(1);
int k = query_key_value.size(2) / (heads + 2 * (num_kv > 0 ? num_kv : heads));
unsigned hidden_dim = heads * k;

bool is_prompt = (seq_len > 1);

if (is_prompt) InferenceContext::Instance().reset_tokens(seq_len);
unsigned soft_len = InferenceContext::Instance().current_tokens();

Expand Down Expand Up @@ -2028,7 +2029,7 @@ PYBIND11_MODULE(TORCH_EXTENSION_NAME, m)
"DeepSpeed memory allocation for GPT inference with " #_name " (CUDA)"); \
m.def("dequantize_" #_name, \
&ds_dequantize<_dtype>, \
"DeepSpeed dequantize with " #_name " (CUDA)")
"DeepSpeed dequantize with " #_name " (CUDA)");

DEF_OPS(fp32, float);
DEF_OPS(fp16, __half);
Expand Down
2 changes: 1 addition & 1 deletion deepspeed/checkpoint/zero_checkpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ def _strip_tensor_paddings(self, sd):
if group_paddings[key] == 0:
continue
for state_name, state_value in group_state.items():
if state_name != "step" and torch.is_tensor(state_value):
if state_name != "step" and torch.is_tensor(state_value) and state_value.dim():
raw_length = state_value.numel() - group_paddings[key]
group_state[state_name] = torch.narrow(state_value, 0, 0, raw_length).clone()
else:
Expand Down
21 changes: 0 additions & 21 deletions deepspeed/comm/torch.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,18 +146,15 @@ def init_process_group(self, backend, timeout, init_method, rank, world_size):
world_size=world_size)
self.using_mpi = torch.distributed.get_backend() == 'mpi'

@compiler.disable
def all_reduce(self, tensor, op=torch.distributed.ReduceOp.SUM, group=None, async_op=False):
op = self._reduce_op(op)
return torch.distributed.all_reduce(tensor=tensor, op=op, group=group, async_op=async_op)

@compiler.disable
def inference_all_reduce(self, tensor, op, group=None):
if self.shm_comm_op == None or self.shm_comm_op.inference_all_reduce(tensor, op) == -1:
op = self._reduce_op(op)
return torch.distributed.all_reduce(tensor=tensor, op=op, group=group, async_op=False)

@compiler.disable
def all_reduce_coalesced(self, tensors, op=torch.distributed.ReduceOp.SUM, group=None, async_op=False):
""" proxy func to torch.distributed.all_reduce_coalesced,
which is included in PyTorch 1.13 and above
Expand All @@ -168,15 +165,13 @@ def all_reduce_coalesced(self, tensors, op=torch.distributed.ReduceOp.SUM, group
op = self._reduce_op(op)
return torch.distributed.all_reduce_coalesced(tensors=tensors, op=op, group=group, async_op=async_op)

@compiler.disable
def reduce(self, tensor, dst, op=ReduceOp.SUM, group=None, async_op=False):
if DS_COMM_REDUCE_OFF:
if int(os.getenv('RANK', '0')) == 0:
utils.logger.warning("REDUCE is OFF")
return Noop()
return torch.distributed.reduce(tensor=tensor, dst=dst, op=self._reduce_op(op), group=group, async_op=async_op)

@compiler.disable
def reduce_scatter(self, output, input_list, op=ReduceOp.SUM, group=None, async_op=False):
if DS_COMM_REDUCE_SCATTER_OFF:
if int(os.getenv('RANK', '0')) == 0:
Expand All @@ -189,7 +184,6 @@ def reduce_scatter(self, output, input_list, op=ReduceOp.SUM, group=None, async_
group=group,
async_op=async_op)

@compiler.disable
def broadcast(self, tensor, src, group=None, async_op=False):
if DS_COMM_BROADCAST_OFF:
if int(os.getenv('RANK', '0')) == 0:
Expand All @@ -198,7 +192,6 @@ def broadcast(self, tensor, src, group=None, async_op=False):
else:
return torch.distributed.broadcast(tensor=tensor, src=src, group=group, async_op=async_op)

@compiler.disable
def all_gather(self, tensor_list, tensor, group=None, async_op=False):
if DS_COMM_ALL_GATHER_OFF:
if int(os.getenv('RANK', '0')) == 0:
Expand All @@ -207,15 +200,13 @@ def all_gather(self, tensor_list, tensor, group=None, async_op=False):
else:
return torch.distributed.all_gather(tensor_list=tensor_list, tensor=tensor, group=group, async_op=async_op)

@compiler.disable
def all_gather_into_tensor(self, output_tensor, input_tensor, group=None, async_op=False):
if self.has_all_gather_into_tensor():
return self.all_gather_function(output_tensor=output_tensor,
input_tensor=input_tensor,
group=group,
async_op=async_op)

@compiler.disable
def all_gather_base(self, output_tensor, input_tensor, group=None, async_op=False):
if DS_COMM_ALL_GATHER_OFF:
if int(os.getenv('RANK', '0')) == 0:
Expand All @@ -233,7 +224,6 @@ def all_gather_base(self, output_tensor, input_tensor, group=None, async_op=Fals
"please consider upgrading your pytorch installation.")
pass

@compiler.disable
def all_gather_coalesced(self, output_tensors, input_tensors, group=None, async_op=False):
""""""
assert len(output_tensors) == len(input_tensors), ""
Expand All @@ -257,7 +247,6 @@ def all_gather_coalesced(self, output_tensors, input_tensors, group=None, async_
else:
reqs[-1].wait()

@compiler.disable
def reduce_scatter_tensor(self, output_tensor, input_tensor, op=ReduceOp.SUM, group=None, async_op=False):
if self.has_reduce_scatter_tensor():
return self.reduce_scatter_function(output_tensor,
Expand All @@ -271,7 +260,6 @@ def reduce_scatter_tensor(self, output_tensor, input_tensor, op=ReduceOp.SUM, gr
"please consider upgrading your pytorch installation.")
pass

@compiler.disable
def all_to_all_single(self,
output,
input,
Expand All @@ -286,49 +274,40 @@ def all_to_all_single(self,
group=group,
async_op=async_op)

@compiler.disable
def all_to_all(self, output_tensor_list, input_tensor_list, group=None, async_op=False):
return torch.distributed.all_to_all(output_tensor_list, input_tensor_list, group=group, async_op=async_op)

@compiler.disable
def send(self, tensor, dst, group=None, tag=0):
return torch.distributed.send(tensor=tensor, dst=dst, group=group, tag=tag)

@compiler.disable
def recv(self, tensor, src=None, group=None, tag=0):
return torch.distributed.recv(tensor=tensor, src=src, group=group, tag=tag)

@compiler.disable
def isend(self, tensor, dst, group=None, tag=0):
return torch.distributed.isend(tensor=tensor, dst=dst, group=group, tag=tag)

@compiler.disable
def irecv(self, tensor, src=None, group=None, tag=0):
return torch.distributed.irecv(tensor=tensor, src=src, group=group, tag=tag)

@compiler.disable
def gather(self, tensor, gather_list=None, dst=0, group=None, async_op=False):
return torch.distributed.gather(tensor=tensor,
gather_list=gather_list,
dst=dst,
group=group,
async_op=async_op)

@compiler.disable
def scatter(self, tensor, scatter_list=None, src=0, group=None, async_op=False):
return torch.distributed.scatter(tensor=tensor,
scatter_list=scatter_list,
src=src,
group=group,
async_op=async_op)

@compiler.disable
def barrier(self, group=torch.distributed.GroupMember.WORLD, async_op=False, device_ids=None):
if group is None:
group = torch.distributed.GroupMember.WORLD
return torch.distributed.barrier(group=group, async_op=async_op, device_ids=device_ids)

@compiler.disable
def monitored_barrier(self, group=torch.distributed.GroupMember.WORLD, timeout=None, wait_all_ranks=False):
if group is None:
group = torch.distributed.GroupMember.WORLD
Expand Down
Loading

0 comments on commit 907611f

Please sign in to comment.