Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Autoscaling inference endpoints #412

Merged
merged 15 commits into from
Dec 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 1 addition & 2 deletions examples/model_configs/endpoint_model.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
model:
type: "endpoint" # can be base, tgi, or endpoint
base_params:
endpoint_name: "llama-2-7B-lighteval" # needs to be lower case without special characters
model: "meta-llama/Llama-2-7b-hf"
model_or_endpoint_name: "meta-llama/Llama-2-7b-hf" # the model name or the endpoint name if reuse_existing is true
revision: "main"
dtype: "float16" # can be any of "awq", "eetq", "gptq", "4bit' or "8bit" (will use bitsandbytes), "bfloat16" or "float16"
reuse_existing: false # if true, ignore all params in instance, and don't delete the endpoint after evaluation
Expand Down
4 changes: 4 additions & 0 deletions examples/model_configs/endpoint_model_lite.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
model:
type: "endpoint" # can be base, tgi, or endpoint
base_params:
model_or_endpoint_name: "Qwen/Qwen2.5-72B-Instruct" #Qwen/Qwen2.5-7B"
148 changes: 106 additions & 42 deletions src/lighteval/models/endpoint_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,19 +21,22 @@
# SOFTWARE.

import asyncio
import re
from typing import Coroutine, List, Optional, Union

import torch
from huggingface_hub import (
AsyncInferenceClient,
InferenceClient,
InferenceEndpoint,
InferenceEndpointError,
InferenceEndpointTimeoutError,
TextGenerationInputGrammarType,
TextGenerationOutput,
create_inference_endpoint,
get_inference_endpoint,
)
from huggingface_hub.utils._errors import HfHubHTTPError
from torch.utils.data import DataLoader
from tqdm import tqdm
from transformers import AutoTokenizer
Expand All @@ -53,61 +56,105 @@


BATCH_SIZE = 50
MAX_RETRIES = 5

SORTED_INSTANCE_SIZES = [ # sorted by incremental overall RAM (to load models)
# type, size
("nvidia-a10g", "x1"),
("nvidia-t4", "x4"),
("nvidia-a100", "x1"),
("nvidia-a10g", "x4"),
]


class InferenceEndpointModel(LightevalModel):
"""InferenceEndpointModels can be used both with the free inference client, or with inference
endpoints, which will use text-generation-inference to deploy your model for the duration of the evaluation.
"""

def __init__(
def __init__( # noqa: C901
self, config: Union[InferenceEndpointModelConfig, InferenceModelConfig], env_config: EnvConfig
) -> None:
self.reuse_existing = getattr(config, "should_reuse_existing", True)
self._max_length = None
self.endpoint = None
if isinstance(config, InferenceEndpointModelConfig):
if config.should_reuse_existing:
self.endpoint = get_inference_endpoint(
name=config.name, token=env_config.token, namespace=config.namespace
)
else:
self.endpoint: InferenceEndpoint = create_inference_endpoint(
name=config.name,
namespace=config.namespace,
repository=config.repository,
revision=config.revision,
framework=config.framework,
task="text-generation",
accelerator=config.accelerator,
vendor=config.vendor,
region=config.region,
type=config.endpoint_type,
instance_size=config.instance_size,
instance_type=config.instance_type,
token=env_config.token,
custom_image={
"health_route": "/health",
"env": {
# Documentaiton: https://huggingface.co/docs/text-generation-inference/en/basic_tutorials/launcher
"MAX_BATCH_PREFILL_TOKENS": "2048",
"MAX_INPUT_LENGTH": "2047",
"MAX_TOTAL_TOKENS": "2048",
"MODEL_ID": "/repository",
"HF_MODEL_TRUST_REMOTE_CODE": "true",
**config.get_dtype_args(),
**config.get_custom_env_vars(),
},
"url": (config.image_url or "ghcr.io/huggingface/text-generation-inference:latest"),
},
)
hlog("Deploying your endpoint. Please wait.")
try:
self.endpoint.wait(timeout=600) # Waits for the endpoint to be deployed
except InferenceEndpointTimeoutError as e:
hlog_err("Endpoint did not start within 10 minutes, there was a timeout.")
raise e
instance_type = config.instance_type or SORTED_INSTANCE_SIZES[0][0]
instance_size = config.instance_size or SORTED_INSTANCE_SIZES[0][1]
# Endpoint names do not allow special characters
endpoint_name = re.sub("[^a-zA-Z0-9-]", "-", config.model_or_endpoint_name.lower() + "-lighteval")
for _ in range(1, MAX_RETRIES): # We allow retrying for up to 5 times
try:
if self.endpoint is None: # Endpoint does not exist yet locally
if config.should_reuse_existing:
self.endpoint = get_inference_endpoint(
name=config.model_or_endpoint_name, token=env_config.token, namespace=config.namespace
)
else:
self.endpoint: InferenceEndpoint = create_inference_endpoint(
name=endpoint_name,
namespace=config.namespace,
repository=config.model_or_endpoint_name,
revision=config.revision,
framework=config.framework,
task="text-generation",
accelerator=config.accelerator,
vendor=config.vendor,
region=config.region,
type=config.endpoint_type,
instance_size=instance_size,
instance_type=instance_type,
token=env_config.token,
custom_image={
"health_route": "/health",
"env": {
# Documentation: https://huggingface.co/docs/text-generation-inference/en/basic_tutorials/launcher
"MAX_BATCH_PREFILL_TOKENS": "2048",
"MAX_INPUT_LENGTH": "2047",
"MAX_TOTAL_TOKENS": "2048",
"MODEL_ID": "/repository",
"HF_MODEL_TRUST_REMOTE_CODE": "true",
**config.get_dtype_args(),
**config.get_custom_env_vars(),
},
"url": (
config.image_url or "ghcr.io/huggingface/text-generation-inference:latest"
),
},
)
else: # Endpoint exists and must be scaled up
self.endpoint.update(instance_size=instance_size, instance_type=instance_type)
self.endpoint.fetch()

# Waits for the endpoint to be deployed - we could also check for the status in updating', 'pending', 'initializing'
hlog("Trying to deploy your endpoint. Please wait.")
try:
self.endpoint.wait(timeout=1800, refresh_every=60)
except InferenceEndpointError as e:
instance_type, instance_size = self.get_larger_hardware_suggestion(
instance_type, instance_size
)

hlog(
f"Endpoint failed to start on current hardware with error {e}. Trying to autoscale to ({instance_type}, {instance_size})."
)
if self.endpoint.status == "running": # We're good! going to the next step!
break
except InferenceEndpointTimeoutError as e:
hlog_err("Endpoint did not start within 30 minutes, there was a timeout. Please inspect the logs.")
raise e
except HfHubHTTPError as e:
if "409 Client Error: Conflict for url:" in str(e):
config.model_or_endpoint_name = endpoint_name
config.should_reuse_existing = True
elif "Bad Request: Compute instance not available yet" in str(e):
self.endpoint.wait(timeout=1800, refresh_every=60)

if not self.endpoint.status == "running":
raise Exception("Did not manage to start endpoint within the elapsed time and on suggested hardware.")

hlog("Endpoint successfully deployed!")
self.name = config.repository
self.name = config.model_or_endpoint_name
self.revision = self.endpoint.revision
self.async_client: AsyncInferenceClient = self.endpoint.async_client
self.client: InferenceClient = self.endpoint.client
Expand All @@ -131,6 +178,23 @@ def __init__(
model_size=-1,
)

@staticmethod
def get_larger_hardware_suggestion(cur_instance_type: str = None, cur_instance_size: str = None):
try:
cur_instance_ix = SORTED_INSTANCE_SIZES.index((cur_instance_type, cur_instance_size))
new_instance_type = SORTED_INSTANCE_SIZES[cur_instance_ix + 1][0]
new_instance_size = SORTED_INSTANCE_SIZES[cur_instance_ix + 1][1]
return new_instance_type, new_instance_size
except ValueError:
hlog_warn(
f"Problem when scaling endpoint: the current instance combination ({cur_instance_type}, {cur_instance_size}) is unknown. Can't scale it up."
)
except IndexError:
hlog_warn(
"To avoid accidental costs, we will not upgrade the current endpoint above 4 a10g automatically, please request it explicitely."
)
return cur_instance_type, cur_instance_size

@property
def tokenizer(self):
return self._tokenizer
Expand Down
76 changes: 36 additions & 40 deletions src/lighteval/models/model_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,24 +257,32 @@ class InferenceModelConfig:

@dataclass
class InferenceEndpointModelConfig:
name: str
repository: str
accelerator: str
vendor: str
region: str
instance_size: str
instance_type: str
model_dtype: str
model_or_endpoint_name: str
should_reuse_existing: bool = False
accelerator: str = "gpu"
model_dtype: str = None # if empty, we use the default
vendor: str = "aws"
region: str = "eu-west-1"
instance_size: str = None # if none, we autoscale
instance_type: str = None # if none, we autoscale
framework: str = "pytorch"
endpoint_type: str = "protected"
should_reuse_existing: bool = False
add_special_tokens: bool = True
revision: str = "main"
namespace: str = None # The namespace under which to launch the endopint. Defaults to the current user's namespace
image_url: str = None
env_vars: dict = None

def __post_init__(self):
# xor operator, one is None but not the other
if (self.instance_size is None) ^ (self.instance_type is None):
raise ValueError(
"When creating an inference endpoint, you need to specify explicitely both instance_type and instance_size, or none of them for autoscaling."
)

def get_dtype_args(self) -> Dict[str, str]:
if self.model_dtype is None:
return {}
model_dtype = self.model_dtype.lower()
if model_dtype in ["awq", "eetq", "gptq"]:
return {"QUANTIZE": model_dtype}
Expand All @@ -289,15 +297,6 @@ def get_dtype_args(self) -> Dict[str, str]:
def get_custom_env_vars(self) -> Dict[str, str]:
return {k: str(v) for k, v in self.env_vars.items()} if self.env_vars else {}

@staticmethod
def nullable_keys() -> list[str]:
"""
Returns the list of optional keys in an endpoint model configuration. By default, the code requires that all the
keys be specified in the configuration in order to launch the endpoint. This function returns the list of keys
that are not required and can remain None.
"""
return ["namespace", "env_vars", "image_url"]


def create_model_config( # noqa: C901
use_chat_template: bool,
Expand Down Expand Up @@ -371,29 +370,26 @@ def create_model_config( # noqa: C901
)

if config["type"] == "endpoint":
reuse_existing_endpoint = config["base_params"].get("reuse_existing", None)
complete_config_endpoint = all(
val not in [None, ""]
for key, val in config.get("instance", {}).items()
if key not in InferenceEndpointModelConfig.nullable_keys()
if config["base_params"].get("endpoint_name", None):
return InferenceModelConfig(model=config["base_params"]["endpoint_name"])
all_params = {
"model_dtype": config["base_params"].get("dtype", None),
"revision": config["base_params"].get("revision", None) or "main",
"should_reuse_existing": config["base_params"].get("should_reuse_existing"),
"accelerator": config.get("instance", {}).get("accelerator", None),
"region": config.get("instance", {}).get("region", None),
"vendor": config.get("instance", {}).get("vendor", None),
"instance_size": config.get("instance", {}).get("instance_size", None),
"instance_type": config.get("instance", {}).get("instance_type", None),
"namespace": config.get("instance", {}).get("namespace", None),
"image_url": config.get("instance", {}).get("image_url", None),
"env_vars": config.get("instance", {}).get("env_vars", None),
}
return InferenceEndpointModelConfig(
model_or_endpoint_name=config["base_params"]["model_or_endpoint_name"],
# We only initialize params which have a non default value
**{k: v for k, v in all_params.items() if v is not None},
)
if reuse_existing_endpoint or complete_config_endpoint:
return InferenceEndpointModelConfig(
name=config["base_params"]["endpoint_name"].replace(".", "-").lower(),
repository=config["base_params"]["model"],
model_dtype=config["base_params"]["dtype"],
revision=config["base_params"]["revision"] or "main",
should_reuse_existing=reuse_existing_endpoint,
accelerator=config["instance"]["accelerator"],
region=config["instance"]["region"],
vendor=config["instance"]["vendor"],
instance_size=config["instance"]["instance_size"],
instance_type=config["instance"]["instance_type"],
namespace=config["instance"]["namespace"],
image_url=config["instance"].get("image_url", None),
env_vars=config["instance"].get("env_vars", None),
)
return InferenceModelConfig(model=config["base_params"]["endpoint_name"])

if config["type"] == "base":
# Creating the multichoice space parameters
Expand Down
Loading