Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue #43 (empty files creation) and improve reading/writing speed #57

Merged
merged 1 commit into from
May 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 17 additions & 6 deletions nemo_curator/utils/file_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,8 @@ def parse_str_of_num_bytes(s, return_str=False):
def _save_jsonl(documents, output_path, start_index=0, max_index=10000, prefix=None):
"""Worker function to write out the data to jsonl files"""

def _output_json(document):
myjson = json.dumps(document, ensure_ascii=False)
return myjson.encode("utf-8")
def _encode_text(document):
return document.strip().encode("utf-8")

def _name(start_index, npad, prefix, i):
tag = str(start_index + i).rjust(npad, "0")
Expand All @@ -195,11 +194,22 @@ def _name(start_index, npad, prefix, i):

output_glob_string = os.path.join(output_path, "*.jsonl")

documents.map(_output_json).to_textfiles(
output_files = documents.map(_encode_text).to_textfiles(
output_glob_string,
name_function=name,
)

# Delete empty files generated due to empty partitions in the bag
for output_file in output_files:
try:
if os.path.getsize(output_file) == 0:
os.remove(output_file)
except Exception as exception:
print(
f"An exception occurred when trying to delete {output_file}.\n{exception}",
flush=True,
)


def reshard_jsonl(
input_dir, output_dir, output_file_size="100M", start_index=0, file_prefix=""
Expand All @@ -212,7 +222,8 @@ def reshard_jsonl(
output_dir: The output directory where the resharded jsonl files will be written
output_file_size: Approximate size of output files. Must specify with a string and
with the unit K, M or G for kilo, mega or gigabytes
start_index: Starting index for naming the output files
start_index: Starting index for naming the output files. Note: The indices may not
be continuous if the sharding process would output an empty file in its place
file_prefix: Prefix to use to prepend to output file number
"""

Expand All @@ -222,7 +233,7 @@ def reshard_jsonl(
input_files = list(get_all_files_paths_under(input_dir))

# Read in the dask bag
b = db.read_text(input_files, blocksize=blocksize).map(json.loads)
b = db.read_text(input_files, blocksize=blocksize)

# Prepare the output
output_dir = expand_outdir_and_mkdir(output_dir)
Expand Down