From 76d46394674cf593dbb29456bbef1a9df2752dc0 Mon Sep 17 00:00:00 2001 From: porteratzo Date: Fri, 29 Sep 2023 10:26:38 -0700 Subject: [PATCH] llm tutorial --- openfl-tutorials/Federated_PyTorch_LLM.ipynb | 473 +++++++++++++++++++ 1 file changed, 473 insertions(+) create mode 100644 openfl-tutorials/Federated_PyTorch_LLM.ipynb diff --git a/openfl-tutorials/Federated_PyTorch_LLM.ipynb b/openfl-tutorials/Federated_PyTorch_LLM.ipynb new file mode 100644 index 0000000000..7c030a7011 --- /dev/null +++ b/openfl-tutorials/Federated_PyTorch_LLM.ipynb @@ -0,0 +1,473 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Federated PyTorch TinyImageNet Tutorial" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This notebook is an example of Transfer Learning \n", + "\n", + "Custom DataLoader is used with OpenFL Python API" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Install dependencies if not already installed\n", + "!pip install torch torchvision peft transformers sentencepiece huggingface_hub accelerate datasets evaluate seqeval\n", + "%load_ext autoreload\n", + "%autoreload 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "from typing import Any, Mapping\n", + "import numpy as np\n", + "import openfl.native as fx\n", + "import torch\n", + "import torch as pt\n", + "from accelerate import Accelerator\n", + "from datasets import Dataset, load_dataset, load_metric\n", + "from openfl.federated import PyTorchTaskRunner, TaskRunner\n", + "from openfl.federated.task.runner_pt import change_tags\n", + "from openfl.utilities import Metric, TensorKey\n", + "from openfl.utilities.data_splitters import EqualNumPyDataSplitter\n", + "from peft import LoraConfig, TaskType, get_peft_model\n", + "from peft.utils import get_peft_model_state_dict, set_peft_model_state_dict\n", + "from torch.nn import BCEWithLogitsLoss, CrossEntropyLoss, MSELoss\n", + "from torch.optim import AdamW\n", + "from torch.utils.data import DataLoader\n", + "from tqdm import tqdm\n", + "import torch.nn as nn\n", + "\n", + "from transformers import (AutoConfig, AutoModelForSequenceClassification,\n", + " AutoTokenizer, DataCollatorWithPadding)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "After importing the required packages, the next step is setting up our openfl workspace. To do this, simply run the `fx.init()` command as follows:" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Setup default workspace, logging, etc.\n", + "fx.init('torch_cnn_mnist')" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now we are ready to define our dataset and model to perform federated learning on. The dataset should be composed of a numpy arrayWe start with a simple fully connected model that is trained on the MNIST dataset. " + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Download the data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def get_glue_mrpc_dataset(tokenizer):\n", + " dataset = load_dataset(\"glue\", \"mrpc\")\n", + "\n", + " def tokenize_function(examples):\n", + " # max_length=None => use the model max length (it's actually the default)\n", + " outputs = tokenizer(\n", + " examples[\"sentence1\"],\n", + " examples[\"sentence2\"],\n", + " truncation=True,\n", + " max_length=None,\n", + " )\n", + " return outputs\n", + "\n", + " tokenized_datasets = dataset.map(\n", + " tokenize_function,\n", + " batched=True,\n", + " remove_columns=[\"idx\", \"sentence1\", \"sentence2\"],\n", + " )\n", + " tokenized_datasets = tokenized_datasets.rename_column(\"label\", \"labels\")\n", + " tokenized_datasets.set_format(\"torch\")\n", + " data_collator = DataCollatorWithPadding(tokenizer=tokenizer, padding=\"longest\")\n", + " return data_collator, tokenized_datasets\n", + "\n", + "base_model_name = \"roberta-large\"\n", + "padding_side = \"right\"\n", + "tokenizer = AutoTokenizer.from_pretrained(base_model_name, padding_side=padding_side)\n", + "if getattr(tokenizer, \"pad_token_id\") is None:\n", + " tokenizer.pad_token_id = tokenizer.eos_token_id\n", + "data_collator, tokenized_datasets = get_glue_mrpc_dataset(tokenizer)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Describe the dataset" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class GlueMrpc(Dataset):\n", + " \"\"\"\n", + " Has 5.8k pairs of sentences with annotations if the two sentences are equivalent\n", + " \"\"\" \n", + " def get_shape(self):\n", + " \n", + " if not hasattr(self, 'saved_shape'):\n", + " self.saved_shape = max([len(i) for i in self.data['input_ids']])\n", + " return self.saved_shape\n", + "\n", + "train_set = GlueMrpc.from_dict(tokenized_datasets['train'].to_dict())\n", + "valid_set = GlueMrpc.from_dict(tokenized_datasets['test'].to_dict())\n" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Implement Federated dataset\n", + "We have to implement `split` method" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class GlueMrpcFederatedDataset(DataLoader):\n", + " def __init__(self, train_set, valid_set, batch_size, data_collator=None):\n", + " self.data_splitter = EqualNumPyDataSplitter()\n", + " if isinstance(train_set,Dataset):\n", + " self.train_set = GlueMrpc.from_dict(train_set.to_dict())\n", + " else:\n", + " self.train_set = train_set\n", + " \n", + " if isinstance(valid_set,Dataset):\n", + " self.valid_set = GlueMrpc.from_dict(valid_set.to_dict())\n", + " else:\n", + " self.valid_set = valid_set \n", + " \n", + " self.batch_size = batch_size\n", + " self.data_collator = data_collator\n", + " \n", + " def split(self, num_collaborators):\n", + " train_split = self.data_splitter.split(self.train_set, num_collaborators)\n", + " valid_split = self.data_splitter.split(self.valid_set, num_collaborators)\n", + " return [\n", + " GlueMrpcFederatedDataset(\n", + " self.train_set.select(train_split[i]),\n", + " self.valid_set.select(valid_split[i]),\n", + " self.batch_size\n", + " )\n", + " for i in range(num_collaborators)\n", + " ]\n", + " \n", + " def get_feature_shape(self):\n", + " return self.train_set.get_shape()\n", + " \n", + " def get_train_loader(self, num_batches=None):\n", + " return DataLoader(self.train_set, batch_size=self.batch_size, collate_fn=data_collator)\n", + " \n", + " def get_valid_loader(self):\n", + " return DataLoader(self.valid_set, collate_fn=data_collator)\n", + " \n", + " def get_train_data_size(self):\n", + " return len(self.train_set)\n", + " \n", + " def get_valid_data_size(self):\n", + " return len(self.valid_set)\n", + " \n", + "fl_data = GlueMrpcFederatedDataset(train_set, valid_set, batch_size=32)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Define model" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "class LLMTaskRunner(PyTorchTaskRunner):\n", + " def __init__(self, base_model_name, data_loader, device=None, metric=None, **kwargs):\n", + " kwargs['data_loader'] = data_loader\n", + " super().__init__(device, **kwargs)\n", + " self.base_model_name = base_model_name\n", + " self.metric = metric\n", + " self._init_model()\n", + " self._init_optimizer()\n", + " \n", + " def _init_model(self):\n", + " model = AutoModelForSequenceClassification.from_pretrained(\n", + " self.base_model_name, return_dict=True)\n", + " peft_config = LoraConfig(task_type=TaskType.SEQ_CLS, inference_mode=False, r=16, lora_alpha=16, lora_dropout=0.1, bias=\"all\")\n", + " self.model = get_peft_model(model, peft_config)\n", + " \n", + " def _init_optimizer(self):\n", + " no_decay = [\"bias\", \"LayerNorm.weight\"]\n", + " optimizer_grouped_parameters = [\n", + " {\n", + " \"params\": [p for n, p in self.model.named_parameters() if not any(nd in n for nd in no_decay)],\n", + " \"weight_decay\": 0.01,\n", + " },\n", + " {\n", + " \"params\": [p for n, p in self.model.named_parameters() if any(nd in n for nd in no_decay)],\n", + " \"weight_decay\": 0.0,\n", + " },\n", + " ]\n", + " self.optimizer = AdamW(optimizer_grouped_parameters, lr=0.01)\n", + " \n", + " self.training_round_completed = False\n", + " self.initialize_tensorkeys_for_functions()\n", + " \n", + " def state_dict(self):\n", + " return get_peft_model_state_dict(self.model)\n", + " \n", + " def load_state_dict(self, state_dict: Mapping[str, Any], strict: bool = True):\n", + " return set_peft_model_state_dict(\n", + " self.model, state_dict\n", + " )\n", + " \n", + " def validate(self, col_name, round_num, input_tensor_dict,\n", + " use_tqdm=False, **kwargs):\n", + " \"\"\"Validate.\n", + "\n", + " Run validation of the model on the local data.\n", + "\n", + " Args:\n", + " col_name: Name of the collaborator\n", + " round_num: What round is it\n", + " input_tensor_dict: Required input tensors (for model)\n", + " use_tqdm (bool): Use tqdm to print a progress bar (Default=True)\n", + "\n", + " Returns:\n", + " global_output_dict: Tensors to send back to the aggregator\n", + " local_output_dict: Tensors to maintain in the local TensorDB\n", + "\n", + " \"\"\"\n", + " self.rebuild_model(round_num, input_tensor_dict, validation=True)\n", + " self.model.eval()\n", + " self.model.to(self.device)\n", + " val_score = 0\n", + " total_samples = 0\n", + "\n", + " loader = self.data_loader.get_valid_loader()\n", + " if use_tqdm:\n", + " loader = tqdm(loader, desc='validate')\n", + "\n", + " with pt.no_grad():\n", + " for sample in loader:\n", + " samples = sample['input_ids'].shape[0]\n", + " total_samples += samples\n", + " output = self.model(**sample)\n", + " # get the index of the max log-probability\n", + " logits = output.logits\n", + " predictions = torch.argmax(logits, dim=-1)\n", + " metric.add_batch(predictions=predictions, references=sample['labels'])\n", + " val_score = metric.compute()['accuracy']\n", + "\n", + " origin = col_name\n", + " suffix = 'validate'\n", + " if kwargs['apply'] == 'local':\n", + " suffix += '_local'\n", + " else:\n", + " suffix += '_agg'\n", + " tags = ('metric',)\n", + " tags = change_tags(tags, add_field=suffix)\n", + " # TODO figure out a better way to pass in metric for this pytorch\n", + " # validate function\n", + " output_tensor_dict = {\n", + " TensorKey('acc', origin, round_num, True, tags):\n", + " np.array(val_score)\n", + " }\n", + "\n", + " # Empty list represents metrics that should only be stored locally\n", + " return output_tensor_dict, {}\n", + "\n", + " def train_epoch(self, batch_generator) -> Metric:\n", + " \"\"\"Train single epoch.\n", + "\n", + " Override this function in order to use custom training.\n", + "\n", + " Args:\n", + " batch_generator: Train dataset batch generator. Yields (samples, targets) tuples of\n", + " size = `self.data_loader.batch_size`.\n", + " Returns:\n", + " Metric: An object containing name and np.ndarray value.\n", + " \"\"\"\n", + " losses = []\n", + " for sample in batch_generator:\n", + " self.optimizer.zero_grad()\n", + " output = self.model(**sample)\n", + " loss = output.loss\n", + " loss.backward()\n", + " self.optimizer.step()\n", + " losses.append(loss.detach().cpu().numpy())\n", + " loss = np.mean(losses)\n", + " if self.model.config.problem_type == \"regression\":\n", + " loss_fct = MSELoss()\n", + " elif self.model.config.problem_type == \"single_label_classification\":\n", + " loss_fct = CrossEntropyLoss()\n", + " elif self.model.config.problem_type == \"multi_label_classification\":\n", + " loss_fct = BCEWithLogitsLoss()\n", + " return Metric(name=loss_fct._get_name(), value=np.array(loss))\n", + " \n", + " \n", + " def save_native(self, filepath, model_state_dict_key='model_state_dict',\n", + " optimizer_state_dict_key='optimizer_state_dict', **kwargs):\n", + " \"\"\"\n", + " Save model and optimizer states in a picked file specified by the \\\n", + " filepath. model_/optimizer_state_dicts are stored in the keys provided. \\\n", + " Uses pt.save().\n", + "\n", + " Args:\n", + " filepath (string) : Path to pickle file to be\n", + " created by pt.save().\n", + " model_state_dict_key (string) : key for model state dict\n", + " in pickled file.\n", + " optimizer_state_dict_key (string) : key for optimizer state\n", + " dict in picked file.\n", + " kwargs : unused\n", + "\n", + " Returns:\n", + " None\n", + " \"\"\"\n", + " pickle_dict = {\n", + " model_state_dict_key: get_peft_model_state_dict(self.model),\n", + " optimizer_state_dict_key: self.optimizer.state_dict()\n", + " }\n", + " pt.save(pickle_dict, filepath)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "num_collaborators = 2\n", + "metric = load_metric('glue', \"mrpc\")\n", + "collaborator_models = [\n", + " LLMTaskRunner(\n", + " base_model_name,\n", + " data_loader=data_slice,\n", + " metric=metric\n", + " )\n", + " for data_slice in fl_data.split(num_collaborators)]\n", + "collaborators = {'one':collaborator_models[0],'two':collaborator_models[1]}#, 'three':collaborator_models[2]}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Original TinyImageNet dataset\n", + "print(f'Original training data size: {len(fl_data.train_set)}')\n", + "print(f'Original validation data size: {len(fl_data.valid_set)}\\n')\n", + "\n", + "#Collaborator one's data\n", + "for i, model in enumerate(collaborator_models):\n", + " print(f'Collaborator {i}\\'s training data size: {len(model.data_loader.train_set)}')\n", + " print(f'Collaborator {i}\\'s validation data size: {len(model.data_loader.valid_set)}\\n')\n", + "\n", + "#Collaborator three's data\n", + "#print(f'Collaborator three\\'s training data size: {len(collaborator_models[2].data_loader.X_train)}')\n", + "#print(f'Collaborator three\\'s validation data size: {len(collaborator_models[2].data_loader.X_valid)}')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Run experiment, return trained FederatedModel\n", + "final_fl_model = fx.run_experiment(collaborators,{'aggregator.settings.rounds_to_train':3})" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "#Save final model\n", + "final_fl_model.save_native('final_model.pth')" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "llama-env", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.8.0" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +}