Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Switch split task to token based splitting #283

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
918740b
Switch split task to token based splitting
ChrisJar Dec 13, 2024
f45d7c9
Merge branch 'main' into token-split
Dec 18, 2024
c12df54
Move tokenizer out of loop
Dec 18, 2024
c88a4a2
Fix CLI
Dec 18, 2024
0cacc9e
Add chunk_overlap parameter
Jan 8, 2025
2e495b2
Merge in main
Jan 8, 2025
d8a2de9
Fix broken tests
Jan 8, 2025
a21d065
Add chunk overlap
Jan 8, 2025
bc2bf48
Rename nemo document splitter to text splitter
Jan 9, 2025
cd18083
Temp fix
Jan 9, 2025
081dc4e
Merge remote-tracking branch 'upstream/main' into token-split
Jan 9, 2025
bab7f3d
Address reviews
Jan 13, 2025
125bb38
Merge remote-tracking branch 'upstream/main' into token-split
Jan 14, 2025
e09723f
Merge remote-tracking branch 'upstream/main' into token-split
Jan 22, 2025
289998c
Merge remote-tracking branch 'upstream/main' into token-split
Feb 6, 2025
e86c539
Merge remote-tracking branch 'upstream/main' into token-split
Feb 10, 2025
2f4b979
Change default chunk_size to 1024
Feb 10, 2025
b052942
Change default chunk_overlap to 20
Feb 11, 2025
317a426
Pass huggingface access token as param
Feb 12, 2025
e250eff
Add llama license notice
Feb 12, 2025
3c2f246
Add support for filtering by file type
Feb 12, 2025
cf6d125
Merge remote-tracking branch 'upstream/main' into token-split
Feb 12, 2025
6e32a9c
Fix offset mapping
Feb 12, 2025
0825feb
Merge remote-tracking branch 'upstream/main' into token-split
Feb 13, 2025
67b7051
Add built with llama
Feb 19, 2025
b6829ed
Merge upstream/main into token-split
Feb 19, 2025
7634030
Change default chunk_overlap to 150
Feb 19, 2025
66eb642
Merge remote-tracking branch 'upstream/main' into token-split
Feb 20, 2025
d6401bc
Add option to predownload llama tokenizer
Feb 20, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 12 additions & 39 deletions client/src/nv_ingest_client/primitives/tasks/split.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,18 +20,8 @@


class SplitTaskSchema(BaseModel):
split_by: Optional[str] = "sentence"
split_length: Optional[int] = 10
split_overlap: Optional[int] = 0
max_character_length: Optional[int] = 1024
sentence_window_size: Optional[int] = 0

@validator("split_by")
def split_by_must_be_valid(cls, v):
valid_criteria = ["page", "size", "word", "sentence"]
if v not in valid_criteria:
raise ValueError(f"split_by must be one of {valid_criteria}")
return v
split_by: str = "intfloat/e5-large-unsupervised"
chunk_size: int = 300

class Config:
extra = "forbid"
Expand All @@ -42,37 +32,26 @@ class SplitTask(Task):
Object for document splitting task
"""

_TypeSplitBy = Literal["word", "sentence", "passage"]

def __init__(
self,
split_by: _TypeSplitBy = None,
split_length: int = None,
split_overlap: int = None,
max_character_length: int = None,
sentence_window_size: int = None,
tokenizer: str = None,
chunk_size: int = None,
) -> None:
"""
Setup Split Task Config
"""
super().__init__()
self._split_by = split_by
self._split_length = split_length
self._split_overlap = split_overlap
self._max_character_length = max_character_length
self._sentence_window_size = sentence_window_size
self._tokenizer = tokenizer
self._chunk_size = chunk_size

def __str__(self) -> str:
"""
Returns a string with the object's config and run time state
"""
info = ""
info += "Split Task:\n"
info += f" split_by: {self._split_by}\n"
info += f" split_length: {self._split_length}\n"
info += f" split_overlap: {self._split_overlap}\n"
info += f" split_max_character_length: {self._max_character_length}\n"
info += f" split_sentence_window_size: {self._sentence_window_size}\n"
info += f" tokenizer: {self._tokenizer}\n"
info += f" chunk_size: {self._chunk_size}\n"
return info

def to_dict(self) -> Dict:
Expand All @@ -81,15 +60,9 @@ def to_dict(self) -> Dict:
"""
split_params = {}

if self._split_by is not None:
split_params["split_by"] = self._split_by
if self._split_length is not None:
split_params["split_length"] = self._split_length
if self._split_overlap is not None:
split_params["split_overlap"] = self._split_overlap
if self._max_character_length is not None:
split_params["max_character_length"] = self._max_character_length
if self._sentence_window_size is not None:
split_params["sentence_window_size"] = self._sentence_window_size
if self._tokenizer is not None:
split_params["tokenizer"] = self._tokenizer
if self._chunk_size is not None:
split_params["chunk_size"] = self._chunk_size

return {"type": "split", "task_properties": split_params}
119 changes: 34 additions & 85 deletions src/nv_ingest/modules/transforms/nemo_doc_splitter.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# SPDX-License-Identifier: Apache-2.0


import os
import copy
import logging
import traceback
Expand All @@ -13,6 +14,7 @@

import mrc
import pandas as pd
from transformers import AutoTokenizer
from more_itertools import windowed
from morpheus.messages import ControlMessage
from morpheus.messages import MessageMeta
Expand All @@ -33,24 +35,16 @@
logger = logging.getLogger(__name__)


def _build_split_documents(row, text_splits: List[str], sentence_window_size: int) -> List[dict[str, Any]]:
"""Build documents from text splits with window text."""
def _build_split_documents(row, chunks: List[str]) -> List[dict[str, Any]]:
"""Build documents from text chunks"""
documents: List[dict] = []

window_size = sentence_window_size
for i, text in enumerate(text_splits):
for i, text in enumerate(chunks):
if text is None or not text.strip():
continue

metadata = row.metadata if hasattr(row, "metadata") and isinstance(row.metadata, dict) else {}
metadata = copy.deepcopy(metadata)
if window_size > 0:
window_text = "".join(
text_splits[max(0, i - window_size) : min(i + 1 + window_size, len(text_splits))] # noqa: E203
)

metadata["window"] = window_text
metadata["original_text"] = text

metadata["content"] = text

Expand All @@ -59,70 +53,33 @@ def _build_split_documents(row, text_splits: List[str], sentence_window_size: in
return documents


def _split_into_units(text: str, split_by: Literal["word", "sentence", "passage"]) -> List[str]:
if split_by == "passage":
split_at = "\n\n"
elif split_by == "sentence":
split_at = "." # why not ?,!, etc..?
elif split_by == "word":
split_at = " "
else:
raise NotImplementedError("DocumentSplitter only supports 'passage', 'sentence'" " or 'word' split_by options.")
units = text.split(split_at)
# Add the delimiter back to all units except the last one
for i in range(len(units) - 1):
units[i] += split_at
def _split_into_chunks(text, tokenizer, chunk_size=300):
# Tokenize the text into token IDs
encoding = tokenizer.encode_plus(text, add_special_tokens=False, return_offsets_mapping=True)

return units
# Get the token IDs and offsets for splitting
tokens = encoding['input_ids']
offsets = encoding['offset_mapping']

# Split the tokens into chunks of the desired size
chunks = [tokens[i:i + chunk_size] for i in range(0, len(tokens), chunk_size)]

def _concatenate_units(units: List[str], split_length: int, split_overlap: int, max_character_length: int) -> List[str]:
text_splits = []
segments = windowed(units, n=split_length, step=split_length - split_overlap)
for seg in segments:
current_units = [unit for unit in seg if unit is not None]
txt = "".join(current_units)
if max_character_length and len(txt) > max_character_length:
text_splits.extend(_split_long_text(txt, max_character_length))
elif len(txt) > 0:
text_splits.append(txt)
# Convert token chunks back to text while preserving original spacing and case
text_chunks = []
for chunk in chunks:
# Find the start and end offsets for the current chunk
chunk_offsets = offsets[:len(chunk)]
start_offset = chunk_offsets[0][0]
end_offset = chunk_offsets[-1][1]

return text_splits
# Extract the original text for this chunk based on offsets
text_chunk = text[start_offset:end_offset]
text_chunks.append(text_chunk)

# Remove processed offsets for the next iteration
offsets = offsets[len(chunk):]

def _split_long_text(text: str, max_character_length: int) -> List[str]:
"""
Splits a long text into smaller segments that
do not exceed max_character_length.
"""
split_texts = []
while text:
# Take the maximum possible substring without exceeding max_character_length
segment = text[:max_character_length]
split_texts.append(segment)
text = text[max_character_length:] # noqa: E203

return split_texts


def _process_content(row, validated_config):
content = row["metadata"]["content"]

if content is None:
raise ValueError(
"DocumentSplitter only works with text documents but one or more 'content' " "values are None."
)

units = _split_into_units(content, validated_config.split_by)
text_splits = _concatenate_units(
units,
validated_config.split_length,
validated_config.split_overlap,
max_character_length=validated_config.max_character_length,
)
split_docs = _build_split_documents(row, text_splits, sentence_window_size=validated_config.sentence_window_size)

return split_docs
return text_chunks


MODULE_NAME = "nemo_document_splitter"
Expand Down Expand Up @@ -167,16 +124,11 @@ def split_and_forward(message: ControlMessage):
return message

# Override parameters if set
split_by = task_props.get("split_by", validated_config.split_by)
split_length = task_props.get("split_length", validated_config.split_length)
split_overlap = task_props.get("split_overlap", validated_config.split_overlap)
max_character_length = task_props.get("max_character_length", validated_config.max_character_length)
sentence_window_size = task_props.get("sentence_window_size", validated_config.sentence_window_size)
tokenizer = task_props.get("tokenizer", validated_config.tokenizer)
chunk_size = task_props.get("chunk_size", validated_config.chunk_size)

logger.info(
f"Splitting documents with split_by: {split_by}, split_length: {split_length}, "
f"split_overlap: {split_overlap}, max_character_length: {max_character_length}, "
f"sentence_window_size: {sentence_window_size}"
f"Splitting documents with tokenizer: {tokenizer}, chunk_size: {chunk_size} tokens"
)

split_docs = []
Expand All @@ -188,14 +140,11 @@ def split_and_forward(message: ControlMessage):
"DocumentSplitter only works with text documents but one or more " "'content' values are None."
)

units = _split_into_units(content, split_by)
text_splits = _concatenate_units(
units,
split_length,
split_overlap,
max_character_length=max_character_length,
)
split_docs.extend(_build_split_documents(row, text_splits, sentence_window_size=sentence_window_size))
os.environ['TOKENIZERS_PARALLELISM'] = "False"
tokenizer_model = AutoTokenizer.from_pretrained(tokenizer)

chunks = _split_into_chunks(content, tokenizer_model, chunk_size)
split_docs.extend(_build_split_documents(row, chunks))

split_docs_df = pd.DataFrame(split_docs)

Expand Down
13 changes: 2 additions & 11 deletions src/nv_ingest/schemas/ingest_job_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,17 +61,8 @@ class TracingOptionsSchema(BaseModelNoExt):


class IngestTaskSplitSchema(BaseModelNoExt):
split_by: Literal["word", "sentence", "passage"]
split_length: conint(gt=0)
split_overlap: conint(ge=0)
max_character_length: Optional[conint(gt=0)]
sentence_window_size: Optional[conint(ge=0)]

@validator("sentence_window_size")
def check_sentence_window_size(cls, v, values, **kwargs):
if v is not None and v > 0 and values["split_by"] != "sentence":
raise ValueError("When using sentence_window_size, split_by must be 'sentence'.")
return v
tokenizer: str
chunk_size: conint(gt=0)


class IngestTaskExtractSchema(BaseModelNoExt):
Expand Down
17 changes: 2 additions & 15 deletions src/nv_ingest/schemas/nemo_doc_splitter_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,11 @@
# SPDX-License-Identifier: Apache-2.0


from typing import Literal
from typing import Optional

from pydantic import BaseModel
from pydantic import conint
from pydantic import validator


class DocumentSplitterSchema(BaseModel):
split_by: Literal["word", "sentence", "passage"] = "word"
split_length: conint(gt=0) = 60
split_overlap: conint(ge=0) = 10
max_character_length: Optional[conint(gt=0)] = 450
sentence_window_size: Optional[conint(ge=0)] = 0
tokenizer: str = "intfloat/e5-large-unsupervised"
chunk_size: conint(gt=0) = 300
raise_on_failure: bool = False

@validator("sentence_window_size")
def check_sentence_window_size(cls, v, values, **kwargs):
if v is not None and v > 0 and values["split_by"] != "sentence":
raise ValueError("When using sentence_window_size, split_by must be 'sentence'.")
return v
Loading