From ed163087bac610ad62b51240c8d3b1d330db717f Mon Sep 17 00:00:00 2001 From: bjzhjing Date: Fri, 24 Jan 2025 22:27:49 +0800 Subject: [PATCH] =?UTF-8?q?Provide=20unified=20scalable=20deployment=20and?= =?UTF-8?q?=20benchmarking=20support=20for=20exam=E2=80=A6=20(#1315)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Cathy Zhang Signed-off-by: letonghan Co-authored-by: letonghan Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> --- ChatQnA/benchmark_chatqna.yaml | 83 ++++ README-deploy-benchmark.md | 69 ++++ benchmark.py | 343 +++++++++++++++++ deploy.py | 674 +++++++++++++++++++++++++++++++++ deploy_and_benchmark.py | 292 ++++++++++++++ requirements.txt | 9 + 6 files changed, 1470 insertions(+) create mode 100644 ChatQnA/benchmark_chatqna.yaml create mode 100644 README-deploy-benchmark.md create mode 100644 benchmark.py create mode 100644 deploy.py create mode 100644 deploy_and_benchmark.py create mode 100644 requirements.txt diff --git a/ChatQnA/benchmark_chatqna.yaml b/ChatQnA/benchmark_chatqna.yaml new file mode 100644 index 000000000..c608b8afb --- /dev/null +++ b/ChatQnA/benchmark_chatqna.yaml @@ -0,0 +1,83 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +deploy: + device: gaudi + version: 1.1.0 + modelUseHostPath: /mnt/models + HUGGINGFACEHUB_API_TOKEN: "" + node: [1, 2, 4, 8] + namespace: "" + + services: + backend: + instance_num: [2, 2, 4, 8] + cores_per_instance: "" + memory_capacity: "" + + teirerank: + enabled: True + model_id: "" + replicaCount: [1, 1, 1, 1] + cards_per_instance: 1 + + tei: + model_id: "" + replicaCount: [1, 2, 4, 8] + cores_per_instance: "" + memory_capacity: "" + + llm: + engine: tgi + model_id: "" + replicaCount: [7, 15, 31, 63] + max_batch_size: [1, 2, 4, 8] + max_input_length: "" + max_total_tokens: "" + max_batch_total_tokens: "" + max_batch_prefill_tokens: "" + cards_per_instance: 1 + + data-prep: + replicaCount: [1, 1, 1, 1] + cores_per_instance: "" + memory_capacity: "" + + retriever-usvc: + replicaCount: [2, 2, 4, 8] + cores_per_instance: "" + memory_capacity: "" + + redis-vector-db: + replicaCount: [1, 1, 1, 1] + cores_per_instance: "" + memory_capacity: "" + + chatqna-ui: + replicaCount: [1, 1, 1, 1] + + nginx: + replicaCount: [1, 1, 1, 1] + +benchmark: + # http request behavior related fields + concurrency: [1, 2, 4] + totoal_query_num: [2048, 4096] + duration: [5, 10] # unit minutes + query_num_per_concurrency: [4, 8, 16] + possion: True + possion_arrival_rate: 1.0 + warmup_iterations: 10 + seed: 1024 + + # workload, all of the test cases will run for benchmark + test_cases: + - chatqnafixed + - chatqna_qlist_pubmed: + dataset: pub_med10 # pub_med10, pub_med100, pub_med1000 + user_queries: [1, 2, 4] + query_token_size: 128 # if specified, means fixed query token size will be sent out + + llm: + # specify the llm output token size + max_token_size: [128, 256] diff --git a/README-deploy-benchmark.md b/README-deploy-benchmark.md new file mode 100644 index 000000000..4b813cccc --- /dev/null +++ b/README-deploy-benchmark.md @@ -0,0 +1,69 @@ +# ChatQnA Benchmarking + +## Purpose + +We aim to run these benchmarks and share them with the OPEA community for three primary reasons: + +- To offer insights on inference throughput in real-world scenarios, helping you choose the best service or deployment for your needs. +- To establish a baseline for validating optimization solutions across different implementations, providing clear guidance on which methods are most effective for your use case. +- To inspire the community to build upon our benchmarks, allowing us to better quantify new solutions in conjunction with current leading LLMs, serving frameworks etc. + +## Table of Contents + +- [Prerequisites](#prerequisites) +- [Overview](#overview) + - [Using deploy_and_benchmark.py](#using-deploy_and_benchmark.py-recommended) +- [Data Preparation](#data-preparation) +- [Configuration](#configuration) + +## Prerequisites + +Before running the benchmarks, ensure you have: + +1. **Kubernetes Environment** + + - Kubernetes installation: Use [kubespray](https://github.com/opea-project/docs/blob/main/guide/installation/k8s_install/k8s_install_kubespray.md) or other official Kubernetes installation guides + - (Optional) [Kubernetes set up guide on Intel Gaudi product](https://github.com/opea-project/GenAIInfra/blob/main/README.md#setup-kubernetes-cluster) + +2. **Configuration YAML** + The configuration file (e.g., `./ChatQnA/benchmark_chatqna.yaml`) consists of two main sections: deployment and benchmarking. Required fields must be filled with valid values (like the Hugging Face token). For all other fields, you can either customize them according to your needs or leave them empty ("") to use the default values from the [helm charts](https://github.com/opea-project/GenAIInfra/tree/main/helm-charts). + +## Data Preparation + +Before running benchmarks, you need to: + +1. **Prepare Test Data** + + - Download the retrieval file: + ```bash + wget https://github.com/opea-project/GenAIEval/tree/main/evals/benchmark/data/upload_file.txt + ``` + - For the `chatqna_qlist_pubmed` test case, prepare `pubmed_${max_lines}.txt` by following this [README](https://github.com/opea-project/GenAIEval/blob/main/evals/benchmark/stresscli/README_Pubmed_qlist.md) + +2. **Prepare Model Files (Recommended)** + ```bash + pip install -U "huggingface_hub[cli]" + sudo mkdir -p /mnt/models + sudo chmod 777 /mnt/models + huggingface-cli download --cache-dir /mnt/models Intel/neural-chat-7b-v3-3 + ``` + +## Overview + +The benchmarking process consists of two main components: deployment and benchmarking. We provide `deploy_and_benchmark.py` as a unified entry point that combines both steps. + +### Using deploy_and_benchmark.py (Recommended) + +The script `deploy_and_benchmark.py` serves as the main entry point. Here's an example using ChatQnA configuration (you can replace it with any other example's configuration YAML file): + +1. For a specific number of nodes: + + ```bash + python deploy_and_benchmark.py ./ChatQnA/benchmark_chatqna.yaml --target-node 1 + ``` + +2. For all node configurations: + ```bash + python deploy_and_benchmark.py ./ChatQnA/benchmark_chatqna.yaml + ``` + This will iterate through the node list in your configuration YAML file, performing deployment and benchmarking for each node count. diff --git a/benchmark.py b/benchmark.py new file mode 100644 index 000000000..fb20367c0 --- /dev/null +++ b/benchmark.py @@ -0,0 +1,343 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import os +import sys +from datetime import datetime + +import yaml +from evals.benchmark.stresscli.commands.load_test import locust_runtests +from kubernetes import client, config + +# only support chatqna for now +service_endpoints = { + "chatqna": "/v1/chatqna", +} + + +def load_yaml(file_path): + with open(file_path, "r") as f: + data = yaml.safe_load(f) + return data + + +def construct_benchmark_config(test_suite_config): + """Extract relevant data from the YAML based on the specified test cases.""" + + return { + "concurrency": test_suite_config.get("concurrency", []), + "totoal_query_num": test_suite_config.get("user_queries", []), + "duration:": test_suite_config.get("duration:", []), + "query_num_per_concurrency": test_suite_config.get("query_num_per_concurrency", []), + "possion": test_suite_config.get("possion", False), + "possion_arrival_rate": test_suite_config.get("possion_arrival_rate", 1.0), + "warmup_iterations": test_suite_config.get("warmup_iterations", 10), + "seed": test_suite_config.get("seed", None), + "test_cases": test_suite_config.get("test_cases", ["chatqnafixed"]), + "user_queries": test_suite_config.get("user_queries", [1]), + "query_token_size": test_suite_config.get("query_token_size", 128), + "llm_max_token_size": test_suite_config.get("llm", {}).get("max_token_size", [128]), + } + + +def _get_cluster_ip(service_name, namespace="default"): + """Get the Cluster IP of a service in a Kubernetes cluster.""" + # Load the Kubernetes configuration + config.load_kube_config() # or use config.load_incluster_config() if running inside a Kubernetes pod + + # Create an API client for the core API (which handles services) + v1 = client.CoreV1Api() + + try: + # Get the service object + service = v1.read_namespaced_service(name=service_name, namespace=namespace) + + # Extract the Cluster IP + cluster_ip = service.spec.cluster_ip + + # Extract the port number (assuming the first port, modify if necessary) + if service.spec.ports: + port_number = service.spec.ports[0].port # Get the first port number + else: + port_number = None + + return cluster_ip, port_number + except client.exceptions.ApiException as e: + print(f"Error fetching service: {e}") + return None + + +def _get_service_ip(service_name, deployment_type="k8s", service_ip=None, service_port=None, namespace="default"): + """Get the service IP and port based on the deployment type. + + Args: + service_name (str): The name of the service. + deployment_type (str): The type of deployment ("k8s" or "docker"). + service_ip (str): The IP address of the service (required for Docker deployment). + service_port (int): The port of the service (required for Docker deployment). + namespace (str): The namespace of the service (default is "default"). + + Returns: + (str, int): The service IP and port. + """ + if deployment_type == "k8s": + # Kubernetes IP and port retrieval logic + svc_ip, port = _get_cluster_ip(service_name, namespace) + elif deployment_type == "docker": + # For Docker deployment, service_ip and service_port must be specified + if not service_ip or not service_port: + raise ValueError( + "For Docker deployment, service_ip and service_port must be provided in the configuration." + ) + svc_ip = service_ip + port = service_port + else: + raise ValueError("Unsupported deployment type. Use 'k8s' or 'docker'.") + + return svc_ip, port + + +def _create_yaml_content(service, base_url, bench_target, test_phase, num_queries, test_params): + """Create content for the run.yaml file.""" + + # If a load shape includes the parameter concurrent_level, + # the parameter will be passed to Locust to launch fixed + # number of simulated users. + concurrency = 1 + if num_queries >= 0: + concurrency = max(1, num_queries // test_params["concurrent_level"]) + else: + concurrency = test_params["concurrent_level"] + + import importlib.util + + package_name = "opea-eval" + spec = importlib.util.find_spec(package_name) + print(spec) + + # get folder path of opea-eval + eval_path = None + import pkg_resources + + for dist in pkg_resources.working_set: + if "opea-eval" in dist.project_name: + eval_path = dist.location + if not eval_path: + print("Fail to load opea-eval package. Please install it first.") + exit(1) + + yaml_content = { + "profile": { + "storage": {"hostpath": test_params["test_output_dir"]}, + "global-settings": { + "tool": "locust", + "locustfile": os.path.join(eval_path, "evals/benchmark/stresscli/locust/aistress.py"), + "host": base_url, + "stop-timeout": test_params["query_timeout"], + "processes": 2, + "namespace": test_params["namespace"], + "bench-target": bench_target, + "service-metric-collect": test_params["collect_service_metric"], + "service-list": service.get("service_list", []), + "dataset": service.get("dataset", "default"), + "prompts": service.get("prompts", None), + "max-output": service.get("max_output", 128), + "seed": test_params.get("seed", None), + "llm-model": test_params["llm_model"], + "deployment-type": test_params["deployment_type"], + "load-shape": test_params["load_shape"], + }, + "runs": [{"name": test_phase, "users": concurrency, "max-request": num_queries}], + } + } + + # For the following scenarios, test will stop after the specified run-time + if test_params["run_time"] is not None and test_phase != "warmup": + yaml_content["profile"]["global-settings"]["run-time"] = test_params["run_time"] + + return yaml_content + + +def _create_stresscli_confs(case_params, test_params, test_phase, num_queries, base_url, ts) -> str: + """Create a stresscli configuration file and persist it on disk.""" + stresscli_confs = [] + # Get the workload + test_cases = test_params["test_cases"] + for test_case in test_cases: + stresscli_conf = {} + print(test_case) + if isinstance(test_case, str): + bench_target = test_case + elif isinstance(test_case, dict): + bench_target = list(test_case.keys())[0] + dataset_conf = test_case[bench_target] + if bench_target == "chatqna_qlist_pubmed": + max_lines = dataset_conf["dataset"].split("pub_med")[-1] + stresscli_conf["envs"] = {"DATASET": f"pubmed_{max_lines}.txt", "MAX_LINES": max_lines} + # Generate the content of stresscli configuration file + stresscli_yaml = _create_yaml_content(case_params, base_url, bench_target, test_phase, num_queries, test_params) + + # Dump the stresscli configuration file + service_name = case_params.get("service_name") + run_yaml_path = os.path.join( + test_params["test_output_dir"], f"run_{service_name}_{ts}_{test_phase}_{num_queries}_{bench_target}.yaml" + ) + with open(run_yaml_path, "w") as yaml_file: + yaml.dump(stresscli_yaml, yaml_file) + stresscli_conf["run_yaml_path"] = run_yaml_path + stresscli_confs.append(stresscli_conf) + return stresscli_confs + + +def create_stresscli_confs(service, base_url, test_suite_config, index): + """Create and save the run.yaml file for the service being tested.""" + os.makedirs(test_suite_config["test_output_dir"], exist_ok=True) + + stresscli_confs = [] + + # Add YAML configuration of stresscli for warm-ups + warm_ups = test_suite_config["warm_ups"] + if warm_ups is not None and warm_ups > 0: + stresscli_confs.extend(_create_stresscli_confs(service, test_suite_config, "warmup", warm_ups, base_url, index)) + + # Add YAML configuration of stresscli for benchmark + user_queries_lst = test_suite_config["user_queries"] + if user_queries_lst is None or len(user_queries_lst) == 0: + # Test stop is controlled by run time + stresscli_confs.extend(_create_stresscli_confs(service, test_suite_config, "benchmark", -1, base_url, index)) + else: + # Test stop is controlled by request count + for user_queries in user_queries_lst: + stresscli_confs.extend( + _create_stresscli_confs(service, test_suite_config, "benchmark", user_queries, base_url, index) + ) + + return stresscli_confs + + +def _run_service_test(example, service, test_suite_config): + """Run the test for a specific service and example.""" + print(f"[OPEA BENCHMARK] 🚀 Example: [ {example} ] Service: [ {service.get('service_name')} ], Running test...") + + # Get the service name + service_name = service.get("service_name") + + # Get the deployment type from the test suite configuration + deployment_type = test_suite_config.get("deployment_type", "k8s") + + # Get the service IP and port based on deployment type + svc_ip, port = _get_service_ip( + service_name, + deployment_type, + test_suite_config.get("service_ip"), + test_suite_config.get("service_port"), + test_suite_config.get("namespace"), + ) + + base_url = f"http://{svc_ip}:{port}" + endpoint = service_endpoints[example] + url = f"{base_url}{endpoint}" + print(f"[OPEA BENCHMARK] 🚀 Running test for {service_name} at {url}") + + # Generate a unique index based on the current time + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + + # Create the run.yaml for the service + stresscli_confs = create_stresscli_confs(service, base_url, test_suite_config, timestamp) + + # Do benchmark in for-loop for different user queries + output_folders = [] + for index, stresscli_conf in enumerate(stresscli_confs, start=1): + run_yaml_path = stresscli_conf["run_yaml_path"] + print(f"[OPEA BENCHMARK] 🚀 The {index} time test is running, run yaml: {run_yaml_path}...") + os.environ["MAX_TOKENS"] = str(service.get("max_output")) + if stresscli_conf.get("envs") is not None: + for key, value in stresscli_conf.get("envs").items(): + os.environ[key] = value + + output_folders.append(locust_runtests(None, run_yaml_path)) + + print(f"[OPEA BENCHMARK] 🚀 Test completed for {service_name} at {url}") + return output_folders + + +def run_benchmark(benchmark_config, chart_name, namespace, llm_model=None, report=False): + # If llm_model is None or an empty string, set to default value + if not llm_model: + llm_model = "Qwen/Qwen2.5-Coder-7B-Instruct" + + # Extract data + parsed_data = construct_benchmark_config(benchmark_config) + test_suite_config = { + "user_queries": parsed_data["user_queries"], # num of user queries + "random_prompt": False, # whether to use random prompt, set to False by default + "run_time": "60m", # The max total run time for the test suite, set to 60m by default + "collect_service_metric": False, # whether to collect service metrics, set to False by default + "llm_model": llm_model, # The LLM model used for the test + "deployment_type": "k8s", # Default is "k8s", can also be "docker" + "service_ip": None, # Leave as None for k8s, specify for Docker + "service_port": None, # Leave as None for k8s, specify for Docker + "test_output_dir": os.getcwd() + "/benchmark_output", # The directory to store the test output + "load_shape": { + "name": "constant", + "params": {"constant": {"concurrent_level": 4}, "poisson": {"arrival_rate": 1.0}}, + }, + "concurrent_level": 4, + "arrival_rate": 1.0, + "query_timeout": 120, + "warm_ups": parsed_data["warmup_iterations"], + "seed": parsed_data["seed"], + "namespace": namespace, + "test_cases": parsed_data["test_cases"], + "llm_max_token_size": parsed_data["llm_max_token_size"], + } + + dataset = None + query_data = None + + # Do benchmark in for-loop for different llm_max_token_size + for llm_max_token in parsed_data["llm_max_token_size"]: + print(f"[OPEA BENCHMARK] 🚀 Run benchmark on {dataset} with llm max-output-token {llm_max_token}.") + case_data = {} + # Support chatqna only for now + if chart_name == "chatqna": + case_data = { + "run_test": True, + "service_name": "chatqna", + "service_list": [ + "chatqna", + "chatqna-chatqna-ui", + "chatqna-data-prep", + "chatqna-nginx", + "chatqna-redis-vector-db", + "chatqna-retriever-usvc", + "chatqna-tei", + "chatqna-teirerank", + "chatqna-tgi", + ], + "test_cases": parsed_data["test_cases"], + # Activate if random_prompt=true: leave blank = default dataset(WebQuestions) or sharegpt + "prompts": query_data, + "max_output": llm_max_token, # max number of output tokens + "k": 1, # number of retrieved documents + } + output_folder = _run_service_test(chart_name, case_data, test_suite_config) + + print(f"[OPEA BENCHMARK] 🚀 Test Finished. Output saved in {output_folder}.") + + if report: + print(output_folder) + all_results = dict() + for folder in output_folder: + from evals.benchmark.stresscli.commands.report import get_report_results + + results = get_report_results(folder) + all_results[folder] = results + print(f"results = {results}\n") + + return all_results + + +if __name__ == "__main__": + benchmark_config = load_yaml("./benchmark.yaml") + run_benchmark(benchmark_config=benchmark_config, chart_name="chatqna", namespace="deploy-benchmark") diff --git a/deploy.py b/deploy.py new file mode 100644 index 000000000..21dd278cc --- /dev/null +++ b/deploy.py @@ -0,0 +1,674 @@ +# Copyright (C) 2024 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import glob +import json +import os +import subprocess +import sys +import time +from enum import Enum, auto + +import yaml + +################################################################################ +# # +# HELM VALUES GENERATION SECTION # +# # +################################################################################ + + +def configure_node_selectors(values, node_selector, deploy_config): + """Configure node selectors for all services.""" + for service_name, config in deploy_config["services"].items(): + if service_name == "backend": + values["nodeSelector"] = {key: value for key, value in node_selector.items()} + elif service_name == "llm": + engine = config.get("engine", "tgi") + values[engine] = {"nodeSelector": {key: value for key, value in node_selector.items()}} + else: + values[service_name] = {"nodeSelector": {key: value for key, value in node_selector.items()}} + return values + + +def configure_replica(values, deploy_config): + """Get replica configuration based on example type and node count.""" + for service_name, config in deploy_config["services"].items(): + if not config.get("replicaCount"): + continue + + if service_name == "llm": + engine = config.get("engine", "tgi") + values[engine]["replicaCount"] = config["replicaCount"] + elif service_name == "backend": + values["replicaCount"] = config["replicaCount"] + else: + values[service_name]["replicaCount"] = config["replicaCount"] + + return values + + +def get_output_filename(num_nodes, with_rerank, example_type, device, action_type): + """Generate output filename based on configuration.""" + rerank_suffix = "with-rerank-" if with_rerank else "" + action_suffix = "deploy-" if action_type == 0 else "update-" if action_type == 1 else "" + + return f"{example_type}-{num_nodes}-{device}-{action_suffix}{rerank_suffix}values.yaml" + + +def configure_resources(values, deploy_config): + """Configure resources when tuning is enabled.""" + resource_configs = [] + + for service_name, config in deploy_config["services"].items(): + resources = {} + if deploy_config["device"] == "gaudi" and config.get("cards_per_instance", 0) > 1: + resources = { + "limits": {"habana.ai/gaudi": config["cards_per_instance"]}, + "requests": {"habana.ai/gaudi": config["cards_per_instance"]}, + } + else: + limits = {} + requests = {} + + # Only add CPU if cores_per_instance has a value + if config.get("cores_per_instance"): + limits["cpu"] = config["cores_per_instance"] + requests["cpu"] = config["cores_per_instance"] + + # Only add memory if memory_capacity has a value + if config.get("memory_capacity"): + limits["memory"] = config["memory_capacity"] + requests["memory"] = config["memory_capacity"] + + # Only create resources if we have any limits/requests + if limits and requests: + resources["limits"] = limits + resources["requests"] = requests + + if resources: + if service_name == "llm": + engine = config.get("engine", "tgi") + resource_configs.append( + { + "name": engine, + "resources": resources, + } + ) + else: + resource_configs.append( + { + "name": service_name, + "resources": resources, + } + ) + + for config in [r for r in resource_configs if r]: + service_name = config["name"] + if service_name == "backend": + values["resources"] = config["resources"] + elif service_name in values: + values[service_name]["resources"] = config["resources"] + + return values + + +def configure_extra_cmd_args(values, deploy_config): + """Configure extra command line arguments for services.""" + for service_name, config in deploy_config["services"].items(): + extra_cmd_args = [] + + for param in [ + "max_batch_size", + "max_input_length", + "max_total_tokens", + "max_batch_total_tokens", + "max_batch_prefill_tokens", + ]: + if config.get(param): + extra_cmd_args.extend([f"--{param.replace('_', '-')}", str(config[param])]) + + if extra_cmd_args: + if service_name == "llm": + engine = config.get("engine", "tgi") + if engine not in values: + values[engine] = {} + values[engine]["extraCmdArgs"] = extra_cmd_args + else: + if service_name not in values: + values[service_name] = {} + values[service_name]["extraCmdArgs"] = extra_cmd_args + + return values + + +def configure_models(values, deploy_config): + """Configure model settings for services.""" + for service_name, config in deploy_config["services"].items(): + # Skip if no model_id defined or service is disabled + if not config.get("model_id") or config.get("enabled") is False: + continue + + if service_name == "llm": + # For LLM service, use its engine as the key + engine = config.get("engine", "tgi") + values[engine]["LLM_MODEL_ID"] = config.get("model_id") + elif service_name == "tei": + values[service_name]["EMBEDDING_MODEL_ID"] = config.get("model_id") + elif service_name == "teirerank": + values[service_name]["RERANK_MODEL_ID"] = config.get("model_id") + + return values + + +def configure_rerank(values, with_rerank, deploy_config, example_type, node_selector): + """Configure rerank service.""" + if with_rerank: + if "teirerank" not in values: + values["teirerank"] = {"nodeSelector": {key: value for key, value in node_selector.items()}} + elif "nodeSelector" not in values["teirerank"]: + values["teirerank"]["nodeSelector"] = {key: value for key, value in node_selector.items()} + else: + if example_type == "chatqna": + values["image"] = {"repository": "opea/chatqna-without-rerank"} + if "teirerank" not in values: + values["teirerank"] = {"enabled": False} + elif "enabled" not in values["teirerank"]: + values["teirerank"]["enabled"] = False + return values + + +def generate_helm_values(example_type, deploy_config, chart_dir, action_type, node_selector=None): + """Create a values.yaml file based on the provided configuration.""" + if deploy_config is None: + raise ValueError("deploy_config is required") + + # Ensure the chart_dir exists + if not os.path.exists(chart_dir): + return {"status": "false", "message": f"Chart directory {chart_dir} does not exist"} + + num_nodes = deploy_config.get("node", 1) + with_rerank = deploy_config["services"].get("teirerank", {}).get("enabled", False) + + print(f"Generating values for {example_type} example") + print(f"with_rerank: {with_rerank}") + print(f"num_nodes: {num_nodes}") + print(f"node_selector: {node_selector}") + + # Initialize base values + values = { + "global": { + "HUGGINGFACEHUB_API_TOKEN": deploy_config.get("HUGGINGFACEHUB_API_TOKEN", ""), + "modelUseHostPath": deploy_config.get("modelUseHostPath", ""), + } + } + + # Configure components + values = configure_node_selectors(values, node_selector or {}, deploy_config) + values = configure_rerank(values, with_rerank, deploy_config, example_type, node_selector or {}) + values = configure_replica(values, deploy_config) + values = configure_resources(values, deploy_config) + values = configure_extra_cmd_args(values, deploy_config) + values = configure_models(values, deploy_config) + + device = deploy_config.get("device", "unknown") + + # Generate and write YAML file + filename = get_output_filename(num_nodes, with_rerank, example_type, device, action_type) + yaml_string = yaml.dump(values, default_flow_style=False) + + filepath = os.path.join(chart_dir, filename) + + # Write the YAML data to the file + with open(filepath, "w") as file: + file.write(yaml_string) + + print(f"YAML file {filepath} has been generated.") + return {"status": "success", "filepath": filepath} + + +################################################################################ +# # +# DEPLOYMENT SECTION # +# # +################################################################################ + + +def run_kubectl_command(command): + """Run a kubectl command and return the output.""" + try: + result = subprocess.run(command, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) + return result.stdout + except subprocess.CalledProcessError as e: + print(f"Error running command: {command}\n{e.stderr}") + exit(1) + + +def get_all_nodes(): + """Get the list of all nodes in the Kubernetes cluster.""" + command = ["kubectl", "get", "nodes", "-o", "json"] + output = run_kubectl_command(command) + nodes = json.loads(output) + return [node["metadata"]["name"] for node in nodes["items"]] + + +def add_label_to_node(node_name, label): + """Add a label to the specified node.""" + command = ["kubectl", "label", "node", node_name, label, "--overwrite"] + print(f"Labeling node {node_name} with {label}...") + run_kubectl_command(command) + print(f"Label {label} added to node {node_name} successfully.") + + +def add_labels_to_nodes(node_count=None, label=None, node_names=None): + """Add a label to the specified number of nodes or to specified nodes.""" + + if node_names: + # Add label to the specified nodes + for node_name in node_names: + add_label_to_node(node_name, label) + else: + # Fetch the node list and label the specified number of nodes + all_nodes = get_all_nodes() + if node_count is None or node_count > len(all_nodes): + print(f"Error: Node count exceeds the number of available nodes ({len(all_nodes)} available).") + sys.exit(1) + + selected_nodes = all_nodes[:node_count] + for node_name in selected_nodes: + add_label_to_node(node_name, label) + + +def clear_labels_from_nodes(label, node_names=None): + """Clear the specified label from specific nodes if provided, otherwise from all nodes.""" + label_key = label.split("=")[0] # Extract key from 'key=value' format + + # If specific nodes are provided, use them; otherwise, get all nodes + nodes_to_clear = node_names if node_names else get_all_nodes() + + for node_name in nodes_to_clear: + # Check if the node has the label by inspecting its metadata + command = ["kubectl", "get", "node", node_name, "-o", "json"] + node_info = run_kubectl_command(command) + node_metadata = json.loads(node_info) + + # Check if the label exists on this node + labels = node_metadata["metadata"].get("labels", {}) + if label_key in labels: + # Remove the label from the node + command = ["kubectl", "label", "node", node_name, f"{label_key}-"] + print(f"Removing label {label_key} from node {node_name}...") + run_kubectl_command(command) + print(f"Label {label_key} removed from node {node_name} successfully.") + else: + print(f"Label {label_key} not found on node {node_name}, skipping.") + + +def get_hw_values_file(deploy_config, chart_dir): + """Get the hardware-specific values file based on the deploy configuration.""" + device_type = deploy_config.get("device", "cpu") + print(f"Device type is {device_type}. Using existing Helm chart values files...") + if device_type == "cpu": + print(f"Device type is {device_type}. Using existing Helm chart values files.") + return None + + llm_engine = deploy_config.get("services", {}).get("llm", {}).get("engine", "tgi") + version = deploy_config.get("version", "1.1.0") + + if os.path.isdir(chart_dir): + # Determine which values file to use based on version + if version in ["1.0.0", "1.1.0"]: + hw_values_file = os.path.join(chart_dir, f"{device_type}-values.yaml") + else: + hw_values_file = os.path.join(chart_dir, f"{device_type}-{llm_engine}-values.yaml") + + if not os.path.exists(hw_values_file): + print(f"Warning: {hw_values_file} not found") + hw_values_file = None + else: + print(f"Device-specific values file found: {hw_values_file}") + else: + print(f"Error: Could not find directory for {chart_dir}") + hw_values_file = None + + return hw_values_file + + +def install_helm_release(release_name, chart_name, namespace, hw_values_file, deploy_values_file): + """Deploy a Helm release with a specified name and chart. + + Parameters: + - release_name: The name of the Helm release. + - chart_name: The Helm chart name or path. + - namespace: The Kubernetes namespace for deployment. + - hw_values_file: The values file for hw specific + - deploy_values_file: The values file for deployment. + """ + + # Check if the namespace exists; if not, create it + try: + command = ["kubectl", "get", "namespace", namespace] + subprocess.run(command, check=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) + except subprocess.CalledProcessError: + print(f"Namespace '{namespace}' does not exist. Creating it...") + command = ["kubectl", "create", "namespace", namespace] + subprocess.run(command, check=True) + print(f"Namespace '{namespace}' created successfully.") + + try: + # Prepare the Helm install command + command = ["helm", "install", release_name, chart_name, "--namespace", namespace] + + # Append values files in order + if hw_values_file: + command.extend(["-f", hw_values_file]) + if deploy_values_file: + command.extend(["-f", deploy_values_file]) + + # Execute the Helm install command + print(f"Running command: {' '.join(command)}") + subprocess.run(command, check=True) + print("Deployment initiated successfully.") + + except subprocess.CalledProcessError as e: + print(f"Error occurred while deploying Helm release: {e}") + + +def uninstall_helm_release(release_name, namespace=None): + """Uninstall a Helm release and clean up resources, optionally delete the namespace if not 'default'.""" + # Default to 'default' namespace if none is specified + if not namespace: + namespace = "default" + + try: + # Uninstall the Helm release + command = ["helm", "uninstall", release_name, "--namespace", namespace] + print(f"Uninstalling Helm release {release_name} in namespace {namespace}...") + run_kubectl_command(command) + print(f"Helm release {release_name} uninstalled successfully.") + + # If the namespace is specified and not 'default', delete it + if namespace != "default": + print(f"Deleting namespace {namespace}...") + delete_namespace_command = ["kubectl", "delete", "namespace", namespace] + run_kubectl_command(delete_namespace_command) + print(f"Namespace {namespace} deleted successfully.") + else: + print("Namespace is 'default', skipping deletion.") + + except subprocess.CalledProcessError as e: + print(f"Error occurred while uninstalling Helm release or deleting namespace: {e}") + + +def update_service(release_name, chart_name, namespace, hw_values_file, deploy_values_file, update_values_file): + """Update the deployment using helm upgrade with new values. + + Args: + release_name: The helm release name + namespace: The kubernetes namespace + deploy_config: The deployment configuration + chart_name: The chart name for the deployment + """ + + # Construct helm upgrade command + command = [ + "helm", + "upgrade", + release_name, + chart_name, + "--namespace", + namespace, + "-f", + hw_values_file, + "-f", + deploy_values_file, + "-f", + update_values_file, + ] + # Execute helm upgrade + print(f"Running command: {' '.join(command)}") + run_kubectl_command(command) + print("Deployment updated successfully") + + +def read_deploy_config(config_path): + """Read and parse the deploy config file. + + Args: + config_path: Path to the deploy config file + + Returns: + The parsed deploy config dictionary or None if failed + """ + try: + with open(config_path, "r") as f: + return yaml.safe_load(f) + except Exception as e: + print(f"Failed to load deploy config: {str(e)}") + return None + + +def check_deployment_ready(release_name, namespace, timeout=300, interval=5, logfile="deployment.log"): + """Wait until all pods in the deployment are running and ready. + + Args: + namespace: The Kubernetes namespace + timeout: The maximum time to wait in seconds (default 120 seconds) + interval: The interval between checks in seconds (default 5 seconds) + logfile: The file to log output to (default 'deployment.log') + + Returns: + 0 if success, 1 if failure (timeout reached) + """ + try: + # Get the list of deployments in the namespace + cmd = ["kubectl", "-n", namespace, "get", "deployments", "-o", "jsonpath='{.items[*].metadata.name}'"] + deployments_output = subprocess.check_output(cmd, text=True) + deployments = deployments_output.strip().split() + + # Strip the first and last elements of single quotes if present + deployments[0] = deployments[0].strip("'") + deployments[-1] = deployments[-1].strip("'") + + with open(logfile, "a") as log: + log.write(f"Found deployments: {', '.join(deployments)}\n") + + timer = 0 + + # Loop through each deployment to check its readiness + for deployment_name in deployments: + + if "-" not in deployment_name or "ui" in deployment_name or "nginx" in deployment_name: + continue + + instance_name = deployment_name.split("-", 1)[0] + app_name = deployment_name.split("-", 1)[1] + + if instance_name != release_name: + continue + + cmd = ["kubectl", "-n", namespace, "get", "deployment", deployment_name, "-o", "jsonpath={.spec.replicas}"] + desired_replicas = int(subprocess.check_output(cmd, text=True).strip()) + + with open(logfile, "a") as log: + log.write(f"Checking deployment '{deployment_name}' with desired replicas: {desired_replicas}\n") + + while True: + cmd = [ + "kubectl", + "-n", + namespace, + "get", + "pods", + "-l", + f"app.kubernetes.io/instance={instance_name}", + "-l", + f"app.kubernetes.io/name={app_name}", + "--field-selector=status.phase=Running", + "-o", + "json", + ] + + pods_output = subprocess.check_output(cmd, text=True) + pods = json.loads(pods_output) + + ready_pods = sum( + 1 + for pod in pods["items"] + if all(container.get("ready") for container in pod.get("status", {}).get("containerStatuses", [])) + ) + + terminating_pods = sum( + 1 for pod in pods["items"] if pod.get("metadata", {}).get("deletionTimestamp") is not None + ) + + with open(logfile, "a") as log: + log.write( + f"Ready pods: {ready_pods}, Desired replicas: {desired_replicas}, Terminating pods: {terminating_pods}\n" + ) + + if ready_pods == desired_replicas and terminating_pods == 0: + with open(logfile, "a") as log: + log.write(f"All pods for deployment '{deployment_name}' are running and ready.\n") + break + + if timer >= timeout: + with open(logfile, "a") as log: + log.write( + f"Timeout reached for deployment '{deployment_name}'. Not all pods are running and ready.\n" + ) + return 1 # Failure + + time.sleep(interval) + timer += interval + + return 0 # Success for all deployments + + except subprocess.CalledProcessError as e: + with open(logfile, "a") as log: + log.write(f"Error executing kubectl command: {e}\n") + return 1 # Failure + except json.JSONDecodeError as e: + with open(logfile, "a") as log: + log.write(f"Error parsing kubectl output: {e}\n") + return 1 # Failure + except Exception as e: + with open(logfile, "a") as log: + log.write(f"Unexpected error: {e}\n") + return 1 # Failure + + +def main(): + parser = argparse.ArgumentParser(description="Manage Helm Deployment.") + parser.add_argument( + "--chart-name", + type=str, + default="chatqna", + help="The chart name to deploy (default: chatqna).", + ) + parser.add_argument("--namespace", default="default", help="Kubernetes namespace (default: default).") + parser.add_argument("--user-values", help="Path to a user-specified values.yaml file.") + parser.add_argument("--deploy-config", help="Path to a deploy config yaml file.") + parser.add_argument( + "--create-values-only", action="store_true", help="Only create the values.yaml file without deploying." + ) + parser.add_argument("--uninstall", action="store_true", help="Uninstall the Helm release.") + parser.add_argument("--num-nodes", type=int, default=1, help="Number of nodes to use (default: 1).") + parser.add_argument("--node-names", nargs="*", help="Optional specific node names to label.") + parser.add_argument("--add-label", action="store_true", help="Add label to specified nodes if this flag is set.") + parser.add_argument( + "--delete-label", action="store_true", help="Delete label from specified nodes if this flag is set." + ) + parser.add_argument( + "--label", default="node-type=opea-benchmark", help="Label to add/delete (default: node-type=opea-benchmark)." + ) + parser.add_argument("--update-service", action="store_true", help="Update the deployment with new configuration.") + parser.add_argument("--check-ready", action="store_true", help="Check if all services in the deployment are ready.") + parser.add_argument("--chart-dir", default=".", help="Path to the untarred Helm chart directory.") + + args = parser.parse_args() + + # Node labeling management + if args.add_label: + add_labels_to_nodes(args.num_nodes, args.label, args.node_names) + return + elif args.delete_label: + clear_labels_from_nodes(args.label, args.node_names) + return + elif args.check_ready: + is_ready = check_deployment_ready(args.chart_name, args.namespace) + return is_ready + elif args.uninstall: + uninstall_helm_release(args.chart_name, args.namespace) + return + + # Load deploy_config if provided + deploy_config = None + if args.deploy_config: + deploy_config = read_deploy_config(args.deploy_config) + if deploy_config is None: + parser.error("Failed to load deploy config") + return + + hw_values_file = get_hw_values_file(deploy_config, args.chart_dir) + + action_type = 0 + if args.update_service: + action_type = 1 + + # The user file is provided for deploy when --update-service is not specified + if args.user_values and not args.update_service: + values_file_path = args.user_values + else: + if not args.deploy_config: + parser.error("--deploy-config is required") + + node_selector = {args.label.split("=")[0]: args.label.split("=")[1]} + + print("go to generate deploy values" if action_type == 0 else "go to generate update values") + + # Generate values file for deploy or update service + result = generate_helm_values( + example_type=args.chart_name, + deploy_config=deploy_config, + chart_dir=args.chart_dir, + action_type=action_type, # 0 - deploy, 1 - update + node_selector=node_selector, + ) + + # Check result status + if result["status"] == "success": + values_file_path = result["filepath"] + else: + parser.error(f"Failed to generate values.yaml: {result['message']}") + return + + print("start to read the generated values file") + # Read back the generated YAML file for verification + with open(values_file_path, "r") as file: + print("Generated YAML contents:") + print(file.read()) + + # Handle service update if specified + if args.update_service: + if not args.user_values: + parser.error("--user-values is required for update reference") + + try: + update_service( + args.chart_name, args.chart_name, args.namespace, hw_values_file, args.user_values, values_file_path + ) + return + except Exception as e: + parser.error(f"Failed to update deployment: {str(e)}") + return + + # Deploy unless --create-values-only is specified + if not args.create_values_only: + install_helm_release(args.chart_name, args.chart_name, args.namespace, hw_values_file, values_file_path) + print(f"values_file_path: {values_file_path}") + + +if __name__ == "__main__": + main() diff --git a/deploy_and_benchmark.py b/deploy_and_benchmark.py new file mode 100644 index 000000000..1dc4c4308 --- /dev/null +++ b/deploy_and_benchmark.py @@ -0,0 +1,292 @@ +# Copyright (C) 2025 Intel Corporation +# SPDX-License-Identifier: Apache-2.0 + +import argparse +import copy +import os +import re +import shutil +import subprocess +import sys + +import yaml + +from benchmark import run_benchmark + + +def read_yaml(file_path): + try: + with open(file_path, "r") as file: + return yaml.safe_load(file) + except Exception as e: + print(f"Error reading YAML file: {e}") + return None + + +def construct_deploy_config(deploy_config, target_node, max_batch_size=None): + """Construct a new deploy config based on the target node number and optional max_batch_size. + + Args: + deploy_config: Original deploy config dictionary + target_node: Target node number to match in the node array + max_batch_size: Optional specific max_batch_size value to use + + Returns: + A new deploy config with single values for node and instance_num + """ + # Deep copy the original config to avoid modifying it + new_config = copy.deepcopy(deploy_config) + + # Get the node array and validate + nodes = deploy_config.get("node") + if not isinstance(nodes, list): + raise ValueError("deploy_config['node'] must be an array") + + # Find the index of the target node + try: + node_index = nodes.index(target_node) + except ValueError: + raise ValueError(f"Target node {target_node} not found in node array {nodes}") + + # Set the single node value + new_config["node"] = target_node + + # Update instance_num for each service based on the same index + for service_name, service_config in new_config.get("services", {}).items(): + if "replicaCount" in service_config: + instance_nums = service_config["replicaCount"] + if isinstance(instance_nums, list): + if len(instance_nums) != len(nodes): + raise ValueError( + f"instance_num array length ({len(instance_nums)}) for service {service_name} " + f"doesn't match node array length ({len(nodes)})" + ) + service_config["replicaCount"] = instance_nums[node_index] + + # Update max_batch_size if specified + if max_batch_size is not None and "llm" in new_config["services"]: + new_config["services"]["llm"]["max_batch_size"] = max_batch_size + + return new_config + + +def pull_helm_chart(chart_pull_url, version, chart_name): + # Pull and untar the chart + subprocess.run(["helm", "pull", chart_pull_url, "--version", version, "--untar"], check=True) + + current_dir = os.getcwd() + untar_dir = os.path.join(current_dir, chart_name) + + if not os.path.isdir(untar_dir): + print(f"Error: Could not find untarred directory for {chart_name}") + return None + + return untar_dir + + +def main(yaml_file, target_node=None): + """Main function to process deployment configuration. + + Args: + yaml_file: Path to the YAML configuration file + target_node: Optional target number of nodes to deploy. If not specified, will process all nodes. + """ + config = read_yaml(yaml_file) + if config is None: + print("Failed to read YAML file.") + return None + + deploy_config = config["deploy"] + benchmark_config = config["benchmark"] + + # Extract chart name from the YAML file name + chart_name = os.path.splitext(os.path.basename(yaml_file))[0].split("_")[-1] + print(f"chart_name: {chart_name}") + python_cmd = sys.executable + + # Process nodes + nodes = deploy_config.get("node", []) + if not isinstance(nodes, list): + print("Error: deploy_config['node'] must be an array") + return None + + nodes_to_process = [target_node] if target_node is not None else nodes + node_names = deploy_config.get("node_name", []) + namespace = deploy_config.get("namespace", "default") + + # Pull the Helm chart + chart_pull_url = f"oci://ghcr.io/opea-project/charts/{chart_name}" + version = deploy_config.get("version", "1.1.0") + chart_dir = pull_helm_chart(chart_pull_url, version, chart_name) + if not chart_dir: + return + + for node in nodes_to_process: + try: + print(f"\nProcessing configuration for {node} nodes...") + + # Get corresponding node names for this node count + current_node_names = node_names[:node] if node_names else [] + + # Add labels for current node configuration + print(f"Adding labels for {node} nodes...") + cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--add-label"] + if current_node_names: + cmd.extend(["--node-names"] + current_node_names) + + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print(f"Failed to add labels for {node} nodes") + continue + + try: + # Process max_batch_sizes + max_batch_sizes = deploy_config.get("services", {}).get("llm", {}).get("max_batch_size", []) + if not isinstance(max_batch_sizes, list): + max_batch_sizes = [max_batch_sizes] + + values_file_path = None + for i, max_batch_size in enumerate(max_batch_sizes): + print(f"\nProcessing max_batch_size: {max_batch_size}") + + # Construct new deploy config + new_deploy_config = construct_deploy_config(deploy_config, node, max_batch_size) + + # Write the new deploy config to a temporary file + temp_config_file = f"temp_deploy_config_{node}_{max_batch_size}.yaml" + try: + with open(temp_config_file, "w") as f: + yaml.dump(new_deploy_config, f) + + if i == 0: + # First iteration: full deployment + cmd = [ + python_cmd, + "deploy.py", + "--deploy-config", + temp_config_file, + "--chart-name", + chart_name, + "--namespace", + namespace, + "--chart-dir", + chart_dir, + ] + result = subprocess.run(cmd, check=True, capture_output=True, text=True) + + match = re.search(r"values_file_path: (\S+)", result.stdout) + if match: + values_file_path = match.group(1) + print(f"Captured values_file_path: {values_file_path}") + else: + print("values_file_path not found in the output") + + else: + # Subsequent iterations: update services with config change + cmd = [ + python_cmd, + "deploy.py", + "--deploy-config", + temp_config_file, + "--chart-name", + chart_name, + "--namespace", + namespace, + "--chart-dir", + chart_dir, + "--user-values", + values_file_path, + "--update-service", + ] + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print( + f"Update failed for {node} nodes configuration with max_batch_size {max_batch_size}" + ) + break # Skip remaining max_batch_sizes for this node + + # Wait for deployment to be ready + print("\nWaiting for deployment to be ready...") + cmd = [ + python_cmd, + "deploy.py", + "--chart-name", + chart_name, + "--namespace", + namespace, + "--check-ready", + ] + try: + result = subprocess.run(cmd, check=True) + print("Deployments are ready!") + except subprocess.CalledProcessError as e: + print(f"Deployments status failed with returncode: {e.returncode}") + + # Run benchmark + run_benchmark( + benchmark_config=benchmark_config, + chart_name=chart_name, + namespace=namespace, + llm_model=deploy_config.get("services", {}).get("llm", {}).get("model_id", ""), + ) + + except Exception as e: + print( + f"Error during {'deployment' if i == 0 else 'update'} for {node} nodes with max_batch_size {max_batch_size}: {str(e)}" + ) + break # Skip remaining max_batch_sizes for this node + finally: + # Clean up the temporary file + if os.path.exists(temp_config_file): + os.remove(temp_config_file) + + finally: + # Uninstall the deployment + print(f"\nUninstalling deployment for {node} nodes...") + cmd = [ + python_cmd, + "deploy.py", + "--chart-name", + chart_name, + "--namespace", + namespace, + "--uninstall", + ] + try: + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print(f"Failed to uninstall deployment for {node} nodes") + except Exception as e: + print(f"Error while uninstalling deployment for {node} nodes: {str(e)}") + + # Delete labels for current node configuration + print(f"Deleting labels for {node} nodes...") + cmd = [python_cmd, "deploy.py", "--chart-name", chart_name, "--num-nodes", str(node), "--delete-label"] + if current_node_names: + cmd.extend(["--node-names"] + current_node_names) + + try: + result = subprocess.run(cmd, check=True) + if result.returncode != 0: + print(f"Failed to delete labels for {node} nodes") + except Exception as e: + print(f"Error while deleting labels for {node} nodes: {str(e)}") + + except Exception as e: + print(f"Error processing configuration for {node} nodes: {str(e)}") + continue + + # Cleanup: Remove the untarred directory + if chart_dir and os.path.isdir(chart_dir): + print(f"Removing temporary directory: {chart_dir}") + shutil.rmtree(chart_dir) + print("Temporary directory removed successfully.") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Deploy and benchmark with specific node configuration.") + parser.add_argument("yaml_file", help="Path to the YAML configuration file") + parser.add_argument("--target-node", type=int, help="Optional: Target number of nodes to deploy.", default=None) + + args = parser.parse_args() + main(args.yaml_file, args.target_node) diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 000000000..44f6445aa --- /dev/null +++ b/requirements.txt @@ -0,0 +1,9 @@ +kubernetes +locust +numpy +opea-eval>=1.2 +pytest +pyyaml +requests +sseclient-py +transformers