diff --git a/.github/actions/run-script-zerox-tests/action.yml b/.github/actions/run-script-zerox-tests/action.yml deleted file mode 100644 index ea15d49e1..000000000 --- a/.github/actions/run-script-zerox-tests/action.yml +++ /dev/null @@ -1,14 +0,0 @@ -name: 'Run SDK Auth Tests' -description: 'Runs SDK authentication tests for R2R' -runs: - using: "composite" - steps: - - name: Ingest zerox document - working-directory: ./py - shell: bash - run: poetry run python core/examples/scripts/run_ingest_with_zerox.py - - - name: Test ingested zerox document - working-directory: ./py - shell: bash - run: poetry run python tests/integration/runner_scripts.py test_ingested_zerox_document diff --git a/.github/workflows/r2r-full-integration-deep-dive-tests.yml b/.github/workflows/r2r-full-integration-deep-dive-tests.yml index 4021478d4..a4391820d 100644 --- a/.github/workflows/r2r-full-integration-deep-dive-tests.yml +++ b/.github/workflows/r2r-full-integration-deep-dive-tests.yml @@ -36,6 +36,3 @@ jobs: - name: Start R2R Full server uses: ./.github/actions/start-r2r-full - - - name: Run Test Zerox - uses: ./.github/actions/run-script-zerox-tests diff --git a/docs/cookbooks/graphrag.mdx b/docs/cookbooks/graphrag.mdx index 0d8437edf..6ebc6a14c 100644 --- a/docs/cookbooks/graphrag.mdx +++ b/docs/cookbooks/graphrag.mdx @@ -99,9 +99,6 @@ excluded_parsers = ["mp4"] semantic_similarity_threshold = 0.7 generation_config = { model = "openai/gpt-4o-mini" } - [ingestion.extra_parsers] - pdf = "zerox" - [database] provider = "postgres" batch_size = 256 @@ -204,9 +201,6 @@ max_characters = 1_024 combine_under_n_chars = 128 overlap = 256 - [ingestion.extra_parsers] - pdf = "zerox" - [orchestration] provider = "hatchet" kg_creation_concurrency_lipmit = 32 diff --git a/py/core/__init__.py b/py/core/__init__.py index babf7f1f8..d289188e7 100644 --- a/py/core/__init__.py +++ b/py/core/__init__.py @@ -195,9 +195,9 @@ "AudioParser", "DOCXParser", "ImageParser", - "PDFParser", + "VLMPDFParser", + "BasicPDFParser", "PDFParserUnstructured", - "PDFParserMarker", "PPTParser", # Structured parsers "CSVParser", diff --git a/py/core/base/parsers/base_parser.py b/py/core/base/parsers/base_parser.py index d0bc8633c..1de600404 100644 --- a/py/core/base/parsers/base_parser.py +++ b/py/core/base/parsers/base_parser.py @@ -3,14 +3,11 @@ from abc import ABC, abstractmethod from typing import AsyncGenerator, Generic, TypeVar -from ..abstractions import DataType - T = TypeVar("T") class AsyncParser(ABC, Generic[T]): + @abstractmethod - async def ingest( - self, data: T, **kwargs - ) -> AsyncGenerator[DataType, None]: + async def ingest(self, data: T, **kwargs) -> AsyncGenerator[str, None]: pass diff --git a/py/core/base/providers/ingestion.py b/py/core/base/providers/ingestion.py index 6e80b51e8..feb0349a5 100644 --- a/py/core/base/providers/ingestion.py +++ b/py/core/base/providers/ingestion.py @@ -1,10 +1,13 @@ import logging from abc import ABC from enum import Enum +from typing import Optional from core.base.abstractions import ChunkEnrichmentSettings from .base import Provider, ProviderConfig +from .database import DatabaseProvider +from .llm import CompletionProvider logger = logging.getLogger() @@ -15,7 +18,14 @@ class IngestionConfig(ProviderConfig): chunk_enrichment_settings: ChunkEnrichmentSettings = ( ChunkEnrichmentSettings() ) - extra_parsers: dict[str, str] = {} + + audio_transcription_model: str + + vision_img_prompt_name: Optional[str] = None + vision_img_model: str + + vision_pdf_prompt_name: Optional[str] = None + vision_pdf_model: str @property def supported_providers(self) -> list[str]: @@ -27,7 +37,21 @@ def validate_config(self) -> None: class IngestionProvider(Provider, ABC): - pass + + config: IngestionConfig + database_provider: DatabaseProvider + llm_provider: CompletionProvider + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + super().__init__(config) + self.config: IngestionConfig = config + self.llm_provider = llm_provider + self.database_provider = database_provider class ChunkingStrategy(str, Enum): diff --git a/py/core/base/providers/llm.py b/py/core/base/providers/llm.py index 41cb00a21..cd213d208 100644 --- a/py/core/base/providers/llm.py +++ b/py/core/base/providers/llm.py @@ -147,6 +147,8 @@ async def aget_completion( "generation_config": generation_config, "kwargs": kwargs, } + if modalities := kwargs.get("modalities"): + task["modalities"] = modalities response = await self._execute_with_backoff_async(task) return LLMChatCompletion(**response.dict()) diff --git a/py/core/configs/full.toml b/py/core/configs/full.toml index 3d397527e..b6ec46b00 100644 --- a/py/core/configs/full.toml +++ b/py/core/configs/full.toml @@ -7,9 +7,6 @@ max_characters = 1_024 combine_under_n_chars = 128 overlap = 256 - [ingestion.extra_parsers] - pdf = "zerox" - [orchestration] provider = "hatchet" kg_creation_concurrency_lipmit = 32 diff --git a/py/core/examples/scripts/run_ingest_with_zerox.py b/py/core/examples/scripts/run_ingest_with_zerox.py deleted file mode 100644 index 41aba6adf..000000000 --- a/py/core/examples/scripts/run_ingest_with_zerox.py +++ /dev/null @@ -1,15 +0,0 @@ -import os -import time - -from r2r import R2RClient - -if __name__ == "__main__": - client = R2RClient(base_url="http://localhost:7272") - script_path = os.path.dirname(__file__) - sample_file = os.path.join(script_path, "..", "data", "graphrag.pdf") - - ingest_response = client.ingest_files( - file_paths=[sample_file], - ingestion_config={"parser_overrides": {"pdf": "zerox"}}, - ) - time.sleep(60) diff --git a/py/core/main/assembly/factory.py b/py/core/main/assembly/factory.py index caaf0f76f..461e9abf4 100644 --- a/py/core/main/assembly/factory.py +++ b/py/core/main/assembly/factory.py @@ -89,7 +89,13 @@ def create_crypto_provider( @staticmethod def create_ingestion_provider( - ingestion_config: IngestionConfig, *args, **kwargs + ingestion_config: IngestionConfig, + database_provider: PostgresDBProvider, + llm_provider: Union[ + LiteLLMCompletionProvider, OpenAICompletionProvider + ], + *args, + **kwargs, ) -> Union[R2RIngestionProvider, UnstructuredIngestionProvider]: config_dict = ( @@ -104,7 +110,9 @@ def create_ingestion_provider( r2r_ingestion_config = R2RIngestionConfig( **config_dict, **extra_fields ) - return R2RIngestionProvider(r2r_ingestion_config) + return R2RIngestionProvider( + r2r_ingestion_config, database_provider, llm_provider + ) elif config_dict["provider"] in [ "unstructured_local", "unstructured_api", @@ -114,7 +122,7 @@ def create_ingestion_provider( ) return UnstructuredIngestionProvider( - unstructured_ingestion_config, + unstructured_ingestion_config, database_provider, llm_provider ) else: raise ValueError( @@ -217,10 +225,12 @@ def create_llm_provider( @staticmethod async def create_email_provider( email_config: Optional[EmailConfig] = None, *args, **kwargs - ) -> Optional[Union[AsyncSMTPEmailProvider, ConsoleMockEmailProvider]]: + ) -> Union[AsyncSMTPEmailProvider, ConsoleMockEmailProvider]: """Creates an email provider based on configuration.""" if not email_config: - return None + raise ValueError( + f"No email configuration provided for email provider, please add `[email]` to your `r2r.toml`." + ) if email_config.provider == "smtp": return AsyncSMTPEmailProvider(email_config) @@ -263,21 +273,14 @@ async def create_providers( self.config.embedding, *args, **kwargs ) ) - ingestion_provider = ( - ingestion_provider_override - or self.create_ingestion_provider( - self.config.ingestion, *args, **kwargs - ) - ) - llm_provider = llm_provider_override or self.create_llm_provider( self.config.completion, *args, **kwargs ) + crypto_provider = ( crypto_provider_override or self.create_crypto_provider(self.config.crypto, *args, **kwargs) ) - database_provider = ( database_provider_override or await self.create_database_provider( @@ -285,6 +288,17 @@ async def create_providers( ) ) + ingestion_provider = ( + ingestion_provider_override + or self.create_ingestion_provider( + self.config.ingestion, + database_provider, + llm_provider, + *args, + **kwargs, + ) + ) + email_provider = ( email_provider_override or await self.create_email_provider( diff --git a/py/core/parsers/__init__.py b/py/core/parsers/__init__.py index 0439c320b..2915f1ab2 100644 --- a/py/core/parsers/__init__.py +++ b/py/core/parsers/__init__.py @@ -7,9 +7,10 @@ "AudioParser", "DOCXParser", "ImageParser", - "PDFParser", + "VLMPDFParser", + "BasicPDFParser", "PDFParserUnstructured", - "PDFParserMarker", + "VLMPDFParser", "PPTParser", # Structured parsers "CSVParser", diff --git a/py/core/parsers/media/__init__.py b/py/core/parsers/media/__init__.py index 71c9bf0df..38881e171 100644 --- a/py/core/parsers/media/__init__.py +++ b/py/core/parsers/media/__init__.py @@ -2,10 +2,9 @@ from .docx_parser import DOCXParser from .img_parser import ImageParser from .pdf_parser import ( # type: ignore - PDFParser, - PDFParserMarker, + BasicPDFParser, PDFParserUnstructured, - ZeroxPDFParser, + VLMPDFParser, ) from .ppt_parser import PPTParser @@ -13,9 +12,8 @@ "AudioParser", "DOCXParser", "ImageParser", - "PDFParser", + "VLMPDFParser", + "BasicPDFParser", "PDFParserUnstructured", - "ZeroxPDFParser", - "PDFParserMarker", "PPTParser", ] diff --git a/py/core/parsers/media/audio_parser.py b/py/core/parsers/media/audio_parser.py index a8026c1af..1d4421d37 100644 --- a/py/core/parsers/media/audio_parser.py +++ b/py/core/parsers/media/audio_parser.py @@ -1,35 +1,81 @@ +import base64 +import logging import os +import tempfile from typing import AsyncGenerator from core.base.parsers.base_parser import AsyncParser -from core.parsers.media.openai_helpers import process_audio_with_openai +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) + +logger = logging.getLogger() class AudioParser(AsyncParser[bytes]): - """A parser for audio data.""" + """A parser for audio data using Whisper transcription.""" def __init__( - self, api_base: str = "https://api.openai.com/v1/audio/transcriptions" + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, ): - self.api_base = api_base - self.openai_api_key = os.environ.get("OPENAI_API_KEY") + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + try: + from litellm import atranscription + + self.atranscription = atranscription + except ImportError: + logger.error("Failed to import LiteLLM transcription") + raise ImportError( + "Please install the `litellm` package to use the AudioParser." + ) async def ingest( # type: ignore - self, data: bytes, chunk_size: int = 1024, *args, **kwargs + self, data: bytes, **kwargs ) -> AsyncGenerator[str, None]: - """Ingest audio data and yield a transcription.""" - temp_audio_path = "temp_audio.wav" - with open(temp_audio_path, "wb") as f: - f.write(data) + """ + Ingest audio data and yield a transcription using Whisper via LiteLLM. + + Args: + data: Raw audio bytes + chunk_size: Size of text chunks to yield + model: The model to use for transcription (default is whisper-1) + *args, **kwargs: Additional arguments passed to the transcription call + + Yields: + Chunks of transcribed text + """ try: - transcription_text = process_audio_with_openai( - open(temp_audio_path, "rb"), self.openai_api_key # type: ignore + # Create a temporary file to store the audio data + with tempfile.NamedTemporaryFile( + suffix=".wav", delete=False + ) as temp_file: + temp_file.write(data) + temp_file_path = temp_file.name + + # Call Whisper transcription + response = await self.atranscription( + model=self.config.audio_transcription_model, + file=open(temp_file_path, "rb"), + **kwargs, ) - # split text into small chunks and yield them - for i in range(0, len(transcription_text), chunk_size): - text = transcription_text[i : i + chunk_size] - if text and text != "": - yield text + # The response should contain the transcribed text directly + yield response.text + + except Exception as e: + logger.error(f"Error processing audio with Whisper: {str(e)}") + raise + finally: - os.remove(temp_audio_path) + # Clean up the temporary file + try: + os.unlink(temp_file_path) + except Exception as e: + logger.warning(f"Failed to delete temporary file: {str(e)}") diff --git a/py/core/parsers/media/docx_parser.py b/py/core/parsers/media/docx_parser.py index 21272e1b2..86c242115 100644 --- a/py/core/parsers/media/docx_parser.py +++ b/py/core/parsers/media/docx_parser.py @@ -3,12 +3,26 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class DOCXParser(AsyncParser[DataType]): """A parser for DOCX data.""" - def __init__(self): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + try: from docx import Document diff --git a/py/core/parsers/media/img_parser.py b/py/core/parsers/media/img_parser.py index 206c9160f..d4276b718 100644 --- a/py/core/parsers/media/img_parser.py +++ b/py/core/parsers/media/img_parser.py @@ -1,50 +1,112 @@ import base64 import logging -import os from typing import AsyncGenerator -from core.base.abstractions import DataType +from core.base.abstractions import DataType, GenerationConfig from core.base.parsers.base_parser import AsyncParser -from core.parsers.media.openai_helpers import process_frame_with_openai +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) logger = logging.getLogger() class ImageParser(AsyncParser[DataType]): - """A parser for image data.""" + """A parser for image data using vision models.""" + + DEFAULT_IMG_VISION_PROMPT_NAME = "vision_img" def __init__( self, - model: str = "gpt-4o-mini", - max_tokens: int = 2_048, - api_base: str = "https://api.openai.com/v1/chat/completions", - max_image_size: int = 1 * 1024 * 1024, # 4MB limit + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, ): - self.model = model - self.max_tokens = max_tokens - self.openai_api_key = os.environ.get("OPENAI_API_KEY") - self.api_base = api_base - self.max_image_size = max_image_size + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.vision_prompt_text = None + + try: + from litellm import supports_vision + + self.supports_vision = supports_vision + except ImportError: + logger.error("Failed to import LiteLLM vision support") + raise ImportError( + "Please install the `litellm` package to use the ImageParser." + ) async def ingest( # type: ignore - self, data: DataType, chunk_size: int = 1024, *args, **kwargs + self, data: DataType, **kwargs ) -> AsyncGenerator[str, None]: - """Ingest image data and yield a description.""" - - if isinstance(data, bytes): - # Encode to base64 - data = base64.b64encode(data).decode("utf-8") - - openai_text = process_frame_with_openai( - data, # type: ignore - self.openai_api_key, # type: ignore - self.model, - self.max_tokens, - self.api_base, - ) - - # split text into small chunks and yield them - for i in range(0, len(openai_text), chunk_size): - text = openai_text[i : i + chunk_size] - if text and text != "": - yield text + """ + Ingest image data and yield a description using vision model. + + Args: + data: Image data (bytes or base64 string) + chunk_size: Size of text chunks to yield + *args, **kwargs: Additional arguments passed to the completion call + + Yields: + Chunks of image description text + """ + if not self.vision_prompt_text: + self.vision_prompt_text = await self.database_provider.get_prompt( # type: ignore + prompt_name=self.config.vision_img_prompt_name + or self.DEFAULT_IMG_VISION_PROMPT_NAME + ) + try: + # Verify model supports vision + if not self.supports_vision(model=self.config.vision_img_model): + raise ValueError( + f"Model {self.config.vision_img_model} does not support vision" + ) + + # Encode image data if needed + if isinstance(data, bytes): + image_data = base64.b64encode(data).decode("utf-8") + else: + image_data = data + + # Configure the generation parameters + generation_config = GenerationConfig( + model=self.config.vision_img_model, + stream=False, + ) + + # Prepare message with image + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_data}" + }, + }, + ], + } + ] + + # Get completion from LiteLLM provider + response = await self.llm_provider.aget_completion( + messages=messages, generation_config=generation_config + ) + + # Extract description from response + if response.choices and response.choices[0].message: + content = response.choices[0].message.content + if not content: + raise ValueError("No content in response") + yield content + else: + raise ValueError("No response content") + + except Exception as e: + logger.error(f"Error processing image with vision model: {str(e)}") + raise diff --git a/py/core/parsers/media/openai_helpers.py b/py/core/parsers/media/openai_helpers.py deleted file mode 100644 index 729426a63..000000000 --- a/py/core/parsers/media/openai_helpers.py +++ /dev/null @@ -1,63 +0,0 @@ -"""Implementations of parsers for different data types.""" - -import logging - -import requests - -logger = logging.getLogger() - - -def process_frame_with_openai( - data: bytes, - api_key: str, - model: str = "gpt-4o", - max_tokens: int = 2_048, - api_base: str = "https://api.openai.com/v1/chat/completions", -) -> str: - headers = { - "Content-Type": "application/json", - "Authorization": f"Bearer {api_key}", - } - - payload = { - "model": model, - "messages": [ - { - "role": "user", - "content": [ - { - "type": "text", - "text": "First, provide a title for the image, then explain everything that you see. Be very thorough in your analysis as a user will need to understand the image without seeing it. If it is possible to transcribe the image to text directly, then do so. The more detail you provide, the better the user will understand the image.", - }, - { - "type": "image_url", - "image_url": {"url": f"data:image/jpeg;base64,{data}"}, # type: ignore - }, - ], - } - ], - "max_tokens": max_tokens, - } - - response = requests.post(api_base, headers=headers, json=payload) - response_json = response.json() - return response_json["choices"][0]["message"]["content"] - - -def process_audio_with_openai( - audio_file, - api_key: str, - audio_api_base: str = "https://api.openai.com/v1/audio/transcriptions", -) -> str: - headers = {"Authorization": f"Bearer {api_key}"} - - transcription_response = requests.post( - audio_api_base, - headers=headers, - files={"file": audio_file}, - data={"model": "whisper-1"}, - ) - - transcription = transcription_response.json() - - return transcription["text"] diff --git a/py/core/parsers/media/pdf_parser.py b/py/core/parsers/media/pdf_parser.py index 2372c7fc8..61ec1a11b 100644 --- a/py/core/parsers/media/pdf_parser.py +++ b/py/core/parsers/media/pdf_parser.py @@ -1,5 +1,6 @@ # type: ignore import asyncio +import base64 import logging import os import string @@ -7,17 +8,219 @@ from io import BytesIO from typing import AsyncGenerator -from core.base.abstractions import DataType +import aiofiles +from pdf2image import convert_from_path + +from core.base.abstractions import DataType, GenerationConfig from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) logger = logging.getLogger() -ZEROX_DEFAULT_MODEL = "openai/gpt-4o-mini" -class PDFParser(AsyncParser[DataType]): +class VLMPDFParser(AsyncParser[DataType]): + """A parser for PDF documents using vision models for page processing.""" + + DEFAULT_PDF_VISION_PROMPT_NAME = "vision_pdf" + + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + self.vision_prompt_text = None + + try: + from litellm import supports_vision + + self.supports_vision = supports_vision + except ImportError: + logger.error("Failed to import LiteLLM vision support") + raise ImportError( + "Please install the `litellm` package to use the VLMPDFParser." + ) + + async def convert_pdf_to_images( + self, pdf_path: str, temp_dir: str + ) -> list[str]: + """Convert PDF pages to images asynchronously.""" + options = { + "pdf_path": pdf_path, + "output_folder": temp_dir, + "dpi": 300, # Configurable via config if needed + "fmt": "jpeg", + "thread_count": 4, + "paths_only": True, + } + try: + image_paths = await asyncio.to_thread(convert_from_path, **options) + return image_paths + except Exception as err: + logger.error(f"Error converting PDF to images: {err}") + raise + + async def process_page( + self, image_path: str, page_num: int + ) -> dict[str, str]: + """Process a single PDF page using the vision model.""" + + try: + # Read and encode image + async with aiofiles.open(image_path, "rb") as image_file: + image_data = await image_file.read() + image_base64 = base64.b64encode(image_data).decode("utf-8") + + # Verify model supports vision + if not self.supports_vision(model=self.config.vision_pdf_model): + raise ValueError( + f"Model {self.config.vision_pdf_model} does not support vision" + ) + + # Configure generation parameters + generation_config = GenerationConfig( + model=self.config.vision_pdf_model, + stream=False, + ) + + # Prepare message with image + messages = [ + { + "role": "user", + "content": [ + {"type": "text", "text": self.vision_prompt_text}, + { + "type": "image_url", + "image_url": { + "url": f"data:image/jpeg;base64,{image_base64}" + }, + }, + ], + } + ] + + # Get completion from LiteLLM provider + response = await self.llm_provider.aget_completion( + messages=messages, generation_config=generation_config + ) + + if response.choices and response.choices[0].message: + content = response.choices[0].message.content + if not content: + raise ValueError("No content in response") + return {"page": str(page_num), "content": content} + else: + raise ValueError("No response content") + + except Exception as e: + logger.error( + f"Error processing page {page_num} with vision model: {str(e)}" + ) + raise + + async def ingest( + self, data: DataType, maintain_order: bool = False, **kwargs + ) -> AsyncGenerator[dict[str, str], None]: + """ + Ingest PDF data and yield descriptions for each page using vision model. + + Args: + data: PDF file path or bytes + maintain_order: If True, yields results in page order. If False, yields as completed. + **kwargs: Additional arguments passed to the completion call + + Yields: + Dict containing page number and content for each processed page + """ + if not self.vision_prompt_text: + self.vision_prompt_text = await self.database_provider.get_prompt( # type: ignore + prompt_name=self.config.vision_pdf_prompt_name + or self.DEFAULT_PDF_VISION_PROMPT_NAME + ) + + temp_dir = None + try: + # Create temporary directory for image processing + temp_dir = os.path.join(os.getcwd(), "temp_pdf_images") + os.makedirs(temp_dir, exist_ok=True) + + # Handle both file path and bytes input + if isinstance(data, bytes): + pdf_path = os.path.join(temp_dir, "temp.pdf") + async with aiofiles.open(pdf_path, "wb") as f: + await f.write(data) + else: + pdf_path = data + + # Convert PDF to images + image_paths = await self.convert_pdf_to_images(pdf_path, temp_dir) + # Create tasks for all pages + tasks = { + asyncio.create_task( + self.process_page(image_path, page_num) + ): page_num + for page_num, image_path in enumerate(image_paths, 1) + } + + if maintain_order: + # Store results in order + pending = set(tasks.keys()) + results = {} + next_page = 1 + + while pending: + # Get next completed task + done, pending = await asyncio.wait( + pending, return_when=asyncio.FIRST_COMPLETED + ) + + # Process completed tasks + for task in done: + result = await task + page_num = int(result["page"]) + results[page_num] = result + + # Yield results in order + while next_page in results: + yield results.pop(next_page)["content"] + next_page += 1 + else: + # Yield results as they complete + for coro in asyncio.as_completed(tasks.keys()): + result = await coro + yield result["content"] + + except Exception as e: + logger.error(f"Error processing PDF: {str(e)}") + raise + + finally: + # Cleanup temporary files + if temp_dir and os.path.exists(temp_dir): + for file in os.listdir(temp_dir): + os.remove(os.path.join(temp_dir, file)) + os.rmdir(temp_dir) + + +class BasicPDFParser(AsyncParser[DataType]): """A parser for PDF data.""" - def __init__(self): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config try: from pypdf import PdfReader @@ -65,54 +268,16 @@ async def ingest( yield page_text -class PDFParserSix(AsyncParser[DataType]): - """A parser for PDF data.""" - - def __init__(self): - try: - from pdfminer.high_level import extract_text_to_fp - from pdfminer.layout import LAParams - - self.extract_text_to_fp = extract_text_to_fp - self.LAParams = LAParams - except ImportError: - raise ValueError( - "Error, `pdfminer.six` is required to run `PDFParser`. Please install it using `pip install pdfminer.six`." - ) - - async def ingest(self, data: bytes, **kwargs) -> AsyncGenerator[str, None]: - """Ingest PDF data and yield text from each page.""" - if not isinstance(data, bytes): - raise ValueError("PDF data must be in bytes format.") - - pdf_file = BytesIO(data) - - async def process_page(page_number): - output = BytesIO() - await asyncio.to_thread( - self.extract_text_to_fp, - pdf_file, - output, - page_numbers=[page_number], - laparams=self.LAParams(), - ) - page_text = output.getvalue().decode("utf-8") - return "".join(filter(lambda x: x in string.printable, page_text)) - - from pdfminer.pdfdocument import PDFDocument - from pdfminer.pdfparser import PDFParser as pdfminer_PDFParser - - parser = pdfminer_PDFParser(pdf_file) - document = PDFDocument(parser) - - for page_number in range(len(list(document.get_pages()))): - page_text = await process_page(page_number) - if page_text: - yield page_text - - class PDFParserUnstructured(AsyncParser[DataType]): - def __init__(self): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config try: from unstructured.partition.pdf import partition_pdf @@ -141,79 +306,3 @@ async def ingest( ) for element in elements: yield element.text - - -class PDFParserMarker(AsyncParser[DataType]): - model_refs = None - - def __init__(self): - try: - from marker.convert import convert_single_pdf - from marker.models import load_all_models - - self.convert_single_pdf = convert_single_pdf - if PDFParserMarker.model_refs is None: - PDFParserMarker.model_refs = load_all_models() - - except ImportError as e: - raise ValueError( - f"Error, marker is not installed {e}, please install using `pip install marker-pdf` " - ) - - async def ingest( - self, data: DataType, **kwargs - ) -> AsyncGenerator[str, None]: - if isinstance(data, str): - raise ValueError("PDF data must be in bytes format.") - - text, _, _ = self.convert_single_pdf( - BytesIO(data), PDFParserMarker.model_refs - ) - yield text - - -class ZeroxPDFParser(AsyncParser[DataType]): - """An advanced PDF parser using zerox.""" - - def __init__(self): - """ - Use the zerox library to parse PDF data. - - Args: - cleanup (bool, optional): Whether to clean up temporary files after processing. Defaults to True. - concurrency (int, optional): The number of concurrent processes to run. Defaults to 10. - file_data (Optional[str], optional): The file data to process. Defaults to an empty string. - maintain_format (bool, optional): Whether to maintain the format from the previous page. Defaults to False. - model (str, optional): The model to use for generating completions. Defaults to "gpt-4o-mini". Refer to LiteLLM Providers for the correct model name, as it may differ depending on the provider. - temp_dir (str, optional): The directory to store temporary files, defaults to some named folder in system's temp directory. If already exists, the contents will be deleted before zerox uses it. - custom_system_prompt (str, optional): The system prompt to use for the model, this overrides the default system prompt of zerox.Generally it is not required unless you want some specific behaviour. When set, it will raise a friendly warning. Defaults to None. - kwargs (dict, optional): Additional keyword arguments to pass to the litellm.completion method. Refer to the LiteLLM Documentation and Completion Input for details. - - """ - try: - # from pyzerox import zerox - from .pyzerox import zerox - - self.zerox = zerox - - except ImportError as e: - raise ValueError( - f"Error, zerox installation failed with Error='{e}', please install through the R2R ingestion bundle with `pip install r2r -E ingestion-bundle` " - ) - - async def ingest( - self, data: DataType, **kwargs - ) -> AsyncGenerator[str, None]: - if isinstance(data, str): - raise ValueError("PDF data must be in bytes format.") - - model = kwargs.get("zerox_parsing_model", ZEROX_DEFAULT_MODEL) - model = model.split("/")[-1] # remove the provider prefix - result = await self.zerox( - file_data=data, - model=model, - verbose=True, - ) - - for page in result.pages: - yield page.content diff --git a/py/core/parsers/media/ppt_parser.py b/py/core/parsers/media/ppt_parser.py index 5f19a0171..6fa8f52e9 100644 --- a/py/core/parsers/media/ppt_parser.py +++ b/py/core/parsers/media/ppt_parser.py @@ -3,12 +3,25 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class PPTParser(AsyncParser[DataType]): """A parser for PPT data.""" - def __init__(self): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config try: from pptx import Presentation diff --git a/py/core/parsers/media/pyzerox/__init__.py b/py/core/parsers/media/pyzerox/__init__.py deleted file mode 100644 index 18cd95ac3..000000000 --- a/py/core/parsers/media/pyzerox/__init__.py +++ /dev/null @@ -1,10 +0,0 @@ -from .constants.prompts import Prompts -from .zerox_core import zerox - -DEFAULT_SYSTEM_PROMPT = Prompts.DEFAULT_SYSTEM_PROMPT - -__all__ = [ - "zerox", - "Prompts", - "DEFAULT_SYSTEM_PROMPT", -] diff --git a/py/core/parsers/media/pyzerox/constants/__init__.py b/py/core/parsers/media/pyzerox/constants/__init__.py deleted file mode 100644 index 4378b38e0..000000000 --- a/py/core/parsers/media/pyzerox/constants/__init__.py +++ /dev/null @@ -1,9 +0,0 @@ -from .conversion import PDFConversionDefaultOptions -from .messages import Messages -from .prompts import Prompts - -__all__ = [ - "PDFConversionDefaultOptions", - "Messages", - "Prompts", -] diff --git a/py/core/parsers/media/pyzerox/constants/conversion.py b/py/core/parsers/media/pyzerox/constants/conversion.py deleted file mode 100644 index 4320e3484..000000000 --- a/py/core/parsers/media/pyzerox/constants/conversion.py +++ /dev/null @@ -1,8 +0,0 @@ -class PDFConversionDefaultOptions: - """Default options for converting PDFs to images""" - - DPI = 300 - FORMAT = "png" - SIZE = (None, 1056) - THREAD_COUNT = 4 - USE_PDFTOCAIRO = True diff --git a/py/core/parsers/media/pyzerox/constants/messages.py b/py/core/parsers/media/pyzerox/constants/messages.py deleted file mode 100644 index ffa3f68ec..000000000 --- a/py/core/parsers/media/pyzerox/constants/messages.py +++ /dev/null @@ -1,56 +0,0 @@ -class Messages: - """User-facing messages""" - - MISSING_ENVIRONMENT_VARIABLES = """ - Required environment variable (keys) from the model are Missing. Please set the required environment variables for the model provider. - Refer: https://docs.litellm.ai/docs/providers - """ - - NON_VISION_MODEL = """ - The provided model is not a vision model. Please provide a vision model. - """ - - MODEL_ACCESS_ERROR = """ - Your provided model can't be accessed. Please make sure you have access to the model and also required environment variables are setup correctly including valid api key(s). - Refer: https://docs.litellm.ai/docs/providers - """ - - CUSTOM_SYSTEM_PROMPT_WARNING = """ - Custom system prompt was provided which overrides the default system prompt. We assume that you know what you are doing. - """ - - MAINTAIN_FORMAT_SELECTED_PAGES_WARNING = """ - The maintain_format flag is set to True in conjunction with select_pages input given. This may result in unexpected behavior. - """ - - PAGE_NUMBER_OUT_OF_BOUND_ERROR = """ - The page number(s) provided is out of bound. Please provide a valid page number(s). - """ - - NON_200_RESPONSE = """ - Model API returned status code {status_code}: {data} - - Please check the litellm documentation for more information. https://docs.litellm.ai/docs/exception_mapping. - """ - - COMPLETION_ERROR = """ - Error in Completion Response. Error: {0} - Please check the status of your model provider API status. - """ - - PDF_CONVERSION_FAILED = """ - Error during PDF conversion: {0} - Please check the PDF file and try again. For more information: https://github.com/Belval/pdf2image - """ - - FILE_UNREACHAGBLE = """ - File not found or unreachable. Status Code: {0} - """ - - FILE_PATH_MISSING = """ - File path is invalid or missing. - """ - - FAILED_TO_SAVE_FILE = """Failed to save file to local drive""" - - FAILED_TO_PROCESS_IMAGE = """Failed to process image""" diff --git a/py/core/parsers/media/pyzerox/constants/patterns.py b/py/core/parsers/media/pyzerox/constants/patterns.py deleted file mode 100644 index 6be1a77e1..000000000 --- a/py/core/parsers/media/pyzerox/constants/patterns.py +++ /dev/null @@ -1,6 +0,0 @@ -class Patterns: - """Regex patterns for markdown and code blocks""" - - MATCH_MARKDOWN_BLOCKS = r"^```[a-z]*\n([\s\S]*?)\n```$" - - MATCH_CODE_BLOCKS = r"^```\n([\s\S]*?)\n```$" diff --git a/py/core/parsers/media/pyzerox/constants/prompts.py b/py/core/parsers/media/pyzerox/constants/prompts.py deleted file mode 100644 index a59680a37..000000000 --- a/py/core/parsers/media/pyzerox/constants/prompts.py +++ /dev/null @@ -1,8 +0,0 @@ -class Prompts: - """Class for storing prompts for the Zerox system.""" - - DEFAULT_SYSTEM_PROMPT = """ - Convert the following PDF page to markdown. - Return only the markdown with no explanation text. - Do not exclude any content from the page. - """ diff --git a/py/core/parsers/media/pyzerox/errors/__init__.py b/py/core/parsers/media/pyzerox/errors/__init__.py deleted file mode 100644 index 7fa7bedd3..000000000 --- a/py/core/parsers/media/pyzerox/errors/__init__.py +++ /dev/null @@ -1,21 +0,0 @@ -from .exceptions import ( - FailedToProcessFile, - FailedToSaveFile, - FileUnavailable, - MissingEnvironmentVariables, - ModelAccessError, - NotAVisionModel, - PageNumberOutOfBoundError, - ResourceUnreachableException, -) - -__all__ = [ - "NotAVisionModel", - "ModelAccessError", - "PageNumberOutOfBoundError", - "MissingEnvironmentVariables", - "ResourceUnreachableException", - "FileUnavailable", - "FailedToSaveFile", - "FailedToProcessFile", -] diff --git a/py/core/parsers/media/pyzerox/errors/base.py b/py/core/parsers/media/pyzerox/errors/base.py deleted file mode 100644 index f1e761141..000000000 --- a/py/core/parsers/media/pyzerox/errors/base.py +++ /dev/null @@ -1,21 +0,0 @@ -from typing import Optional - - -class CustomException(Exception): - """ - Base class for custom exceptions - """ - - def __init__( - self, - message: Optional[str] = None, - extra_info: Optional[dict] = None, - ): - self.message = message - self.extra_info = extra_info - super().__init__(self.message) - - def __str__(self): - if self.extra_info: - return f"{self.message} (Extra Info: {self.extra_info})" - return self.message diff --git a/py/core/parsers/media/pyzerox/errors/exceptions.py b/py/core/parsers/media/pyzerox/errors/exceptions.py deleted file mode 100644 index ee90873d6..000000000 --- a/py/core/parsers/media/pyzerox/errors/exceptions.py +++ /dev/null @@ -1,93 +0,0 @@ -from typing import Dict, Optional - -# Package Imports -from ..constants import Messages -from .base import CustomException - - -class MissingEnvironmentVariables(CustomException): - """Exception raised when the model provider environment variables, API key(s) are missing. Refer: https://docs.litellm.ai/docs/providers""" - - def __init__( - self, - message: str = Messages.MISSING_ENVIRONMENT_VARIABLES, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) - - -class NotAVisionModel(CustomException): - """Exception raised when the provided model is not a vision model.""" - - def __init__( - self, - message: str = Messages.NON_VISION_MODEL, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) - - -class ModelAccessError(CustomException): - """Exception raised when the provided model can't be accessed due to incorrect credentials/keys or incorrect environent variables setup.""" - - def __init__( - self, - message: str = Messages.MODEL_ACCESS_ERROR, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) - - -class PageNumberOutOfBoundError(CustomException): - """Exception invalid page number(s) provided.""" - - def __init__( - self, - message: str = Messages.PAGE_NUMBER_OUT_OF_BOUND_ERROR, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) - - -class ResourceUnreachableException(CustomException): - """Exception raised when a resource is unreachable.""" - - def __init__( - self, - message: str = Messages.FILE_UNREACHAGBLE, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) - - -class FileUnavailable(CustomException): - """Exception raised when a file is unavailable.""" - - def __init__( - self, - message: str = Messages.FILE_PATH_MISSING, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) - - -class FailedToSaveFile(CustomException): - """Exception raised when a file fails to save.""" - - def __init__( - self, - message: str = Messages.FAILED_TO_SAVE_FILE, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) - - -class FailedToProcessFile(CustomException): - """Exception raised when a file fails to process.""" - - def __init__( - self, - message: str = Messages.FAILED_TO_PROCESS_IMAGE, - extra_info: Optional[Dict] = None, - ): - super().__init__(message, extra_info) diff --git a/py/core/parsers/media/pyzerox/processor/__init__.py b/py/core/parsers/media/pyzerox/processor/__init__.py deleted file mode 100644 index 1124805e8..000000000 --- a/py/core/parsers/media/pyzerox/processor/__init__.py +++ /dev/null @@ -1,14 +0,0 @@ -from .image import encode_image_to_base64, save_image -from .pdf import convert_pdf_to_images, process_page, process_pages_in_batches -from .text import format_markdown -from .utils import download_file - -__all__ = [ - "save_image", - "encode_image_to_base64", - "convert_pdf_to_images", - "format_markdown", - "download_file", - "process_page", - "process_pages_in_batches", -] diff --git a/py/core/parsers/media/pyzerox/processor/image.py b/py/core/parsers/media/pyzerox/processor/image.py deleted file mode 100644 index 8ad973f4f..000000000 --- a/py/core/parsers/media/pyzerox/processor/image.py +++ /dev/null @@ -1,27 +0,0 @@ -import base64 -import io - -import aiofiles - - -async def encode_image_to_base64(image_path: str) -> str: - """Encode an image to base64 asynchronously.""" - async with aiofiles.open(image_path, "rb") as image_file: - image_data = await image_file.read() - return base64.b64encode(image_data).decode("utf-8") - - -async def save_image(image, image_path: str): - """Save an image to a file asynchronously.""" - # Convert PIL Image to BytesIO object - with io.BytesIO() as buffer: - image.save( - buffer, format=image.format - ) # Save the image to the BytesIO object - image_data = ( - buffer.getvalue() - ) # Get the image data from the BytesIO object - - # Write image data to file asynchronously - async with aiofiles.open(image_path, "wb") as f: - await f.write(image_data) diff --git a/py/core/parsers/media/pyzerox/processor/pdf.py b/py/core/parsers/media/pyzerox/processor/pdf.py deleted file mode 100644 index 5bc874382..000000000 --- a/py/core/parsers/media/pyzerox/processor/pdf.py +++ /dev/null @@ -1,115 +0,0 @@ -import asyncio -import logging -import os -from typing import TYPE_CHECKING, List, Optional, Tuple - -from pdf2image import convert_from_path - -from ..constants import Messages, PDFConversionDefaultOptions - -if TYPE_CHECKING: - from ..zerox_models import litellmmodel - -# Package Imports -from .image import save_image -from .text import format_markdown - - -async def convert_pdf_to_images(local_path: str, temp_dir: str) -> List[str]: - """Converts a PDF file to a series of images in the temp_dir. Returns a list of image paths in page order.""" - options = { - "pdf_path": local_path, - "output_folder": temp_dir, - "dpi": PDFConversionDefaultOptions.DPI, - "fmt": PDFConversionDefaultOptions.FORMAT, - "size": PDFConversionDefaultOptions.SIZE, - "thread_count": PDFConversionDefaultOptions.THREAD_COUNT, - "use_pdftocairo": PDFConversionDefaultOptions.USE_PDFTOCAIRO, - "paths_only": True, - } - - try: - image_paths = await asyncio.to_thread(convert_from_path, **options) - return image_paths - except Exception as err: - logging.error(f"Error converting PDF to images: {err}") - - -async def process_page( - image: str, - model: "litellmmodel", - temp_directory: str = "", - input_token_count: int = 0, - output_token_count: int = 0, - prior_page: str = "", - semaphore: Optional[asyncio.Semaphore] = None, -) -> Tuple[str, int, int, str]: - """Process a single page of a PDF""" - - # If semaphore is provided, acquire it before processing the page - if semaphore: - async with semaphore: - return await process_page( - image, - model, - temp_directory, - input_token_count, - output_token_count, - prior_page, - ) - - image_path = os.path.join(temp_directory, image) - - # Get the completion from LiteLLM - try: - completion = await model.completion( - image_path=image_path, - maintain_format=True, - prior_page=prior_page, - ) - - formatted_markdown = format_markdown(completion.content) - input_token_count += completion.input_tokens - output_token_count += completion.output_tokens - prior_page = formatted_markdown - - return ( - formatted_markdown, - input_token_count, - output_token_count, - prior_page, - ) - - except Exception as error: - logging.error(f"{Messages.FAILED_TO_PROCESS_IMAGE} Error:{error}") - return "", input_token_count, output_token_count, "" - - -async def process_pages_in_batches( - images: List[str], - concurrency: int, - model: "litellmmodel", - temp_directory: str = "", - input_token_count: int = 0, - output_token_count: int = 0, - prior_page: str = "", -): - # Create a semaphore to limit the number of concurrent tasks - semaphore = asyncio.Semaphore(concurrency) - - # Process each page in parallel - tasks = [ - process_page( - image, - model, - temp_directory, - input_token_count, - output_token_count, - prior_page, - semaphore, - ) - for image in images - ] - - # Wait for all tasks to complete - return await asyncio.gather(*tasks) diff --git a/py/core/parsers/media/pyzerox/processor/text.py b/py/core/parsers/media/pyzerox/processor/text.py deleted file mode 100644 index 524033e6e..000000000 --- a/py/core/parsers/media/pyzerox/processor/text.py +++ /dev/null @@ -1,14 +0,0 @@ -import re - -# Package imports -from ..constants.patterns import Patterns - - -def format_markdown(text: str) -> str: - """Format markdown text by removing markdown and code blocks""" - - formatted_markdown = re.sub(Patterns.MATCH_MARKDOWN_BLOCKS, r"\1", text) - formatted_markdown = re.sub( - Patterns.MATCH_CODE_BLOCKS, r"\1", formatted_markdown - ) - return formatted_markdown diff --git a/py/core/parsers/media/pyzerox/processor/utils.py b/py/core/parsers/media/pyzerox/processor/utils.py deleted file mode 100644 index da703240d..000000000 --- a/py/core/parsers/media/pyzerox/processor/utils.py +++ /dev/null @@ -1,52 +0,0 @@ -import os -import re -from typing import Iterable, Optional, Union -from urllib.parse import urlparse - -import aiofiles -import aiohttp -from PyPDF2 import PdfReader, PdfWriter - -from ..constants.messages import Messages - -# Package Imports -from ..errors.exceptions import ( - PageNumberOutOfBoundError, - ResourceUnreachableException, -) - - -async def download_file( - file_path: str, - temp_dir: str, -) -> Optional[str]: - """Downloads a file from a URL or local path to a temporary directory.""" - - local_pdf_path = os.path.join(temp_dir, os.path.basename(file_path)) - if is_valid_url(file_path): - async with aiohttp.ClientSession() as session: - async with session.get(file_path) as response: - if response.status != 200: - raise ResourceUnreachableException() - async with aiofiles.open(local_pdf_path, "wb") as f: - await f.write(await response.read()) - else: - async with ( - aiofiles.open(file_path, "rb") as src, - aiofiles.open(local_pdf_path, "wb") as dst, - ): - await dst.write(await src.read()) - return local_pdf_path - - -def is_valid_url(string: str) -> bool: - """Checks if a string is a valid URL.""" - - try: - result = urlparse(string) - return all([result.scheme, result.netloc]) and result.scheme in [ - "http", - "https", - ] - except ValueError: - return False diff --git a/py/core/parsers/media/pyzerox/zerox_core/__init__.py b/py/core/parsers/media/pyzerox/zerox_core/__init__.py deleted file mode 100644 index 825ed3f77..000000000 --- a/py/core/parsers/media/pyzerox/zerox_core/__init__.py +++ /dev/null @@ -1,5 +0,0 @@ -from .zerox import zerox - -__all__ = [ - "zerox", -] diff --git a/py/core/parsers/media/pyzerox/zerox_core/types.py b/py/core/parsers/media/pyzerox/zerox_core/types.py deleted file mode 100644 index 8474a5524..000000000 --- a/py/core/parsers/media/pyzerox/zerox_core/types.py +++ /dev/null @@ -1,42 +0,0 @@ -from dataclasses import dataclass, field -from typing import Any, Dict, Iterable, List, Optional, Union - - -@dataclass -class ZeroxArgs: - """ - Dataclass to store the arguments for the Zerox class. - """ - - file_path: str - cleanup: bool = True - concurrency: int = 10 - maintain_format: bool = False - model: str = ("gpt-4o-mini",) - output_dir: Optional[str] = None - temp_dir: Optional[str] = None - custom_system_prompt: Optional[str] = None - kwargs: Dict[str, Any] = field(default_factory=dict) - - -@dataclass -class Page: - """ - Dataclass to store the page content. - """ - - content: str - content_length: int - page: int - - -@dataclass -class ZeroxOutput: - """ - Dataclass to store the output of the Zerox class. - """ - - completion_time: float - input_tokens: int - output_tokens: int - pages: List[Page] diff --git a/py/core/parsers/media/pyzerox/zerox_core/zerox.py b/py/core/parsers/media/pyzerox/zerox_core/zerox.py deleted file mode 100644 index b89a20e54..000000000 --- a/py/core/parsers/media/pyzerox/zerox_core/zerox.py +++ /dev/null @@ -1,151 +0,0 @@ -import asyncio -import os -import tempfile -import warnings -from datetime import datetime -from typing import Iterable, List, Optional, Union - -import aiofiles -import aiofiles.os as async_os -import aioshutil as async_shutil - -from ..constants.messages import Messages -from ..errors import FileUnavailable - -# Package Imports -from ..processor import ( - convert_pdf_to_images, - process_page, - process_pages_in_batches, -) -from ..zerox_models import litellmmodel -from .types import Page, ZeroxOutput - - -async def zerox( - cleanup: bool = True, - concurrency: int = 10, - file_data: Optional[bytes] = None, - maintain_format: bool = False, - model: str = "gpt-4o-mini", - temp_dir: Optional[str] = None, - custom_system_prompt: Optional[str] = None, - **kwargs, -) -> ZeroxOutput: - """ - API to perform OCR to markdown using Vision models. - Please setup the environment variables for the model and model provider before using this API. Refer: https://docs.litellm.ai/docs/providers - - :param cleanup: Whether to cleanup the temporary files after processing, defaults to True - :type cleanup: bool, optional - :param concurrency: The number of concurrent processes to run, defaults to 10 - :type concurrency: int, optional - :param file_path: The path or URL to the PDF file to process. - :type file_path: str, optional - :param maintain_format: Whether to maintain the format from the previous page, defaults to False - :type maintain_format: bool, optional - :param model: The model to use for generating completions, defaults to "gpt-4o-mini". Note - Refer: https://docs.litellm.ai/docs/providers to pass correct model name as according to provider it might be different from actual name. - :type model: str, optional - :param temp_dir: The directory to store temporary files, defaults to some named folder in system's temp directory. If already exists, the contents will be deleted for zerox uses it. - :type temp_dir: str, optional - :param custom_system_prompt: The system prompt to use for the model, this overrides the default system prompt of zerox. Generally it is not required unless you want some specific behaviour. When set, it will raise a friendly warning, defaults to None - :type custom_system_prompt: str, optional - - :param kwargs: Additional keyword arguments to pass to the model.completion -> litellm.completion method. Refer: https://docs.litellm.ai/docs/providers and https://docs.litellm.ai/docs/completion/input - :return: The markdown content generated by the model. - """ - - input_token_count = 0 - output_token_count = 0 - prior_page = "" - aggregated_markdown: List[str] = [] - start_time = datetime.now() - # File Data Validators - if not file_data: - raise FileUnavailable() - - # Create an instance of the litellm model interface - vision_model = litellmmodel(model=model, **kwargs) - - # override the system prompt if a custom prompt is provided - if custom_system_prompt: - vision_model.system_prompt = custom_system_prompt - - if temp_dir: - if os.path.exists(temp_dir): - await async_shutil.rmtree(temp_dir) - await async_os.makedirs(temp_dir, exist_ok=True) - - # Create a temporary directory to store the PDF and images - with tempfile.TemporaryDirectory() as temp_dir_: - - if temp_dir: - ## use the user provided temp directory - temp_directory = temp_dir - else: - ## use the system temp directory - temp_directory = temp_dir_ - - local_path = os.path.join(temp_directory, "input.pdf") - async with aiofiles.open(local_path, "wb") as f: - await f.write(file_data) - - # Convert the file to a series of images, below function returns a list of image paths in page order - images = await convert_pdf_to_images( - local_path=local_path, temp_dir=temp_directory - ) - - if maintain_format: - for image in images: - result, input_token_count, output_token_count, prior_page = ( - await process_page( - image, - vision_model, - temp_directory, - input_token_count, - output_token_count, - prior_page, - ) - ) - - if result: - aggregated_markdown.append(result) - else: - results = await process_pages_in_batches( - images, - concurrency, - vision_model, - temp_directory, - input_token_count, - output_token_count, - prior_page, - ) - - aggregated_markdown = [ - result[0] for result in results if isinstance(result[0], str) - ] - - ## add token usage - input_token_count += sum([result[1] for result in results]) - output_token_count += sum([result[2] for result in results]) - - # Cleanup the downloaded PDF file - if cleanup and os.path.exists(temp_directory): - await async_shutil.rmtree(temp_directory) - - # Format JSON response - end_time = datetime.now() - completion_time = (end_time - start_time).total_seconds() * 1000 - - # Default behavior when no is provided - formatted_pages = [ - Page(content=content, page=i + 1, content_length=len(content)) - for i, content in enumerate(aggregated_markdown) - ] - - return ZeroxOutput( - completion_time=completion_time, - input_tokens=input_token_count, - output_tokens=output_token_count, - pages=formatted_pages, - ) diff --git a/py/core/parsers/media/pyzerox/zerox_models/__init__.py b/py/core/parsers/media/pyzerox/zerox_models/__init__.py deleted file mode 100644 index f19d77392..000000000 --- a/py/core/parsers/media/pyzerox/zerox_models/__init__.py +++ /dev/null @@ -1,7 +0,0 @@ -from .modellitellm import litellmmodel -from .types import CompletionResponse - -__all__ = [ - "litellmmodel", - "CompletionResponse", -] diff --git a/py/core/parsers/media/pyzerox/zerox_models/base.py b/py/core/parsers/media/pyzerox/zerox_models/base.py deleted file mode 100644 index 4e85dc344..000000000 --- a/py/core/parsers/media/pyzerox/zerox_models/base.py +++ /dev/null @@ -1,43 +0,0 @@ -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, Dict, Optional, Type, TypeVar - -if TYPE_CHECKING: - from ..zerox_models import CompletionResponse - -T = TypeVar("T", bound="BaseModel") - - -class BaseModel(ABC): - """ - Base class for all models. - """ - - @abstractmethod - async def completion( - self, - ) -> "CompletionResponse": - raise NotImplementedError("Subclasses must implement this method") - - @abstractmethod - def validate_access( - self, - ) -> None: - raise NotImplementedError("Subclasses must implement this method") - - @abstractmethod - def validate_model( - self, - ) -> None: - raise NotImplementedError("Subclasses must implement this method") - - def __init__( - self, - model: Optional[str] = None, - **kwargs, - ): - self.model = model - self.kwargs = kwargs - - ## validations - # self.validate_model() - # self.validate_access() diff --git a/py/core/parsers/media/pyzerox/zerox_models/modellitellm.py b/py/core/parsers/media/pyzerox/zerox_models/modellitellm.py deleted file mode 100644 index 02c5fe792..000000000 --- a/py/core/parsers/media/pyzerox/zerox_models/modellitellm.py +++ /dev/null @@ -1,169 +0,0 @@ -import os -import warnings -from typing import Any, Dict, List, Optional - -import aiohttp -import litellm - -from ..constants.messages import Messages -from ..constants.prompts import Prompts -from ..errors import ( - MissingEnvironmentVariables, - ModelAccessError, - NotAVisionModel, -) -from ..processor.image import encode_image_to_base64 - -# Package Imports -from .base import BaseModel -from .types import CompletionResponse - -DEFAULT_SYSTEM_PROMPT = Prompts.DEFAULT_SYSTEM_PROMPT - - -class litellmmodel(BaseModel): - ## setting the default system prompt - _system_prompt = DEFAULT_SYSTEM_PROMPT - - def __init__( - self, - model: Optional[str] = None, - **kwargs, - ): - """ - Initializes the Litellm model interface. - :param model: The model to use for generating completions, defaults to "gpt-4o-mini". Refer: https://docs.litellm.ai/docs/providers - :type model: str, optional - - :param kwargs: Additional keyword arguments to pass to self.completion -> litellm.completion. Refer: https://docs.litellm.ai/docs/providers and https://docs.litellm.ai/docs/completion/input - """ - super().__init__(model=model, **kwargs) - - ## calling custom methods to validate the environment and model - self.validate_environment() - self.validate_model() - self.validate_access() - - @property - def system_prompt(self) -> str: - """Returns the system prompt for the model.""" - return self._system_prompt - - @system_prompt.setter - def system_prompt(self, prompt: str) -> None: - """ - Sets/overrides the system prompt for the model. - Will raise a friendly warning to notify the user. - """ - warnings.warn( - f"{Messages.CUSTOM_SYSTEM_PROMPT_WARNING}. Default prompt for zerox is:\n {DEFAULT_SYSTEM_PROMPT}" - ) - self._system_prompt = prompt - - ## custom method on top of BaseModel - def validate_environment(self) -> None: - """Validates the environment variables required for the model.""" - env_config = litellm.validate_environment(model=self.model) - - if not env_config["keys_in_environment"]: - raise MissingEnvironmentVariables(extra_info=env_config) - - def validate_model(self) -> None: - """Validates the model to ensure it is a vision model.""" - if not litellm.supports_vision(model=self.model): - raise NotAVisionModel(extra_info={"model": self.model}) - - def validate_access(self) -> None: - """Validates access to the model -> if environment variables are set correctly with correct values.""" - if not litellm.check_valid_key(model=self.model, api_key=None): - raise ModelAccessError(extra_info={"model": self.model}) - - async def completion( - self, - image_path: str, - maintain_format: bool, - prior_page: str, - ) -> CompletionResponse: - """LitellM completion for image to markdown conversion. - - :param image_path: Path to the image file. - :type image_path: str - :param maintain_format: Whether to maintain the format from the previous page. - :type maintain_format: bool - :param prior_page: The markdown content of the previous page. - :type prior_page: str - - :return: The markdown content generated by the model. - """ - messages = await self._prepare_messages( - image_path=image_path, - maintain_format=maintain_format, - prior_page=prior_page, - ) - - try: - response = await litellm.acompletion( - model=self.model, messages=messages, **self.kwargs - ) - - ## completion response - response = CompletionResponse( - content=response["choices"][0]["message"]["content"], - input_tokens=response["usage"]["prompt_tokens"], - output_tokens=response["usage"]["completion_tokens"], - ) - return response - - except Exception as err: - raise Exception(Messages.COMPLETION_ERROR.format(err)) - - async def _prepare_messages( - self, - image_path: str, - maintain_format: bool, - prior_page: str, - ) -> List[Dict[str, Any]]: - """Prepares the messages to send to the LiteLLM Completion API. - - :param image_path: Path to the image file. - :type image_path: str - :param maintain_format: Whether to maintain the format from the previous page. - :type maintain_format: bool - :param prior_page: The markdown content of the previous page. - :type prior_page: str - """ - # Default system message - messages: List[Dict[str, Any]] = [ - { - "role": "system", - "content": self._system_prompt, - }, - ] - - # If content has already been generated, add it to context. - # This helps maintain the same format across pages. - if maintain_format and prior_page: - messages.append( - { - "role": "system", - "content": f'Markdown must maintain consistent formatting with the following page: \n\n """{prior_page}"""', - }, - ) - - # Add Image to request - base64_image = await encode_image_to_base64(image_path) - messages.append( - { - "role": "user", - "content": [ - { - "type": "image_url", - "image_url": { - "url": f"data:image/png;base64,{base64_image}" - }, - }, - ], - } - ) - - return messages diff --git a/py/core/parsers/media/pyzerox/zerox_models/types.py b/py/core/parsers/media/pyzerox/zerox_models/types.py deleted file mode 100644 index 0eea3e2ee..000000000 --- a/py/core/parsers/media/pyzerox/zerox_models/types.py +++ /dev/null @@ -1,12 +0,0 @@ -from dataclasses import dataclass - - -@dataclass -class CompletionResponse: - """ - A class representing the response of a completion. - """ - - content: str - input_tokens: int - output_tokens: int diff --git a/py/core/parsers/structured/csv_parser.py b/py/core/parsers/structured/csv_parser.py index ab1e55e0d..c8418f5a1 100644 --- a/py/core/parsers/structured/csv_parser.py +++ b/py/core/parsers/structured/csv_parser.py @@ -3,12 +3,26 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class CSVParser(AsyncParser[DataType]): """A parser for CSV data.""" - def __init__(self): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + import csv from io import StringIO @@ -29,7 +43,12 @@ async def ingest( class CSVParserAdvanced(AsyncParser[DataType]): """A parser for CSV data.""" - def __init__(self): + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config + import csv from io import StringIO diff --git a/py/core/parsers/structured/json_parser.py b/py/core/parsers/structured/json_parser.py index aedb2482c..1efe29c78 100644 --- a/py/core/parsers/structured/json_parser.py +++ b/py/core/parsers/structured/json_parser.py @@ -5,11 +5,26 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class JSONParser(AsyncParser[DataType]): """A parser for JSON data.""" + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + async def ingest( self, data: DataType, *args, **kwargs ) -> AsyncGenerator[str, None]: diff --git a/py/core/parsers/structured/xlsx_parser.py b/py/core/parsers/structured/xlsx_parser.py index 5237439ea..e06a22d73 100644 --- a/py/core/parsers/structured/xlsx_parser.py +++ b/py/core/parsers/structured/xlsx_parser.py @@ -4,12 +4,25 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class XLSXParser(AsyncParser[DataType]): """A parser for XLSX data.""" - def __init__(self): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config try: from openpyxl import load_workbook @@ -36,7 +49,11 @@ class XLSXParserAdvanced(AsyncParser[DataType]): """A parser for XLSX data.""" # identifies connected components in the excel graph and extracts data from each component - def __init__(self): + def __init__( + self, config: IngestionConfig, llm_provider: CompletionProvider + ): + self.llm_provider = llm_provider + self.config = config try: import networkx as nx import numpy as np diff --git a/py/core/parsers/text/html_parser.py b/py/core/parsers/text/html_parser.py index c2e893120..6f3e146c6 100644 --- a/py/core/parsers/text/html_parser.py +++ b/py/core/parsers/text/html_parser.py @@ -5,11 +5,26 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class HTMLParser(AsyncParser[DataType]): """A parser for HTML data.""" + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + async def ingest( self, data: DataType, *args, **kwargs ) -> AsyncGenerator[str, None]: diff --git a/py/core/parsers/text/md_parser.py b/py/core/parsers/text/md_parser.py index 725ae5724..2a181fbf9 100644 --- a/py/core/parsers/text/md_parser.py +++ b/py/core/parsers/text/md_parser.py @@ -5,12 +5,26 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class MDParser(AsyncParser[DataType]): """A parser for Markdown data.""" - def __init__(self): + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + import markdown self.markdown = markdown diff --git a/py/core/parsers/text/text_parser.py b/py/core/parsers/text/text_parser.py index da72ea85f..791f0783c 100644 --- a/py/core/parsers/text/text_parser.py +++ b/py/core/parsers/text/text_parser.py @@ -3,11 +3,26 @@ from core.base.abstractions import DataType from core.base.parsers.base_parser import AsyncParser +from core.base.providers import ( + CompletionProvider, + DatabaseProvider, + IngestionConfig, +) class TextParser(AsyncParser[DataType]): """A parser for raw text data.""" + def __init__( + self, + config: IngestionConfig, + database_provider: DatabaseProvider, + llm_provider: CompletionProvider, + ): + self.database_provider = database_provider + self.llm_provider = llm_provider + self.config = config + async def ingest( self, data: DataType, *args, **kwargs ) -> AsyncGenerator[DataType, None]: diff --git a/py/core/providers/auth/r2r_auth.py b/py/core/providers/auth/r2r_auth.py index 5babfa508..3c2fb5642 100644 --- a/py/core/providers/auth/r2r_auth.py +++ b/py/core/providers/auth/r2r_auth.py @@ -306,7 +306,7 @@ async def request_password_reset(self, email: str) -> Dict[str, str]: ) # TODO: Integrate with email provider to send reset link - await self.email_provider.send_reset_email(email, reset_token) + await self.email_provider.send_password_reset_email(email, reset_token) return {"message": "If the email exists, a reset link has been sent"} diff --git a/py/core/providers/database/prompts/vision_img.yaml b/py/core/providers/database/prompts/vision_img.yaml new file mode 100644 index 000000000..4a1aa4777 --- /dev/null +++ b/py/core/providers/database/prompts/vision_img.yaml @@ -0,0 +1,4 @@ +vision_img: + template: > + First, provide a title for the image, then explain everything that you see. Be very thorough in your analysis as a user will need to understand the image without seeing it. If it is possible to transcribe the image to text directly, then do so. The more detail you provide, the better the user will understand the image. + input_types: {} diff --git a/py/core/providers/database/prompts/vision_pdf.yaml b/py/core/providers/database/prompts/vision_pdf.yaml new file mode 100644 index 000000000..350ead2d9 --- /dev/null +++ b/py/core/providers/database/prompts/vision_pdf.yaml @@ -0,0 +1,42 @@ +vision_pdf: + template: > + Convert this PDF page to markdown format, preserving all content and formatting. Follow these guidelines: + + Text: + - Maintain the original text hierarchy (headings, paragraphs, lists) + - Preserve any special formatting (bold, italic, underline) + - Include all footnotes, citations, and references + - Keep text in its original reading order + + Tables: + - Recreate tables using markdown table syntax + - Preserve all headers, rows, and columns + - Maintain alignment and formatting where possible + - Include any table captions or notes + + Equations: + - Convert mathematical equations using LaTeX notation + - Preserve equation numbers if present + - Include any surrounding context or references + + Images: + - Enclose image descriptions within [FIG] and [/FIG] tags + - Include detailed descriptions of: + * Main subject matter + * Text overlays or captions + * Charts, graphs, or diagrams + * Relevant colors, patterns, or visual elements + - Maintain image placement relative to surrounding text + + Additional Elements: + - Include page numbers if visible + - Preserve headers and footers + - Maintain sidebars or callout boxes + - Keep any special symbols or characters + + Quality Requirements: + - Ensure 100% content preservation + - Maintain logical document flow + - Verify all markdown syntax is valid + - Double-check completeness before submitting + input_types: {} diff --git a/py/core/providers/email/smtp.py b/py/core/providers/email/smtp.py index fecbe0008..da8bf67cd 100644 --- a/py/core/providers/email/smtp.py +++ b/py/core/providers/email/smtp.py @@ -35,7 +35,9 @@ def __init__(self, config: EmailConfig): if not self.smtp_password: raise ValueError("SMTP password is required") - self.from_email = config.from_email or os.getenv("R2R_FROM_EMAIL") + self.from_email: Optional[str] = config.from_email or os.getenv( + "R2R_FROM_EMAIL" + ) if not self.from_email: raise ValueError("From email is required") @@ -52,8 +54,8 @@ async def send_email( html_body: Optional[str] = None, ) -> None: msg = MIMEMultipart("alternative") - msg["Subject"] = subject - msg["From"] = self.from_email + msg["Subject"] = subject # type: ignore + msg["From"] = self.from_email # type: ignore msg["To"] = to_email msg.attach(MIMEText(body, "plain")) @@ -63,7 +65,7 @@ async def send_email( try: smtp = SMTP( hostname=self.smtp_server, - port=self.smtp_port, + port=int(self.smtp_port) if self.smtp_port else None, use_tls=self.use_tls, ) @@ -84,9 +86,9 @@ async def send_verification_email( subject = "Verify Your Email Address" body = f""" Thank you for registering! Please verify your email address by entering the following code: - + {verification_code} - + This code will expire in 24 hours. """ html_body = f""" @@ -103,9 +105,9 @@ async def send_password_reset_email( subject = "Password Reset Request" body = f""" We received a request to reset your password. Use the following code to reset your password: - + {reset_token} - + This code will expire in 1 hour. If you didn't request this reset, please ignore this email. """ html_body = f""" diff --git a/py/core/providers/ingestion/r2r/base.py b/py/core/providers/ingestion/r2r/base.py index 2632644dc..5334ad3b0 100644 --- a/py/core/providers/ingestion/r2r/base.py +++ b/py/core/providers/ingestion/r2r/base.py @@ -19,6 +19,9 @@ from core.base.abstractions import DocumentExtraction from core.utils import generate_extraction_id +from ...database import PostgresDBProvider +from ...llm import LiteLLMCompletionProvider, OpenAICompletionProvider + logger = logging.getLogger() @@ -38,7 +41,7 @@ class R2RIngestionProvider(IngestionProvider): DocumentType.HTM: parsers.HTMLParser, DocumentType.JSON: parsers.JSONParser, DocumentType.MD: parsers.MDParser, - DocumentType.PDF: parsers.PDFParser, + DocumentType.PDF: parsers.VLMPDFParser, DocumentType.PPTX: parsers.PPTParser, DocumentType.TXT: parsers.TextParser, DocumentType.XLSX: parsers.XLSXParser, @@ -47,6 +50,8 @@ class R2RIngestionProvider(IngestionProvider): DocumentType.JPG: parsers.ImageParser, DocumentType.PNG: parsers.ImageParser, DocumentType.SVG: parsers.ImageParser, + DocumentType.WEBP: parsers.ImageParser, + DocumentType.ICO: parsers.ImageParser, DocumentType.MP3: parsers.AudioParser, } @@ -54,23 +59,25 @@ class R2RIngestionProvider(IngestionProvider): DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced}, DocumentType.PDF: { "unstructured": parsers.PDFParserUnstructured, - "zerox": parsers.ZeroxPDFParser, - "marker": parsers.PDFParserMarker, + "basic": parsers.BasicPDFParser, }, DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced}, } - IMAGE_TYPES = { - DocumentType.GIF, - DocumentType.JPG, - DocumentType.JPEG, - DocumentType.PNG, - DocumentType.SVG, - } - - def __init__(self, config: R2RIngestionConfig): - super().__init__(config) + def __init__( + self, + config: R2RIngestionConfig, + database_provider: PostgresDBProvider, + llm_provider: Union[ + LiteLLMCompletionProvider, OpenAICompletionProvider + ], + ): + super().__init__(config, database_provider, llm_provider) self.config: R2RIngestionConfig = config # for type hinting + self.database_provider: PostgresDBProvider = database_provider + self.llm_provider: Union[ + LiteLLMCompletionProvider, OpenAICompletionProvider + ] = llm_provider self.parsers: dict[DocumentType, AsyncParser] = {} self.text_splitter = self._build_text_splitter() self._initialize_parsers() @@ -83,10 +90,18 @@ def _initialize_parsers(self): for doc_type, parser in self.DEFAULT_PARSERS.items(): # will choose the first parser in the list if doc_type not in self.config.excluded_parsers: - self.parsers[doc_type] = parser() + self.parsers[doc_type] = parser( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, + ) for doc_type, doc_parser_name in self.config.extra_parsers.items(): - self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = ( - R2RIngestionProvider.EXTRA_PARSERS[doc_type][doc_parser_name]() + self.parsers[ + f"{doc_parser_name}_{str(doc_type)}" + ] = R2RIngestionProvider.EXTRA_PARSERS[doc_type][doc_parser_name]( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, ) def _build_text_splitter( diff --git a/py/core/providers/ingestion/unstructured/base.py b/py/core/providers/ingestion/unstructured/base.py index a1d57af9e..e296782be 100644 --- a/py/core/providers/ingestion/unstructured/base.py +++ b/py/core/providers/ingestion/unstructured/base.py @@ -6,7 +6,7 @@ import time from copy import copy from io import BytesIO -from typing import Any, AsyncGenerator, Optional +from typing import Any, AsyncGenerator, Optional, Union import httpx from unstructured_client import UnstructuredClient @@ -25,6 +25,9 @@ from core.base.providers.ingestion import IngestionConfig, IngestionProvider from core.utils import generate_extraction_id +from ...database import PostgresDBProvider +from ...llm import LiteLLMCompletionProvider, OpenAICompletionProvider + logger = logging.getLogger() @@ -83,6 +86,7 @@ class UnstructuredIngestionProvider(IngestionProvider): DocumentType.JPG: [parsers.ImageParser], DocumentType.PNG: [parsers.ImageParser], DocumentType.SVG: [parsers.ImageParser], + DocumentType.PDF: [parsers.VLMPDFParser], DocumentType.MP3: [parsers.AudioParser], DocumentType.JSON: [parsers.JSONParser], # type: ignore DocumentType.HTML: [parsers.HTMLParser], # type: ignore @@ -92,24 +96,27 @@ class UnstructuredIngestionProvider(IngestionProvider): EXTRA_PARSERS = { DocumentType.CSV: {"advanced": parsers.CSVParserAdvanced}, # type: ignore DocumentType.PDF: { - "unstructured": parsers.PDFParserUnstructured, - "zerox": parsers.ZeroxPDFParser, - "marker": parsers.PDFParserMarker, + "basic": parsers.BasicPDFParser, }, DocumentType.XLSX: {"advanced": parsers.XLSXParserAdvanced}, # type: ignore } - IMAGE_TYPES = { - DocumentType.GIF, - DocumentType.JPG, - DocumentType.JPEG, - DocumentType.PNG, - DocumentType.SVG, - } - - def __init__(self, config: UnstructuredIngestionConfig): - super().__init__(config) + def __init__( + self, + config: UnstructuredIngestionConfig, + database_provider: PostgresDBProvider, + llm_provider: Union[ + LiteLLMCompletionProvider, OpenAICompletionProvider + ], + ): + + super().__init__(config, database_provider, llm_provider) self.config: UnstructuredIngestionConfig = config + self.database_provider: PostgresDBProvider = database_provider + self.llm_provider: Union[ + LiteLLMCompletionProvider, OpenAICompletionProvider + ] = llm_provider + if config.provider == "unstructured_api": try: self.unstructured_api_auth = os.environ["UNSTRUCTURED_API_KEY"] @@ -142,25 +149,33 @@ def __init__(self, config: UnstructuredIngestionConfig): self.client = httpx.AsyncClient() - super().__init__(config) + super().__init__(config, database_provider, llm_provider) self.parsers: dict[DocumentType, AsyncParser] = {} self._initialize_parsers() def _initialize_parsers(self): - for doc_type, parser_infos in self.R2R_FALLBACK_PARSERS.items(): - for parser_info in parser_infos: + for doc_type, parsers in self.R2R_FALLBACK_PARSERS.items(): + for parser in parsers: if ( doc_type not in self.config.excluded_parsers and doc_type not in self.parsers ): # will choose the first parser in the list - self.parsers[doc_type] = parser_info() + self.parsers[doc_type] = parser( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, + ) # TODO - Reduce code duplication between Unstructured & R2R for doc_type, doc_parser_name in self.config.extra_parsers.items(): - self.parsers[f"{doc_parser_name}_{str(doc_type)}"] = ( - UnstructuredIngestionProvider.EXTRA_PARSERS[doc_type][ - doc_parser_name - ]() + self.parsers[ + f"{doc_parser_name}_{str(doc_type)}" + ] = UnstructuredIngestionProvider.EXTRA_PARSERS[doc_type][ + doc_parser_name + ]( + config=self.config, + database_provider=self.database_provider, + llm_provider=self.llm_provider, ) async def parse_fallback( @@ -213,9 +228,25 @@ async def parse( ) elements = [] + # allow user to re-override places where unstructured is overriden above + # e.g. + # "ingestion_config": { + # ..., + # "parser_overrides": { + # "pdf": "unstructured" + # } + # } + reoverride_with_unst = ( + parser_overrides.get(document.document_type.value, None) + == "unstructured" + ) + # TODO - Cleanup this approach to be less hardcoded # TODO - Remove code duplication between Unstructured & R2R - if document.document_type.value in parser_overrides: + if ( + document.document_type.value in parser_overrides + and not reoverride_with_unst + ): logger.info( f"Using parser_override for {document.document_type} with input value {parser_overrides[document.document_type.value]}" ) @@ -226,7 +257,10 @@ async def parse( ): elements.append(element) - elif document.document_type in self.R2R_FALLBACK_PARSERS.keys(): + elif ( + document.document_type in self.R2R_FALLBACK_PARSERS.keys() + and not reoverride_with_unst + ): logger.info( f"Parsing {document.document_type}: {document.id} with fallback parser" ) diff --git a/py/poetry.lock b/py/poetry.lock index e441531f2..50fc2e603 100644 --- a/py/poetry.lock +++ b/py/poetry.lock @@ -5321,6 +5321,17 @@ notebook = ["ipywidgets (>=6)"] slack = ["slack-sdk"] telegram = ["requests"] +[[package]] +name = "types-aiofiles" +version = "24.1.0.20240626" +description = "Typing stubs for aiofiles" +optional = false +python-versions = ">=3.8" +files = [ + {file = "types-aiofiles-24.1.0.20240626.tar.gz", hash = "sha256:48604663e24bc2d5038eac05ccc33e75799b0779e93e13d6a8f711ddc306ac08"}, + {file = "types_aiofiles-24.1.0.20240626-py3-none-any.whl", hash = "sha256:7939eca4a8b4f9c6491b6e8ef160caee9a21d32e18534a57d5ed90aee47c66b4"}, +] + [[package]] name = "types-requests" version = "2.32.0.20241016" @@ -5858,4 +5869,4 @@ ingestion-bundle = ["aiofiles", "aioshutil", "beautifulsoup4", "bs4", "markdown" [metadata] lock-version = "2.0" python-versions = ">=3.11,<3.13" -content-hash = "076bf72cff07b22d020e62cbe5e477865157be0deba19f817b1b2a41787625df" +content-hash = "fb41515396b9a34291521c668a4d9b889406c781731a00cf6b06ef2e6347b28a" diff --git a/py/pyproject.toml b/py/pyproject.toml index e56393c3e..d8d1866e0 100644 --- a/py/pyproject.toml +++ b/py/pyproject.toml @@ -81,6 +81,7 @@ pypdf2 = { version = "^3.0.1", optional = true } python-pptx = { version = "^1.0.1", optional = true } python-docx = { version = "^1.1.0", optional = true } aiosmtplib = "^3.0.2" +types-aiofiles = "^24.1.0.20240626" [tool.poetry.extras] core = [ diff --git a/py/r2r.toml b/py/r2r.toml index 5ece375f4..6cb95f422 100644 --- a/py/r2r.toml +++ b/py/r2r.toml @@ -88,6 +88,12 @@ chunk_size = 1_024 chunk_overlap = 512 excluded_parsers = ["mp4"] +audio_transcription_model="openai/whisper-1" +vision_img_model = "gpt-4o-mini" +vision_pdf_model = "gpt-4o-mini" +# vision_img_prompt_name = "vision_img" # optional, default is "vision_img" +# vision_pdf_prompt_name = "vision_pdf" # optional, default is "vision_pdf" + [ingestion.chunk_enrichment_settings] enable_chunk_enrichment = false # disabled by default strategies = ["semantic", "neighborhood"] @@ -97,9 +103,6 @@ excluded_parsers = ["mp4"] semantic_similarity_threshold = 0.7 generation_config = { model = "openai/gpt-4o-mini" } - [ingestion.extra_parsers] - pdf = "zerox" - [logging] provider = "r2r" log_table = "logs" diff --git a/py/shared/abstractions/document.py b/py/shared/abstractions/document.py index 04629ffad..7c4daee3b 100644 --- a/py/shared/abstractions/document.py +++ b/py/shared/abstractions/document.py @@ -49,6 +49,8 @@ class DocumentType(str, Enum): TIFF = "tiff" JPG = "jpg" SVG = "svg" + WEBP = "webp" + ICO = "ico" # Markdown MD = "md"