diff --git a/api/__pycache__/ltx_serve.cpython-310.pyc b/api/__pycache__/ltx_serve.cpython-310.pyc new file mode 100644 index 0000000..9550f81 Binary files /dev/null and b/api/__pycache__/ltx_serve.cpython-310.pyc differ diff --git a/api/__pycache__/mochi_serve.cpython-310.pyc b/api/__pycache__/mochi_serve.cpython-310.pyc new file mode 100644 index 0000000..56c8153 Binary files /dev/null and b/api/__pycache__/mochi_serve.cpython-310.pyc differ diff --git a/api/logs/combined_api.log b/api/logs/combined_api.log new file mode 100644 index 0000000..a1b0a5a --- /dev/null +++ b/api/logs/combined_api.log @@ -0,0 +1 @@ +2024-12-23 14:03:00.055 | INFO | __main__:main:220 - Starting Combined Video Generation Server diff --git a/api/ltx_serve.py b/api/ltx_serve.py index fb320f8..ace769c 100644 --- a/api/ltx_serve.py +++ b/api/ltx_serve.py @@ -1,5 +1,13 @@ """ LitServe API implementation for LTX video generation service. + +This module provides a FastAPI-based service for generating videos using the LTX model. +It handles request validation, video generation, S3 upload, and monitoring through Prometheus. + +Key Components: + - PrometheusLogger: Custom metrics logging + - VideoGenerationRequest: Request validation model + - LTXVideoAPI: Main API implementation """ import os @@ -33,7 +41,14 @@ multiprocess.MultiProcessCollector(registry) class PrometheusLogger(Logger): - """Custom logger for Prometheus metrics.""" + """Custom logger for Prometheus metrics. + + Implements metric collection for video generation request processing times + using Prometheus Histograms. Metrics are stored in a multi-process compatible registry. + + Attributes: + function_duration (Histogram): Prometheus histogram for tracking processing times + """ def __init__(self): super().__init__() @@ -45,11 +60,31 @@ def __init__(self): ) def process(self, key: str, value: float) -> None: - """Process and record metric.""" + """Process and record a metric value. + + Args: + key (str): The name of the function or operation being measured + value (float): The duration or metric value to record + """ self.function_duration.labels(function_name=key).observe(value) class VideoGenerationRequest(BaseModel): - """Model representing a video generation request.""" + """Model representing a video generation request. + + Validates and normalizes input parameters for video generation. + Provides default values and constraints for all generation parameters. + + Attributes: + prompt (str): Main text description for video generation + negative_prompt (Optional[str]): Text description of elements to avoid + num_inference_steps (int): Number of denoising steps (1-100) + guidance_scale (float): Controls adherence to prompt (1.0-20.0) + height (int): Video height in pixels (256-720, multiple of 32) + width (int): Video width in pixels (256-1280, multiple of 32) + num_frames (int): Number of frames to generate (1-257) + frame_rate (int): Output video frame rate (1-60) + seed (Optional[int]): Random seed for reproducibility + """ prompt: str = Field(..., description="Text description of the video to generate") negative_prompt: Optional[str] = Field( @@ -97,10 +132,27 @@ class VideoGenerationRequest(BaseModel): seed: Optional[int] = Field(None, description="Random seed for generation") class LTXVideoAPI(LitAPI): - """API for LTX video generation using LitServer.""" + """API for LTX video generation using LitServer. + + Implements the core video generation workflow including model initialization, + request processing, video generation, and result handling. + + Attributes: + settings (LTXVideoSettings): Configuration for video generation + engine (LTXInference): Video generation inference engine + """ def setup(self, device: str) -> None: - """Initialize the LTX video generation model.""" + """Initialize the LTX video generation model. + + Sets up the video generation engine and loads models onto the specified device. + + Args: + device (str): Target device for model execution ('cuda', 'cpu', etc.) + + Raises: + Exception: If model initialization fails + """ try: logger.info(f"Initializing LTX video generation on device: {device}") @@ -120,7 +172,19 @@ def decode_request( self, request: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> List[Dict[str, Any]]: - """Decode and validate the incoming request.""" + """Decode and validate the incoming request. + + Converts raw request data into validated VideoGenerationRequest objects. + + Args: + request: Single request dict or list of request dicts + + Returns: + List of validated request dictionaries + + Raises: + ValidationError: If request validation fails + """ try: # Ensure request is a list if not isinstance(request, list): @@ -141,7 +205,19 @@ def batch( self, inputs: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> Dict[str, List[Any]]: - """Prepare inputs for batch processing.""" + """Prepare inputs for batch processing. + + Organizes single or multiple requests into a batched format for processing. + + Args: + inputs: Single input dict or list of input dicts + + Returns: + Dictionary with lists of batched parameters + + Raises: + Exception: If batch preparation fails + """ try: # Convert single input to list if not isinstance(inputs, list): @@ -176,7 +252,23 @@ def batch( raise def predict(self, inputs: List[Dict[str, Any]]) -> List[Dict[str, Any]]: - """Process inputs and generate videos.""" + """Process inputs and generate videos. + + Core video generation method that handles the complete pipeline: + - Parameter validation + - Video generation + - S3 upload + - Performance monitoring + + Args: + inputs: List of validated generation requests + + Returns: + List of generation results including video URLs and metadata + + Raises: + Exception: If video generation or upload fails + """ results = [] try: @@ -317,7 +409,16 @@ def encode_response( } def main(): - """Main entry point for the API server.""" + """Main entry point for the API server. + + Initializes and starts the Litser server with: + - Prometheus metrics endpoint + - Configured logging + - LTX video generation API + - Server settings for batching and acceleration + + Exits with status code 1 if server initialization fails. + """ # Initialize Prometheus logger prometheus_logger = PrometheusLogger() prometheus_logger.mount( diff --git a/api/serve.py b/api/serve.py index cf6fb61..9239144 100644 --- a/api/serve.py +++ b/api/serve.py @@ -1,180 +1,158 @@ """ -Combined API router for multiple LitServe-based models. +Combined API router for LTX and Mochi video generation services. -This script imports multiple model-specific LitAPI classes (e.g., LTXVideoAPI -and MochiVideoAPI) and integrates them into a single endpoint. Clients specify -which model to invoke by providing a `model_name` field in the request body. - -Features: -- Single endpoint routing for multiple models -- Prometheus metrics for request duration tracking -- Comprehensive logging (stdout and file) with loguru -- Detailed docstrings and structured JSON responses -- Extensible: Just add new model APIs and register them in `model_apis`. +This module provides a unified endpoint that can handle requests for both +LTX and Mochi video generation models. Clients specify which model to use +via the 'model_name' parameter in their requests. Usage: -1. Ensure `ltx_serve.py` and `mochi_serve.py` are in the same directory. -2. Run `python combined_serve.py`. -3. Send POST requests to `http://localhost:8000/predict` with JSON like: - { - "model_name": "ltx", - "prompt": "Generate a video about a sunny day at the beach" - } - - or - - { - "model_name": "mochi", - "prompt": "Generate a video about a futuristic city" - } + POST /predict + { + "model_name": "ltx", # or "mochi" + "prompt": "your video prompt here", + ...other model-specific parameters... + } """ import sys import os -import time from typing import Dict, Any, List, Union from pydantic import BaseModel, Field from loguru import logger - import torch import litserve as ls -from prometheus_client import ( - CollectorRegistry, - Histogram, - make_asgi_app, - multiprocess -) - -# Import the individual model APIs +from prometheus_client import CollectorRegistry, Histogram, make_asgi_app, multiprocess from ltx_serve import LTXVideoAPI from mochi_serve import MochiVideoAPI # Setup Prometheus multiprocess mode -os.environ["PROMETHEUS_MULTIPROC_DIR"] = "/tmp/prometheus_multiproc_dir" -if not os.path.exists("/tmp/prometheus_multiproc_dir"): - os.makedirs("/tmp/prometheus_multiproc_dir") +os.environ["PROMETHEUS_MULTIPROC_DIR"] = "/tmp/prometheus_multiproc" +if not os.path.exists("/tmp/prometheus_multiproc"): + os.makedirs("/tmp/prometheus_multiproc") registry = CollectorRegistry() multiprocess.MultiProcessCollector(registry) class PrometheusLogger(ls.Logger): - """Custom logger for Prometheus metrics.""" + """Custom logger for tracking combined API metrics.""" + def __init__(self): super().__init__() self.function_duration = Histogram( - "combined_request_processing_seconds", - "Time spent processing combined API request", - ["function_name"], + "combined_request_duration_seconds", + "Time spent processing video generation requests", + ["model_name", "function_name"], registry=registry ) def process(self, key: str, value: float) -> None: - """Record metric observations for function durations.""" - self.function_duration.labels(function_name=key).observe(value) + """Record metric observations.""" + model_name, func_name = key.split(":", 1) if ":" in key else ("unknown", key) + self.function_duration.labels( + model_name=model_name, + function_name=func_name + ).observe(value) class CombinedRequest(BaseModel): - """ - Pydantic model for incoming requests to the combined endpoint. - The `model_name` field is used to select which model to route to. - Other fields depend on the target model, so they are optional here. - """ - model_name: str = Field(..., description="Name of the model to use (e.g., 'ltx' or 'mochi').") - # Any additional fields will be passed through to the selected model's decode_request. - # We keep this flexible by using an extra allowed attributes pattern. - # For more strict validation, define fields matching each model's requirements. + """Request model for the combined API endpoint.""" + + model_name: str = Field( + ..., + description="Model to use for video generation ('ltx' or 'mochi')" + ) + class Config: - extra = "allow" + extra = "allow" # Allow additional fields for model-specific parameters -class CombinedAPI(ls.LitAPI): - """ - A combined API class that delegates requests to multiple model-specific APIs - based on the `model_name` field in the request. +class CombinedVideoAPI(ls.LitAPI): + """Combined API for serving both LTX and Mochi video generation models.""" - This approach allows adding new models by: - 1. Importing their API class. - 2. Initializing and registering them in `model_apis` dictionary. - """ def setup(self, device: str) -> None: - """Setup all sub-model APIs and logging/metrics.""" - - logger.info(f"Initializing combined API with device={device}") + """Initialize both video generation models. + + Args: + device: Target device for model execution + """ + logger.info(f"Setting up combined video API on device: {device}") - # Initialize sub-model APIs + # Initialize both APIs self.ltx_api = LTXVideoAPI() self.mochi_api = MochiVideoAPI() - - # Setup each sub-model on the provided device + + # Setup each model self.ltx_api.setup(device=device) self.mochi_api.setup(device=device) - - # Register them in a dictionary for easy routing + + # Register models for routing self.model_apis = { "ltx": self.ltx_api, "mochi": self.mochi_api } - - logger.info("Combined API setup completed successfully.") + + logger.info("Successfully initialized all models") def decode_request( self, request: Union[Dict[str, Any], List[Dict[str, Any]]] ) -> Dict[str, Any]: - """ - Decode the incoming request to determine which model to use. - We expect `model_name` to route the request accordingly. - The rest of the fields will be passed to the chosen model's decode_request. + """Validate request and determine target model. + + Args: + request: Raw request data + + Returns: + Decoded request with model selection + + Raises: + ValueError: If model_name is invalid """ if isinstance(request, list): - # We handle only single requests for simplicity - request = request[0] - + request = request[0] # Handle single requests for now + validated = CombinedRequest(**request).dict() model_name = validated.pop("model_name").lower() - + if model_name not in self.model_apis: - raise ValueError(f"Unknown model_name '{model_name}'. Available: {list(self.model_apis.keys())}") - - # We'll store the selected model_name and request data + raise ValueError( + f"Invalid model_name: {model_name}. " + f"Available models: {list(self.model_apis.keys())}" + ) + return { "model_name": model_name, "request_data": validated } def predict(self, inputs: Dict[str, Any]) -> Dict[str, Any]: - """ - Perform prediction by routing to the chosen model API. - - Steps: - 1. Extract model_name and request_data. - 2. Pass request_data to the chosen model's decode_request -> predict pipeline. - 3. Return the predictions from the model. + """Route request to appropriate model and generate video. + + Args: + inputs: Decoded request data + + Returns: + Generation results from selected model """ model_name = inputs["model_name"] request_data = inputs["request_data"] model_api = self.model_apis[model_name] - - start_time = time.time() - + try: - # The sub-model APIs typically handle lists of requests. - # We'll wrap request_data in a list if needed. + # Process request through selected model decoded = model_api.decode_request(request_data) - # decoded is typically a list of requests for that model predictions = model_api.predict(decoded) - # predictions is typically a list of results - result = predictions[0] if predictions else {"status": "error", "error": "No result returned"} - - end_time = time.time() - self.log("combined_inference_time", end_time - start_time) - + result = predictions[0] if predictions else { + "status": "error", + "error": "No result returned" + } + return { "model_name": model_name, "result": result } - + except Exception as e: import traceback - logger.error(f"Error in combined predict: {e}\n{traceback.format_exc()}") + logger.error(f"Error in {model_name} prediction: {str(e)}") return { "model_name": model_name, "status": "error", @@ -186,41 +164,41 @@ def predict(self, inputs: Dict[str, Any]) -> Dict[str, Any]: torch.cuda.empty_cache() def encode_response(self, output: Dict[str, Any]) -> Dict[str, Any]: - """ - Encode the final response. We call the chosen model's encode_response if the result - is from a model inference. If there's an error at the combined level, we return a generic error response. + """Encode final response using model-specific encoder. + + Args: + output: Raw model output + + Returns: + Encoded response ready for client """ model_name = output.get("model_name") if model_name and model_name in self.model_apis: - # If there's a result from the model, encode it using the model's encoder result = output.get("result", {}) + if result.get("status") == "error": - # Model-specific error case return { "status": "error", "error": result.get("error", "Unknown error"), - "traceback": result.get("traceback", None) + "traceback": result.get("traceback") } - # Successful result + encoded = self.model_apis[model_name].encode_response(result) - # Add the model name to the final response for clarity encoded["model_name"] = model_name return encoded else: - # If we got here, there's a top-level routing error return { "status": "error", - "error": output.get("error", "Unknown top-level error"), - "traceback": output.get("traceback", None) + "error": output.get("error", "Unknown routing error"), + "traceback": output.get("traceback") } - def main(): - """Main entry point to run the combined server.""" - # Set up Prometheus logger + """Initialize and start the combined video generation server.""" + # Setup Prometheus metrics prometheus_logger = PrometheusLogger() prometheus_logger.mount( - path="/api/v1/metrics", + path="/metrics", app=make_asgi_app(registry=registry) ) @@ -238,18 +216,17 @@ def main(): level="DEBUG" ) - logger.info("Starting Combined Video Generation Server on port 8000") - - # Initialize and run the combined server - api = CombinedAPI() + # Start server + logger.info("Starting Combined Video Generation Server") + api = CombinedVideoAPI() server = ls.LitServer( api, - api_path="/predict", # A single endpoint for all models + api_path="/predict", accelerator="auto", devices="auto", max_batch_size=1, track_requests=True, - loggers=prometheus_logger + loggers=[prometheus_logger] ) server.run(port=8000) diff --git a/configs/__pycache__/aws_settings.cpython-310.pyc b/configs/__pycache__/aws_settings.cpython-310.pyc index ceb9967..54af652 100644 Binary files a/configs/__pycache__/aws_settings.cpython-310.pyc and b/configs/__pycache__/aws_settings.cpython-310.pyc differ diff --git a/configs/__pycache__/ltx_settings.cpython-310.pyc b/configs/__pycache__/ltx_settings.cpython-310.pyc new file mode 100644 index 0000000..b41f2e7 Binary files /dev/null and b/configs/__pycache__/ltx_settings.cpython-310.pyc differ diff --git a/configs/__pycache__/mochi_settings.cpython-310.pyc b/configs/__pycache__/mochi_settings.cpython-310.pyc index b8e3eba..e7cefaf 100644 Binary files a/configs/__pycache__/mochi_settings.cpython-310.pyc and b/configs/__pycache__/mochi_settings.cpython-310.pyc differ diff --git a/scripts/__pycache__/ltx_inference.cpython-310.pyc b/scripts/__pycache__/ltx_inference.cpython-310.pyc new file mode 100644 index 0000000..78ede1d Binary files /dev/null and b/scripts/__pycache__/ltx_inference.cpython-310.pyc differ diff --git a/scripts/__pycache__/mochi_diffusers.cpython-310.pyc b/scripts/__pycache__/mochi_diffusers.cpython-310.pyc index 5f5ef1e..3568ddb 100644 Binary files a/scripts/__pycache__/mochi_diffusers.cpython-310.pyc and b/scripts/__pycache__/mochi_diffusers.cpython-310.pyc differ diff --git a/scripts/__pycache__/mp4_to_s3_json.cpython-310.pyc b/scripts/__pycache__/mp4_to_s3_json.cpython-310.pyc index ece86f2..0352d5b 100644 Binary files a/scripts/__pycache__/mp4_to_s3_json.cpython-310.pyc and b/scripts/__pycache__/mp4_to_s3_json.cpython-310.pyc differ diff --git a/scripts/__pycache__/s3_manager.cpython-310.pyc b/scripts/__pycache__/s3_manager.cpython-310.pyc index 15dfbf8..428ecab 100644 Binary files a/scripts/__pycache__/s3_manager.cpython-310.pyc and b/scripts/__pycache__/s3_manager.cpython-310.pyc differ