Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Make add_filename str/bool #465

Merged
6 changes: 4 additions & 2 deletions docs/user-guide/documentdataset.rst
Original file line number Diff line number Diff line change
Expand Up @@ -68,14 +68,16 @@ Let's walk through this code line by line.
"books_dataset/books_02.jsonl"]

* ``books = DocumentDataset.read_json(files, add_filename=True)`` This will read the files listed into memory.
The ``add_filename=True`` option preserves the name of the shard (``books_00.jsonl``, ``books_01.jsonl``, etc.) as an additional ``filename`` field.
When the dataset is written back to disk, this option (in conjunction with the ``write_to_filename`` option) ensure that documents stay in their original shard.
The ``add_filename=True`` option preserves the name of the shard (``books_00.jsonl``, ``books_01.jsonl``, etc.) as an additional ``file_name`` field.
When the dataset is written back to disk, this option (in conjunction with the ``write_to_filename`` option and ``filename_col`` ) ensure that documents stay in their original shard.
This can be useful for manually inspecting the results of filtering shard by shard.
The ``add_filename`` option can also be used as a string, in which case it will be used as the name of the column (instead of the default ``file_name``).
* ``filter_step = ...`` This constructs and applies a heuristic filter for the length of the document.
More information is provided in the filtering page of the documentation.
* ``long_books.to_json("long_books/", write_to_filename=True)`` This writes the filtered dataset to a new directory.
As mentioned above, the ``write_to_filename=True`` preserves the sharding of the dataset.
If the dataset was not read in with ``add_filename=True``, setting ``write_to_filename=True`` will throw an error.
If the dataset was read with ``add_filename="path"`` then along with ``write_to_filename=True`` the ``filename_col="path"`` will need to be set as well.

``DocumentDataset`` is just a wrapper around a `Dask dataframe <https://docs.dask.org/en/stable/dataframe.html>`_.
The underlying dataframe can be accessed with the ``DocumentDataset.df`` member variable.
Expand Down
20 changes: 14 additions & 6 deletions nemo_curator/datasets/doc_dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ def read_json(
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename: bool = False,
add_filename: Union[bool, str] = False,
input_meta: Union[str, dict] = None,
columns: Optional[List[str]] = None,
**kwargs,
Expand All @@ -64,7 +64,9 @@ def read_json(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "file_name" column to the DataFrame.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.
columns: If not None, only these columns will be read from the file.
Expand All @@ -91,7 +93,7 @@ def read_parquet(
backend: Literal["pandas", "cudf"] = "pandas",
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = "1gb",
add_filename=False,
add_filename: Union[bool, str] = False,
columns: Optional[List[str]] = None,
**kwargs,
) -> "DocumentDataset":
Expand All @@ -102,7 +104,9 @@ def read_parquet(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "file_name" column to the DataFrame.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
columns: If not None, only these columns will be read from the file.
There is a significant performance gain when specifying columns for Parquet files.

Expand Down Expand Up @@ -135,7 +139,9 @@ def read_pickle(
input_files: The path of the input file(s).
backend: The backend to use for reading the data.
files_per_partition: The number of files to read per partition.
add_filename: Whether to add a "file_name" column to the DataFrame.
add_filename: Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.
columns: If not None, only these columns will be read from the file.

"""
Expand All @@ -154,6 +160,7 @@ def to_json(
output_path: str,
write_to_filename: bool = False,
keep_filename_column: bool = False,
filename_col: str = "file_name",
):
"""
See nemo_curator.utils.distributed_utils.write_to_disk docstring for parameters.
Expand All @@ -165,6 +172,7 @@ def to_json(
write_to_filename=write_to_filename,
keep_filename_column=keep_filename_column,
output_type="jsonl",
filename_col=filename_col,
)

def to_parquet(
Expand Down Expand Up @@ -234,7 +242,7 @@ def _read_json_or_parquet(
input_files: Union[str, List[str]],
file_type: str,
backend: Literal["cudf", "pandas"],
add_filename: bool,
add_filename: Union[bool, str] = False,
files_per_partition: Optional[int] = None,
blocksize: Optional[str] = None,
input_meta: Union[str, dict] = None,
Expand Down
17 changes: 11 additions & 6 deletions nemo_curator/datasets/parallel_dataset.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import csv
from typing import List, Optional, Tuple, Union
from typing import List, Tuple, Union

import dask.dataframe as dd
import pandas as pd

from nemo_curator.datasets.doc_dataset import DocumentDataset
from nemo_curator.utils.distributed_utils import write_to_disk
from nemo_curator.utils.distributed_utils import _resolve_filename_col, write_to_disk
from nemo_curator.utils.file_utils import remove_path_extension
from nemo_curator.utils.import_utils import gpu_only_import

Expand All @@ -31,7 +31,7 @@ def read_simple_bitext(
src_lang: str,
tgt_lang: str,
backend: str = "pandas",
add_filename: bool = False,
add_filename: Union[bool, str] = False,
npartitions: int = 16,
):
"""See `read_single_simple_bitext_file_pair` docstring for what "simple_bitext" means and usage of other parameters.
Expand Down Expand Up @@ -99,7 +99,7 @@ def read_single_simple_bitext_file_pair(
tgt_lang: str,
doc_id: str = None,
backend: str = "cudf",
add_filename: bool = False,
add_filename: Union[bool, str] = False,
) -> Union[dd.DataFrame, "dask_cudf.DataFrame"]:
"""This function reads a pair of "simple bitext" files into a pandas DataFrame.
A simple bitext is a commonly data format in machine translation.
Expand Down Expand Up @@ -129,7 +129,10 @@ def read_single_simple_bitext_file_pair(
tgt_lang (str): Target language, in ISO-639-1 (two character) format (e.g. 'en')
doc_id (str, optional): A string document id to assign to every segment in the file. Defaults to None.
backend (str, optional): Backend of the data frame. Defaults to "cudf".
add_filename (bool, optional): Add "file_name" as an extra field to every segment in the file. Defaults to False.
add_filename (Union[bool, str]): Whether to add a filename column to the DataFrame.
If True, a new column is added to the DataFrame called `file_name`.
If str, sets new column name. Default is False.


Returns:
Union[dd.DataFrame, dask_cudf.DataFrame]
Expand Down Expand Up @@ -162,6 +165,8 @@ def read_single_simple_bitext_file_pair(
df_combined["tgt_lang"] = tgt_lang

if add_filename:
df_combined["file_name"] = remove_path_extension(src_input_file)
df_combined[_resolve_filename_col(add_filename)] = remove_path_extension(
src_input_file
)

return df_combined
1 change: 1 addition & 0 deletions nemo_curator/download/arxiv.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def download_arxiv(
output_type=output_type,
keep_raw_download=keep_raw_download,
force_download=force_download,
filename_col="file_name",
)

return dataset
1 change: 1 addition & 0 deletions nemo_curator/download/commoncrawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -442,6 +442,7 @@ def download_common_crawl(
output_type=output_type,
keep_raw_download=keep_raw_download,
force_download=force_download,
filename_col="file_name",
)

return dataset
16 changes: 12 additions & 4 deletions nemo_curator/download/doc_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,16 @@ def _download_and_extract_single_partition(
keep_raw_download: bool,
force_download: bool,
input_meta: Union[str, dict] = None,
filename_col: str = "file_name",
) -> pd.DataFrame:
url, output_path = paths

if os.path.exists(output_path) and not force_download:
partition = read_single_partition(
[output_path], backend="pandas", filetype=output_type, add_filename=True
[output_path],
backend="pandas",
filetype=output_type,
add_filename=filename_col,
)
return partition

Expand All @@ -141,8 +145,10 @@ def _download_and_extract_single_partition(
partition = pd.DataFrame(records)
filename = os.path.basename(output_path)
output_dir = os.path.dirname(output_path)
partition["file_name"] = filename
single_partition_write_with_filename(partition, output_dir, output_type=output_type)
partition[filename_col] = filename
single_partition_write_with_filename(
partition, output_dir, output_type=output_type, filename_col=filename_col
)
if not keep_raw_download:
os.remove(downloaded_file)

Expand All @@ -160,6 +166,7 @@ def download_and_extract(
keep_raw_download=False,
force_download=False,
input_meta: Union[str, dict] = None,
filename_col: str = "file_name",
) -> DocumentDataset:
"""
Downloads and extracts a dataset into a format accepted by the NeMo Curator
Expand All @@ -178,7 +185,7 @@ def download_and_extract(
directly read from them instead.
input_meta: A dictionary or a string formatted as a dictionary, which outlines
the field names and their respective data types within the JSONL input file.

filename_col : The name of the column that contains the filename. Default is "filename_col"
Returns:
A DocumentDataset of the downloaded data
"""
Expand All @@ -202,6 +209,7 @@ def download_and_extract(
force_download=force_download,
enforce_metadata=False,
input_meta=input_meta,
filename_col=filename_col,
meta=output_format,
)

Expand Down
1 change: 1 addition & 0 deletions nemo_curator/download/wikipedia.py
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,7 @@ def download_wikipedia(
output_type=output_type,
keep_raw_download=keep_raw_download,
force_download=force_download,
filename_col="file_name",
)

return dataset
18 changes: 11 additions & 7 deletions nemo_curator/modules/dataset_ops.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import math
from typing import Any, Callable, List, Optional
from typing import Callable, List, Optional

import dask.dataframe as dd
import numpy as np
Expand All @@ -17,9 +17,10 @@ def __init__(
seed: Optional[int] = None,
npartitions: Optional[int] = None,
partition_to_filename: Callable[[int], str] = default_filename,
filename_col: str = "file_name",
) -> None:
"""
Randomly permutes the dataset. This will make the original "file_name" column invalid, so if the column is present it will be overwritten.
Randomly permutes the dataset. This will make the original filename_col column invalid, so if the column is present it will be overwritten.
Args:
seed: The random seed that will be used to determine which partition (file) each datapoint goes to.
Setting the seed will guarantee determinism, but may be slightly slower (20-30% slower)
Expand All @@ -35,6 +36,7 @@ def __init__(
self.npartitions = npartitions
self.partition_to_filename = partition_to_filename
self.rand_col = "_shuffle_rand"
self.filename_col = filename_col

def __call__(self, dataset: DocumentDataset) -> DocumentDataset:
if self.seed is None:
Expand All @@ -52,8 +54,10 @@ def shuffle_deterministic(self, dataset: DocumentDataset) -> DocumentDataset:
shuffled_df = dataset.df.set_index(self.rand_col, npartitions=new_npartitions)
shuffled_df = shuffled_df.reset_index(drop=True)

if "file_name" in shuffled_df:
shuffled_df["file_name"] = shuffled_df.map_partitions(self._add_filename)
if self.filename_col in shuffled_df:
shuffled_df[self.filename_col] = shuffled_df.map_partitions(
self._add_filename
)

return DocumentDataset(shuffled_df)

Expand Down Expand Up @@ -98,15 +102,15 @@ def _partition_shuffle(self, partition, partition_info=None):
drop=True
)

if "file_name" in partition:
if self.filename_col in partition:
filename = self.partition_to_filename(partition_num)
partition["file_name"] = filename
partition[self.filename_col] = filename

return partition

def _add_filename(self, partition, partition_info=None):
if partition_info is None:
return ["file_name"] * len(partition)
return [self.filename_col] * len(partition)

filename = self.partition_to_filename(partition_info["number"])

Expand Down
Loading
Loading