diff --git a/nemo_curator/utils/file_utils.py b/nemo_curator/utils/file_utils.py index af3c2513d..3ec466b4c 100644 --- a/nemo_curator/utils/file_utils.py +++ b/nemo_curator/utils/file_utils.py @@ -181,9 +181,8 @@ def parse_str_of_num_bytes(s, return_str=False): def _save_jsonl(documents, output_path, start_index=0, max_index=10000, prefix=None): """Worker function to write out the data to jsonl files""" - def _output_json(document): - myjson = json.dumps(document, ensure_ascii=False) - return myjson.encode("utf-8") + def _encode_text(document): + return document.strip().encode("utf-8") def _name(start_index, npad, prefix, i): tag = str(start_index + i).rjust(npad, "0") @@ -195,11 +194,22 @@ def _name(start_index, npad, prefix, i): output_glob_string = os.path.join(output_path, "*.jsonl") - documents.map(_output_json).to_textfiles( + output_files = documents.map(_encode_text).to_textfiles( output_glob_string, name_function=name, ) + # Delete empty files generated due to empty partitions in the bag + for output_file in output_files: + try: + if os.path.getsize(output_file) == 0: + os.remove(output_file) + except Exception as exception: + print( + f"An exception occurred when trying to delete {output_file}.\n{exception}", + flush=True, + ) + def reshard_jsonl( input_dir, output_dir, output_file_size="100M", start_index=0, file_prefix="" @@ -212,7 +222,8 @@ def reshard_jsonl( output_dir: The output directory where the resharded jsonl files will be written output_file_size: Approximate size of output files. Must specify with a string and with the unit K, M or G for kilo, mega or gigabytes - start_index: Starting index for naming the output files + start_index: Starting index for naming the output files. Note: The indices may not + be continuous if the sharding process would output an empty file in its place file_prefix: Prefix to use to prepend to output file number """ @@ -222,7 +233,7 @@ def reshard_jsonl( input_files = list(get_all_files_paths_under(input_dir)) # Read in the dask bag - b = db.read_text(input_files, blocksize=blocksize).map(json.loads) + b = db.read_text(input_files, blocksize=blocksize) # Prepare the output output_dir = expand_outdir_and_mkdir(output_dir)