Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch split task to token based splitting #283

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
918740b
Switch split task to token based splitting
ChrisJar Dec 13, 2024
f45d7c9
Merge branch 'main' into token-split
Dec 18, 2024
c12df54
Move tokenizer out of loop
Dec 18, 2024
c88a4a2
Fix CLI
Dec 18, 2024
0cacc9e
Add chunk_overlap parameter
Jan 8, 2025
2e495b2
Merge in main
Jan 8, 2025
d8a2de9
Fix broken tests
Jan 8, 2025
a21d065
Add chunk overlap
Jan 8, 2025
bc2bf48
Rename nemo document splitter to text splitter
Jan 9, 2025
cd18083
Temp fix
Jan 9, 2025
081dc4e
Merge remote-tracking branch 'upstream/main' into token-split
Jan 9, 2025
bab7f3d
Address reviews
Jan 13, 2025
125bb38
Merge remote-tracking branch 'upstream/main' into token-split
Jan 14, 2025
e09723f
Merge remote-tracking branch 'upstream/main' into token-split
Jan 22, 2025
289998c
Merge remote-tracking branch 'upstream/main' into token-split
Feb 6, 2025
e86c539
Merge remote-tracking branch 'upstream/main' into token-split
Feb 10, 2025
2f4b979
Change default chunk_size to 1024
Feb 10, 2025
b052942
Change default chunk_overlap to 20
Feb 11, 2025
317a426
Pass huggingface access token as param
Feb 12, 2025
e250eff
Add llama license notice
Feb 12, 2025
3c2f246
Add support for filtering by file type
Feb 12, 2025
cf6d125
Merge remote-tracking branch 'upstream/main' into token-split
Feb 12, 2025
6e32a9c
Fix offset mapping
Feb 12, 2025
0825feb
Merge remote-tracking branch 'upstream/main' into token-split
Feb 13, 2025
67b7051
Add built with llama
Feb 19, 2025
b6829ed
Merge upstream/main into token-split
Feb 19, 2025
7634030
Change default chunk_overlap to 150
Feb 19, 2025
66eb642
Merge remote-tracking branch 'upstream/main' into token-split
Feb 20, 2025
d6401bc
Add option to predownload llama tokenizer
Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ FROM $BASE_IMG:$BASE_IMG_TAG AS base
ARG RELEASE_TYPE="dev"
ARG VERSION=""
ARG VERSION_REV="0"
ARG DOWNLOAD_LLAMA_TOKENIZER=""
ARG HF_ACCESS_TOKEN=""

# Embed the `git rev-parse HEAD` as a Docker metadata label
# Allows for linking container builds to git commits
Expand Down Expand Up @@ -71,6 +73,9 @@ WORKDIR /workspace
# Copy custom entrypoint script
COPY ./docker/scripts/entrypoint.sh /workspace/docker/entrypoint.sh

# Copy post build triggers script
COPY ./docker/scripts/post_build_triggers.py /workspace/docker/post_build_triggers.py

FROM base AS nv_ingest_install
# Copy the module code
COPY setup.py setup.py
Expand Down Expand Up @@ -124,6 +129,12 @@ RUN --mount=type=cache,target=/opt/conda/pkgs\
&& pip install ./api/dist/*.whl \
&& pip install ./client/dist/*.whl


RUN --mount=type=cache,target=/opt/conda/pkgs \
--mount=type=cache,target=/root/.cache/pip \
source activate nv_ingest_runtime \
&& python3 /workspace/docker/post_build_triggers.py

RUN rm -rf src

FROM nv_ingest_install AS runtime
Expand Down
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,12 @@ https://pypi.org/project/pdfservices-sdk/
required if you want to use the Adobe extraction service for PDF decomposition. Please review the
[license agreement](https://github.com/adobe/pdfservices-python-sdk?tab=License-1-ov-file) for the
pdfservices-sdk before enabling this option.
- **`DOWNLOAD_LLAMA_TOKENIZER` (Built With Llama):**:
- **Description**: The Split task uses the `meta-llama/Llama-3.2-1B` tokenizer, which will be downloaded
from HuggingFace at build time if `DOWNLOAD_LLAMA_TOKENIZER` is set to `True`. Please review the
[license agreement](https://huggingface.co/meta-llama/Llama-3.2-1B) for Llama 3.2 materials before using this.
This is a gated model so you'll need to [request access](https://huggingface.co/meta-llama/Llama-3.2-1B) and
set `HF_ACCESS_TOKEN` to your HuggingFace access token in order to use it.


### Contributing
Expand Down
68 changes: 26 additions & 42 deletions client/src/nv_ingest_client/primitives/tasks/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,29 +8,19 @@

import logging
from typing import Dict
from typing import Literal
from typing import Optional

from pydantic import BaseModel, field_validator
from pydantic import BaseModel

from .task_base import Task

logger = logging.getLogger(__name__)


class SplitTaskSchema(BaseModel):
split_by: Optional[str] = "sentence"
split_length: Optional[int] = 10
split_overlap: Optional[int] = 0
max_character_length: Optional[int] = 1024
sentence_window_size: Optional[int] = 0

@field_validator("split_by")
def split_by_must_be_valid(cls, v):
valid_criteria = ["page", "size", "word", "sentence"]
if v not in valid_criteria:
raise ValueError(f"split_by must be one of {valid_criteria}")
return v
tokenizer: str = "meta-llama/Llama-3.2-1B"
chunk_size: int = 1024
chunk_overlap: int = 150
params: dict = {}

class Config:
extra = "forbid"
Expand All @@ -41,37 +31,33 @@ class SplitTask(Task):
Object for document splitting task
"""

_TypeSplitBy = Literal["word", "sentence", "passage"]

def __init__(
self,
split_by: _TypeSplitBy = None,
split_length: int = None,
split_overlap: int = None,
max_character_length: int = None,
sentence_window_size: int = None,
tokenizer: str = "meta-llama/Llama-3.2-1B",
chunk_size: int = 1024,
chunk_overlap: int = 150,
params: dict = {},
) -> None:
"""
Setup Split Task Config
"""
super().__init__()
self._split_by = split_by
self._split_length = split_length
self._split_overlap = split_overlap
self._max_character_length = max_character_length
self._sentence_window_size = sentence_window_size
self._tokenizer = tokenizer
self._chunk_size = chunk_size
self._chunk_overlap = chunk_overlap
self._params = params

def __str__(self) -> str:
"""
Returns a string with the object's config and run time state
"""
info = ""
info += "Split Task:\n"
info += f" split_by: {self._split_by}\n"
info += f" split_length: {self._split_length}\n"
info += f" split_overlap: {self._split_overlap}\n"
info += f" split_max_character_length: {self._max_character_length}\n"
info += f" split_sentence_window_size: {self._sentence_window_size}\n"
info += f" tokenizer: {self._tokenizer}\n"
info += f" chunk_size: {self._chunk_size}\n"
info += f" chunk_overlap: {self._chunk_overlap}\n"
for key, value in self._params.items():
info += f" {key}: {value}\n"
return info

def to_dict(self) -> Dict:
Expand All @@ -80,15 +66,13 @@ def to_dict(self) -> Dict:
"""
split_params = {}

if self._split_by is not None:
split_params["split_by"] = self._split_by
if self._split_length is not None:
split_params["split_length"] = self._split_length
if self._split_overlap is not None:
split_params["split_overlap"] = self._split_overlap
if self._max_character_length is not None:
split_params["max_character_length"] = self._max_character_length
if self._sentence_window_size is not None:
split_params["sentence_window_size"] = self._sentence_window_size
if self._tokenizer is not None:
split_params["tokenizer"] = self._tokenizer
if self._chunk_size is not None:
split_params["chunk_size"] = self._chunk_size
if self._chunk_overlap is not None:
split_params["chunk_overlap"] = self._chunk_overlap
if self._params is not None:
split_params["params"] = self._params

return {"type": "split", "task_properties": split_params}
3 changes: 3 additions & 0 deletions docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,9 @@ services:
context: ${NV_INGEST_ROOT:-.}
dockerfile: "./Dockerfile"
target: runtime
args:
DOWNLOAD_LLAMA_TOKENIZER: ${DOWNLOAD_LLAMA_TOKENIZER:-False}
HF_ACCESS_TOKEN: ${HF_ACCESS_TOKEN:-hfaccesstoken}
volumes:
- ${DATASET_ROOT:-./data}:/workspace/data
ports:
Expand Down
9 changes: 9 additions & 0 deletions docker/scripts/post_build_triggers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
import os
from transformers import AutoTokenizer

if os.getenv("DOWNLOAD_LLAMA_TOKENIZER") == "True":
tokenizer_path = "/workspace/models/tokenizer/"
os.makedirs(tokenizer_path, exist_ok=True)

tokenizer = AutoTokenizer.from_pretrained("meta-llama/Llama-3.2-1B", token=os.getenv("HF_ACCESS_TOKEN"))
tokenizer.save_pretrained(tokenizer_path)
4 changes: 2 additions & 2 deletions src/nv_ingest/modules/transforms/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,6 @@
# SPDX-License-Identifier: Apache-2.0

from .associate_nearby_text import AssociateNearbyTextLoaderFactory
from .nemo_doc_splitter import NemoDocSplitterLoaderFactory
from .text_splitter import TextSplitterLoaderFactory

__all__ = ["NemoDocSplitterLoaderFactory", "AssociateNearbyTextLoaderFactory"]
__all__ = ["TextSplitterLoaderFactory", "AssociateNearbyTextLoaderFactory"]
165 changes: 165 additions & 0 deletions src/nv_ingest/modules/transforms/text_splitter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,165 @@
# SPDX-FileCopyrightText: Copyright (c) 2024, NVIDIA CORPORATION & AFFILIATES.
# All rights reserved.
# SPDX-License-Identifier: Apache-2.0


import copy
import logging
import traceback
import uuid
from typing import Any
from typing import List

import mrc
import pandas as pd
from transformers import AutoTokenizer
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
from morpheus.utils.control_message_utils import cm_skip_processing_if_failed
from morpheus.utils.module_utils import ModuleLoaderFactory
from morpheus.utils.module_utils import register_module
from mrc.core import operators as ops
from pydantic import BaseModel

import cudf

from nv_ingest.schemas.metadata_schema import ContentTypeEnum
from nv_ingest.schemas.text_splitter_schema import TextSplitterSchema
from nv_ingest.util.exception_handlers.decorators import nv_ingest_node_failure_context_manager
from nv_ingest.util.flow_control import filter_by_task
from nv_ingest.util.modules.config_validator import fetch_and_validate_module_config
from nv_ingest.util.tracing import traceable

logger = logging.getLogger(__name__)


def _build_split_documents(row, chunks: List[str]) -> List[dict[str, Any]]:
"""Build documents from text chunks"""
documents: List[dict] = []

for i, text in enumerate(chunks):
if text is None or not text.strip():
continue

metadata = row.metadata if hasattr(row, "metadata") and isinstance(row.metadata, dict) else {}
metadata = copy.deepcopy(metadata)

metadata["content"] = text

documents.append({"document_type": ContentTypeEnum.TEXT.value, "metadata": metadata, "uuid": str(uuid.uuid4())})

return documents


def _split_into_chunks(text, tokenizer, chunk_size=1024, chunk_overlap=20):
# Tokenize the text into token IDs
encoding = tokenizer.encode_plus(text, add_special_tokens=False, return_offsets_mapping=True)

# Get the token IDs and offsets for splitting
offsets = encoding["offset_mapping"]

# Split the tokens into chunks of the desired size with the desired overlap
chunks = [offsets[i : i + chunk_size] for i in range(0, len(offsets), chunk_size - chunk_overlap)]

# Convert token chunks back to text while preserving original spacing and case
text_chunks = []
for chunk in chunks:
text_chunk = text[chunk[0][0] : chunk[-1][0]]
text_chunks.append(text_chunk)

return text_chunks


MODULE_NAME = "text_splitter"
MODULE_NAMESPACE = "nv_ingest"

TextSplitterLoaderFactory = ModuleLoaderFactory(MODULE_NAME, MODULE_NAMESPACE, TextSplitterSchema)


@register_module(MODULE_NAME, MODULE_NAMESPACE)
def _text_splitter(builder: mrc.Builder):
"""
A pipeline module that splits documents into smaller parts based on the specified criteria.
"""

validated_config = fetch_and_validate_module_config(builder, TextSplitterSchema)

@filter_by_task(["split"])
@traceable(MODULE_NAME)
@cm_skip_processing_if_failed
@nv_ingest_node_failure_context_manager(
annotation_id=MODULE_NAME,
raise_on_failure=validated_config.raise_on_failure,
)
def split_and_forward(message: ControlMessage):
try:
# Assume that df is going to have a 'content' column
task_props = message.remove_task("split")

if isinstance(task_props, BaseModel):
task_props = task_props.model_dump()

# Validate that all 'content' values are not None
with message.payload().mutable_dataframe() as mdf:
df = mdf.to_pandas()

# Filter to document type
bool_index = df["document_type"] == ContentTypeEnum.TEXT
df_filtered = df.loc[bool_index]

if df_filtered.empty:
return message

# Override parameters if set
tokenizer = task_props.get("tokenizer", validated_config.tokenizer)
chunk_size = task_props.get("chunk_size", validated_config.chunk_size)
chunk_overlap = task_props.get("chunk_overlap", validated_config.chunk_overlap)
params = task_props.get("params", {})

hf_access_token = params.get("hf_access_token", None)
split_source_types = params.get("split_source_types", ["TEXT"])

logger.debug(
f"Splitting text with tokenizer: {tokenizer}, "
f"chunk_size: {chunk_size} tokens, "
f"chunk_overlap: {chunk_overlap}"
)

# Filter to file type
bool_index = pd.json_normalize(df_filtered["metadata"])["source_metadata.source_type"].isin(
split_source_types
)
df_filtered = df_filtered.loc[bool_index]

if df_filtered.empty:
return message

tokenizer_model = AutoTokenizer.from_pretrained(tokenizer, token=hf_access_token)

split_docs = []
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Matching documents should be collected and processed all at once as opposed to iterating rows; and we should actually make this a multiprocessing stage so we're able to use the worker pool for CPU bound tasks. But we can hold of if this needs to go in sooner.

for _, row in df_filtered.iterrows():
content = row["metadata"]["content"] if row["metadata"]["content"] is not None else ""

chunks = _split_into_chunks(content, tokenizer_model, chunk_size, chunk_overlap)
split_docs.extend(_build_split_documents(row, chunks))

split_docs_df = pd.DataFrame(split_docs)

# Return both processed text and other document types
split_docs_df = pd.concat([split_docs_df, df[~bool_index]], axis=0).reset_index(drop=True)
# Update control message with new payload
split_docs_gdf = cudf.from_pandas(split_docs_df)

message_meta = MessageMeta(df=split_docs_gdf)
message.payload(message_meta)

return message
except Exception as e:
traceback.print_exc()
raise ValueError(f"Failed to split documents: {e}")

split_node = builder.make_node("split_and_forward", ops.map(split_and_forward))

# Register the input and output of the module
builder.register_module_input("input", split_node)
builder.register_module_output("output", split_node)
4 changes: 2 additions & 2 deletions src/nv_ingest/schemas/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,13 @@
from .message_broker_source_schema import MessageBrokerTaskSourceSchema
from .metadata_injector_schema import MetadataInjectorSchema
from .metadata_schema import validate_metadata
from .nemo_doc_splitter_schema import DocumentSplitterSchema
from .text_splitter_schema import TextSplitterSchema
from .pdf_extractor_schema import PDFExtractorSchema
from .task_injection_schema import TaskInjectionSchema
from .vdb_task_sink_schema import VdbTaskSinkSchema

__all__ = [
"DocumentSplitterSchema",
"TextSplitterSchema",
"ImageCaptionExtractionSchema",
"ImageStorageModuleSchema",
"IngestJobSchema",
Expand Down
20 changes: 9 additions & 11 deletions src/nv_ingest/schemas/ingest_job_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from typing import Any
from typing import Dict
from typing import List
from typing import Literal
from typing import Optional
from typing import Union

Expand Down Expand Up @@ -60,16 +59,15 @@ class TracingOptionsSchema(BaseModelNoExt):


class IngestTaskSplitSchema(BaseModelNoExt):
split_by: Literal["word", "sentence", "passage"]
split_length: Annotated[int, Field(gt=0)]
split_overlap: Annotated[int, Field(ge=0)]
max_character_length: Optional[Annotated[int, Field(gt=0)]] = None
sentence_window_size: Optional[Annotated[int, Field(ge=0)]] = None

@field_validator("sentence_window_size")
def check_sentence_window_size(cls, v, values, **kwargs):
if v is not None and v > 0 and values.data["split_by"] != "sentence":
raise ValueError("When using sentence_window_size, split_by must be 'sentence'.")
tokenizer: str
chunk_size: Annotated[int, Field(gt=0)]
chunk_overlap: Annotated[int, Field(ge=0)]
params: dict

@field_validator("chunk_overlap")
def check_chunk_overlap(cls, v, values, **kwargs):
if v is not None and "chunk_size" in values.data and v >= values.data["chunk_size"]:
raise ValueError("chunk_overlap must be less than chunk_size")
return v


Expand Down
Loading
Loading