From 0c627793e0698ef4e9eff991a9f02e9d248e1273 Mon Sep 17 00:00:00 2001 From: Robert Washbourne Date: Sun, 22 Sep 2024 21:15:19 +0000 Subject: [PATCH] changes --- mcts/generate.py | 108 +++++++++++++++++ mcts/reward.py | 119 ++++++++++++++++++ mcts/test.py | 51 ++++++++ mcts/train_reward.py | 283 +++++++++++++++++++++++++++++++++++++++++++ mcts/util.py | 28 +++++ modal_reward.py | 28 ++--- modal_vllm.py | 175 ++++++++++++++++++++++++++ readme.md | 15 +++ 8 files changed, 793 insertions(+), 14 deletions(-) create mode 100644 mcts/generate.py create mode 100644 mcts/reward.py create mode 100644 mcts/test.py create mode 100644 mcts/train_reward.py create mode 100644 mcts/util.py create mode 100644 modal_vllm.py diff --git a/mcts/generate.py b/mcts/generate.py new file mode 100644 index 0000000..ed12c10 --- /dev/null +++ b/mcts/generate.py @@ -0,0 +1,108 @@ +import asyncio +from openai import AsyncOpenAI +import json +from typing import List, Tuple +from datasets import load_dataset +from util import split_and_clean_steps, quality_filter, SEED +from tqdm import tqdm + +client = AsyncOpenAI( + api_key="9FF74944EED19865193F979942FB1", + base_url="https://rawsh--vllm-qwen-serve.modal.run/v1" +) + +def format_thoughts(thoughts: List[str]) -> str: + return "\n".join(f"## Step {i}:\n{thought}" for i, thought in enumerate(thoughts, 1)) + +template = "<|im_start|>system\nYou are Qwen, created by Alibaba Cloud. You are a helpful assistant.<|im_end|>\n\ +<|im_start|>user\n{user}<|im_end|>\n<|im_start|>assistant\n{assistant_partial}" + +class ReasoningTrace: + def __init__(self, question: str, previous_thoughts: List[str], next_step: int): + self.question = question + self.previous_thoughts = previous_thoughts + self.next_step = next_step + +class ProcessedReasoningTrace: + def __init__(self, question: str, thoughts: List[str]): + self.question = question + self.thoughts = thoughts + +async def generate_thought_batched(batch: List[ReasoningTrace]) -> List[ProcessedReasoningTrace]: + prompts = [] + for trace in batch: + formatted_thoughts = format_thoughts(trace.previous_thoughts) + prompt = template.format(user=trace.question, assistant_partial=f"{formatted_thoughts}\n## Step {trace.next_step}:\n") + prompts.append(prompt) + + params = { + "model": "Qwen/Qwen2.5-0.5B-Instruct", + "prompt": prompts, + "max_tokens": 200, + "temperature": 0.7, + "stop": ["\n## Step"], + "timeout": 600 + } + + try: + response = await client.completions.create(**params) + processed = [ + ProcessedReasoningTrace( + question=batch[i].question, + thoughts=batch[i].previous_thoughts + [response.choices[i].text.strip()] + ) for i in range(len(batch)) + ] + return processed + except Exception as e: + print(f"An error occurred: {str(e)}") + return None + +async def format_thought_chain(question: str, chain: List[str]) -> List[ReasoningTrace]: + return [ReasoningTrace(question, chain[:i], i+1) for i in range(1, len(chain))] + +async def process_batch(batch: List[ReasoningTrace], semaphore: asyncio.Semaphore) -> List[ProcessedReasoningTrace]: + async with semaphore: + return await generate_thought_batched(batch) + +async def process_all_thought_chains_batched(thought_chains: List[Tuple[str, List[str]]]) -> List[ProcessedReasoningTrace]: + batch_size = 200 + all_traces = [] + + for question, chain in thought_chains: + all_traces.extend(await format_thought_chain(question, chain)) + + results = [] + semaphore = asyncio.Semaphore(10) # Limit to 10 concurrent batches + tasks = [] + + for i in range(0, len(all_traces), batch_size): + batch = all_traces[i:i + batch_size] + task = asyncio.create_task(process_batch(batch, semaphore)) + tasks.append(task) + + for task in tqdm(asyncio.as_completed(tasks), total=len(tasks), desc="Processing batches"): + processed_batch = await task + if processed_batch: + results.extend(processed_batch) + + return results + +async def main(): + ds = load_dataset("argilla/magpie-ultra-v0.1") + filtered_ds = ds.filter(quality_filter) + split_ds = filtered_ds['train'].train_test_split(test_size=0.1, seed=SEED) + train_ds = split_ds['train'] + correct_traces = [(row["instruction"], split_and_clean_steps(row["response"])) for row in train_ds] + + # correct_traces = correct_traces[:1000] + generated_thoughts = await process_all_thought_chains_batched(correct_traces) + + with open("out.jsonl", "w") as f: + for chain in generated_thoughts: + json.dump(chain.__dict__, f) + f.write("\n") + + print(f"Results written to out.jsonl") + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/mcts/reward.py b/mcts/reward.py new file mode 100644 index 0000000..9408fec --- /dev/null +++ b/mcts/reward.py @@ -0,0 +1,119 @@ +from datasets import load_dataset +import numpy as np +from util import split_and_clean_steps, quality_filter, SEED +import json + +def initialize_prm(traces, last_step_correct=True): + """ + Initialize the Process Reward Model (PRM) using sets of reasoning traces. + + Args: + traces (list of list of str): Reasoning traces + correct (bool): Whether the traces are correct (True) or incorrect (False) + + Returns: + dict: Initialized PRM with quality values and weighted rewards + """ + # prm = {} + prm_data = [] + + for i, trace_tuple in enumerate(traces): + question, trace = trace_tuple + K = len(trace) # Total number of reasoning steps + + # Initialize trace + prm_example = {"steps": [], "quality_values": [], "weighted_rewards": []} + v_prev = 0 + for k, step in enumerate(trace, 1): + penalize = (not last_step_correct) and k == len(trace) + m_k = K - k if (not penalize) else K - k + 1 # One more step needed to correct mistake if incorrect + r_s_k = 0 if (not penalize) else 1 # 0 for correct steps, 1 for incorrect steps + w_s_k = (1 - v_prev) / (m_k + 1) * (1 - 2 * r_s_k) + v_k = max(v_prev + w_s_k, 0) + + prm_example["question"] = question + prm_example["steps"].append(step) + prm_example["quality_values"].append(v_k) + prm_example["weighted_rewards"].append(w_s_k) + v_prev = v_k + + prm_data.append(prm_example) + + return prm_data + + +# Load and filter the dataset, then apply the 90:10 split +ds = load_dataset("argilla/magpie-ultra-v0.1") +# Filter the dataset +filtered_ds = ds.filter(quality_filter) +# Apply the 90:10 split on the filtered training data +split_ds = filtered_ds['train'].train_test_split(test_size=0.1, seed=SEED) +train_ds = split_ds['train'] +print(len(train_ds)) +# "Correct" traces generated by 405B +correct_traces = [(row["instruction"], split_and_clean_steps(row["response"])) for row in train_ds] + +# Example usage: +# correct_traces = [ +# ["Step 1: Correct", "Step 2: Correct", "Step 3: Correct"], +# ["Step 1: Correct", "Step 2: Correct"] +# ] + +with open('out.jsonl') as f: + last_step_incorrect_data = [json.loads(line) for line in f] + last_step_incorrect_traces = [(ex["question"], ex["thoughts"]) for ex in last_step_incorrect_data] + +# incorrect_traces = [['Identify all the possible outcomes of tossing four coins simultaneously. When tossing four coins simultaneously, each coin has 2 possible outcomes (heads or tails). Therefore, for four coins, the total number of possible outcomes is $2^4 = 16$.', 'List all the outcomes that result in more heads than tails. There are 4 outcomes that meet this criterion: HTHT, HHTT, THTH, TTHH. This gives us a total of 4 favorable outcomes.'], ['Identify all the possible outcomes of tossing four coins simultaneously. When tossing four coins simultaneously, each coin has 2 possible outcomes (heads or tails). Therefore, for four coins, the total number of possible outcomes is $2^4 = 16$.', 'Determine the favorable outcomes. We want more heads than tails, which means we need 3 heads and 1 tail, or 4 heads.', 'Count the number of outcomes with 3 heads and 1 tail. For 3 heads, there is only 1 way to arrange them (HHH). For 1 tail, there are 2 ways to arrange them (TTH and THT). So, there are a total of 1 + 2 = 3 favorable outcomes.'], ['Recognize that this is an arithmetic sequence with a common difference of 1.', 'To find the sum of the first 100 positive integers, we can use the formula for the sum of an arithmetic series, which is given by S = n/2 * (a1 + an), where n is the number of terms, a1 is the first term, and an is the last term.']] + +# initialized_prm = initialize_prm(correct_traces) +# print(initialized_prm) +# print(initialized_prm["trace_1000"]) + +correct_prm_data = initialize_prm(correct_traces, last_step_correct=True) +print(len(correct_prm_data)) +total_length = 0 +correct_prm_data_step_values = [] +for ex in correct_prm_data: + total_length += len(ex["steps"]) + for i in range(len(ex["steps"])): + question = ex["question"] + partial_steps = ex["steps"][:i+1] + partial_reward = ex["quality_values"][i] + correct_prm_data_step_values.append({ + "question": question, + "steps": partial_steps, + "final_step_reward": partial_reward + }) + +print("corr total # step values", total_length) + +last_step_incorrect_prm_data = initialize_prm(last_step_incorrect_traces, last_step_correct=False) +print(len(last_step_incorrect_prm_data)) + +last_step_incorrect_prm_data_step_values = [] +for ex in last_step_incorrect_prm_data: + i = len(ex["steps"]) - 1 + question = ex["question"] + partial_steps = ex["steps"][:i+1] + partial_reward = ex["quality_values"][i] + last_step_incorrect_prm_data_step_values.append({ + "question": question, + "steps": partial_steps, + "final_step_reward": partial_reward + }) + +print("last step incorr total # step values", len(last_step_incorrect_prm_data_step_values)) + +# print(initialized_prm) +# print(last_step_incorrect_prm_data[1000]) + +with open("reward.jsonl", "w") as f: + for prm_examples in correct_prm_data_step_values: + json.dump(prm_examples, f) + f.write("\n") + + for prm_examples in last_step_incorrect_prm_data_step_values: + json.dump(prm_examples, f) + f.write("\n") + +print(f"Results written to reward.jsonl") \ No newline at end of file diff --git a/mcts/test.py b/mcts/test.py new file mode 100644 index 0000000..034b683 --- /dev/null +++ b/mcts/test.py @@ -0,0 +1,51 @@ +import re + +def split_and_clean_steps(text): + # Use regex to split the text into steps + steps = re.split(r'(?=##\s*Step\s+\d+:)', text) + + # Remove any leading/trailing whitespace, empty steps, and the "## Step n:" prefix + cleaned_steps = [] + for step in steps: + # Strip whitespace and check if step is not empty + step = step.strip() + if step: + # Remove the "## Step n:" prefix + step = re.sub(r'^##\s*Step\s+\d+:\s*', '', step) + cleaned_steps.append(step) + + return cleaned_steps + +# Example usage +text1 = """## Step 1: First step +Content of first step. +## Step 2: Second step +Content of second step. +## Step 10: Tenth step +Content of tenth step. +## Step 11: Eleventh step +Content of eleventh step. +sdfsdfsdfsdf + + + +sdfsdfsd + +step ## Step 12: Test""" + +text2 = """## Step 1: Short step +Brief content. +## Step 99: Large step number +Content of step 99. +## Step 100: Three-digit step +Content of step 100.""" + +# Test with both examples +for i, text in enumerate([text1, text2], 1): + # print(f"Test case {i}:") + result = split_and_clean_steps(text) + for j, step in enumerate(result, 1): + print(f"Step {j}:") + print(step) + print() + print("---\n") \ No newline at end of file diff --git a/mcts/train_reward.py b/mcts/train_reward.py new file mode 100644 index 0000000..76b1515 --- /dev/null +++ b/mcts/train_reward.py @@ -0,0 +1,283 @@ +######################## +# This script is modified from the TRL package https://github.com/huggingface/trl/blob/main/examples/research_projects/stack_llama/scripts/reward_modeling.py +# This script is designed for the reward modeling with Gemma model but can also be applied to any models with a chat template and an official pad token +# If you have any question, feel free to send me an email via wx13@illinois.edu +######################## +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional, Union + +# import evaluate +import numpy as np +import torch +import torch.nn as nn +from datasets import load_dataset +# from peft import LoraConfig, TaskType, get_peft_model +from transformers import ( + AutoModelForSequenceClassification, + AutoTokenizer, + HfArgumentParser, + PreTrainedTokenizerBase, + Trainer, + TrainerCallback, + TrainingArguments, +) +from transformers.utils import PaddingStrategy + +import pdb + + +# Define and parse arguments. +@dataclass +class ScriptArguments: + """ + These arguments vary depending on how many GPUs you have, what their capacity and features are, and what size model you want to train. + """ + local_rank: Optional[int] = field( + default=-1, metadata={"help": "Used for multi-gpu"}) + + deepspeed: Optional[str] = field( + # default="dp3.json", + default=None, + metadata={ + "help": "Path to deepspeed config if using deepspeed. You may need this if the model that you want to train doesn't fit on a single GPU." + }, + ) + per_device_train_batch_size: Optional[int] = field(default=1) + per_device_eval_batch_size: Optional[int] = field(default=1) + gradient_accumulation_steps: Optional[int] = field(default=32) + learning_rate: Optional[float] = field(default=1e-5) + weight_decay: Optional[float] = field(default=0.001) + model_name: Optional[str] = field( + default="google/gemma-2b-it", #"mistralai/Mistral-7B-Instruct-v0.2", + metadata={ + "help": "The model that you want to train from the Hugging Face hub. E.g. gpt2, gpt2-xl, bert, etc." + }, + ) + bf16: Optional[bool] = field( + default=True, + metadata={ + "help": "This essentially cuts the training time in half if you want to sacrifice a little precision and have a supported GPU." + }, + ) + num_train_epochs: Optional[int] = field( + default=1, + metadata={"help": "The number of training epochs for the reward model."}, + ) + train_set_path: Optional[str] = field( + default="hendrydong/preference_700K", + metadata={"help": "The dir of the subset of the training data to use"}, + ) + eval_set_path: Optional[str] = field( + default="hendrydong/preference_700K", + metadata={"help": "The dir of the subset of the eval data to use"}, + ) + output_path: Optional[str] = field( + default="./bt_models/gemma2b_rm", + metadata={"help": "The dir for output model"}, + ) + gradient_checkpointing: Optional[bool] = field( + default=True, + metadata={"help": "Enables gradient checkpointing."}, + ) + optim: Optional[str] = field( + # default="adamw_hf", + default="paged_adamw_32bit", + # default="adamw_torch_fused", + metadata={"help": "The optimizer to use."}, + ) + lr_scheduler_type: Optional[str] = field( + default="cosine", + metadata={"help": "The lr scheduler"}, + ) + max_length: Optional[int] = field(default=4096) + + save_every_steps: Optional[int] = field( + default=999999, + metadata={"help": "Save the model every x steps"}, + ) + eval_every_steps: Optional[int] = field( + default=999999, + metadata={"help": "Eval the model every x steps"}, + ) + +parser = HfArgumentParser(ScriptArguments) +script_args = parser.parse_args_into_dataclasses()[0] + +# Load the value-head model and tokenizer. +tokenizer_name = script_args.model_name +tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, use_auth_token=True) + +# Adjusted according to the base model +# Need to do this for the models that don't have an official pad token. +tokenizer.truncation_side = "left" +tokenizer.model_max_length = script_args.max_length + +# Get the dataset +train_path = script_args.train_set_path +eval_path = script_args.eval_set_path +output_name = script_args.output_path + +def build_dataset(tokenizer, train_path, eval_path): + + def tokenize(sample): + + sample['positive'] = tokenizer.apply_chat_template( + sample['chosen'], tokenize=False, add_generation_prompt=False).replace(tokenizer.bos_token, "") + sample['negative'] = tokenizer.apply_chat_template( + sample['rejected'], tokenize=False, add_generation_prompt=False).replace(tokenizer.bos_token, "") + + tokenized_pos = tokenizer(sample['positive'], truncation=True) + tokenized_neg = tokenizer(sample['negative'], truncation=True) + sample["input_ids_j"] = tokenized_pos["input_ids"] + sample["attention_mask_j"] = tokenized_pos["attention_mask"] + sample["input_ids_k"] = tokenized_neg["input_ids"] + sample["attention_mask_k"] = tokenized_neg["attention_mask"] + return sample + + ds = load_dataset(train_path, split="train").shuffle(seed=42) + #ds = ds.select(range(2000)) + ds = ds.map(tokenize, num_proc=8) + + eval_dataset = None + + train_dataset = ds + eval_dataset = load_dataset(eval_path, split="train").shuffle(seed=42).select(range(500)) + #eval_dataset = ds.select(range(500)) + return train_dataset, eval_dataset + + +train_dataset, eval_dataset = build_dataset(tokenizer, train_path, eval_path) +print("Training set: ", len(train_dataset), " Eval set: ", len(eval_dataset)) + +# Define the trainer +training_args = TrainingArguments( + output_dir=output_name, + learning_rate=script_args.learning_rate, + per_device_train_batch_size=script_args.per_device_train_batch_size, + per_device_eval_batch_size=script_args.per_device_eval_batch_size, + num_train_epochs=script_args.num_train_epochs, + weight_decay=script_args.weight_decay, + evaluation_strategy="steps", + eval_steps=script_args.eval_every_steps, + save_strategy="steps", + save_steps=script_args.save_every_steps, + gradient_accumulation_steps=script_args.gradient_accumulation_steps, + gradient_checkpointing=script_args.gradient_checkpointing, + deepspeed=script_args.deepspeed, + local_rank=script_args.local_rank, + remove_unused_columns=False, + label_names=[], + bf16=script_args.bf16, + logging_strategy="steps", + logging_steps=10, + optim=script_args.optim, + lr_scheduler_type=script_args.lr_scheduler_type, + warmup_ratio=0.03, + report_to='wandb' +) + +# enable if you want to train with lora +# peft_config = LoraConfig( +# task_type=TaskType.SEQ_CLS, +# inference_mode=False, +# r=8, +# lora_alpha=32, +# lora_dropout=0.1, +# ) + +model = AutoModelForSequenceClassification.from_pretrained( + script_args.model_name, num_labels=1, torch_dtype=torch.bfloat16, use_flash_attention_2=True, +) +# model = get_peft_model(model, peft_config) +# model.print_trainable_parameters() + +model.config.use_cache = not script_args.gradient_checkpointing +num_proc = 24 # Can adjust to be higher if you have more processors. +original_columns = train_dataset.column_names + + +# We need to define a special data collator that batches the data in our j vs k format. +@dataclass +class RewardDataCollatorWithPadding: + tokenizer: AutoTokenizer + padding: Union[bool, str, PaddingStrategy] = True + max_length: Optional[int] = None + pad_to_multiple_of: Optional[int] = None + return_tensors: str = "pt" + + def __call__(self, features: List[Dict[str, Any]]) -> Dict[str, Any]: + merged_features = [] + for feature in features: + merged_features.append( + { + "input_ids": feature["input_ids_j"], + "attention_mask": feature["attention_mask_j"], + } + ) + merged_features.append( + { + "input_ids": feature["input_ids_k"], + "attention_mask": feature["attention_mask_k"], + } + ) + batch = self.tokenizer.pad( + merged_features, + padding=self.padding, + max_length=self.max_length, + pad_to_multiple_of=self.pad_to_multiple_of, + return_tensors=self.return_tensors, + ) + batch = { + "input_ids": batch["input_ids"], + "attention_mask": batch["attention_mask"], + "return_loss": True, + } + return batch + + +# Define the trainer +def compute_metrics(eval_pred): + result = {} + pos_predictions_scores = eval_pred.predictions[0] + neg_predictions_scores = eval_pred.predictions[1] + # We assume that the first sample is preferred by default in groundtruth + result['accuracy'] = np.sum( + pos_predictions_scores > neg_predictions_scores) / len(pos_predictions_scores) + return result + + +class RewardTrainer(Trainer): + def compute_loss(self, model, inputs, return_outputs=False): + rewards = model( + input_ids=inputs["input_ids"], attention_mask=inputs["attention_mask"] + )[0] + bsz = rewards.size(0) + jidx = torch.arange(0, bsz, 2) + kidx = jidx + 1 + rewards_j = rewards[jidx] + rewards_k = rewards[kidx] + loss = -nn.functional.logsigmoid(rewards_j - rewards_k).mean() + if return_outputs: + return loss, {"rewards_j": rewards_j, "rewards_k": rewards_k} + return loss + + +# Train the model, woohoo. +trainer = RewardTrainer( + model=model, + args=training_args, + train_dataset=train_dataset, + eval_dataset=eval_dataset, + compute_metrics=compute_metrics, + data_collator=RewardDataCollatorWithPadding( + tokenizer=tokenizer, max_length=script_args.max_length), +) + + +trainer.train() + + +print("Saving last checkpoint of the model") +#model.save_pretrained(output_name + "/last_checkpoint") +trainer.save_model(output_name + "/last_checkpoint") +tokenizer.save_pretrained(output_name + "/last_checkpoint") \ No newline at end of file diff --git a/mcts/util.py b/mcts/util.py new file mode 100644 index 0000000..2107742 --- /dev/null +++ b/mcts/util.py @@ -0,0 +1,28 @@ + +import re + +SEED = 42 + +def split_and_clean_steps(text): + # Use regex to split the text into steps + steps = re.split(r'(?=##\s*Step\s+\d+:)', text) + + # Remove any leading/trailing whitespace, empty steps, and the "## Step n:" prefix + cleaned_steps = [] + for step in steps: + # Strip whitespace and check if step is not empty + step = step.strip() + if step: + # Remove the "## Step n:" prefix + step = re.sub(r'^##\s*Step\s+\d+:\s*', '', step) + cleaned_steps.append(step) + + return cleaned_steps + +def quality_filter(example): + response_quality = example['score'] >= 0.32 # arbitrary af + # TODO: check correctness of chain + # math_and_reasoning = example['primary_tag'] in ['Math', 'Reasoning'] + instruction_quality = example['quality'] in ['excellent', 'good'] + response_format = "## Step 1: " in example['response'] + return response_quality and instruction_quality and response_format \ No newline at end of file diff --git a/modal_reward.py b/modal_reward.py index ac4e158..e67f516 100644 --- a/modal_reward.py +++ b/modal_reward.py @@ -22,8 +22,8 @@ @app.cls( gpu=modal.gpu.A10G(), - container_idle_timeout=10, - volumes={"/data": modal.Volume.from_name("my-test-volume")} + container_idle_timeout=30, + # volumes={"/data": modal.Volume.from_name("my-test-volume")} ) class Embedder: model_id = "RLHFlow/ArmoRM-Llama3-8B-v0.1" @@ -49,11 +49,11 @@ def build(self): elapsed = pc() - start print(f"[build] compile model took {elapsed} seconds") - print("[build] save model") - start = pc() - model.save_pretrained("/data/saved_model", safe_serialization=True) - elapsed = pc() - start - print(f"[build] saving model took {elapsed} seconds") + # print("[build] save model") + # start = pc() + # model.save_pretrained("/data/saved_model", safe_serialization=True) + # elapsed = pc() - start + # print(f"[build] saving model took {elapsed} seconds") # @modal.enter(snap=False) @modal.enter() @@ -67,20 +67,20 @@ def setup(self): with torch.device("cuda"): print("[setup] loading model") start = pc() - self.model = AutoModelForSequenceClassification.from_pretrained("/data/saved_model", + self.model = AutoModelForSequenceClassification.from_pretrained(self.model_id, trust_remote_code=True, torch_dtype=dtype, use_safetensors=True) elapsed = pc() - start print(f"[setup] loading model took {elapsed} seconds") - print("[setup] compile model") - start = pc() - model = torch.compile(model) - elapsed = pc() - start - print(f"[setup] compile model took {elapsed} seconds") + # print("[setup] compile model") + # start = pc() + # self.model = torch.compile(self.model) + # elapsed = pc() - start + # print(f"[setup] compile model took {elapsed} seconds") print("[setup] loading tokenizer") start = pc() - self.tokenizer = AutoTokenizer.from_pretrained("/data/saved_model", use_fast=True) + self.tokenizer = AutoTokenizer.from_pretrained(self.model_id, use_fast=True) elapsed = pc() - start print(f"[setup] loading tokenizer took {elapsed} seconds") diff --git a/modal_vllm.py b/modal_vllm.py new file mode 100644 index 0000000..ab7bf74 --- /dev/null +++ b/modal_vllm.py @@ -0,0 +1,175 @@ +import modal + +def download_model_to_image(model_dir, model_name, model_revision): + import os + from huggingface_hub import snapshot_download + from transformers.utils import move_cache + + os.makedirs(model_dir, exist_ok=True) + + snapshot_download( + model_name, + revision=model_revision, + local_dir=model_dir, + ignore_patterns=["*.pt", "*.bin"], # Using safetensors + ) + move_cache() + +MODEL_DIR = "/qwen" +MODEL_NAME = "Qwen/Qwen2.5-0.5B-Instruct" +MODEL_REVISION = "a8b602d9dafd3a75d382e62757d83d89fca3be54" + +vllm_image = ( + modal.Image.debian_slim(python_version="3.10") + .pip_install( + "vllm==0.6.1.post2", + "torch==2.4.0", + "transformers==4.44.2", + "ray==2.36.0", + "hf-transfer==0.1.8", + "huggingface_hub==0.25.0", + ) + .env({"HF_HUB_ENABLE_HF_TRANSFER": "1"}) + .run_function( + download_model_to_image, + timeout=60 * 20, + kwargs={ + "model_dir": MODEL_DIR, + "model_name": MODEL_NAME, + "model_revision": MODEL_REVISION, + }, + ) +) + +app = modal.App("vllm-qwen") + +N_GPU = 1 # tip: for best results, first upgrade to more powerful GPUs, and only then increase GPU count + +MINUTES = 60 # seconds +HOURS = 60 * MINUTES + +# key: 9FF74944EED19865193F979942FB1 + +@app.function( + image=vllm_image, + # gpu=modal.gpu.H100(count=N_GPU), + # gpu=modal.gpu.A100(count=N_GPU, size="40GB"), + gpu=modal.gpu.A10G(count=N_GPU), + container_idle_timeout=1 * MINUTES, + timeout=20 * MINUTES, + allow_concurrent_inputs=1, + secrets=[modal.Secret.from_name("vllm-token")] + # volumes={MODELS_DIR: volume}, +) +@modal.asgi_app() +def serve(): + import os + import fastapi + import vllm.entrypoints.openai.api_server as api_server + from vllm.engine.arg_utils import AsyncEngineArgs + from vllm.engine.async_llm_engine import AsyncLLMEngine + from vllm.entrypoints.logger import RequestLogger + from vllm.entrypoints.openai.serving_chat import OpenAIServingChat + from vllm.entrypoints.openai.serving_completion import ( + OpenAIServingCompletion, + ) + from vllm.usage.usage_lib import UsageContext + + def get_model_config(engine): + import asyncio + + try: # adapted from vLLM source -- https://github.com/vllm-project/vllm/blob/507ef787d85dec24490069ffceacbd6b161f4f72/vllm/entrypoints/openai/api_server.py#L235C1-L247C1 + event_loop = asyncio.get_running_loop() + except RuntimeError: + event_loop = None + + if event_loop is not None and event_loop.is_running(): + # If the current is instanced by Ray Serve, + # there is already a running event loop + model_config = event_loop.run_until_complete(engine.get_model_config()) + else: + # When using single vLLM without engine_use_ray + model_config = asyncio.run(engine.get_model_config()) + + return model_config + + # volume.reload() # ensure we have the latest version of the weights + + # create a fastAPI app that uses vLLM's OpenAI-compatible router + web_app = fastapi.FastAPI( + title=f"OpenAI-compatible {MODEL_NAME} server", + description="Run an OpenAI-compatible LLM server with vLLM on modal.com", + version="0.0.1", + docs_url="/docs", + ) + + # security: CORS middleware for external requests + http_bearer = fastapi.security.HTTPBearer( + scheme_name="Bearer Token", + description="See code for authentication details.", + ) + web_app.add_middleware( + fastapi.middleware.cors.CORSMiddleware, + allow_origins=["*"], + allow_credentials=True, + allow_methods=["*"], + allow_headers=["*"], + ) + + # security: inject dependency on authed routes + TOKEN = os.environ["API_TOKEN"] + async def is_authenticated(api_key: str = fastapi.Security(http_bearer)): + if api_key.credentials != TOKEN: + raise fastapi.HTTPException( + status_code=fastapi.status.HTTP_401_UNAUTHORIZED, + detail="Invalid authentication credentials", + ) + return {"username": "authenticated_user"} + + router = fastapi.APIRouter(dependencies=[fastapi.Depends(is_authenticated)]) + + # wrap vllm's router in auth router + router.include_router(api_server.router) + # add authed vllm to our fastAPI app + web_app.include_router(router) + + engine_args = AsyncEngineArgs( + # model=MODELS_DIR + "/" + MODEL_NAME, + model=MODEL_DIR, + tensor_parallel_size=N_GPU, + gpu_memory_utilization=0.90, + max_model_len=8096, + # enforce_eager=True, + enforce_eager=False, # capture the graph for faster inference, but slower cold starts (30s > 20s) + enable_prefix_caching=True + ) + + engine = AsyncLLMEngine.from_engine_args( + engine_args, usage_context=UsageContext.OPENAI_API_SERVER + ) + + model_config = get_model_config(engine) + + request_logger = RequestLogger(max_log_len=2048) + + api_server.openai_serving_chat = OpenAIServingChat( + engine, + model_config=model_config, + served_model_names=[MODEL_NAME], + chat_template=None, + response_role="assistant", + lora_modules=[], + prompt_adapters=[], + request_logger=request_logger, + ) + api_server.openai_serving_completion = OpenAIServingCompletion( + engine, + model_config=model_config, + served_model_names=[MODEL_NAME], + lora_modules=[], + prompt_adapters=[], + request_logger=request_logger, + ) + + return web_app + diff --git a/readme.md b/readme.md index 8408361..2f7b3cc 100644 --- a/readme.md +++ b/readme.md @@ -1,5 +1,20 @@ . + + +``` +curl -X 'POST' 'https://rawsh--vllm-qwen-serve.modal.run/v1/completions' -H 'accept: application/json' -H 'Authorization: Bearer 9FF74944EED19865193F979942FB1' -H 'Content-Type: application/json' -d '{ + "model": "Qwen/Qwen2.5-0.5B-Instruct", + "prompt": [ + "<|im_start|>user\nIf four coins are tossed simultaneously, what is the probability of obtaining more heads than tails, in the coins that land face up? Express your answer as a common fraction.\n<|im_end|>\n<|im_start|>assistant\n## Step 1:\nIdentify all the possible outcomes of tossing four coins simultaneously. When tossing four coins simultaneously, each coin has 2 possible outcomes (heads or tails). Therefore, for four coins, the total number of possible outcomes is $2^4 = 16$.\n\n## Step 2:\n" + ], + "max_tokens": 200, + "stop": ["\n\n## Step "], + "temperature": 1 +}' +``` + + optimize cot preference tuning on high quality and low quality chains?