Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deepspeed #898

Closed
wants to merge 43 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
67ad7ad
llm tutorial
porteratzo Oct 5, 2023
600bfc9
small fix
porteratzo Oct 5, 2023
3cacb71
fixes
porteratzo Oct 17, 2023
00a7d92
changes
porteratzo Oct 26, 2023
7f51d62
f
porteratzo Oct 26, 2023
3c13023
f
porteratzo Oct 26, 2023
eb1e2ed
r
porteratzo Oct 26, 2023
636adb0
c
porteratzo Oct 26, 2023
6c0ae79
f
porteratzo Oct 26, 2023
4cc88df
f
porteratzo Oct 26, 2023
bd52231
f
porteratzo Oct 26, 2023
fade608
f
porteratzo Oct 26, 2023
5bc4ed5
f
porteratzo Oct 26, 2023
533059c
f
porteratzo Oct 30, 2023
6856c52
f
porteratzo Oct 30, 2023
43f0ed2
f
porteratzo Oct 30, 2023
a5d6c04
f
porteratzo Oct 30, 2023
3b69cf9
f
porteratzo Oct 30, 2023
563f6ac
f
porteratzo Oct 30, 2023
7ab3311
f
porteratzo Oct 30, 2023
8e4f407
f
porteratzo Oct 30, 2023
ea919a9
d
porteratzo Oct 30, 2023
67f8d5d
f
porteratzo Oct 30, 2023
ec97691
f
porteratzo Oct 30, 2023
d609797
d
porteratzo Oct 30, 2023
25c42f4
f
porteratzo Oct 30, 2023
d6fb0ee
g
porteratzo Oct 30, 2023
7ac2f4f
f
porteratzo Oct 30, 2023
181daad
f
porteratzo Oct 30, 2023
f07e173
f
porteratzo Nov 11, 2023
70f26d7
Merge branch 'develop' of https://github.com/securefederatedai/openfl…
porteratzo Nov 11, 2023
33b913f
g
porteratzo Nov 14, 2023
5d3b2b6
tests
porteratzo Nov 23, 2023
2c3871f
f
porteratzo Nov 23, 2023
f7b7f4f
f
porteratzo Nov 23, 2023
67aa2cc
f
porteratzo Nov 23, 2023
5130ef9
f
porteratzo Nov 24, 2023
9ceaf16
f
porteratzo Nov 24, 2023
76ed2df
f
porteratzo Nov 29, 2023
d4c882a
f
porteratzo Nov 29, 2023
8767f24
f
porteratzo Nov 29, 2023
2632b28
f
porteratzo Nov 29, 2023
0ca51f9
f
porteratzo Nov 29, 2023
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
511 changes: 511 additions & 0 deletions openfl-tutorials/Federated_PyTorch_LLM.ipynb

Large diffs are not rendered by default.

341 changes: 341 additions & 0 deletions openfl-tutorials/experimental/Federeated_Pytorch_LLM_Deepspeed.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,341 @@

from typing import Any, Mapping
import numpy as np
import openfl.native as fx
import torch
import torch as pt
from datasets import Dataset, load_dataset, load_metric
from openfl.federated import PyTorchTaskRunner
from openfl.federated.task.runner_pt import change_tags
from openfl.utilities import Metric, TensorKey
from openfl.utilities.data_splitters import EqualNumPyDataSplitter
from peft import LoraConfig, TaskType, get_peft_model
from peft.utils import get_peft_model_state_dict, set_peft_model_state_dict
from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss
from torch.optim import AdamW
from torch.utils.data import DataLoader
from tqdm import tqdm
import torch.nn as nn
from transformers.trainer_pt_utils import get_parameter_names
from transformers import (AutoModelForSequenceClassification,
AutoTokenizer, DataCollatorWithPadding, get_scheduler)
import argparse
import deepspeed

def get_glue_mrpc_dataset(tokenizer):
dataset = load_dataset("glue", "mrpc")

def tokenize_function(examples):
# max_length=None => use the model max length (it's actually the default)
outputs = tokenizer(
examples["sentence1"],
examples["sentence2"],
truncation=True,
max_length=None,
)
return outputs

tokenized_datasets = dataset.map(
tokenize_function,
batched=True,
remove_columns=["idx", "sentence1", "sentence2"],
)
tokenized_datasets = tokenized_datasets.rename_column("label", "labels")
tokenized_datasets.set_format("torch")
data_collator = DataCollatorWithPadding(tokenizer=tokenizer, padding="longest")
return data_collator, tokenized_datasets

class GlueMrpc(Dataset):
"""
Has 5.8k pairs of sentences with annotations if the two sentences are equivalent
"""
def get_shape(self):

if not hasattr(self, 'saved_shape'):
self.saved_shape = max([len(i) for i in self.data['input_ids']])
return self.saved_shape

class GlueMrpcFederatedDataset(DataLoader):
def __init__(self, train_set, valid_set, batch_size, data_collator=None):
self.data_splitter = EqualNumPyDataSplitter(shuffle=True)
if isinstance(train_set,Dataset):
self.train_set = GlueMrpc.from_dict(train_set.to_dict())
else:
self.train_set = train_set

if isinstance(valid_set,Dataset):
self.valid_set = GlueMrpc.from_dict(valid_set.to_dict())
else:
self.valid_set = valid_set

self.batch_size = batch_size
self.data_collator = data_collator

def split(self, num_collaborators):
train_split = self.data_splitter.split(self.train_set, num_collaborators)
valid_split = self.data_splitter.split(self.valid_set, num_collaborators)
return [
GlueMrpcFederatedDataset(
self.train_set.select(train_split[i]),
self.valid_set.select(valid_split[i]),
self.batch_size, self.data_collator
)
for i in range(num_collaborators)
]

def get_feature_shape(self):
return self.train_set.get_shape()

def get_train_loader(self, num_batches=None):
return DataLoader(self.train_set, batch_size=self.batch_size, collate_fn=self.data_collator)

def get_valid_loader(self):
return DataLoader(self.valid_set, batch_size=self.batch_size, collate_fn=self.data_collator)

def get_train_data_size(self):
return len(self.train_set)

def get_valid_data_size(self):
return len(self.valid_set)


class LLMTaskRunner(PyTorchTaskRunner):
def __init__(
self, base_model_name, data_loader, device=None, metric=None, args=None, **kwargs
):
kwargs["data_loader"] = data_loader
super().__init__(device, **kwargs)
self.base_model_name = base_model_name
self.metric = metric
self._init_model()
self._init_params()
self.model, self.optimizer, _, self.scheduler = deepspeed.initialize(args=args,
model=self.model,
model_parameters=self.optimizer_grouped_parameters)

self.save_models = []

def _init_model(self):
model = AutoModelForSequenceClassification.from_pretrained(
self.base_model_name, return_dict=True
)
peft_config = LoraConfig(
task_type=TaskType.SEQ_CLS,
inference_mode=False,
r=16,
lora_alpha=16,
lora_dropout=0.1,
bias="lora_only",
)
self.model = get_peft_model(model, peft_config)

def _init_params(self):
ALL_LAYERNORM_LAYERS = [nn.LayerNorm]
decay_parameters = get_parameter_names(self.model, ALL_LAYERNORM_LAYERS)
decay_parameters = [name for name in decay_parameters if "bias" not in name]

optimizer_grouped_parameters = [
{
"params": [
p
for n, p in self.model.named_parameters()
if (n in decay_parameters and p.requires_grad)
],
"weight_decay": 0.01,
},
{
"params": [
p
for n, p in self.model.named_parameters()
if (n not in decay_parameters and p.requires_grad)
],
"weight_decay": 0.0,
},
]
self.optimizer_grouped_parameters = optimizer_grouped_parameters
self.initialize_tensorkeys_for_functions()

def train(self):
return self.model.train()

def state_dict(self):
return get_peft_model_state_dict(self.model)

def load_state_dict(self, state_dict: Mapping[str, Any], strict: bool = True):
return set_peft_model_state_dict(self.model, state_dict)

def validate(
self, col_name, round_num, input_tensor_dict, use_tqdm=False, **kwargs
):
"""Validate.

Run validation of the model on the local data.

Args:
col_name: Name of the collaborator
round_num: What round is it
input_tensor_dict: Required input tensors (for model)
use_tqdm (bool): Use tqdm to print a progress bar (Default=True)

Returns:
global_output_dict: Tensors to send back to the aggregator
local_output_dict: Tensors to maintain in the local TensorDB

"""
self.save_models.append(input_tensor_dict.copy())
self.rebuild_model(round_num, input_tensor_dict, validation=True)
self.model.eval()


self.model.to(self.device)
val_score = 0
total_samples = 0

loader = self.data_loader.get_valid_loader()
if use_tqdm:
loader = tqdm(loader, desc="validate")

with pt.no_grad():
for sample in loader:
samples = sample["input_ids"].shape[0]
total_samples += samples
output = self.model(**sample)
# get the index of the max log-probability
logits = output.logits
predictions = torch.argmax(logits, dim=-1)
self.metric.add_batch(predictions=predictions, references=sample["labels"])
val_score = self.metric.compute()["accuracy"]

origin = col_name
suffix = "validate"
if kwargs["apply"] == "local":
suffix += "_local"
else:
suffix += "_agg"
tags = ("metric",)
tags = change_tags(tags, add_field=suffix)
# TODO figure out a better way to pass in metric for this pytorch
# validate function
output_tensor_dict = {
TensorKey("acc", origin, round_num, True, tags): np.array(val_score)
}

# Empty list represents metrics that should only be stored locally
return output_tensor_dict, {}

def train_epoch(self, batch_generator) -> Metric:
"""Train single epoch.

Override this function in order to use custom training.

Args:
batch_generator: Train dataset batch generator. Yields (samples, targets) tuples of
size = `self.data_loader.batch_size`.
Returns:
Metric: An object containing name and np.ndarray value.
"""
losses = []
for sample in batch_generator:
self.model.zero_grad()
output = self.model(**sample)
loss = output.loss
self.model.backward(loss)
torch.nn.utils.clip_grad_norm_(self.model.parameters(), 1.0)
self.model.step()
losses.append(loss.detach().cpu().numpy())
loss = np.mean(losses)
if self.model.config.problem_type == "regression":
loss_fct = MSELoss()
elif self.model.config.problem_type == "single_label_classification":
loss_fct = CrossEntropyLoss()
elif self.model.config.problem_type == "multi_label_classification":
loss_fct = BCEWithLogitsLoss()
return Metric(name=loss_fct._get_name(), value=np.array(loss))

def save_native(
self,
filepath,
model_state_dict_key="model_state_dict",
optimizer_state_dict_key="optimizer_state_dict",
**kwargs,
):
"""
Save model and optimizer states in a picked file specified by the \
filepath. model_/optimizer_state_dicts are stored in the keys provided. \
Uses pt.save().

Args:
filepath (string) : Path to pickle file to be
created by pt.save().
model_state_dict_key (string) : key for model state dict
in pickled file.
optimizer_state_dict_key (string) : key for optimizer state
dict in picked file.
kwargs : unused

Returns:
None
"""
pickle_dict = {
model_state_dict_key: get_peft_model_state_dict(self.model),
optimizer_state_dict_key: self.optimizer.state_dict(),
}
pt.save(pickle_dict, filepath)

def get_arguments():
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank",
type=int,
default=-1,
help="local_rank for distributed training on gpus")
# Include DeepSpeed configuration arguments
parser = deepspeed.add_config_arguments(parser)

args = parser.parse_args()

return args

def get_dataset(base_model_name = "roberta-base", padding_side = "right"):
tokenizer = AutoTokenizer.from_pretrained(base_model_name, padding_side=padding_side)
if getattr(tokenizer, "pad_token_id") is None:
tokenizer.pad_token_id = tokenizer.eos_token_id
data_collator, tokenized_datasets = get_glue_mrpc_dataset(tokenizer)

train_set = GlueMrpc.from_dict(tokenized_datasets['train'].to_dict())
valid_set = GlueMrpc.from_dict(tokenized_datasets['test'].to_dict())

fl_data = GlueMrpcFederatedDataset(train_set, valid_set, batch_size=32)
return fl_data

def main():
args = get_arguments()
fx.init('torch_llm')
base_model_name = "roberta-base"
padding_side = "right"
fl_data = get_dataset(base_model_name, padding_side)
metric = load_metric('glue', "mrpc")

num_collaborators = 2
collaborator_models = [
LLMTaskRunner(
base_model_name,
data_loader=data_slice,
metric=metric, args=args
)
for data_slice in fl_data.split(num_collaborators)]
collaborators = {'one':collaborator_models[0],'two':collaborator_models[1]}#, 'three':collaborator_models[2]}

# %%
#Original TinyImageNet dataset
print(f'Original training data size: {len(fl_data.train_set)}')
print(f'Original validation data size: {len(fl_data.valid_set)}\n')

#Collaborator one's data
for i, model in enumerate(collaborator_models):
print(f'Collaborator {i}\'s training data size: {len(model.data_loader.train_set)}')
print(f'Collaborator {i}\'s validation data size: {len(model.data_loader.valid_set)}\n')
final_fl_model = fx.run_experiment(collaborators,{'aggregator.settings.rounds_to_train':10,"tasks.train.kwargs.epochs":2})


if __name__ == "__main__":
main()
Loading
Loading