From 332d9120b89665553e4d54eae1295b1143ace1de Mon Sep 17 00:00:00 2001 From: vikramxD Date: Thu, 26 Dec 2024 11:09:41 +0000 Subject: [PATCH] Add Allegro API server implementation with Prometheus metrics support - Introduced `allegro_serve.py` to implement the Allegro API using LitServe. - Added Prometheus metrics for tracking request processing times. - Created `AllegroRequest` model for input validation and `AllegroAPI` class for handling inference requests. - Implemented error handling and logging for improved traceability. - Set up a temporary directory for video output and integrated S3 upload functionality. - Added an error log file for capturing runtime errors. --- api/allegro_serve.py | 204 ++++++++++++++++++ api/logs/error.log | 0 .../allegro_settings.cpython-310.pyc | Bin 1693 -> 1693 bytes .../__pycache__/aws_settings.cpython-310.pyc | Bin 555 -> 553 bytes .../allegro_diffusers.cpython-310.pyc | Bin 0 -> 4656 bytes .../mp4_to_s3_json.cpython-310.pyc | Bin 963 -> 961 bytes .../__pycache__/s3_manager.cpython-310.pyc | Bin 2597 -> 2595 bytes scripts/allegro_diffusers.py | 2 +- 8 files changed, 205 insertions(+), 1 deletion(-) create mode 100644 api/allegro_serve.py create mode 100644 api/logs/error.log create mode 100644 scripts/__pycache__/allegro_diffusers.cpython-310.pyc diff --git a/api/allegro_serve.py b/api/allegro_serve.py new file mode 100644 index 0000000..aace28a --- /dev/null +++ b/api/allegro_serve.py @@ -0,0 +1,204 @@ +import os +import sys +import time +import tempfile +from typing import Dict, Any, List, Union, Optional +from pathlib import Path +from pydantic import BaseModel, Field +from litserve import LitAPI, LitServer, Logger +from loguru import logger +from prometheus_client import ( + CollectorRegistry, + Histogram, + make_asgi_app, + multiprocess +) + +from configs.allegro_settings import AllegroSettings +from scripts.allegro_diffusers import AllegroInference +from scripts.mp4_to_s3_json import mp4_to_s3_json +import torch + +# Set up prometheus multiprocess mode +os.environ["PROMETHEUS_MULTIPROC_DIR"] = "/tmp/prometheus_multiproc_dir" +if not os.path.exists("/tmp/prometheus_multiproc_dir"): + os.makedirs("/tmp/prometheus_multiproc_dir") + +# Initialize prometheus registry +registry = CollectorRegistry() +multiprocess.MultiProcessCollector(registry) + +class PrometheusLogger(Logger): + """Custom logger for Prometheus metrics. + + Implements metric collection for request processing times + using Prometheus Histograms. + Metrics are stored in a multi-process compatible registry. + + Attributes: + function_duration (Histogram): Prometheus histogram for tracking processing times + """ + + def __init__(self): + super().__init__() + self.function_duration = Histogram( + "allegro_request_processing_seconds", + "Time spent processing Allegro request", + ["function_name"], + registry=registry + ) + + def process(self, key: str, value: float) -> None: + """Process and record a metric value. + + Args: + key (str): The name of the function or operation being measured + value (float): The duration or metric value to record + """ + self.function_duration.labels(function_name=key).observe(value) + +class AllegroRequest(BaseModel): + """Model representing a request for the Allegro model. + + Validates input parameters for Allegro model inference. + + Attributes: + prompt (str): Text prompt for inference + negative_prompt (Optional[str]): Text prompt for elements to avoid + num_inference_steps (int): Number of denoising steps (1-100) + guidance_scale (float): Controls adherence to prompt (1.0-20.0) + height (int): Image height (256-720, multiple of 32) + width (int): Image width (256-1280, multiple of 32) + seed (Optional[int]): Random seed for reproducibility + """ + prompt: str = Field(..., description="Main text prompt for generation") + negative_prompt: Optional[str] = Field( + "worst quality, blurry, distorted", + description="Text description of what to avoid" + ) + num_inference_steps: int = Field(50, ge=1, le=100, description="Number of inference steps") + guidance_scale: float = Field(7.5, ge=1.0, le=20.0, description="Guidance scale") + height: int = Field(512, ge=256, le=720, multiple_of=32, description="Image height") + width: int = Field(512, ge=256, le=1280, multiple_of=32, description="Image width") + seed: Optional[int] = Field(None, description="Random seed for reproducibility") + +class AllegroAPI(LitAPI): + """API implementation for Allegro model inference using LitServe. + + Attributes: + settings (AllegroSettings): Configuration for Allegro model + engine (AllegroInference): Inference engine for Allegro + """ + + def setup(self, device: str) -> None: + """Initialize the Allegro inference engine. + + Args: + device (str): Target device for inference ('cuda', 'cpu', etc.) + """ + try: + logger.info(f"Initializing Allegro model on device: {device}") + self.settings = AllegroSettings(device=device) + self.engine = AllegroInference(self.settings) + logger.info("Allegro setup completed successfully") + except Exception as e: + logger.error(f"Error during Allegro setup: {e}") + raise + + def decode_request(self, request: Dict[str, Any]) -> Dict[str, Any]: + """Decode and validate the incoming request. + + Args: + request (dict): Input request dictionary + + Returns: + dict: Validated request + """ + try: + return AllegroRequest(**request).dict() + except Exception as e: + logger.error(f"Request validation error: {e}") + raise + + def predict(self, inputs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: + """Perform inference using the Allegro model. + + Args: + inputs (list): List of validated requests + + Returns: + list: Results with URLs and metadata + """ + results = [] + for request in inputs: + start_time = time.time() + try: + self.settings.update(request) + with tempfile.TemporaryDirectory() as temp_dir: + output_path = Path(temp_dir) / "output.mp4" + self.settings.output_path = output_path + self.engine.generate() + + if not output_path.exists(): + raise FileNotFoundError(f"Output not found at {output_path}") + + with open(output_path, 'rb') as video_file: + s3_response = mp4_to_s3_json(video_file, output_path.name) + + generation_time = time.time() - start_time + + results.append({ + "status": "success", + "video_url": s3_response["url"], + "prompt": request["prompt"], + "time_taken": generation_time + }) + + except Exception as e: + logger.error(f"Error during prediction: {e}") + results.append({"status": "error", "error": str(e)}) + return results + + def encode_response(self, output: List[Dict[str, Any]]) -> Dict[str, Any]: + """Encode the results into a response format. + + Args: + output (list): Results list + + Returns: + dict: Encoded response + """ + return {"results": output} + +def main(): + prometheus_logger = PrometheusLogger() + prometheus_logger.mount( + path="/api/v1/metrics", + app=make_asgi_app(registry=registry) + ) + + logger.remove() + logger.add(sys.stdout, format="{time} | {message}", level="INFO") + logger.add("logs/error.log", format="{time} | {message}", level="ERROR") + + try: + api = AllegroAPI() + server = LitServer( + api, + api_path='/api/v1/allegro', + accelerator="auto", + devices="auto", + max_batch_size=4, + loggers=prometheus_logger, + ) + + logger.info("Starting Allegro API server on port 8000") + server.run(port=8000) + except Exception as e: + logger.error(f"Failed to start server: {e}") + sys.exit(1) + + + +if __name__ == "__main__": + main() diff --git a/api/logs/error.log b/api/logs/error.log new file mode 100644 index 0000000..e69de29 diff --git a/configs/__pycache__/allegro_settings.cpython-310.pyc b/configs/__pycache__/allegro_settings.cpython-310.pyc index 1c671650df5ef32ea7f02efdffc06dec74e6621e..0cf5424a251a3cbb6ab226758590297c38045403 100644 GIT binary patch delta 20 acmbQsJC~O`pO=@50SFQeayN2MVgmpyH3UTf delta 20 acmbQsJC~O`pO=@50SHzE=WOJj#0CH^^8~H{ diff --git a/configs/__pycache__/aws_settings.cpython-310.pyc b/configs/__pycache__/aws_settings.cpython-310.pyc index 54af652fffff69665530980f3c4ce87ffcd56d92..745d48e1d271615775a4957326ca2df437f0000a 100644 GIT binary patch delta 39 tcmZ3@vXX^6pO=@50SFQeayN2MVB}NPFUrp^(RWGAOe@VUoxGZ{0RX!E3nKsk delta 41 vcmZ3rToXImdxyv;jLEZ%wSO!{JncTXMl3(S--2Eq{vF;C@;>K~FNs;)H3-#-J$5AAvN*4Ro$Ad5x zQJ9FKUF?fTsZ^PpNp}%)Au-gBc#F56SQ8vnba)qU4{u*|f-c|T1FY!s-G|n2uQ;d}-c16bAinTY z+p_u$&oT2OPiq!P#bPh2PMWSnMt+&7GVGYfUa3ley#&Lb}q+VBdKFblmXT!ElWM$DH<5KeQVPhBtJB@LmWQ)G;gs2`Y)l>~2OINK(x_rZXkHm`uglnYUz_WHT^{L@o1(l?1fN z@^oZ4kPtG@(mZ29h?PcJA{I#o6@ezm^V}za-FD zDYmt-F?*%|voM>P`DwTi39F^53`KN~QcrpDX6A?;Cpe|fJopZiLG2v11Czlr4_1UX z7{!ek7mLsrrR}z@Ms!6c6x@Os(Q|*byOEnHtPa1%>4t4ZQBgF>ri%1cG~t@E=o^M^ z0%!(X*)enG z4cN9hZ*eDU&pW*Fv~g~m`KFog*7Iu{^F7{riuG6)^=XF}NbR?P?O#cM2Lsg^9)-c&H2D_XgMx$P)W zUI>6;cOLm7rN};FcLQRs1Ou3P2@^^Icq$SkRS6rlQZV=2d27(R^8M9~dzAu7d1m)` zJT4jwPr!M5IzI_ygk)3fWK#JvmuJgVtX`=%)gWDK8+(V2x&}8>NUzrFjR!Sm>)a=} z4Du*i;;1uioIU#hbKO5U|6bJ?wgJcS3*b?- z%LG>Jr&2>-3xPvZMl1FcF-2;+8rUflgPVHU%qMKBA3}^x^nZcQ>i$*qubm5LZC^Al zt&8TRb!k_TZ{jj*&D)6Y&b$juurGS%yN@+3eC;4Z5N>ql1JmxBc5mI{&8O}8K5yN$ z&f6sptSM`B%B+zctlR6RS$mClpQ4g7YkCz{>=>-Lz65-oqna{Uamf4Y&K6eeoOeF$ z7_1n0&$hARqp}RW(3hP^Mm9r&og#+FKxuvX|qTw74CGWiBdpe*~PM+>4Z3 zml;Uu2Eq}al$t0CO+~`A523C?t;%UH7!#?Db68gIWZF?Om&u9K=TeN2otI+Xlqdtr7$)Ab;`v1cNuOV6bQ- zt;Akdv?6&1Xl|jL3{&+DI$W13FhTReaU#Edv)G%t^pyo|9#lXnyRuo>NaVmDMa1v^-LbSY)9Aj2f zwdwTx_I11Od>fGI;ID1|0HDvBo#vOs zz6Y*LF;vZTT$fAVbroIt6qQ#7$%_V*scuvEN`CWYz`lbCdNqM+*J|2(-R|J^Hx-Q$ zTLK2x^$O(Alk!s#r7@sr4WP*tzt#;2xSz>YIqO4O)a46`lS z2~yc6NX_h+eF%rA^zR9UYYQA*oQ+Hnmx~u3stbg*4p#s~aPVa$K~&3_Due|&tYJ)j z6x9ZO)Dm<r1DrMq0CXbEy8Jndk^jc@1HV}hCc2FB}Hgt200Oyr;>Z8Blg~p(|&PJwHVSW-Fz(NRD0PYAk zRSx}onT)ZCI&}HNIAs)2p?U~3L#<2ACAhU?7L&?jJe-DD6bM9q*};^rWSR;Nw~^{f zh1D6rwN~ngC4?N&Rezoe_=3v9!1FUu(omMk3!p`wNSYp@QWc8g2GT~kf~S;c>z6rU z9H;LO4=eoHTwhGFR#ss9o5698Dyu{o^Z$rk<>g)KTV|i literal 0 HcmV?d00001 diff --git a/scripts/__pycache__/mp4_to_s3_json.cpython-310.pyc b/scripts/__pycache__/mp4_to_s3_json.cpython-310.pyc index 0352d5b6a150fd2ea1d6c71ac61210ea6e453655..b18b35685a3c074ac19351d3a92a0043ac4a104a 100644 GIT binary patch delta 39 tcmX@ievq9zpO=@50SFQeayN4SV&YTRFUrp^(RWGAOe@VUoh-_{4gktj3t#{M delta 41 vcmX@eewdv*pO=@50SG2%W^UyE#U!AqUzDF;qVJoTmzkTNoRK+Mn0Xxl?!FA2 diff --git a/scripts/__pycache__/s3_manager.cpython-310.pyc b/scripts/__pycache__/s3_manager.cpython-310.pyc index 428ecab3138a3995e09a309c603fc4a48c4b4269..6b05f3a4e18d6c9f5f9b736726048297a75aaace 100644 GIT binary patch delta 40 ucmZ1~vRH&WpO=@50SFQeayN3vF!3qt7v<-d=({9lrj=%wZZ2n9#|{9*xCZ)FUrp^(f7^F%goJB&dA(c%CwFh0O;2YVE_OC diff --git a/scripts/allegro_diffusers.py b/scripts/allegro_diffusers.py index c1d5f29..94f2cfe 100644 --- a/scripts/allegro_diffusers.py +++ b/scripts/allegro_diffusers.py @@ -95,7 +95,7 @@ def generate_video(self, prompt: str, positive_prompt: str, negative_prompt: str logger.error(f"Error during video generation: {e}") raise -# Example usage (to be executed in a main script or testing environment) + if __name__ == "__main__": settings = AllegroSettings() inference = AllegroInference(settings)