Skip to content

Commit

Permalink
feat: Changes to measure time taken in index (#106)
Browse files Browse the repository at this point in the history
* Changes to measure time taken in index and support for other operations

* Minor docstring fix

* Updated debug log to info for logging timing
  • Loading branch information
chandrasekharan-zipstack authored Oct 1, 2024
1 parent dc91d79 commit 4b44c02
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 48 deletions.
135 changes: 87 additions & 48 deletions src/unstract/sdk/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
from unstract.sdk.exceptions import IndexingError, SdkError
from unstract.sdk.tool.base import BaseTool
from unstract.sdk.utils import ToolUtils
from unstract.sdk.utils.common_utils import log_elapsed
from unstract.sdk.vector_db import VectorDB
from unstract.sdk.x2txt import X2Text

Expand Down Expand Up @@ -104,6 +105,80 @@ def query_index(
finally:
vector_db.close()

@log_elapsed(operation="EXTRACTION")
def extract_text(
self,
x2text_instance_id: str,
file_path: str,
output_file_path: Optional[str] = None,
enable_highlight: bool = False,
usage_kwargs: dict[Any, Any] = {},
process_text: Optional[Callable[[str], str]] = None,
) -> str:
"""Extracts text from a document.
Uses the configured service to perform the extraction
- LLM Whisperer
- Unstructured IO Community / Enterprise
- Llama Parse
Args:
x2text_instance_id (str): UUID of the text extractor
file_path (str): Path to the file
output_file_path (Optional[str], optional): File path to write
the extracted contents into. Defaults to None.
enable_highlight (bool, optional): Flag to provide highlighting metadata.
Defaults to False.
usage_kwargs (dict[Any, Any], optional): Dict to capture usage.
Defaults to {}.
process_text (Optional[Callable[[str], str]], optional): Optional function
to post-process the text. Defaults to None.
Raises:
IndexingError: Errors during text extraction
"""
self.tool.stream_log("Extracting text from input file")
extracted_text = ""
try:
x2text = X2Text(
tool=self.tool,
adapter_instance_id=x2text_instance_id,
usage_kwargs=usage_kwargs,
)
if enable_highlight and isinstance(x2text._x2text_instance, LLMWhisperer):
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
enable_highlight=enable_highlight,
)
whisper_hash_value = process_response.extraction_metadata.whisper_hash

metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value}

self.tool.update_exec_metadata(metadata)

else:
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
)

extracted_text = process_response.extracted_text
except AdapterError as e:
# Wrapping AdapterErrors with SdkError
raise IndexingError(str(e)) from e
if process_text:
try:
result = process_text(extracted_text)
if isinstance(result, str):
extracted_text = result
else:
logger.warning("'process_text' is expected to return an 'str'")
except Exception as e:
logger.error(f"Error occured inside function 'process_text': {e}")
return extracted_text

@log_elapsed(operation="INDEXING(might include EXTRACTION)")
def index(
self,
tool_id: str,
Expand Down Expand Up @@ -207,58 +282,23 @@ def index(
self.tool.stream_log(f"File was indexed already under {doc_id}")
return doc_id

# Extract text and index
self.tool.stream_log("Extracting text from input file")
full_text = []
extracted_text = ""
try:
x2text = X2Text(
tool=self.tool,
adapter_instance_id=x2text_instance_id,
usage_kwargs=usage_kwargs,
)
if enable_highlight and isinstance(
x2text._x2text_instance, LLMWhisperer
):
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
enable_highlight=enable_highlight,
)
whisper_hash_value = (
process_response.extraction_metadata.whisper_hash
)

metadata = {X2TextConstants.WHISPER_HASH: whisper_hash_value}

self.tool.update_exec_metadata(metadata)

else:
process_response: TextExtractionResult = x2text.process(
input_file_path=file_path,
output_file_path=output_file_path,
)
extracted_text = self.extract_text(
x2text_instance_id=x2text_instance_id,
file_path=file_path,
output_file_path=output_file_path,
enable_highlight=enable_highlight,
usage_kwargs=usage_kwargs,
process_text=process_text,
)
if not extracted_text:
raise IndexingError("No text available to index")

extracted_text = process_response.extracted_text
except AdapterError as e:
# Wrapping AdapterErrors with SdkError
raise IndexingError(str(e)) from e
if process_text:
try:
result = process_text(extracted_text)
if isinstance(result, str):
extracted_text = result
except Exception as e:
logger.error(f"Error occured inside function 'process_text': {e}")
full_text.append(
full_text = [
{
"section": "full",
"text_contents": extracted_text,
}
)

if not extracted_text:
raise IndexingError("No text available to index")
]

# Check if chunking is required
documents = []
Expand Down Expand Up @@ -324,7 +364,6 @@ def index(
level=LogLevel.ERROR,
)
raise IndexingError(str(e)) from e
self.tool.stream_log("Added nodes to vector db")

self.tool.stream_log("File has been indexed successfully")
return doc_id
Expand Down
26 changes: 26 additions & 0 deletions src/unstract/sdk/utils/common_utils.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import functools
import logging
import time
import uuid

from unstract.sdk.constants import LogLevel
Expand All @@ -20,3 +22,27 @@ def generate_uuid() -> str:
logging.WARNING: LogLevel.WARN,
logging.ERROR: LogLevel.ERROR,
}


def log_elapsed(operation):
"""Adds an elapsed time log.
Args:
operation (str): Operation being measured
"""

def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = time.time()
try:
result = func(*args, **kwargs)
finally:
end_time = time.time()
elapsed_time = end_time - start_time
logger.info(f"Time taken for '{operation}': {elapsed_time:.3f}s")
return result

return wrapper

return decorator

0 comments on commit 4b44c02

Please sign in to comment.