diff --git a/examples/translation_example.py b/examples/translation_example.py deleted file mode 100644 index d16590166..000000000 --- a/examples/translation_example.py +++ /dev/null @@ -1,377 +0,0 @@ -import os -import re -import time -from dataclasses import dataclass -from functools import lru_cache - -import cudf -import numpy as np -import torch -import torch.nn as nn -from crossfit import op -from crossfit.backend.torch.hf.model import HFModel -from dask.distributed import get_worker -from nltk.tokenize import sent_tokenize -from transformers import AutoConfig, AutoModelForSeq2SeqLM, AutoTokenizer - -from nemo_curator.classifiers.base import DistributedDataClassifier -from nemo_curator.datasets import DocumentDataset -from nemo_curator.utils.distributed_utils import get_client, load_object_on_worker -from nemo_curator.utils.script_utils import ArgumentHelper - -try: - from IndicTransTokenizer import IndicProcessor -except ImportError: - raise ImportError( - "IndicTransTokenizer not found. Please install it using the following command: \n" - + "pip install git+https://github.com/VarunGumma/IndicTransTokenizer.git" - ) - -TERMINAL_PUNCTUATIONS = ( - ".", - "!", - "?", - ":", - ",", - ";", - ")", - "}", - "]", - '"', - "'", - "”", - "’", -) -START_PUNCTUATIONS = ("(", "{", "[", "'", '"', "“", "‘") - - -@dataclass -class TranslationConfig: - pretrained_model_name_or_path: str - max_length: int = 50 - num_beams: int = 5 - autocast: bool = False - max_words_per_sen: int = 200 - - -class CustomModel(nn.Module): - def __init__(self, config: TranslationConfig): - super().__init__() - self.config = config - self.model = AutoModelForSeq2SeqLM.from_pretrained( - pretrained_model_name_or_path=config.pretrained_model_name_or_path, - trust_remote_code=True, - ) - self.autocast = config.autocast - - @torch.no_grad() - def _forward(self, batch: dict) -> torch.Tensor: - return self.model.generate( - **batch, - use_cache=True, - min_length=0, - max_length=self.config.max_length, - num_beams=self.config.num_beams, - num_return_sequences=1, - repetition_penalty=1.2, - ) - - def forward(self, batch: dict) -> torch.Tensor: - if self.autocast: - with torch.autocast(device_type="cuda"): - outputs = self._forward(batch) - else: - outputs = self._forward(batch) - return outputs - - -class ModelForSeq2SeqModel(HFModel): - def __init__(self, config: TranslationConfig): - self.trans_config = config - self.config = self.load_config() - super().__init__(self.trans_config.pretrained_model_name_or_path) - - def load_model(self, device: str = "cuda") -> CustomModel: - model = CustomModel( - self.trans_config, - ) - model = model.to(device) - model.eval() - return model - - def load_config(self) -> AutoConfig: - return AutoConfig.from_pretrained( - pretrained_model_name_or_path=self.trans_config.pretrained_model_name_or_path, - trust_remote_code=True, - ) - - @lru_cache(maxsize=1) - def load_tokenizer(self) -> AutoTokenizer: - return AutoTokenizer.from_pretrained( - pretrained_model_name_or_path=self.trans_config.pretrained_model_name_or_path, - trust_remote_code=True, - ) - - def max_seq_length(self) -> int: - return self.config.max_source_positions - - def load_cfg(self): - return self.load_config() - - -class IndicTranslation(DistributedDataClassifier): - def __init__( - self, - pretrained_model_name_or_path: str = "ai4bharat/indictrans2-en-indic-1B", - input_column: str = "indic_proc_text", - batch_size: int = 128, - autocast: bool = False, - ): - self.pretrained_model_name_or_path = pretrained_model_name_or_path - self.input_column = input_column - self.batch_size = batch_size - self.autocast = autocast - - self.translation_config = TranslationConfig( - pretrained_model_name_or_path=self.pretrained_model_name_or_path, - max_length=256, - num_beams=5, - autocast=self.autocast, - ) - self.model = ModelForSeq2SeqModel(self.translation_config) - super().__init__( - model=self.model, - batch_size=self.batch_size, - device_type="cuda", - autocast=self.autocast, - labels=None, - filter_by=None, - out_dim=None, - pred_column=None, - max_chars=None, - ) - - def preprocess_df(self, df: cudf.DataFrame) -> cudf.DataFrame: - ip = load_object_on_worker( - "IndicProcessor", IndicProcessor, {"inference": True} - ) - indices = df["text"].index.to_arrow().to_pylist() - sentences = df["text"].to_arrow().to_pylist() - sentences = ip.preprocess_batch( - sentences, src_lang="eng_Latn", tgt_lang="hin_Deva" - ) - df["indic_proc_text"] = cudf.Series(sentences, index=indices) - return df - - def translate_tokens(self, df: cudf.DataFrame) -> cudf.DataFrame: - worker = get_worker() - if hasattr(worker, "IndicProcessor"): - ip = getattr(worker, "IndicProcessor") - else: - ip = load_object_on_worker( - "IndicProcessor", IndicProcessor, {"inference": True} - ) - tokenizer = self.model.load_tokenizer() - indices = df["translation"].index.to_arrow().to_pylist() - generated_tokens = df["translation"].to_arrow().to_pylist() - with tokenizer.as_target_tokenizer(): - generated_tokens = tokenizer.batch_decode( - generated_tokens, - skip_special_tokens=True, - ) - generated_tokens = ip.postprocess_batch(generated_tokens, lang="hin_Deva") - df["translation"] = cudf.Series(data=generated_tokens, index=indices) - return df - - def has_alphabet_characters(self, text: str) -> bool: - return any(c.isalpha() for c in text) - - def custom_tokenize(self, text: str): - split_text = re.split( - r"(\#{2,}|\_{2,}|\…{2,}|\+{2,}|\.{2,}|\-{3,}|\*{2,}|\~{2,}|\={2,}|\!{2,}|\n|\t|\‣|\⁃|\⁌|\⁍|\●|\○|\•|\·|\◘|\◦|\⦾|\⦿|\|)", - text, - ) - split_text = [s for s in split_text if len(s) > 0] - tokenized_sentences = [] - len_flag = False - for line in split_text: - # Tokenize sentences using NLTK's sent_tokenize function - if self.has_alphabet_characters(line) == True: - sentences = sent_tokenize(line) - i = 0 - j = 0 - curr_tokenized_snt = [] - non_translation_str = "" - # Comparing the list of tokenized sentences (using NLTK) and actual sentence and preserving the spaces, - # newline and other special characters - while i < len(line): - if j < len(sentences): - stripped_sent = sentences[j].strip() - if len(stripped_sent) == 0: - j += 1 - continue - # If tokenized sentence matches then moving to next sentence - if line[i] == stripped_sent[0]: - if non_translation_str != "": - curr_tokenized_snt.append(non_translation_str) - curr_tokenized_snt.append(stripped_sent) - i += len(stripped_sent) - j += 1 - non_translation_str = "" - else: - non_translation_str += line[i] - i += 1 - else: - non_translation_str += line[i] - i += 1 - if non_translation_str != "": - curr_tokenized_snt.append(non_translation_str) - # Add the tokenized sentences to the list - tokenized_sentences.extend(curr_tokenized_snt) - else: - tokenized_sentences.append(line) - - tokenized_sentence_len = [] - for sentence in tokenized_sentences: - sent = sentence.split() - # removing the sentences with word length greater than threshold as the model may not be able translate it due to constraint on output token size - if len(sent) <= self.translation_config.max_words_per_sen: - tokenized_sentence_len.append(sentence) - - return tokenized_sentence_len - - def process_input_text(self, df: cudf.DataFrame) -> cudf.DataFrame: - df = df.to_pandas() - df["text"] = df["text"].apply(self.custom_tokenize) - df["doc_id"] = np.arange(1, len(df) + 1) - df = df.explode("text", ignore_index=True) - df = df.reset_index(drop=False) - df = cudf.DataFrame.from_pandas(df) - return df - - def combine_text(self, df: cudf.DataFrame) -> cudf.DataFrame: - engligh_stop_flag = df["text"].str.endswith(".") - hindi_stop_flag = df["translation"].str.endswith("|") - df["translation"][~engligh_stop_flag & hindi_stop_flag] = df[ - "translation" - ].str.rstrip("|") - df["translation"] = df["translation"].str.strip() - return df - - def grouping(self, df: cudf.DataFrame) -> cudf.DataFrame: - df = df.to_pandas() - agg_funcs = { - "translation": lambda s: "".join(s), - "text": lambda s: "".join(s), - } - other_columns = { - col: "first" - for col in df.columns - if col not in agg_funcs and col != "doc_id" - } - - agg_funcs.update(other_columns) - df = df.groupby("doc_id").agg(agg_funcs).reset_index() - df = cudf.DataFrame.from_pandas(df) - return df - - def atleast_letter(self, df: cudf.DataFrame, column_name: str) -> cudf.DataFrame: - df = df.to_pandas() - df["isalpha"] = df[column_name].apply(self.has_alphabet_characters) - df = cudf.DataFrame(df) - return df - - def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset: - ddf = dataset.df - # Applying process_input_text for following : - # 1. nltk tokenization to break doc into sentences - # 2. craeting a row w.r.t each sentence. - # 3. Process sentences strip symbols from start and end - ddf = ddf.map_partitions(self.process_input_text, enforce_metadata=False) - ddf["text"] = ddf["text"].astype("str") - - ddf["word_count"] = ddf["text"].str.split().list.len() - ddf["word_count"] = ddf["word_count"].astype("int64") - ddf_true = ddf[(ddf["word_count"] <= self.translation_config.max_words_per_sen)] - # To filter for atleast one unicode letter in text - has_letter = ddf_true.map_partitions(self.atleast_letter, column_name="text") - ddf_trans = ddf_true[has_letter["isalpha"]] - ddf = ddf_trans.drop(columns="word_count") - ## ddf false operations - ddf_false = ddf_true[~has_letter["isalpha"]] - ddf_false = ddf_false.drop(columns="word_count") - ddf_false["translation"] = ddf_false["text"] - # Applying preprocess_df for Indic preprocessing - ddf["text"] = ddf["text"].astype("str") - ddf_meta = ddf._meta.copy() - ddf_meta["indic_proc_text"] = "" - ddf = ddf.map_partitions(self.preprocess_df, meta=ddf_meta) - - columns = ddf.columns.tolist() - pipe = op.Sequential( - op.Tokenizer( - self.model, cols=[self.input_column], tokenizer_type="default" - ), - op.Predictor( - self.model, - sorted_data_loader=True, - batch_size=self.batch_size, - pred_output_col="translation", - ), - keep_cols=columns, - ) - ddf = pipe(ddf) - translated_meta = ddf._meta.copy() - translated_meta["translation"] = "DUMMY_STRING" - ddf = ddf.map_partitions(self.translate_tokens, meta=translated_meta) - ddf = ddf.map_partitions(self.combine_text, meta=translated_meta) - - # Merging translated and non-translated samples - ddf_true["false_translation"] = ddf_false["translation"] - ddf_true["false_translation"] = ddf_true["false_translation"].fillna("") - ddf_true["translation"] = ddf["translation"] - ddf_true["translation"] = ddf_true["translation"].fillna("") - ddf_true["translation"] = ( - ddf_true["translation"] + ddf_true["false_translation"] - ) - - ddf = ddf_true.map_partitions(self.grouping) - return DocumentDataset(ddf) - - -def attach_args(): - parser = ArgumentHelper.parse_distributed_classifier_args() - parser.set_defaults( - pretrained_model_name_or_path="ai4bharat/indictrans2-en-indic-1B" - ) - parser.set_defaults(input_text_field="text") - parser.set_defaults(device="gpu") - return parser - - -def main(args): - print(f"Arguments parsed = {args}") - st = time.time() - client = get_client(**ArgumentHelper.parse_client_args(args)) - print(client.dashboard_link) - translator_model = IndicTranslation( - pretrained_model_name_or_path=args.pretrained_model_name_or_path, - input_column=args.input_text_field, - batch_size=args.batch_size, - autocast=args.autocast, - ) - input_files = [ - os.path.join(args.input_data_dir, x) for x in os.listdir(args.input_data_dir) - ] - input_dataset = DocumentDataset.read_json( - input_files, backend="cudf", add_filename=True - ) - result_dataset = translator_model(dataset=input_dataset) - - result_dataset.to_json(output_path=args.output_data_dir, write_to_filename=True) - print(f"Total time taken for translation: {time.time()-st} seconds", flush=True) - client.close() - - -if __name__ == "__main__": - main(attach_args().parse_args()) diff --git a/tutorials/distributed_data_classification/translation/indic_translation.ipynb b/tutorials/distributed_data_classification/translation/indic_translation.ipynb new file mode 100644 index 000000000..d4f46a9f3 --- /dev/null +++ b/tutorials/distributed_data_classification/translation/indic_translation.ipynb @@ -0,0 +1,1112 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Building an Indic Translation Pipeline with NeMo Curator\n", + "\n", + "In this tutorial, we use the [IndicTransToolkit](https://github.com/VarunGumma/IndicTransToolkit) library, [IndicTrans2](https://huggingface.co/ai4bharat/indictrans2-en-indic-1B) model from Hugging Face, [Dask](https://www.dask.org/), and NeMo Curator to build an Indic language translation pipeline. After creating the pipeline, we demonstrate how to use the model to translate English text to Hindi text." + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Environment Setup" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [], + "source": [ + "import os\n", + "import re\n", + "from dataclasses import dataclass" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tqdm/auto.py:21: TqdmWarning: IProgress not found. Please update jupyter and ipywidgets. See https://ipywidgets.readthedocs.io/en/stable/user_install.html\n", + " from .autonotebook import tqdm as notebook_tqdm\n" + ] + } + ], + "source": [ + "import cudf\n", + "import dask_cudf\n", + "import numpy as np\n", + "import torch\n", + "import torch.nn as nn\n", + "from crossfit import op\n", + "from crossfit.backend.torch.hf.model import HFModel\n", + "from dask.distributed import get_worker\n", + "from nltk.tokenize import sent_tokenize\n", + "from transformers import AutoConfig, AutoModelForSeq2SeqLM, AutoTokenizer" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "from nemo_curator.classifiers.base import DistributedDataClassifier\n", + "from nemo_curator.datasets import DocumentDataset\n", + "from nemo_curator.utils.distributed_utils import get_client, load_object_on_worker" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The [IndicTransToolkit](https://github.com/VarunGumma/IndicTransToolkit) provides a simple, modular, and extendable toolkit for [IndicTrans2](https://github.com/AI4Bharat/IndicTrans2), an open-source transformer-based multilingual NMT model that supports high-quality translations across all the 22 scheduled Indic languages." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "try:\n", + " from IndicTransToolkit import IndicProcessor\n", + "except ModuleNotFoundError:\n", + " raise ImportError(\n", + " \"IndicTransToolkit not found. Please install it using the following command: \\n\"\n", + " + \"pip install git+https://github.com/VarunGumma/IndicTransToolkit.git\"\n", + " )" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Helper Classes and Functions for the `IndicTranslation` Class\n", + "\n", + "To create our Indic translation classifier, we create an `IndicTranslation` class, which will be extended from NeMo Curator's `DistributedDataClassifier` class.\n", + "\n", + "The goal of the base `DistributedDataClassifier` class is to enable multi-node multi-GPU data classification of your data. NeMo Curator provides several subclasses that focus on domain, quality, content safety, and educational content classification. However, the `DistributedDataClassifier` can be extended to fit *any* model; the only requirement is that the model can fit on a single GPU. See NeMo Curator's [Distributed Data Classification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html) documentation for more information.\n", + "\n", + "First, let's create a `TranslationConfig` class. Its purpose is to store some of the attributes that will be used by our model, including the model card of the [IndicTrans2 En-Indic 1.1B variant](https://huggingface.co/ai4bharat/indictrans2-en-indic-1B) on Hugging Face." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class TranslationConfig:\n", + " pretrained_model_name_or_path: str = \"ai4bharat/indictrans2-en-indic-1B\"\n", + " max_length: int = 50\n", + " num_beams: int = 5\n", + " autocast: bool = False\n", + " max_words_per_sen: int = 200" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we create a `CustomModel` class for sequence-to-sequence language modeling. It inherits from `nn.Module`, the base class for all neural network modules in PyTorch.\n", + "\n", + "Inside `__init__`, the model loads a pre-trained sequence-to-sequence model (`AutoModelForSeq2SeqLM`) from Hugging Face, using the model name provided. The `autocast` boolean determines whether mixed precision (`torch.autocast`) is used during inference to speed up computations on CUDA devices; we set it to False above.\n", + "\n", + "The `_forward` method performs text generation on the input batch without tracking gradients (`@torch.no_grad()`), which is efficient for inference. `self.model.generate()` is called with the batch inputs and several generation parameters to control the decoding behavior. The `forward` method is required by `nn.Module` and runs the model's forward pass (the computation performed at every call)." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "class CustomModel(nn.Module):\n", + " def __init__(self, config: TranslationConfig):\n", + " super().__init__()\n", + " self.config = config\n", + " self.model = AutoModelForSeq2SeqLM.from_pretrained(\n", + " pretrained_model_name_or_path=config.pretrained_model_name_or_path,\n", + " trust_remote_code=True,\n", + " )\n", + " self.autocast = config.autocast\n", + "\n", + " @torch.no_grad()\n", + " def _forward(self, batch: dict) -> torch.Tensor:\n", + " return self.model.generate(\n", + " **batch,\n", + " use_cache=True,\n", + " min_length=0,\n", + " max_length=self.config.max_length,\n", + " num_beams=self.config.num_beams,\n", + " num_return_sequences=1,\n", + " repetition_penalty=1.2,\n", + " )\n", + "\n", + " def forward(self, batch: dict) -> torch.Tensor:\n", + " if self.autocast:\n", + " with torch.autocast(device_type=\"cuda\"):\n", + " outputs = self._forward(batch)\n", + " else:\n", + " outputs = self._forward(batch)\n", + " return outputs" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, let's create the `ModelForSeq2SeqModel` class, a model management class that handles loading configurations, the `CustomModel`, and tokenizers for sequence-to-sequence translation. It inherits from `HFModel`, a class created by NVIDIA's [CrossFit](https://github.com/rapidsai/crossfit) library, which enables multi-node and multi-GPU offline inference.\n", + "\n", + "In it, we create several methods which define how to load our model, its configuration, and its tokenizer." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "class ModelForSeq2SeqModel(HFModel):\n", + " def __init__(self, config: TranslationConfig):\n", + " self.trans_config = config\n", + " self.config = self.load_config()\n", + " super().__init__(self.trans_config.pretrained_model_name_or_path)\n", + "\n", + " def load_model(self, device: str = \"cuda\") -> CustomModel:\n", + " model = CustomModel(self.trans_config)\n", + " model = model.to(device)\n", + " model.eval()\n", + " return model\n", + "\n", + " def load_config(self) -> AutoConfig:\n", + " return AutoConfig.from_pretrained(\n", + " pretrained_model_name_or_path=self.trans_config.pretrained_model_name_or_path,\n", + " trust_remote_code=True,\n", + " )\n", + "\n", + " def load_tokenizer(self) -> AutoTokenizer:\n", + " return AutoTokenizer.from_pretrained(\n", + " pretrained_model_name_or_path=self.trans_config.pretrained_model_name_or_path,\n", + " trust_remote_code=True,\n", + " )\n", + "\n", + " def max_seq_length(self) -> int:\n", + " return self.config.max_source_positions\n", + "\n", + " def load_cfg(self):\n", + " return self.load_config()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, let's define some helper functions which will be used by our `IndicTranslation` class.\n", + "\n", + "The `preprocess_df` function is used to load and run the `IndicProcessor` to preprocess our English sentences before tokenization. Note our use of the `load_object_on_worker` function, which loads and stores the `IndicProcessor` on each Dask worker." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "def preprocess_df(df: cudf.DataFrame, input_text_field: str = \"text\") -> cudf.DataFrame:\n", + " ip = load_object_on_worker(\n", + " \"IndicProcessor\", IndicProcessor, {\"inference\": True}\n", + " )\n", + " indices = df[input_text_field].index.to_arrow().to_pylist()\n", + " sentences = df[input_text_field].to_arrow().to_pylist()\n", + " sentences = ip.preprocess_batch(\n", + " sentences, src_lang=\"eng_Latn\", tgt_lang=\"hin_Deva\"\n", + " )\n", + " df[\"indic_proc_text\"] = cudf.Series(sentences, index=indices)\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `has_alphabet_characters` function checks if there is at least one alphabetic character in a given string; the `atleast_letter` function applies it to a DataFrame column to produce another column of booleans." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "def has_alphabet_characters(text: str) -> bool:\n", + " return any(c.isalpha() for c in text)\n", + "\n", + "\n", + "def atleast_letter(df: cudf.DataFrame, column_name: str) -> cudf.DataFrame:\n", + " df = df.to_pandas()\n", + " df[\"isalpha\"] = df[column_name].apply(has_alphabet_characters)\n", + " df = cudf.DataFrame(df)\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "After translating our text, the `combine_text` function modifies the translated column by removing the vertical bar `|` (which is used as a stop marker in our translations) at the end of the text where: (1) the text does not end with a period and (2) the translation ends with a vertical bar. Thus, we keep translations ending with a vertical bar only when the English text ends with a period." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "def combine_text(df: cudf.DataFrame, input_text_field: str = \"text\") -> cudf.DataFrame:\n", + " english_stop_flag = df[input_text_field].str.endswith(\".\")\n", + " hindi_stop_flag = df[\"translation\"].str.endswith(\"|\")\n", + " df[\"translation\"][~english_stop_flag & hindi_stop_flag] = df[\n", + " \"translation\"\n", + " ].str.rstrip(\"|\")\n", + " df[\"translation\"] = df[\"translation\"].str.strip()\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The `grouping` function groups rows by `doc_id`, concatenates text-based columns, and retains the first value of other columns within each group. This is useful because our texts will be spread across several rows, but marked with the same `doc_id`. Thus, we use this function to combine those rows." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [], + "source": [ + "def grouping(df: cudf.DataFrame, input_text_field: str = \"text\") -> cudf.DataFrame:\n", + " df = df.to_pandas()\n", + " agg_funcs = {\n", + " \"translation\": lambda s: \"\".join(s),\n", + " input_text_field: lambda s: \"\".join(s),\n", + " }\n", + " other_columns = {\n", + " col: \"first\"\n", + " for col in df.columns\n", + " if col not in agg_funcs and col != \"doc_id\"\n", + " }\n", + "\n", + " agg_funcs.update(other_columns)\n", + " df = df.groupby(\"doc_id\").agg(agg_funcs).reset_index()\n", + " df = cudf.DataFrame.from_pandas(df)\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Building the `IndicTranslation` Class\n", + "\n", + "Our `IndicTranslation` class is a bit of a monster, containing many methods within it. For this tutorial, we aim to make it as digestible as possible by stepping through each method, one by one.\n", + "\n", + "While this first method may look intimidating, its goal is very simple: create a list of sentences from a given string. It does this by using NLTK tokenization to break the text into sentences. We also remove sentences that are too long." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "def custom_tokenize(self, text: str):\n", + " split_text = re.split(\n", + " r\"(\\#{2,}|\\_{2,}|\\…{2,}|\\+{2,}|\\.{2,}|\\-{3,}|\\*{2,}|\\~{2,}|\\={2,}|\\!{2,}|\\n|\\t|\\‣|\\⁃|\\⁌|\\⁍|\\●|\\○|\\•|\\·|\\◘|\\◦|\\⦾|\\⦿|\\|)\",\n", + " text,\n", + " )\n", + " split_text = [s for s in split_text if len(s) > 0]\n", + " tokenized_sentences = []\n", + " len_flag = False\n", + " for line in split_text:\n", + " # Tokenize sentences using NLTK's sent_tokenize function\n", + " if has_alphabet_characters(line) == True:\n", + " sentences = sent_tokenize(line)\n", + " i = 0\n", + " j = 0\n", + " curr_tokenized_snt = []\n", + " non_translation_str = \"\"\n", + " # Comparing the list of tokenized sentences (using NLTK) and actual the sentence,\n", + " # preserving the spaces, newline and other special characters\n", + " while i < len(line):\n", + " if j < len(sentences):\n", + " stripped_sent = sentences[j].strip()\n", + " if len(stripped_sent) == 0:\n", + " j += 1\n", + " continue\n", + " # If tokenized sentence matches, then moving to next sentence\n", + " if line[i] == stripped_sent[0]:\n", + " if non_translation_str != \"\":\n", + " curr_tokenized_snt.append(non_translation_str)\n", + " curr_tokenized_snt.append(stripped_sent)\n", + " i += len(stripped_sent)\n", + " j += 1\n", + " non_translation_str = \"\"\n", + " else:\n", + " non_translation_str += line[i]\n", + " i += 1\n", + " else:\n", + " non_translation_str += line[i]\n", + " i += 1\n", + " if non_translation_str != \"\":\n", + " curr_tokenized_snt.append(non_translation_str)\n", + " # Add the tokenized sentences to the list\n", + " tokenized_sentences.extend(curr_tokenized_snt)\n", + " else:\n", + " tokenized_sentences.append(line)\n", + "\n", + " tokenized_sentence_len = []\n", + " for sentence in tokenized_sentences:\n", + " sent = sentence.split()\n", + " # Removing the sentences with word length greater than threshold\n", + " # Since the model may not be able translate it due to constraint on output token size\n", + " if len(sent) <= self.translation_config.max_words_per_sen:\n", + " tokenized_sentence_len.append(sentence)\n", + "\n", + " return tokenized_sentence_len" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "This method uses the `custom_tokenize` method above to create a DataFrame where each sentence has its own row, preserving the `doc_id` for context.\n", + "\n", + "For example, if we have the DataFrame:\n", + "\n", + "| text |\n", + "|--------------------------------------------------------|\n", + "| \"This is a first sentence. This is a second sentence.\" |\n", + "| \"This is a third sentence. This is a fourth sentence.\" |\n", + "\n", + "Then the resulting DataFrame will be:\n", + "\n", + "| text | doc_id |\n", + "|------------------------------|--------|\n", + "| \"This is a first sentence.\" | 1 |\n", + "| \"This is a second sentence.\" | 1 |\n", + "| \"This is a third sentence.\" | 2 |\n", + "| \"This is a fourth sentence.\" | 2 |" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [], + "source": [ + "def process_input_text(self, df: cudf.DataFrame, input_text_field: str = \"text\") -> cudf.DataFrame:\n", + " df = df.to_pandas()\n", + " df[input_text_field] = df[input_text_field].apply(self.custom_tokenize)\n", + " df[\"doc_id\"] = np.arange(1, len(df) + 1)\n", + " df = df.explode(input_text_field, ignore_index=True)\n", + " df = df.reset_index(drop=False)\n", + " df = cudf.DataFrame.from_pandas(df)\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "After our translations are generated, the `translate_tokens` method further processes the translations by decoding the tokens back to human-readable text and applying postprocessing with the `IndicProcessor`." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [], + "source": [ + "def translate_tokens(self, df: cudf.DataFrame) -> cudf.DataFrame:\n", + " worker = get_worker()\n", + " if hasattr(worker, \"IndicProcessor\"):\n", + " ip = getattr(worker, \"IndicProcessor\")\n", + " else:\n", + " ip = load_object_on_worker(\n", + " \"IndicProcessor\", IndicProcessor, {\"inference\": True}\n", + " )\n", + " tokenizer = self.model.load_tokenizer()\n", + " indices = df[\"translation\"].index.to_arrow().to_pylist()\n", + " generated_tokens = df[\"translation\"].to_arrow().to_pylist()\n", + " with tokenizer.as_target_tokenizer():\n", + " generated_tokens = tokenizer.batch_decode(\n", + " generated_tokens,\n", + " skip_special_tokens=True,\n", + " )\n", + " generated_tokens = ip.postprocess_batch(generated_tokens, lang=\"hin_Deva\")\n", + " df[\"translation\"] = cudf.Series(data=generated_tokens, index=indices)\n", + " return df" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, we create the `IndicTranslation` class by defining the `__init__` and `_run_classifier` methods. We start with the `__init__` method, which uses the `DistributedDataClassifier`, `TranslationConfig`, and `ModelForSeq2SeqModel` classes described above.\n", + "\n", + "We then combine all of the helper functions and class methods into the `_run_classifier` method. This is the method that is called by `DistributedDataClassifier`'s `__call__` method; it is required for all classes that inherit the `DistributedDataClassifier` class." + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "metadata": {}, + "outputs": [], + "source": [ + "class IndicTranslation(DistributedDataClassifier):\n", + " def __init__(\n", + " self,\n", + " pretrained_model_name_or_path: str = \"ai4bharat/indictrans2-en-indic-1B\",\n", + " input_column: str = \"text\",\n", + " batch_size: int = 128,\n", + " autocast: bool = False,\n", + " ):\n", + " self.pretrained_model_name_or_path = pretrained_model_name_or_path\n", + " self.input_column = input_column\n", + " self.batch_size = batch_size\n", + " self.autocast = autocast\n", + "\n", + " self.translation_config = TranslationConfig(\n", + " pretrained_model_name_or_path=self.pretrained_model_name_or_path,\n", + " max_length=256,\n", + " num_beams=5,\n", + " autocast=self.autocast,\n", + " )\n", + " self.model = ModelForSeq2SeqModel(self.translation_config)\n", + " super().__init__(\n", + " model=self.model,\n", + " batch_size=self.batch_size,\n", + " device_type=\"cuda\",\n", + " autocast=self.autocast,\n", + " labels=None,\n", + " filter_by=None,\n", + " out_dim=None,\n", + " pred_column=None,\n", + " max_chars=None,\n", + " )\n", + "\n", + " def _run_classifier(self, dataset: DocumentDataset) -> DocumentDataset:\n", + " ddf = dataset.df\n", + " # See process_input_text helper function defined above\n", + " # TODO: Breaks here\n", + " # ddf = ddf.map_partitions(self.process_input_text, input_text_field=self.input_column, enforce_metadata=False)\n", + " ddf = ddf.map_partitions(self.process_input_text, enforce_metadata=False)\n", + " # ddf[self.input_column] = ddf[self.input_column].astype(\"str\")\n", + " ddf[\"text\"] = ddf[\"text\"].astype(\"str\")\n", + "\n", + " ddf[\"word_count\"] = ddf[\"text\"].str.split().list.len()\n", + " ddf[\"word_count\"] = ddf[\"word_count\"].astype(\"int64\")\n", + " ddf_true = ddf[(ddf[\"word_count\"] <= self.translation_config.max_words_per_sen)]\n", + "\n", + " # Filter for at least one unicode letter in text\n", + " # See atleast_letter helper function defined above\n", + " has_letter = ddf_true.map_partitions(atleast_letter, column_name=\"text\")\n", + " ddf_trans = ddf_true[has_letter[\"isalpha\"]]\n", + " ddf = ddf_trans.drop(columns=\"word_count\")\n", + "\n", + " ## ddf_false operations\n", + " ddf_false = ddf_true[~has_letter[\"isalpha\"]]\n", + " ddf_false = ddf_false.drop(columns=\"word_count\")\n", + " ddf_false[\"translation\"] = ddf_false[\"text\"]\n", + "\n", + " # Applying preprocess_df helper function for Indic preprocessing\n", + " ddf[\"text\"] = ddf[\"text\"].astype(\"str\")\n", + " ddf_meta = ddf._meta.copy()\n", + " ddf_meta[\"indic_proc_text\"] = \"\"\n", + " # ddf = ddf.map_partitions(preprocess_df, input_text_field=self.input_column, meta=ddf_meta)\n", + " ddf = ddf.map_partitions(preprocess_df, meta=ddf_meta)\n", + "\n", + " columns = ddf.columns.tolist()\n", + " print(f\"coloumns before pipeline : {columns}\")\n", + " pipe = op.Sequential(\n", + " # This step tokenizes the input text found in the specified input_column\n", + " op.Tokenizer(\n", + " self.model, cols=[self.input_column], tokenizer_type=\"default\"\n", + " ),\n", + " # The Predictor takes the tokenized input and passes it through the model to generate translations\n", + " op.Predictor(\n", + " self.model,\n", + " sorted_data_loader=True,\n", + " batch_size=self.batch_size,\n", + " pred_output_col=\"translation\",\n", + " ),\n", + " keep_cols=columns,\n", + " )\n", + " ddf = pipe(ddf)\n", + " translated_meta = ddf._meta.copy()\n", + " translated_meta[\"translation\"] = \"DUMMY_STRING\"\n", + " ddf = ddf.map_partitions(self.translate_tokens, meta=translated_meta)\n", + " # ddf = ddf.map_partitions(combine_text, input_text_field=self.input_column, meta=translated_meta)\n", + " ddf = ddf.map_partitions(combine_text, meta=translated_meta)\n", + "\n", + " # Merging translated and non-translated samples\n", + " ddf_true[\"false_translation\"] = ddf_false[\"translation\"]\n", + " ddf_true[\"false_translation\"] = ddf_true[\"false_translation\"].fillna(\"\")\n", + " ddf_true[\"translation\"] = ddf[\"translation\"]\n", + " ddf_true[\"translation\"] = ddf_true[\"translation\"].fillna(\"\")\n", + " ddf_true[\"translation\"] = (\n", + " ddf_true[\"translation\"] + ddf_true[\"false_translation\"]\n", + " )\n", + "\n", + " # See grouping helper function defined above\n", + " # ddf = ddf_true.map_partitions(grouping, input_text_field=self.input_column)\n", + " ddf = ddf_true.map_partitions(grouping)\n", + " return DocumentDataset(ddf)" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "metadata": {}, + "outputs": [], + "source": [ + "# Add the functions defined above to the IndicTranslation class\n", + "IndicTranslation.custom_tokenize = custom_tokenize\n", + "IndicTranslation.process_input_text = process_input_text\n", + "IndicTranslation.translate_tokens = translate_tokens" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "## Run Indic Translation \n", + "\n", + "We have successfully built our Indic translation pipeline! Now, let's demonstrate how to use it with a simple example.\n", + "\n", + "First, let's create a Dask client." + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "cuDF Spilling is enabled\n" + ] + } + ], + "source": [ + "device = \"gpu\"\n", + "client = get_client(cluster_type=device)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, let's create a `DocumentDataset` with some English sentences to translate." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "metadata": {}, + "outputs": [], + "source": [ + "text = [\n", + " \"Quantum computing is set to revolutionize the field of cryptography.\",\n", + " \"Investing in index funds is a popular strategy for long-term financial growth.\",\n", + " \"Recent advancements in gene therapy offer new hope for treating genetic disorders.\",\n", + " \"Online learning platforms have transformed the way students access educational resources.\",\n", + " \"Traveling to Europe during the off-season can be a more budget-friendly option.\",\n", + " \"Training regimens for athletes have become more sophisticated with the use of data analytics.\",\n", + " \"Streaming services are changing the way people consume television and film content.\",\n", + " \"Vegan recipes have gained popularity as more people adopt plant-based diets.\",\n", + " \"Climate change research is critical for developing sustainable environmental policies.\",\n", + " \"Telemedicine has become increasingly popular due to its convenience and accessibility.\",\n", + "]\n", + "df = cudf.DataFrame({\"text\": text})\n", + "input_dataset = DocumentDataset(dask_cudf.from_cudf(df, npartitions=1))" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Then, we can initialize our `IndicTranslation` model." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/sklearn/base.py:376: InconsistentVersionWarning: Trying to unpickle estimator LinearRegression from version 1.4.2 when using version 1.5.2. This might lead to breaking code or invalid results. Use at your own risk. For more info please refer to:\n", + "https://scikit-learn.org/stable/model_persistence.html#security-maintainability-limitations\n", + " warnings.warn(\n" + ] + } + ], + "source": [ + "# input_text_field = \"text\"\n", + "input_text_field = \"indic_proc_text\"\n", + "batch_size = 128\n", + "autocast = True\n", + "\n", + "translator_model = IndicTranslation(\n", + " pretrained_model_name_or_path=\"ai4bharat/indictrans2-en-indic-1B\",\n", + " input_column=input_text_field,\n", + " batch_size=batch_size,\n", + " autocast=autocast,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now let's translate our text!" + ] + }, + { + "cell_type": "code", + "execution_count": 20, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "coloumns before pipeline : ['index', 'text', 'doc_id', 'indic_proc_text']\n" + ] + } + ], + "source": [ + "result_dataset = translator_model(dataset=input_dataset)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-11-12 13:17:23,950 - distributed.protocol.core - CRITICAL - Failed to deserialize\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "2024-11-12 13:17:23,959 - distributed.worker - WARNING - Scheduler was unaware of this worker; shutting down.\n", + "2024-11-12 13:17:23,962 - tornado.application - ERROR - Exception in callback functools.partial(>, exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>)\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "unhandled exception during asyncio.run() shutdown\n", + "task: exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "2024-11-12 13:17:25,966 - distributed.nanny - ERROR - Worker process died unexpectedly\n", + "2024-11-12 13:17:35,699 - distributed.protocol.core - CRITICAL - Failed to deserialize\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "2024-11-12 13:17:35,706 - distributed.worker - WARNING - Scheduler was unaware of this worker; shutting down.\n", + "2024-11-12 13:17:35,708 - tornado.application - ERROR - Exception in callback functools.partial(>, exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>)\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "unhandled exception during asyncio.run() shutdown\n", + "task: exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "2024-11-12 13:17:37,710 - distributed.nanny - ERROR - Worker process died unexpectedly\n", + "2024-11-12 13:17:50,016 - distributed.protocol.core - CRITICAL - Failed to deserialize\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "2024-11-12 13:17:50,024 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 225, in read\n", + " frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)\n", + "tornado.iostream.StreamClosedError: Stream is closed\n", + "\n", + "The above exception was the direct cause of the following exception:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1250, in heartbeat\n", + " response = await retry_operation(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/utils_comm.py\", line 461, in retry_operation\n", + " return await retry(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/utils_comm.py\", line 440, in retry\n", + " return await coro()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 1256, in send_recv_from_rpc\n", + " return await send_recv(comm=comm, op=key, **kwargs)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 1015, in send_recv\n", + " response = await comm.read(deserializers=deserializers)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 236, in read\n", + " convert_stream_closed_error(self, e)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 142, in convert_stream_closed_error\n", + " raise CommClosedError(f\"in {obj}: {exc}\") from exc\n", + "distributed.comm.core.CommClosedError: in : Stream is closed\n", + "2024-11-12 13:17:50,028 - tornado.application - ERROR - Exception in callback functools.partial(>, exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>)\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "unhandled exception during asyncio.run() shutdown\n", + "task: exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "2024-11-12 13:18:03,778 - distributed.protocol.core - CRITICAL - Failed to deserialize\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "2024-11-12 13:18:03,788 - distributed.worker - ERROR - Failed to communicate with scheduler during heartbeat.\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 225, in read\n", + " frames_nosplit_nbytes_bin = await stream.read_bytes(fmt_size)\n", + "tornado.iostream.StreamClosedError: Stream is closed\n", + "\n", + "The above exception was the direct cause of the following exception:\n", + "\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1250, in heartbeat\n", + " response = await retry_operation(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/utils_comm.py\", line 461, in retry_operation\n", + " return await retry(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/utils_comm.py\", line 440, in retry\n", + " return await coro()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 1256, in send_recv_from_rpc\n", + " return await send_recv(comm=comm, op=key, **kwargs)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 1015, in send_recv\n", + " response = await comm.read(deserializers=deserializers)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 236, in read\n", + " convert_stream_closed_error(self, e)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 142, in convert_stream_closed_error\n", + " raise CommClosedError(f\"in {obj}: {exc}\") from exc\n", + "distributed.comm.core.CommClosedError: in : Stream is closed\n", + "2024-11-12 13:18:03,792 - tornado.application - ERROR - Exception in callback functools.partial(>, exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>)\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n", + "unhandled exception during asyncio.run() shutdown\n", + "task: exception=ModuleNotFoundError(\"No module named 'transformers_modules'\")>\n", + "Traceback (most recent call last):\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 750, in _run_callback\n", + " ret = callback()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/tornado/ioloop.py\", line 774, in _discard_future_result\n", + " future.result()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 205, in wrapper\n", + " return await method(self, *args, **kwargs) # type: ignore\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/worker.py\", line 1300, in handle_scheduler\n", + " await self.handle_stream(comm)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/core.py\", line 886, in handle_stream\n", + " msgs = await comm.read()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/tcp.py\", line 247, in read\n", + " msg = await from_frames(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 78, in from_frames\n", + " res = _from_frames()\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/comm/utils.py\", line 61, in _from_frames\n", + " return protocol.loads(\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 175, in loads\n", + " return msgpack.loads(\n", + " File \"msgpack/_unpacker.pyx\", line 194, in msgpack._cmsgpack.unpackb\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/core.py\", line 172, in _decode_default\n", + " return pickle.loads(sub_header[\"pickled-obj\"], buffers=sub_frames)\n", + " File \"/home/nfs/syurick/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/protocol/pickle.py\", line 90, in loads\n", + " return pickle.loads(x, buffers=buffers)\n", + "ModuleNotFoundError: No module named 'transformers_modules'\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-11-12 13:17:23,952 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:34049' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-5f35495ce1e9e9fc1a8da666d509f5e6', 0)} (stimulus_id='handle-worker-cleanup-1731446243.9527113')\n", + "2024-11-12 13:17:23,958 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:34049'.\n", + "2024-11-12 13:17:35,700 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:36751' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-5f35495ce1e9e9fc1a8da666d509f5e6', 0)} (stimulus_id='handle-worker-cleanup-1731446255.7002087')\n", + "2024-11-12 13:17:35,705 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:36751'.\n", + "2024-11-12 13:17:50,018 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:40283' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-5f35495ce1e9e9fc1a8da666d509f5e6', 0)} (stimulus_id='handle-worker-cleanup-1731446270.018007')\n", + "2024-11-12 13:17:50,024 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:40283'.\n", + "2024-11-12 13:18:03,782 - distributed.scheduler - ERROR - Task ('process_input_text-fused-grouping-c00c2eee19f97c9bf2ef76827bbbd5c2', 0) marked as failed because 4 workers died while trying to run it\n", + "2024-11-12 13:18:03,783 - distributed.scheduler - WARNING - Removing worker 'tcp://127.0.0.1:41865' caused the cluster to lose already computed task(s), which will be recomputed elsewhere: {('frompandas-5f35495ce1e9e9fc1a8da666d509f5e6', 0)} (stimulus_id='handle-worker-cleanup-1731446283.781393')\n", + "2024-11-12 13:18:03,788 - distributed.scheduler - WARNING - Received heartbeat from unregistered worker 'tcp://127.0.0.1:41865'.\n" + ] + }, + { + "ename": "KilledWorker", + "evalue": "Attempted to run task ('process_input_text-fused-grouping-c00c2eee19f97c9bf2ef76827bbbd5c2', 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:41865. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html.", + "output_type": "error", + "traceback": [ + "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", + "\u001b[0;31mKilledWorker\u001b[0m Traceback (most recent call last)", + "Cell \u001b[0;32mIn[21], line 1\u001b[0m\n\u001b[0;32m----> 1\u001b[0m \u001b[43mresult_dataset\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/dask_expr/_collection.py:477\u001b[0m, in \u001b[0;36mFrameBase.compute\u001b[0;34m(self, fuse, **kwargs)\u001b[0m\n\u001b[1;32m 475\u001b[0m out \u001b[38;5;241m=\u001b[39m out\u001b[38;5;241m.\u001b[39mrepartition(npartitions\u001b[38;5;241m=\u001b[39m\u001b[38;5;241m1\u001b[39m)\n\u001b[1;32m 476\u001b[0m out \u001b[38;5;241m=\u001b[39m out\u001b[38;5;241m.\u001b[39moptimize(fuse\u001b[38;5;241m=\u001b[39mfuse)\n\u001b[0;32m--> 477\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[43mDaskMethodsMixin\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[43mout\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n", + "File \u001b[0;32m~/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/dask/base.py:372\u001b[0m, in \u001b[0;36mDaskMethodsMixin.compute\u001b[0;34m(self, **kwargs)\u001b[0m\n\u001b[1;32m 348\u001b[0m \u001b[38;5;28;01mdef\u001b[39;00m \u001b[38;5;21mcompute\u001b[39m(\u001b[38;5;28mself\u001b[39m, \u001b[38;5;241m*\u001b[39m\u001b[38;5;241m*\u001b[39mkwargs):\n\u001b[1;32m 349\u001b[0m \u001b[38;5;250m \u001b[39m\u001b[38;5;124;03m\"\"\"Compute this dask collection\u001b[39;00m\n\u001b[1;32m 350\u001b[0m \n\u001b[1;32m 351\u001b[0m \u001b[38;5;124;03m This turns a lazy Dask collection into its in-memory equivalent.\u001b[39;00m\n\u001b[0;32m (...)\u001b[0m\n\u001b[1;32m 370\u001b[0m \u001b[38;5;124;03m dask.compute\u001b[39;00m\n\u001b[1;32m 371\u001b[0m \u001b[38;5;124;03m \"\"\"\u001b[39;00m\n\u001b[0;32m--> 372\u001b[0m (result,) \u001b[38;5;241m=\u001b[39m \u001b[43mcompute\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mself\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mtraverse\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[38;5;28;43;01mFalse\u001b[39;49;00m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 373\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m result\n", + "File \u001b[0;32m~/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/dask/base.py:660\u001b[0m, in \u001b[0;36mcompute\u001b[0;34m(traverse, optimize_graph, scheduler, get, *args, **kwargs)\u001b[0m\n\u001b[1;32m 657\u001b[0m postcomputes\u001b[38;5;241m.\u001b[39mappend(x\u001b[38;5;241m.\u001b[39m__dask_postcompute__())\n\u001b[1;32m 659\u001b[0m \u001b[38;5;28;01mwith\u001b[39;00m shorten_traceback():\n\u001b[0;32m--> 660\u001b[0m results \u001b[38;5;241m=\u001b[39m \u001b[43mschedule\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdsk\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mkeys\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[38;5;241;43m*\u001b[39;49m\u001b[43mkwargs\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 662\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m repack([f(r, \u001b[38;5;241m*\u001b[39ma) \u001b[38;5;28;01mfor\u001b[39;00m r, (f, a) \u001b[38;5;129;01min\u001b[39;00m \u001b[38;5;28mzip\u001b[39m(results, postcomputes)])\n", + "File \u001b[0;32m~/miniforge3/envs/nemo_curator_dev/lib/python3.10/site-packages/distributed/client.py:2417\u001b[0m, in \u001b[0;36mClient._gather\u001b[0;34m(self, futures, errors, direct, local_worker)\u001b[0m\n\u001b[1;32m 2415\u001b[0m exception \u001b[38;5;241m=\u001b[39m st\u001b[38;5;241m.\u001b[39mexception\n\u001b[1;32m 2416\u001b[0m traceback \u001b[38;5;241m=\u001b[39m st\u001b[38;5;241m.\u001b[39mtraceback\n\u001b[0;32m-> 2417\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m exception\u001b[38;5;241m.\u001b[39mwith_traceback(traceback)\n\u001b[1;32m 2418\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m errors \u001b[38;5;241m==\u001b[39m \u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mskip\u001b[39m\u001b[38;5;124m\"\u001b[39m:\n\u001b[1;32m 2419\u001b[0m bad_keys\u001b[38;5;241m.\u001b[39madd(key)\n", + "\u001b[0;31mKilledWorker\u001b[0m: Attempted to run task ('process_input_text-fused-grouping-c00c2eee19f97c9bf2ef76827bbbd5c2', 0) on 4 different workers, but all those workers died while running it. The last worker that attempt to run the task was tcp://127.0.0.1:41865. Inspecting worker logs is often a good next step to diagnose what went wrong. For more information see https://distributed.dask.org/en/stable/killed.html." + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "2024-11-12 13:18:05,796 - distributed.nanny - ERROR - Worker process died unexpectedly\n" + ] + } + ], + "source": [ + "result_dataset.df.compute()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, we close our Dask client." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.close()" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Thank you for following this tutorial! We have demonstrated how to create and run an Indic translation pipeline in NeMo Curator." + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "nemo_curator_dev", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.15" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +}