diff --git a/tutorials/distributed_data_classification/fineweb-edu-ensemble-classification.ipynb b/tutorials/distributed_data_classification/fineweb-edu-ensemble-classification.ipynb new file mode 100644 index 000000000..b74972a1e --- /dev/null +++ b/tutorials/distributed_data_classification/fineweb-edu-ensemble-classification.ipynb @@ -0,0 +1,1311 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "id": "8f61f035-ab4e-4713-86d5-bb34bc0e8d75", + "metadata": {}, + "source": [ + "# Distributed Data Classification Using NeMo Curator: \n", + "### Ensembling `FineWeb Mixtral Educational Classifier`, `FineWeb Nemotron-4 Educational Classifier`, and `fasttext-oh-eli5`\n", + "\n", + "This notebook demonstrates distributed data classification by ensembling:\n", + "1. NeMo Curator’s [`FineWebMixtralEduClassifier`](https://huggingface.co/nvidia/nemocurator-fineweb-mixtral-edu-classifier)\n", + "2. NeMo Curator’s [`FineWebNemotronEduClassifier`](https://huggingface.co/nvidia/nemocurator-fineweb-nemotron-4-edu-classifier)\n", + "3. Fast Text's [`fasttext-oh-eli5`](https://huggingface.co/mlfoundations/fasttext-oh-eli5) from Hugging Face.\n", + "\n", + "The FineWeb educational classifiers (excluding FastText) leverage [CrossFit](https://github.com/rapidsai/crossfit), a RAPIDS-accelerated library for intelligent batching, to enhance offline inference performance on large datasets.\n", + "\n", + "Before running this notebook, follow the [Getting Started](https://github.com/NVIDIA/NeMo-Curator?tab=readme-ov-file#get-started) guide to install NeMo Curator.\n", + "\n", + "##### **Note on Curating Nemotron-CC**\n", + "This notebook showcases the classification script that was used in curating **Nemotron-CC**, a refined long-horizon pretraining dataset for large language models. As detailed in the paper [\"Nemotron-CC: Transforming Common Crawl into a Refined Long-Horizon Pretraining Dataset\"](https://arxiv.org/abs/2412.02595), Nemotron-CC was designed to improve the trade-off between dataset quality and quantity using a combination of **classifier ensembling, synthetic data rephrasing, and reduced reliance on heuristic filters**.\n", + "\n", + "By leveraging these techniques, **8B parameter models trained on 1T tokens with a high-quality subset of Nemotron-CC** achieved an **MMLU improvement of 5.6** over DCLM, demonstrating significant gains in benchmark performance. Furthermore, **Nemotron-CC’s full dataset (6.3T tokens)** provides **4× more unique real tokens than DCLM**, making it particularly effective for long-token-horizon training, such as 15T-token-scale LLMs.\n", + "\n", + "The dataset is publicly available at [Nemotron-CC](https://data.commoncrawl.org/contrib/Nemotron/Nemotron-CC/index.html).\n", + "\n", + " \n", + "## Steps in This Notebook \n", + "1. **Compute floating-point classification scores** for each classifier. \n", + "2. **Determine percentile-based score thresholds** to categorize results. \n", + "3. **Convert floating-point scores to integer scores** (0-19 scale). \n", + "4. **Ensemble the results** using the maximum classifier score. \n", + "5. **Store results** in directories or cloud buckets based on classification scores.\n" + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "id": "c1882d2c-e2c1-4a59-9f9c-c12a76e9e04c", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "env: PYTHONWARNINGS=ignore\n" + ] + } + ], + "source": [ + "# Silence Warnings (HuggingFace internal warnings)\n", + "\n", + "%env PYTHONWARNINGS=ignore\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")\n", + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "id": "9ef3ef29-ab79-4fea-9050-017b9e9203dd", + "metadata": {}, + "outputs": [], + "source": [ + "import json\n", + "import fasttext\n", + "import pandas as pd\n", + "import cudf\n", + "import dask_cudf\n", + "import numpy as np\n", + "import cupy as cp\n", + "from pathlib import Path\n", + "from typing import Optional, Tuple, Any, Dict, List\n", + "from huggingface_hub import hf_hub_download\n", + "\n", + "from nemo_curator import get_client\n", + "from nemo_curator.classifiers import FineWebNemotronEduClassifier, FineWebMixtralEduClassifier\n", + "from nemo_curator.datasets import DocumentDataset\n", + "from nemo_curator.utils.distributed_utils import load_object_on_worker\n", + "from nemo_curator.utils.distributed_utils import get_device_total_memory" + ] + }, + { + "cell_type": "markdown", + "id": "325f2af0-c7a2-488b-8fb6-d35623159f06", + "metadata": {}, + "source": [ + "### Initializing NeMo Curator Client\n", + "This step initializes the NeMo Curator client to enable distributed classification using GPU-based processing." + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "id": "374004c9-fd63-490f-bc81-875fc2f15ae9", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "cuDF Spilling is enabled\n" + ] + } + ], + "source": [ + "client = get_client(cluster_type=\"gpu\")" + ] + }, + { + "cell_type": "markdown", + "id": "ab00c794-3655-44ee-be33-108958c01f43", + "metadata": {}, + "source": [ + "### Setting Output File Paths\n", + "Defines the paths where classification results, threshold values, and final bucketed results will be stored." + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "id": "47ca63af-78d2-4854-bb23-c6461b74d23e", + "metadata": {}, + "outputs": [], + "source": [ + "# Define output directories\n", + "OUTPUT_BASE_DIR = \"output_data_dir/\"\n", + "OUTPUT_CLASSIFICATION_RESULTS = os.path.join(OUTPUT_BASE_DIR, \"classification_results\")\n", + "OUTPUT_CLASSIFIER_THRESHOLDS = os.path.join(OUTPUT_BASE_DIR, \"classifier_thresholds.json\")\n", + "OUTPUT_BUCKETED_RESULTS = os.path.join(OUTPUT_BASE_DIR, \"bucketed_results\")" + ] + }, + { + "cell_type": "markdown", + "id": "15d6977b-885a-4029-a868-bc6d336085ed", + "metadata": {}, + "source": [ + "# Preparing Text Data for Classification\n", + "- We create a sample dataset with diverse topics.\n", + "- Optionally, users can provide a directory containing JSONL files for classification." + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "id": "a0eb4676-44e6-41ab-abb6-413f78bc9787", + "metadata": {}, + "outputs": [], + "source": [ + "# Create sample DataFrame\n", + "text = [\n", + " \"Quantum computing is set to revolutionize the field of cryptography.\",\n", + " \"Investing in index funds is a popular strategy for long-term financial growth.\",\n", + " \"Recent advancements in gene therapy offer new hope for treating genetic disorders.\",\n", + " \"Online learning platforms have transformed the way students access educational resources.\",\n", + " \"Traveling to Europe during the off-season can be a more budget-friendly option.\",\n", + " \"Training regimens for athletes have become more sophisticated with the use of data analytics.\",\n", + " \"Streaming services are changing the way people consume television and film content.\",\n", + " \"Vegan recipes have gained popularity as more people adopt plant-based diets.\",\n", + " \"Climate change research is critical for developing sustainable environmental policies.\",\n", + " \"Telemedicine has become increasingly popular due to its convenience and accessibility.\",\n", + "]\n", + "df = cudf.DataFrame({\"text\": text})\n", + "input_dataset = DocumentDataset(dask_cudf.from_cudf(df, npartitions=1))\n", + "write_to_filename = False\n", + "\n", + "# Alternatively, read existing directory of JSONL files\n", + "# input_file_path=\"/input_data_dir/\"\n", + "# input_dataset = DocumentDataset.read_json(\n", + "# input_file_path, backend=\"cudf\", add_filename=True\n", + "# )\n", + "# write_to_filename = True" + ] + }, + { + "cell_type": "markdown", + "id": "56b43d1a-7954-48b0-9c39-fe07c3ca06dc", + "metadata": {}, + "source": [ + "# Step 1: Run the Classifiers\n", + "\n", + "1. Compute the floating-point classification score for each classifier.\n", + "\n", + "**Note:** Dask operations are lazy, meaning the classifiers won’t execute until an eager operation like `to_json`, `compute`, or `persist` is called." + ] + }, + { + "cell_type": "markdown", + "id": "16962500-d2a4-4a40-8804-e7accd44abf5", + "metadata": {}, + "source": [ + "### FastText Quality Classifier\n", + "\n", + "The **FastText Quality Classifier** uses the [`fasttext-oh-eli5`](https://huggingface.co/mlfoundations/fasttext-oh-eli5) model from Hugging Face to assess text quality. It distinguishes **high-quality** (`__label__hq`) responses from lower-quality ones (`__label__cc`). \n", + "\n", + "NeMo Curator allows users to define custom modules like this, enabling seamless integration of specialized models. \n", + "\n", + "- **Model:** [`mlfoundations/fasttext-oh-eli5`](https://huggingface.co/mlfoundations/fasttext-oh-eli5) \n", + "- **Training Data:** Reddit ELI5 vs. Wikipedia (200k examples) \n", + "- **Output:** Confidence score + optional binary classification (where 1 represents high quality text and 0 represents low quality text) \n", + "\n", + "🔗 **More details:** [Hugging Face Model Card](https://huggingface.co/mlfoundations/fasttext-oh-eli5)" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "id": "55b09d94-abe6-4c14-aa2a-5302ca0a7f4b", + "metadata": {}, + "outputs": [], + "source": [ + "class FastTextQualityClassifier:\n", + " \"\"\"\n", + " A classifier that uses a fastText model to predict a confidence score for text.\n", + "\n", + " It appends one or two output columns to the data:\n", + " - A float column representing the confidence score.\n", + " - Optionally, an integer column (1 if the top label contains \"hq\", else 0).\n", + "\n", + " The model is loaded from the Hugging Face Hub during initialization.\n", + "\n", + " Args:\n", + " pred_column (str): Name of the output column for the confidence score.\n", + " int_column (str, optional): Name of the output column for the binary indicator.\n", + " If not provided, only the pred_column is added.\n", + " \"\"\"\n", + "\n", + " def __init__(self, pred_column: str, int_column: Optional[str] = None) -> None:\n", + " self.pred_column: str = pred_column\n", + " self.int_column: Optional[str] = int_column\n", + "\n", + " self.repo_id: str = \"mlfoundations/fasttext-oh-eli5\"\n", + " self.model_filename: str = \"openhermes_reddit_eli5_vs_rw_v2_bigram_200k_train.bin\"\n", + " # Download the fastText model from Hugging Face Hub.\n", + " self.model_path: str = hf_hub_download(repo_id=self.repo_id, filename=self.model_filename)\n", + " self.model_identifier: str = f\"{self.repo_id}/{self.model_filename}\"\n", + "\n", + " def _load_fasttext_model(self) -> Any:\n", + " \"\"\"Load and return the fastText model.\"\"\"\n", + " return fasttext.load_model(self.model_path)\n", + "\n", + " def predict_text(self, text: str) -> Tuple[float, int]:\n", + " \"\"\"\n", + " Predict the confidence score and binary indicator for a given text.\n", + "\n", + " Args:\n", + " text (str): The input text to classify.\n", + "\n", + " Returns:\n", + " Tuple[float, int]: A tuple containing the confidence score (float) and binary indicator (int).\n", + " \"\"\"\n", + " model = load_object_on_worker(self.model_identifier, self._load_fasttext_model, {})\n", + " predictions = model.predict(text, k=2) \n", + " # predictions[0]: labels, predictions[1]: scores\n", + " # If the top predicted label contains \"hq\", return the first score; otherwise, use the second.\n", + " if \"hq\" in predictions[0][0]:\n", + " return predictions[1][0], 1\n", + " else:\n", + " return predictions[1][1], 0\n", + "\n", + " def _predict_on_partition(self, df: pd.DataFrame) -> pd.DataFrame:\n", + " \"\"\"\n", + " Apply predictions to a pandas DataFrame partition.\n", + "\n", + " Assumes the DataFrame has a \"text\" column.\n", + "\n", + " Args:\n", + " df (pd.DataFrame): Input DataFrame partition.\n", + "\n", + " Returns:\n", + " pd.DataFrame: DataFrame with added prediction columns.\n", + " \"\"\"\n", + " # Load the model on the worker.\n", + " model = load_object_on_worker(self.model_identifier, self._load_fasttext_model, {})\n", + " results = df[\"text\"].apply(self.predict_text)\n", + " df[self.pred_column] = results.apply(lambda x: x[0]).astype(np.float32)\n", + " if self.int_column is not None:\n", + " df[self.int_column] = results.apply(lambda x: x[1]).astype(np.int32)\n", + " return df\n", + "\n", + " def __call__(self, dataset: DocumentDataset) -> DocumentDataset:\n", + " \"\"\"\n", + " Apply the classifier to a distributed dataset.\n", + "\n", + " The dataset should have a \"text\" column. The classifier converts the dataset\n", + " to a pandas backend, applies predictions to each partition, and then converts the result\n", + " back to cudf.\n", + "\n", + " Args:\n", + " dataset: A distributed DataFrame (e.g., a Dask DataFrame) containing a \"text\" column.\n", + "\n", + " Returns:\n", + " DocumentDataset: The dataset with added prediction columns.\n", + " \"\"\"\n", + " meta = dataset.df._meta\n", + " if hasattr(meta, \"to_pandas\"):\n", + " meta = meta.to_pandas()\n", + " meta[self.pred_column] = np.float32(0.0)\n", + " if self.int_column is not None:\n", + " meta[self.int_column] = np.int32(0)\n", + "\n", + " processed_df = dataset.df.to_backend(\"pandas\").map_partitions(self._predict_on_partition, meta=meta)\n", + " processed_df = processed_df.to_backend(\"cudf\")\n", + " return DocumentDataset(processed_df)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "id": "8c7e28c3-8e25-417a-a1c7-7f5b237d18a0", + "metadata": {}, + "outputs": [], + "source": [ + "# Define classifier score mapping\n", + "classifier_scores = {\n", + " \"nemotron-score\": {\n", + " \"int_score\": \"fineweb-nemotron-edu-score-int\",\n", + " \"float_score\": \"fineweb-nemotron-edu-score\"\n", + " },\n", + " \"mixtral-score\": {\n", + " \"int_score\": \"fineweb-mixtral-edu-score-int\",\n", + " \"float_score\": \"fineweb-mixtral-edu-score\"\n", + " },\n", + " \"fasttext-score\": {\n", + " \"int_score\": \"fasttext-quality-score-int\",\n", + " \"float_score\": \"fasttext-quality-score\"\n", + " }\n", + "}\n", + "\n", + "\n", + "\n", + "# Initialize classifiers\n", + "classifiers = [\n", + " FineWebNemotronEduClassifier(batch_size=1024,\n", + " pred_column=classifier_scores[\"nemotron-score\"][\"float_score\"],\n", + " int_column=classifier_scores[\"nemotron-score\"][\"int_score\"]),\n", + " FineWebMixtralEduClassifier(batch_size=1024,\n", + " pred_column=classifier_scores[\"mixtral-score\"][\"float_score\"],\n", + " int_column=classifier_scores[\"mixtral-score\"][\"int_score\"]),\n", + " FastTextQualityClassifier(pred_column=classifier_scores[\"fasttext-score\"][\"float_score\"],\n", + " int_column=classifier_scores[\"fasttext-score\"][\"int_score\"])\n", + "]" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "id": "a672e5d8-bb1e-4fe4-bdd7-f9859a449158", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting FineWeb Nemotron-4 Edu Classifier inference\n", + "Starting FineWeb Mixtral Edu Classifier inference\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "GPU: tcp://127.0.0.1:33001, Part: 0: 100%|██████████| 10/10 [00:02<00:00, 4.22it/s]\n", + "GPU: tcp://127.0.0.1:33001, Part: 0: 100%|██████████| 10/10 [00:01<00:00, 7.40it/s]\n" + ] + } + ], + "source": [ + "output_dataset = input_dataset\n", + "for classifier in classifiers:\n", + " output_dataset = classifier(dataset=output_dataset)\n", + "\n", + "# Dropping int columns\n", + "# As we add new based on a threshold (in the following columns)\n", + "output_dataset = output_dataset.df.drop(columns=[v[\"int_score\"] for v in classifier_scores.values()])\n", + "output_dataset.to_parquet(path=OUTPUT_CLASSIFICATION_RESULTS)" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "id": "ec5cca63-ad01-4481-b910-8bcc735ece3a", + "metadata": {}, + "outputs": [], + "source": [ + "del classifiers, output_dataset, input_dataset" + ] + }, + { + "cell_type": "markdown", + "id": "1aa51ac8-373c-4318-9bf5-f063321cb3e0", + "metadata": {}, + "source": [ + "### Read Back in the scored Data Frame" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "id": "229d2466-9064-4e2e-957e-07e949d2ae1a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 1 files with blocksize='1gb' / files_per_partition=None\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
fasttext-quality-scorefineweb-mixtral-edu-scorefineweb-mixtral-edu-score-labelfineweb-nemotron-edu-scorefineweb-nemotron-edu-score-labeltext
00.9990111.347656low_quality1.391602low_qualityQuantum computing is set to revolutionize the ...
10.9962640.827637low_quality0.889160low_qualityInvesting in index funds is a popular strategy...
20.0000901.420898low_quality1.345703low_qualityRecent advancements in gene therapy offer new ...
30.0003771.572266low_quality1.727539low_qualityOnline learning platforms have transformed the...
40.9918680.345215low_quality0.248657low_qualityTraveling to Europe during the off-season can ...
\n", + "
" + ], + "text/plain": [ + " fasttext-quality-score fineweb-mixtral-edu-score \\\n", + "0 0.999011 1.347656 \n", + "1 0.996264 0.827637 \n", + "2 0.000090 1.420898 \n", + "3 0.000377 1.572266 \n", + "4 0.991868 0.345215 \n", + "\n", + " fineweb-mixtral-edu-score-label fineweb-nemotron-edu-score \\\n", + "0 low_quality 1.391602 \n", + "1 low_quality 0.889160 \n", + "2 low_quality 1.345703 \n", + "3 low_quality 1.727539 \n", + "4 low_quality 0.248657 \n", + "\n", + " fineweb-nemotron-edu-score-label \\\n", + "0 low_quality \n", + "1 low_quality \n", + "2 low_quality \n", + "3 low_quality \n", + "4 low_quality \n", + "\n", + " text \n", + "0 Quantum computing is set to revolutionize the ... \n", + "1 Investing in index funds is a popular strategy... \n", + "2 Recent advancements in gene therapy offer new ... \n", + "3 Online learning platforms have transformed the... \n", + "4 Traveling to Europe during the off-season can ... " + ] + }, + "execution_count": 10, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "scored_data = DocumentDataset.read_parquet(OUTPUT_CLASSIFICATION_RESULTS, backend=\"cudf\")\n", + "scored_data.df.head()" + ] + }, + { + "cell_type": "markdown", + "id": "e7ef568a-6c17-4b7c-b201-627f33df26fa", + "metadata": {}, + "source": [ + "# Step 2: Compute Score Thresholds\n", + "\n", + "### Why Compute Thresholds?\n", + "- To categorize classification scores into percentile-based bins.\n", + "- Ensures results are comparable across different classifiers.\n", + "\n", + "### Approach:\n", + "1. **Extract classifier scores** from the sampled dataset.\n", + "2. **Compute weighted percentiles** for each classifier.\n", + "3. **Save percentile thresholds** for later use in mapping scores.\n", + "\n", + "> **Note:** The percentile calculation is weighted by token count so that longer texts (with more tokens) have a greater impact on the thresholds. This ensures that the bins accurately reflect the distribution of content, giving a more meaningful categorization of the scores." + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "id": "26a34e8a-c893-454c-8f93-d09cd60d99ce", + "metadata": {}, + "outputs": [], + "source": [ + "def weighted_percentile(data, percentiles, weights):\n", + " \"\"\"\n", + " Compute weighted percentiles with the \"inverted_cdf\" method.\n", + "\n", + " Parameters:\n", + " data : array-like, the data values.\n", + " percentiles : scalar or array-like, percentiles in [0, 100].\n", + " weights : array-like, the weights for each data value.\n", + " \n", + " Returns:\n", + " The weighted percentile values.\n", + " \"\"\"\n", + " data = np.asarray(data)\n", + " weights = np.asarray(weights)\n", + " \n", + " # Sort data and associated weights\n", + " sorter = np.argsort(data)\n", + " data_sorted = data[sorter]\n", + " weights_sorted = weights[sorter]\n", + " \n", + " # Compute the cumulative sum of weights and normalize it to [0, 1]\n", + " cum_weights = np.cumsum(weights_sorted)\n", + " total_weight = cum_weights[-1]\n", + " normalized_cum_weights = cum_weights / total_weight\n", + "\n", + " # For each desired percentile, find the first data value where\n", + " # the normalized cumulative weight is >= (percentile / 100).\n", + " percentiles = np.atleast_1d(percentiles)\n", + " results = []\n", + " for p in percentiles:\n", + " # np.searchsorted returns the index where (p/100) should be inserted \n", + " # to maintain order.\n", + " idx = np.searchsorted(normalized_cum_weights, p / 100.0, side='left')\n", + " results.append(data_sorted[idx])\n", + " \n", + " return np.array(results)\n", + "\n", + "\n", + "def compute_thresholds(score_ar: np.ndarray, token_ar: np.ndarray) -> Dict[str, float]:\n", + " \"\"\"\n", + " Compute percentile-based thresholds for a given score column using weighted percentiles.\n", + "\n", + " Args:\n", + " score_ar (np.ndarray): Array containing the scores.\n", + " token_ar (np.ndarray): Array containing token counts for weighting.\n", + "\n", + " Returns:\n", + " Dict[str, float]: Dictionary containing percentile thresholds.\n", + " \"\"\"\n", + " percentiles = np.arange(5, 100, 5)\n", + " # NumPy < 2.0 does not support the \"inverted_cdf\" method for computing percentiles \n", + " # with weights directly via np.percentile (see commented-out equivalent code below).\n", + " # To achieve the same result, we manually implement the weighted percentile computation\n", + " # using NumPy primitives.\n", + " # thresholds = np.percentile(cc_df_score, percentiles, weights=cc_df_tokens, method='inverted_cdf')\n", + " thresholds = weighted_percentile(score_ar, percentiles, weights=token_ar)\n", + " return {int(percentile): float(thresh) for percentile, thresh in zip(percentiles, thresholds)}\n", + "\n", + "\n", + "def compute_thresholds_for_score_columns(\n", + " df: cudf.DataFrame, text_col_name: str, score_col_names: List[str]\n", + ") -> Dict[str, Dict[str, float]]:\n", + " \"\"\"\n", + " Compute percentile-based thresholds for all specified score columns in a DataFrame.\n", + "\n", + " Args:\n", + " df (cudf.DataFrame): The DataFrame containing the score columns and text column.\n", + " text_col_name (str): The name of the text column used to derive token counts.\n", + " score_col_names (List[str]): List of column names for which thresholds should be computed.\n", + "\n", + " Returns:\n", + " Dict[str, Dict[str, float]]: A dictionary mapping each score column to its percentile thresholds.\n", + " \"\"\"\n", + " threshold_dict = {}\n", + " token_series = df[text_col_name].str.byte_count()\n", + "\n", + " for score_col in score_col_names:\n", + " threshold_dict[score_col] = compute_thresholds(df[score_col].values.get(), token_series.values.get())\n", + "\n", + " return threshold_dict\n", + "\n", + "\n", + "def save_thresholds(threshold_dict: Dict[str, Dict[str, float]], file_name) -> None:\n", + " \"\"\"\n", + " Save computed thresholds to a JSON file.\n", + "\n", + " Args:\n", + " threshold_dict (Dict[str, Dict[str, float]]): The dictionary containing computed thresholds.\n", + " file_name (str, optional): The name of the output JSON file. Defaults to \"thresholds.json\".\n", + " Returns:\n", + " None\n", + " \"\"\"\n", + " with open(file_name, 'w') as fout:\n", + " json.dump(file_name, fout, indent=4)\n", + " print(f\"Thresholds saved to {file_name}\")" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "id": "c8b0a650-6290-4e60-b388-43950e1f7357", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Thresholds saved to output_data_dir/classifier_thresholds.json\n" + ] + } + ], + "source": [ + "# Adjust fraction based on how much can fit in a single GPU (1/2 ish)\n", + "gpu_memory_available = get_device_total_memory()/2\n", + "frac = max(1, scored_data.df.memory_usage(deep=True).sum().compute()/gpu_memory_available)\n", + "sampled_data = scored_data.df.sample(frac=frac).repartition(npartitions=1)\n", + "\n", + "score_col_names = [v[\"float_score\"] for v in classifier_scores.values()]\n", + "threshold_dict = sampled_data.map_partitions(compute_thresholds_for_score_columns, text_col_name=\"text\", score_col_names=score_col_names).compute().iloc[0]\n", + "save_thresholds(threshold_dict, OUTPUT_CLASSIFIER_THRESHOLDS)" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "id": "83696e60-be44-434f-acab-ef275253732a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "{'fineweb-nemotron-edu-score': {5: 0.2486572265625,\n", + " 10: 0.81884765625,\n", + " 15: 0.81884765625,\n", + " 20: 0.81884765625,\n", + " 25: 0.8427734375,\n", + " 30: 0.85400390625,\n", + " 35: 0.85400390625,\n", + " 40: 0.88916015625,\n", + " 45: 0.88916015625,\n", + " 50: 1.2880859375,\n", + " 55: 1.2880859375,\n", + " 60: 1.345703125,\n", + " 65: 1.345703125,\n", + " 70: 1.3916015625,\n", + " 75: 1.3916015625,\n", + " 80: 1.3994140625,\n", + " 85: 1.3994140625,\n", + " 90: 1.7275390625,\n", + " 95: 1.7275390625},\n", + " 'fineweb-mixtral-edu-score': {5: 0.34521484375,\n", + " 10: 0.7822265625,\n", + " 15: 0.7822265625,\n", + " 20: 0.82763671875,\n", + " 25: 0.82763671875,\n", + " 30: 0.9501953125,\n", + " 35: 0.9501953125,\n", + " 40: 1.0234375,\n", + " 45: 1.0234375,\n", + " 50: 1.34765625,\n", + " 55: 1.34765625,\n", + " 60: 1.4208984375,\n", + " 65: 1.4208984375,\n", + " 70: 1.42578125,\n", + " 75: 1.42578125,\n", + " 80: 1.572265625,\n", + " 85: 1.572265625,\n", + " 90: 1.783203125,\n", + " 95: 1.783203125},\n", + " 'fasttext-quality-score': {5: 9.026021871250123e-05,\n", + " 10: 9.026021871250123e-05,\n", + " 15: 0.00011704424832714722,\n", + " 20: 0.00011704424832714722,\n", + " 25: 0.00037683334085159004,\n", + " 30: 0.00037683334085159004,\n", + " 35: 0.0006898035062476993,\n", + " 40: 0.0006898035062476993,\n", + " 45: 0.9918678402900696,\n", + " 50: 0.9918678402900696,\n", + " 55: 0.9919403195381165,\n", + " 60: 0.9919403195381165,\n", + " 65: 0.9962636232376099,\n", + " 70: 0.9962636232376099,\n", + " 75: 0.9990114569664001,\n", + " 80: 0.9990114569664001,\n", + " 85: 0.9997979998588562,\n", + " 90: 0.9997979998588562,\n", + " 95: 0.9999129772186279}}" + ] + }, + "execution_count": 13, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "threshold_dict" + ] + }, + { + "cell_type": "markdown", + "id": "790a9c41-80ee-4885-8c7e-2b34b4e8117c", + "metadata": {}, + "source": [ + "# Step 3: Convert Floating-Point Scores to Integer Scores\n", + "\n", + "### Why Convert?\n", + "- Floating-point scores are mapped to integer categories (0-19) for easier comparison.\n", + "- Integer scores are computed using **percentile-based thresholds**.\n", + "\n", + "### Process:\n", + "1. **Retrieve percentile thresholds** from saved JSON.\n", + "2. **Apply the thresholds to map scores to integer bins**.\n", + "3. **Store integer scores in the dataset** for final ensemble computation." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "id": "4d94e75a-0a78-4554-bb39-087b009db1c3", + "metadata": {}, + "outputs": [], + "source": [ + "def map_scores(df, score_col_name: str, score_int_name: str, bins: List[float]):\n", + " \"\"\"\n", + " Given a DataFrame df and a column of original scores, \n", + " use cp.digitize to map them into integer bins using the given thresholds.\n", + " \"\"\"\n", + " pred_orig_score = cp.array(df[score_col_name])\n", + " pred_int_score = cp.digitize(pred_orig_score, bins)\n", + " df[score_int_name] = pred_int_score\n", + " return df\n", + "\n", + "def map_score_columns(df: cudf.DataFrame, score_col_names: List[str], threshold_dict: Dict[str, dict]):\n", + " \"\"\"\n", + " For each score column in score_col_names, this function:\n", + " 1. Creates a new column name by appending '-int'\n", + " 2. Retrieves the corresponding thresholds from threshold_dict,\n", + " sorts them (using the keys which are assumed to be strings of numbers),\n", + " 3. Passes the bins to map_scores to create the integer score column.\n", + " \"\"\"\n", + " for score_col_name in score_col_names:\n", + " # Build the new integer score column name.\n", + " score_int_name = score_col_name + \"-int\"\n", + " thresholds = threshold_dict.get(score_col_name)\n", + " if thresholds is None:\n", + " raise ValueError(f\"No thresholds found for score column '{score_col_name}'\")\n", + " \n", + " sorted_keys = sorted(thresholds.keys(), key=lambda x: int(x))\n", + " # Use cp.array to create a CuPy array from the list of threshold values.\n", + " bins = cp.array([thresholds[k] for k in sorted_keys])\n", + " \n", + " # Map the original score column to the new integer score column.\n", + " df = map_scores(df, score_col_name, score_int_name, bins)\n", + " return df\n" + ] + }, + { + "cell_type": "code", + "execution_count": 15, + "id": "70e6a00e-5e42-493a-9dcb-682df8eead0d", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
fasttext-quality-scorefineweb-mixtral-edu-scorefineweb-mixtral-edu-score-labelfineweb-nemotron-edu-scorefineweb-nemotron-edu-score-labeltextfineweb-nemotron-edu-score-intfineweb-mixtral-edu-score-intfasttext-quality-score-int
00.9990111.347656low_quality1.391602low_qualityQuantum computing is set to revolutionize the ...151116
10.9962640.827637low_quality0.889160low_qualityInvesting in index funds is a popular strategy...9514
20.0000901.420898low_quality1.345703low_qualityRecent advancements in gene therapy offer new ...13132
30.0003771.572266low_quality1.727539low_qualityOnline learning platforms have transformed the...19176
40.9918680.345215low_quality0.248657low_qualityTraveling to Europe during the off-season can ...1110
\n", + "
" + ], + "text/plain": [ + " fasttext-quality-score fineweb-mixtral-edu-score \\\n", + "0 0.999011 1.347656 \n", + "1 0.996264 0.827637 \n", + "2 0.000090 1.420898 \n", + "3 0.000377 1.572266 \n", + "4 0.991868 0.345215 \n", + "\n", + " fineweb-mixtral-edu-score-label fineweb-nemotron-edu-score \\\n", + "0 low_quality 1.391602 \n", + "1 low_quality 0.889160 \n", + "2 low_quality 1.345703 \n", + "3 low_quality 1.727539 \n", + "4 low_quality 0.248657 \n", + "\n", + " fineweb-nemotron-edu-score-label \\\n", + "0 low_quality \n", + "1 low_quality \n", + "2 low_quality \n", + "3 low_quality \n", + "4 low_quality \n", + "\n", + " text \\\n", + "0 Quantum computing is set to revolutionize the ... \n", + "1 Investing in index funds is a popular strategy... \n", + "2 Recent advancements in gene therapy offer new ... \n", + "3 Online learning platforms have transformed the... \n", + "4 Traveling to Europe during the off-season can ... \n", + "\n", + " fineweb-nemotron-edu-score-int fineweb-mixtral-edu-score-int \\\n", + "0 15 11 \n", + "1 9 5 \n", + "2 13 13 \n", + "3 19 17 \n", + "4 1 1 \n", + "\n", + " fasttext-quality-score-int \n", + "0 16 \n", + "1 14 \n", + "2 2 \n", + "3 6 \n", + "4 10 " + ] + }, + "execution_count": 15, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "scored_data.df = scored_data.df.map_partitions(map_score_columns, score_col_names, threshold_dict)\n", + "scored_data.head()" + ] + }, + { + "cell_type": "markdown", + "id": "9cf526dd-363f-4199-bb9d-2ea9b8897fae", + "metadata": {}, + "source": [ + "# Step 4: Compute the Final Ensembled Score\n", + "\n", + "### Purpose:\n", + "- To combine the predictions from multiple classifiers into a **single representative score**.\n", + "- The ensemble score is computed as the **maximum of all integer scores** across classifiers.\n", + "\n", + "### Approach:\n", + "1. **Extract integer scores from each classifier.**\n", + "2. **Compute the max integer score for each data point.**\n", + "3. **Store the final ensemble score in the dataset.**" + ] + }, + { + "cell_type": "code", + "execution_count": 16, + "id": "380f453d-4d6e-43cc-9d33-3d4d64b854d4", + "metadata": {}, + "outputs": [], + "source": [ + "int_column_names = [f'{v[\"float_score\"]}-int' for v in classifier_scores.values()]\n", + "scored_data.df['ensemble-max-int'] = scored_data.df[int_column_names].max(axis=1)" + ] + }, + { + "cell_type": "code", + "execution_count": 17, + "id": "469cafbe-f8d2-466d-9e80-2522c59a0a1a", + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
fasttext-quality-scorefineweb-mixtral-edu-scorefineweb-mixtral-edu-score-labelfineweb-nemotron-edu-scorefineweb-nemotron-edu-score-labeltextfineweb-nemotron-edu-score-intfineweb-mixtral-edu-score-intfasttext-quality-score-intensemble-max-int
00.9990111.347656low_quality1.391602low_qualityQuantum computing is set to revolutionize the ...15111616
10.9962640.827637low_quality0.889160low_qualityInvesting in index funds is a popular strategy...951414
20.0000901.420898low_quality1.345703low_qualityRecent advancements in gene therapy offer new ...1313213
30.0003771.572266low_quality1.727539low_qualityOnline learning platforms have transformed the...1917619
40.9918680.345215low_quality0.248657low_qualityTraveling to Europe during the off-season can ...111010
\n", + "
" + ], + "text/plain": [ + " fasttext-quality-score fineweb-mixtral-edu-score \\\n", + "0 0.999011 1.347656 \n", + "1 0.996264 0.827637 \n", + "2 0.000090 1.420898 \n", + "3 0.000377 1.572266 \n", + "4 0.991868 0.345215 \n", + "\n", + " fineweb-mixtral-edu-score-label fineweb-nemotron-edu-score \\\n", + "0 low_quality 1.391602 \n", + "1 low_quality 0.889160 \n", + "2 low_quality 1.345703 \n", + "3 low_quality 1.727539 \n", + "4 low_quality 0.248657 \n", + "\n", + " fineweb-nemotron-edu-score-label \\\n", + "0 low_quality \n", + "1 low_quality \n", + "2 low_quality \n", + "3 low_quality \n", + "4 low_quality \n", + "\n", + " text \\\n", + "0 Quantum computing is set to revolutionize the ... \n", + "1 Investing in index funds is a popular strategy... \n", + "2 Recent advancements in gene therapy offer new ... \n", + "3 Online learning platforms have transformed the... \n", + "4 Traveling to Europe during the off-season can ... \n", + "\n", + " fineweb-nemotron-edu-score-int fineweb-mixtral-edu-score-int \\\n", + "0 15 11 \n", + "1 9 5 \n", + "2 13 13 \n", + "3 19 17 \n", + "4 1 1 \n", + "\n", + " fasttext-quality-score-int ensemble-max-int \n", + "0 16 16 \n", + "1 14 14 \n", + "2 2 13 \n", + "3 6 19 \n", + "4 10 10 " + ] + }, + "execution_count": 17, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "scored_data.df.head()" + ] + }, + { + "cell_type": "markdown", + "id": "35ba68b8-8566-401a-882b-eb2ae0414138", + "metadata": {}, + "source": [ + "# Step 5: Write Results to Partitioned Buckets\n", + "\n", + "\n", + "### Purpose:\n", + "- Organize and store classified results in a **structured, partitioned format** to facilitate **annealing-based training** for downstream **LLM fine-tuning** and optimization." + ] + }, + { + "cell_type": "code", + "execution_count": 18, + "id": "5b6bfcc8-5fef-41df-9e04-c50c35538ff3", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Writing to disk complete for 1 partition(s)\n" + ] + } + ], + "source": [ + "scored_data.to_parquet(OUTPUT_BUCKETED_RESULTS, partition_on=\"ensemble-max-int\")" + ] + }, + { + "cell_type": "markdown", + "id": "8052be9b-6889-4254-bf21-ef1c8b41b82f", + "metadata": {}, + "source": [ + "# Verify Results\n", + "\n", + "### Process:\n", + "1. **List available partitions** (each corresponds to a score bucket).\n", + "2. **Read a sample partition** and validate data integrity." + ] + }, + { + "cell_type": "code", + "execution_count": 19, + "id": "67f0fc7b-eca6-4326-9a58-54d27daaf06a", + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "['ensemble-max-int=1', 'ensemble-max-int=10', 'ensemble-max-int=12', 'ensemble-max-int=13', 'ensemble-max-int=14', 'ensemble-max-int=16', 'ensemble-max-int=17', 'ensemble-max-int=18', 'ensemble-max-int=19', 'ensemble-max-int=3', 'ensemble-max-int=5', 'ensemble-max-int=7', 'ensemble-max-int=9']\n", + "Reading 1 files with blocksize='1gb' / files_per_partition=None\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + "\n", + "\n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + " \n", + "
ensemble-max-intfasttext-quality-scorefasttext-quality-score-intfineweb-mixtral-edu-scorefineweb-mixtral-edu-score-intfineweb-nemotron-edu-scorefineweb-nemotron-edu-score-inttext
010.13574210.13574210.1357421Traveling to Europe during the off-season can ...
\n", + "
" + ], + "text/plain": [ + " ensemble-max-int fasttext-quality-score fasttext-quality-score-int \\\n", + "0 1 0.135742 1 \n", + "\n", + " fineweb-mixtral-edu-score fineweb-mixtral-edu-score-int \\\n", + "0 0.135742 1 \n", + "\n", + " fineweb-nemotron-edu-score fineweb-nemotron-edu-score-int \\\n", + "0 0.135742 1 \n", + "\n", + " text \n", + "0 Traveling to Europe during the off-season can ... " + ] + }, + "execution_count": 19, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "all_buckets = sorted(os.listdir(OUTPUT_BUCKETED_RESULTS))\n", + "print(all_buckets)\n", + "first_bucket= DocumentDataset.read_parquet(os.path.join(OUTPUT_BUCKETED_RESULTS, all_buckets[0]))\n", + "first_bucket.head()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.16" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +}