-
Notifications
You must be signed in to change notification settings - Fork 92
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add dask query-planning support #139
Conversation
Signed-off-by: rjzamora <[email protected]>
Signed-off-by: rjzamora <[email protected]>
Signed-off-by: rjzamora <[email protected]>
@@ -36,6 +34,16 @@ def _split_part(part, nsplits): | |||
|
|||
|
|||
def text_bytes_aware_merge(text_df, right_df, broadcast=True, *, on): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this function currently used anywhere?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see any usage of this function. cc: @VibhuJawa in case you recall any background and if this is needed or not
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Dont think this is needed anymore.
We previously added it while trying to be more conservative IIRC
Signed-off-by: rjzamora <[email protected]>
Signed-off-by: rjzamora <[email protected]>
GPU tests pass locally for me - Let me know if others run into problems. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for working on this
I would like to hold off on merging until @ryantwolf has a chance to test a fuzzy dedup pipeline run with this pr & we try some larger scale runs and verify correctness. |
@ayushdg - Just a heads up that rapidsai/cudf#16684 implements a fast/experimental S3 code path for |
Thanks for raising. I'll followup on testing this at scale soon (ish). I still haven't had the time to do so. Will keep you in the loop on those results. |
I ran into below error: Running jaccard shuffle script
Args = Namespace(device='gpu', files_per_partition=2, n_workers=224, num_files=None, nvlink_only=False, protocol='tcp', rmm_pool_size=None, scheduler_address=None, scheduler_file='/home/vjawa/curator-2024_09_14-1198148/logs/scheduler.json', threads_per_worker=1, input_data_dirs=['/data/redpajama-sharded'], input_json_text_field='text', input_json_id_field='adlr_id', log_dir='./logs/', profile_path=None, input_meta='{"adlr_id":"str", "text":"str"}', output_dir='/outputdir/redpajama/long_strings', text_ddf_blocksize=1900, bucket_mapping_ddf_blocksize=1024, bucket_parts_per_worker=8, input_bucket_mapping_dir='/outputdir/redpajama/long_strings/anchor_docs_with_bk.parquet', parts_per_worker=1, set_torch_to_use_rmm=False)
Number of files being read for jaccard calculation = 50958
Graph creation for get_text_ddf_from_json_path_with_blocksize complete.
text_ddf.npartitions = 2483
Started processing bucket-map partitions 0 through 44 of 44
Using 16 text partitions.
Starting text bytes aware shuffle
0%| | 0/1 [00:00<?, ?it/s]
Traceback (most recent call last):
File "/opt/conda/bin/jaccard_shuffle", line 8, in <module>
sys.exit(console_script())
File "/opt/conda/lib/python3.10/site-packages/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py", line 118, in console_script
main(attach_args().parse_args())
File "/opt/conda/lib/python3.10/site-packages/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py", line 64, in main
shuffle.shuffle_docs_on_buckets(
File "/opt/conda/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 995, in shuffle_docs_on_buckets
self._batched_merge_and_write(
File "/opt/conda/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1120, in _batched_merge_and_write
output_df = text_bytes_aware_shuffle(
File "/opt/conda/lib/python3.10/site-packages/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py", line 171, in text_bytes_aware_shuffle
df = df.persist()
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_collection.py", line 446, in persist
out = self.optimize(fuse=fuse)
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_collection.py", line 590, in optimize
return new_collection(self.expr.optimize(fuse=fuse))
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 94, in optimize
return optimize(self, **kwargs)
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 3063, in optimize
return optimize_until(expr, stage)
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 3014, in optimize_until
expr = result.simplify()
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_core.py", line 374, in simplify
new = expr.simplify_once(dependents=dependents, simplified={})
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_core.py", line 352, in simplify_once
new = operand.simplify_once(
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_core.py", line 335, in simplify_once
out = child._simplify_up(expr, dependents)
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_shuffle.py", line 125, in _simplify_up
new_projection = [
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_shuffle.py", line 128, in <listcomp>
if (col in partitioning_index or col in projection)
File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 120, in __bool__
raise ValueError(
ValueError: The truth value of a EQ is ambiguous. Use a.any() or a.all().
Time Check: Sat Sep 14 23:19:48 PDT 2024
As i think it is coming from Dont think we want to do it as part of this PR though. |
@rjzamora , I tried removing the logic but i am still running into this issue. Will try to get a MRE out but let me know if you have thoughts . FWIW, This is the command i was running.
|
@@ -155,7 +155,6 @@ def from_pandas( | |||
npartitions=npartitions, | |||
chunksize=chunksize, | |||
sort=sort, | |||
name=name, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
My understanding is that the name
argument doesn't work for 24.08 (the corresponding dask-expr version has a small bug). Although name
is supported for later dask-expr versions, I'm not sure how important it is for curator to support this argument? (I don't see it used internally anywhere, but could be wrong)
Signed-off-by: rjzamora <[email protected]>
c3d4353
to
8ba6e7a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Keeping merge blocked till will we fix the issue.
End to End Commands to run on #!/bin/bash
# End-to-End Fuzzy Deduplication Script with Timing
# This script performs a complete fuzzy deduplication process on specified datasets
# Steps:
# • Set up environment variables and directories
# • Verify query planning
# • Compute minhashes
# • Create minhash buckets
# • Map Jaccard buckets
# • Perform Jaccard shuffle
export DASK_DATAFRAME__QUERY_PLANNING=True
export LOGDIR="/raid/vjawa/debug_query_planning_logs/"
datasets="cleaned_exact_dedup_all_cc"
output_dir="/raid/vjawa/debug_query_planning"
input_data_dirs="/raid/prospector-lm"
for dataset in $datasets; do
input_data_dirs="$input_data_dirs/$dataset/"
done
input_minhash_dirs="$output_dir/$datasets/minhashes.parquet"
files_per_partition=20
buckets_per_shuffle=3
log_time() {
echo "[$(date +"%Y-%m-%d %H:%M:%S")] $1"
}
log_time "Setting up log directory..."
mkdir -p "$LOGDIR"
log_time "Log directory is set to: $LOGDIR"
mkdir -p $output_dir
log_time "Input data directories: $input_data_dirs"
log_time "Input minhash directories: $input_minhash_dirs"
log_time "Verifying query planning..."
python3 verify_query_planning.py
log_time "Query planning verification completed."
log_time "Computing minhashes..."
gpu_compute_minhashes \
--input-data-dirs "$input_data_dirs" \
--minhash-length 260 \
--char-ngram 5 \
--hash-bytes 4 \
--seed 42 \
--output-minhash-dir "$output_dir" \
--log-dir $LOGDIR \
--files-per-partition $files_per_partition \
--profile-path "$PROFILESDIR" \
--input-json-id-field "adlr_id" \
--input-meta '{"adlr_id":"str", "text":"str"}'
log_time "Minhash computation completed."
log_time "Creating minhash buckets..."
minhash_buckets \
--input-data-dirs "$input_minhash_dirs" \
--minhash-length 260 \
--output-bucket-dir "$output_dir" \
--log-dir $LOGDIR \
--num-bands 20 \
--input-json-id-field "adlr_id" \
--buckets-per-shuffle=$buckets_per_shuffle
log_time "Minhash buckets creation completed."
log_time "Mapping Jaccard buckets..."
jaccard_map_buckets \
--input-bucket-dir "$output_dir/_buckets.parquet" \
--input-data-dirs "$input_data_dirs" \
--output-dir "$output_dir" \
--text-ddf-blocksize 1900 \
--input-json-id-field "adlr_id" \
--input-meta '{"adlr_id":"str", "text":"str"}'
log_time "Jaccard bucket mapping completed."
log_time "Starting Jaccard Shuffle..."
jaccard_shuffle \
--input-bucket-mapping-dir "$output_dir/anchor_docs_with_bk.parquet" \
--input-data-dirs "$input_data_dirs" \
--output-dir "$output_dir" \
--text-ddf-blocksize 1900 \
--bucket-mapping-ddf-blocksize 1024 \
--parts-per-worker 1 \
--input-json-id-field "adlr_id"
log_time "Jaccard Shuffle completed."
log_time "Starting GPU connected components..."
gpu_connected_component \
--jaccard-pairs-path $output_dir/jaccard_similarity_results.parquet \
--output-dir $output_dir \
--cache-dir $output_dir \
--log-dir $LOGDIR \
--input-json-id-field "adlr_id"
log_time "GPU connected components completed."
log_time "Fuzzy deduplication process completed." Python Command to verify query planning works: import sys
import dask.dataframe as dd
import dask.config
def verify_query_planning():
# Get the value of dataframe.query-planning
query_planning = dask.config.get("dataframe.query-planning")
print(f"Query Planning is set to: {query_planning}")
# Check if DASK_EXPR_ENABLED is set
DASK_EXPR_ENABLED = dd._dask_expr_enabled()
print(f"DASK_EXPR_ENABLED is set to: {DASK_EXPR_ENABLED}")
# Check if 'dask_expr' is in sys.modules
dask_expr_loaded = "dask_expr" in sys.modules
print(f"'dask_expr' in sys.modules: {dask_expr_loaded}")
# Assertions
assert query_planning is True, "Query Planning is not set to True"
assert DASK_EXPR_ENABLED, "DASK_EXPR is not enabled"
assert dask_expr_loaded, "'dask_expr' is not loaded in sys.modules"
if __name__ == "__main__":
verify_query_planning() Todo (Verify MRE): echo "Starting Jaccard Shuffle..."
jaccard_shuffle \
--input-bucket-mapping-dir "$output_dir/anchor_docs_with_bk.parquet" \
--input-data-dirs "$input_data_dirs" \
--output-dir "$output_dir" \
--text-ddf-blocksize 1900 \
--bucket-mapping-ddf-blocksize 1024 \
--parts-per-worker 1 \
--input-json-id-field "adlr_id"
|
Thanks @VibhuJawa ! I will try to reproduce. Just a note: assert query_planning is True, "Query Planning is not set to True" This assertion doesn't need to succeed for query-planning to be "active". |
Signed-off-by: rjzamora <[email protected]>
Thanks again for testing this @VibhuJawa ! I believe I've fixed the bug you are seeing in 51e54ff |
@ayushdg and I just completed correctness. Thanks for helping us through this @rjzamora We might have some runtime perf things to look at , will sync with @praateekmahajan to get some profiles etc out to you |
Sounds good! I'll be happy to tackle any performance regressions. |
We should be be good to merge this in, if we control for all variables we get all most exactly the same time.
FALSE:
With query planning we can tune Lets get it in , thanks @rjzamora for all the work here. Very impressive. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@VibhuJawa can you run the NeMo CI on these changes? Want to ensure nothing in the rest of curator would break for any reason. |
CI passed. I'm fine merging. |
Aaand its in. Thanks again @rjzamora for pushing this through. |
* start adding dask-expr support Signed-off-by: rjzamora <[email protected]> * add query_planning_enabled util Signed-off-by: rjzamora <[email protected]> * add global keyword Signed-off-by: rjzamora <[email protected]> * Forgot to remove top level query-planning check Signed-off-by: rjzamora <[email protected]> * fix other shuffle-arg problems that don't 'work' with dask-expr Signed-off-by: rjzamora <[email protected]> * remove name arg usage for now Signed-off-by: rjzamora <[email protected]> * fix bugs Signed-off-by: rjzamora <[email protected]> --------- Signed-off-by: rjzamora <[email protected]>
* start adding dask-expr support Signed-off-by: rjzamora <[email protected]> * add query_planning_enabled util Signed-off-by: rjzamora <[email protected]> * add global keyword Signed-off-by: rjzamora <[email protected]> * Forgot to remove top level query-planning check Signed-off-by: rjzamora <[email protected]> * fix other shuffle-arg problems that don't 'work' with dask-expr Signed-off-by: rjzamora <[email protected]> * remove name arg usage for now Signed-off-by: rjzamora <[email protected]> * fix bugs Signed-off-by: rjzamora <[email protected]> --------- Signed-off-by: rjzamora <[email protected]> Signed-off-by: Yang Yu <[email protected]>
* start adding dask-expr support Signed-off-by: rjzamora <[email protected]> * add query_planning_enabled util Signed-off-by: rjzamora <[email protected]> * add global keyword Signed-off-by: rjzamora <[email protected]> * Forgot to remove top level query-planning check Signed-off-by: rjzamora <[email protected]> * fix other shuffle-arg problems that don't 'work' with dask-expr Signed-off-by: rjzamora <[email protected]> * remove name arg usage for now Signed-off-by: rjzamora <[email protected]> * fix bugs Signed-off-by: rjzamora <[email protected]> --------- Signed-off-by: rjzamora <[email protected]>
* start adding dask-expr support Signed-off-by: rjzamora <[email protected]> * add query_planning_enabled util Signed-off-by: rjzamora <[email protected]> * add global keyword Signed-off-by: rjzamora <[email protected]> * Forgot to remove top level query-planning check Signed-off-by: rjzamora <[email protected]> * fix other shuffle-arg problems that don't 'work' with dask-expr Signed-off-by: rjzamora <[email protected]> * remove name arg usage for now Signed-off-by: rjzamora <[email protected]> * fix bugs Signed-off-by: rjzamora <[email protected]> --------- Signed-off-by: rjzamora <[email protected]> Signed-off-by: Vinay Raman <[email protected]>
* start adding dask-expr support Signed-off-by: rjzamora <[email protected]> * add query_planning_enabled util Signed-off-by: rjzamora <[email protected]> * add global keyword Signed-off-by: rjzamora <[email protected]> * Forgot to remove top level query-planning check Signed-off-by: rjzamora <[email protected]> * fix other shuffle-arg problems that don't 'work' with dask-expr Signed-off-by: rjzamora <[email protected]> * remove name arg usage for now Signed-off-by: rjzamora <[email protected]> * fix bugs Signed-off-by: rjzamora <[email protected]> --------- Signed-off-by: rjzamora <[email protected]> Signed-off-by: Vinay Raman <[email protected]>
This PR adds support for query-planning in dask.
Checklist