Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add dask query-planning support #139

Merged
merged 9 commits into from
Sep 27, 2024
Merged

Conversation

rjzamora
Copy link
Contributor

@rjzamora rjzamora commented Jul 5, 2024

This PR adds support for query-planning in dask.

Checklist

  • I am familiar with the Contributing Guide.
  • New or Existing tests cover these changes.
  • The documentation is up to date with these changes.

@@ -36,6 +34,16 @@ def _split_part(part, nsplits):


def text_bytes_aware_merge(text_df, right_df, broadcast=True, *, on):
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this function currently used anywhere?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't see any usage of this function. cc: @VibhuJawa in case you recall any background and if this is needed or not

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dont think this is needed anymore.

We previously added it while trying to be more conservative IIRC

https://github.com/rapidsai/rapids-deduplication/pull/68

@rjzamora rjzamora changed the title [WIP] Add dask query-planning support Add dask query-planning support Jul 8, 2024
@rjzamora rjzamora marked this pull request as ready for review July 8, 2024 15:50
@rjzamora
Copy link
Contributor Author

rjzamora commented Jul 8, 2024

GPU tests pass locally for me - Let me know if others run into problems.

@ryantwolf ryantwolf requested review from VibhuJawa and ayushdg July 23, 2024 17:01
Copy link
Collaborator

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks for working on this

@ayushdg
Copy link
Collaborator

ayushdg commented Jul 30, 2024

I would like to hold off on merging until @ryantwolf has a chance to test a fuzzy dedup pipeline run with this pr & we try some larger scale runs and verify correctness.

@rjzamora
Copy link
Contributor Author

rjzamora commented Sep 3, 2024

@ayushdg - Just a heads up that rapidsai/cudf#16684 implements a fast/experimental S3 code path for read_parquet in dask-cudf. However, that feature does require that query-planning be turned on. It is also worth noting that the legacy API will probably raise a deprecation warning in the near future.

@ayushdg
Copy link
Collaborator

ayushdg commented Sep 3, 2024

@ayushdg - Just a heads up that rapidsai/cudf#16684 implements a fast/experimental S3 code path for read_parquet in dask-cudf. However, that feature does require that query-planning be turned on. It is also worth noting that the legacy API will probably raise a deprecation warning in the near future.

Thanks for raising. I'll followup on testing this at scale soon (ish). I still haven't had the time to do so. Will keep you in the loop on those results.

@VibhuJawa
Copy link
Collaborator

I ran into below error:

Running jaccard shuffle script
Args = Namespace(device='gpu', files_per_partition=2, n_workers=224, num_files=None, nvlink_only=False, protocol='tcp', rmm_pool_size=None, scheduler_address=None, scheduler_file='/home/vjawa/curator-2024_09_14-1198148/logs/scheduler.json', threads_per_worker=1, input_data_dirs=['/data/redpajama-sharded'], input_json_text_field='text', input_json_id_field='adlr_id', log_dir='./logs/', profile_path=None, input_meta='{"adlr_id":"str", "text":"str"}', output_dir='/outputdir/redpajama/long_strings', text_ddf_blocksize=1900, bucket_mapping_ddf_blocksize=1024, bucket_parts_per_worker=8, input_bucket_mapping_dir='/outputdir/redpajama/long_strings/anchor_docs_with_bk.parquet', parts_per_worker=1, set_torch_to_use_rmm=False)
Number of files being read for jaccard calculation = 50958
Graph creation for get_text_ddf_from_json_path_with_blocksize complete.
text_ddf.npartitions  = 2483

Started processing bucket-map partitions 0 through 44 of 44
Using 16 text partitions.
Starting text bytes aware shuffle
  0%|          | 0/1 [00:00<?, ?it/s]
Traceback (most recent call last):
  File "/opt/conda/bin/jaccard_shuffle", line 8, in <module>
    sys.exit(console_script())
  File "/opt/conda/lib/python3.10/site-packages/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py", line 118, in console_script
    main(attach_args().parse_args())
  File "/opt/conda/lib/python3.10/site-packages/nemo_curator/scripts/fuzzy_deduplication/jaccard_shuffle.py", line 64, in main
    shuffle.shuffle_docs_on_buckets(
  File "/opt/conda/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 995, in shuffle_docs_on_buckets
    self._batched_merge_and_write(
  File "/opt/conda/lib/python3.10/site-packages/nemo_curator/modules/fuzzy_dedup.py", line 1120, in _batched_merge_and_write
    output_df = text_bytes_aware_shuffle(
  File "/opt/conda/lib/python3.10/site-packages/nemo_curator/utils/fuzzy_dedup_utils/shuffle_utils.py", line 171, in text_bytes_aware_shuffle
    df = df.persist()
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_collection.py", line 446, in persist
    out = self.optimize(fuse=fuse)
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_collection.py", line 590, in optimize
    return new_collection(self.expr.optimize(fuse=fuse))
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 94, in optimize
    return optimize(self, **kwargs)
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 3063, in optimize
    return optimize_until(expr, stage)
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 3014, in optimize_until
    expr = result.simplify()
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_core.py", line 374, in simplify
    new = expr.simplify_once(dependents=dependents, simplified={})
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_core.py", line 352, in simplify_once
    new = operand.simplify_once(
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_core.py", line 335, in simplify_once
    out = child._simplify_up(expr, dependents)
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_shuffle.py", line 125, in _simplify_up
    new_projection = [
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_shuffle.py", line 128, in <listcomp>
    if (col in partitioning_index or col in projection)
  File "/opt/conda/lib/python3.10/site-packages/dask_expr/_expr.py", line 120, in __bool__
    raise ValueError(
ValueError: The truth value of a EQ is ambiguous. Use a.any() or a.all().
Time Check: Sat Sep 14 23:19:48 PDT 2024

As i think it is coming from text_bytes_aware_shuffle , we can retire that and just use shuffle directly. (#240) .

Dont think we want to do it as part of this PR though.

@VibhuJawa
Copy link
Collaborator

@rjzamora , I tried removing the logic but i am still running into this issue. Will try to get a MRE out but let me know if you have thoughts .

FWIW, This is the command i was running.


jaccard_shuffle \
  --input-bucket-mapping-dir $output_dir/anchor_docs_with_bk.parquet \
  --input-data-dirs $input_data_dirs \
  --output-dir $output_dir \
  --text-ddf-blocksize 1900 \
  --bucket-mapping-ddf-blocksize 1024 \
  --parts-per-worker 1 \
  --input-json-id-field "adlr_id" \
  --scheduler-file $LOGDIR/scheduler.json

@@ -155,7 +155,6 @@ def from_pandas(
npartitions=npartitions,
chunksize=chunksize,
sort=sort,
name=name,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My understanding is that the name argument doesn't work for 24.08 (the corresponding dask-expr version has a small bug). Although name is supported for later dask-expr versions, I'm not sure how important it is for curator to support this argument? (I don't see it used internally anywhere, but could be wrong)

Copy link
Collaborator

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Keeping merge blocked till will we fix the issue.

@VibhuJawa
Copy link
Collaborator

VibhuJawa commented Sep 17, 2024

End to End Commands to run on exp-01 :

#!/bin/bash
# End-to-End Fuzzy Deduplication Script with Timing
# This script performs a complete fuzzy deduplication process on specified datasets
# Steps:
# • Set up environment variables and directories
# • Verify query planning
# • Compute minhashes
# • Create minhash buckets
# • Map Jaccard buckets
# • Perform Jaccard shuffle

export DASK_DATAFRAME__QUERY_PLANNING=True
export LOGDIR="/raid/vjawa/debug_query_planning_logs/"
datasets="cleaned_exact_dedup_all_cc"
output_dir="/raid/vjawa/debug_query_planning"
input_data_dirs="/raid/prospector-lm"


for dataset in $datasets; do
  input_data_dirs="$input_data_dirs/$dataset/"
done

input_minhash_dirs="$output_dir/$datasets/minhashes.parquet"
files_per_partition=20
buckets_per_shuffle=3

log_time() {
  echo "[$(date +"%Y-%m-%d %H:%M:%S")] $1"
}

log_time "Setting up log directory..."
mkdir -p "$LOGDIR"
log_time "Log directory is set to: $LOGDIR"

mkdir -p $output_dir
log_time "Input data directories: $input_data_dirs"
log_time "Input minhash directories: $input_minhash_dirs"

log_time "Verifying query planning..."
python3 verify_query_planning.py
log_time "Query planning verification completed."

log_time "Computing minhashes..."
gpu_compute_minhashes \
  --input-data-dirs "$input_data_dirs" \
  --minhash-length 260 \
  --char-ngram 5 \
  --hash-bytes 4 \
  --seed 42 \
  --output-minhash-dir "$output_dir" \
  --log-dir $LOGDIR \
  --files-per-partition $files_per_partition \
  --profile-path "$PROFILESDIR" \
  --input-json-id-field "adlr_id" \
  --input-meta '{"adlr_id":"str", "text":"str"}'
log_time "Minhash computation completed."

log_time "Creating minhash buckets..."
minhash_buckets \
  --input-data-dirs "$input_minhash_dirs" \
  --minhash-length 260 \
  --output-bucket-dir "$output_dir" \
  --log-dir $LOGDIR \
  --num-bands 20 \
  --input-json-id-field "adlr_id" \
  --buckets-per-shuffle=$buckets_per_shuffle
log_time "Minhash buckets creation completed."

log_time "Mapping Jaccard buckets..."
jaccard_map_buckets \
  --input-bucket-dir "$output_dir/_buckets.parquet" \
  --input-data-dirs "$input_data_dirs" \
  --output-dir "$output_dir" \
  --text-ddf-blocksize 1900 \
  --input-json-id-field "adlr_id" \
  --input-meta '{"adlr_id":"str", "text":"str"}'
log_time "Jaccard bucket mapping completed."

log_time "Starting Jaccard Shuffle..."
jaccard_shuffle \
  --input-bucket-mapping-dir "$output_dir/anchor_docs_with_bk.parquet" \
  --input-data-dirs "$input_data_dirs" \
  --output-dir "$output_dir" \
  --text-ddf-blocksize 1900 \
  --bucket-mapping-ddf-blocksize 1024 \
  --parts-per-worker 1 \
  --input-json-id-field "adlr_id"
log_time "Jaccard Shuffle completed."

log_time "Starting GPU connected components..."
gpu_connected_component \
  --jaccard-pairs-path $output_dir/jaccard_similarity_results.parquet \
  --output-dir $output_dir \
  --cache-dir $output_dir \
  --log-dir $LOGDIR \
  --input-json-id-field "adlr_id"
log_time "GPU connected components completed."

log_time "Fuzzy deduplication process completed."

Python Command to verify query planning works:

import sys
import dask.dataframe as dd
import dask.config

def verify_query_planning():
    # Get the value of dataframe.query-planning
    query_planning = dask.config.get("dataframe.query-planning")
    print(f"Query Planning is set to: {query_planning}")
    
    # Check if DASK_EXPR_ENABLED is set
    DASK_EXPR_ENABLED = dd._dask_expr_enabled()
    print(f"DASK_EXPR_ENABLED is set to: {DASK_EXPR_ENABLED}")
    
    # Check if 'dask_expr' is in sys.modules
    dask_expr_loaded = "dask_expr" in sys.modules
    print(f"'dask_expr' in sys.modules: {dask_expr_loaded}")

    # Assertions
    assert query_planning is True, "Query Planning is not set to True"
    assert DASK_EXPR_ENABLED, "DASK_EXPR is not enabled"
    assert dask_expr_loaded, "'dask_expr' is not loaded in sys.modules"

if __name__ == "__main__":
    verify_query_planning()

Todo (Verify MRE):

echo "Starting Jaccard Shuffle..."
jaccard_shuffle \
  --input-bucket-mapping-dir "$output_dir/anchor_docs_with_bk.parquet" \
  --input-data-dirs "$input_data_dirs" \
  --output-dir "$output_dir" \
  --text-ddf-blocksize 1900 \
  --bucket-mapping-ddf-blocksize 1024 \
  --parts-per-worker 1 \
  --input-json-id-field "adlr_id"

@rjzamora
Copy link
Contributor Author

Thanks @VibhuJawa ! I will try to reproduce.

Just a note:

assert query_planning is True, "Query Planning is not set to True"

This assertion doesn't need to succeed for query-planning to be "active".
The only value that matters is dd._dask_expr_enabled().

Signed-off-by: rjzamora <[email protected]>
@rjzamora
Copy link
Contributor Author

Thanks again for testing this @VibhuJawa !

I believe I've fixed the bug you are seeing in 51e54ff

@VibhuJawa
Copy link
Collaborator

@ayushdg and I just completed correctness. Thanks for helping us through this @rjzamora

We might have some runtime perf things to look at , will sync with @praateekmahajan to get some profiles etc out to you

@rjzamora
Copy link
Contributor Author

rjzamora commented Sep 17, 2024

We might have some runtime perf things to look at

Sounds good! I'll be happy to tackle any performance regressions.

@VibhuJawa
Copy link
Collaborator

VibhuJawa commented Sep 23, 2024

We should be be good to merge this in, if we control for all variables we get all most exactly the same time.
TRUE:

Time Check: Fri Sep 20 17:46:32 PDT 2024
/data/redpajama-sharded
Query Planning is set to: True
DASK_EXPR_ENABLED is set to: True
'dask_expr' in sys.modules: True
Starting Jaccard Shuffle...
Num Workers = 16
.......
Text-df partition  8079/8079 completed in 11.727193832397461
Bucket partition  44/44 completed in 6120.772291898727
100%|██████████| 1/1 [1:42:00<00:00, 6120.77s/it]
Jaccard Shuffle E2E time taken = 6134.5050756931305 s

FALSE:

/data/redpajama-sharded
Query Planning is set to: False
DASK_EXPR_ENABLED is set to: False
'dask_expr' in sys.modules: False
Traceback (most recent call last):
  File "/scripts/verify_query_planning.py", line 24, in <module>
    verify_query_planning()
  File "/scripts/verify_query_planning.py", line 19, in verify_query_planning
    assert query_planning is True, "Query Planning is not set to True"
AssertionError: Query Planning is not set to True
.......
Will write 17249764 rows to disk
Text-df partition  8079/8079 completed in 9.831743478775024
Bucket partition  44/44 completed in 6121.050513744354
100%|██████████| 1/1 [1:42:01<00:00, 6121.05s/it]
Jaccard Shuffle E2E time taken = 6134.075867891312 s
Time Check: Fri Sep 20 19:43:07 PDT 2024

With query planning we can tune TEXT_DDF_BLOCKSIZE to a larger value . We also may have had a cuDF regression which we need to track but that should not block this PR.

Lets get it in , thanks @rjzamora for all the work here. Very impressive.

Copy link
Collaborator

@VibhuJawa VibhuJawa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@ryantwolf
Copy link
Collaborator

@VibhuJawa can you run the NeMo CI on these changes? Want to ensure nothing in the rest of curator would break for any reason.

@ryantwolf
Copy link
Collaborator

CI passed. I'm fine merging.

@VibhuJawa VibhuJawa merged commit 97e8f15 into NVIDIA:main Sep 27, 2024
3 checks passed
@VibhuJawa
Copy link
Collaborator

Aaand its in. Thanks again @rjzamora for pushing this through.

@rjzamora rjzamora deleted the dask-expr-support branch September 30, 2024 15:20
VibhuJawa pushed a commit to VibhuJawa/NeMo-Curator that referenced this pull request Oct 1, 2024
* start adding dask-expr support

Signed-off-by: rjzamora <[email protected]>

* add query_planning_enabled util

Signed-off-by: rjzamora <[email protected]>

* add global keyword

Signed-off-by: rjzamora <[email protected]>

* Forgot to remove top level query-planning check

Signed-off-by: rjzamora <[email protected]>

* fix other shuffle-arg problems that don't 'work' with dask-expr

Signed-off-by: rjzamora <[email protected]>

* remove name arg usage for now

Signed-off-by: rjzamora <[email protected]>

* fix bugs

Signed-off-by: rjzamora <[email protected]>

---------

Signed-off-by: rjzamora <[email protected]>
yyu22 pushed a commit to yyu22/NeMo-Curator that referenced this pull request Oct 9, 2024
* start adding dask-expr support

Signed-off-by: rjzamora <[email protected]>

* add query_planning_enabled util

Signed-off-by: rjzamora <[email protected]>

* add global keyword

Signed-off-by: rjzamora <[email protected]>

* Forgot to remove top level query-planning check

Signed-off-by: rjzamora <[email protected]>

* fix other shuffle-arg problems that don't 'work' with dask-expr

Signed-off-by: rjzamora <[email protected]>

* remove name arg usage for now

Signed-off-by: rjzamora <[email protected]>

* fix bugs

Signed-off-by: rjzamora <[email protected]>

---------

Signed-off-by: rjzamora <[email protected]>
Signed-off-by: Yang Yu <[email protected]>
yyu22 pushed a commit to yyu22/NeMo-Curator that referenced this pull request Oct 10, 2024
* start adding dask-expr support

Signed-off-by: rjzamora <[email protected]>

* add query_planning_enabled util

Signed-off-by: rjzamora <[email protected]>

* add global keyword

Signed-off-by: rjzamora <[email protected]>

* Forgot to remove top level query-planning check

Signed-off-by: rjzamora <[email protected]>

* fix other shuffle-arg problems that don't 'work' with dask-expr

Signed-off-by: rjzamora <[email protected]>

* remove name arg usage for now

Signed-off-by: rjzamora <[email protected]>

* fix bugs

Signed-off-by: rjzamora <[email protected]>

---------

Signed-off-by: rjzamora <[email protected]>
@sarahyurick sarahyurick mentioned this pull request Oct 18, 2024
4 tasks
vinay-raman pushed a commit to vinay-raman/NeMo-Curator that referenced this pull request Nov 12, 2024
* start adding dask-expr support

Signed-off-by: rjzamora <[email protected]>

* add query_planning_enabled util

Signed-off-by: rjzamora <[email protected]>

* add global keyword

Signed-off-by: rjzamora <[email protected]>

* Forgot to remove top level query-planning check

Signed-off-by: rjzamora <[email protected]>

* fix other shuffle-arg problems that don't 'work' with dask-expr

Signed-off-by: rjzamora <[email protected]>

* remove name arg usage for now

Signed-off-by: rjzamora <[email protected]>

* fix bugs

Signed-off-by: rjzamora <[email protected]>

---------

Signed-off-by: rjzamora <[email protected]>
Signed-off-by: Vinay Raman <[email protected]>
vinay-raman pushed a commit to vinay-raman/NeMo-Curator that referenced this pull request Nov 13, 2024
* start adding dask-expr support

Signed-off-by: rjzamora <[email protected]>

* add query_planning_enabled util

Signed-off-by: rjzamora <[email protected]>

* add global keyword

Signed-off-by: rjzamora <[email protected]>

* Forgot to remove top level query-planning check

Signed-off-by: rjzamora <[email protected]>

* fix other shuffle-arg problems that don't 'work' with dask-expr

Signed-off-by: rjzamora <[email protected]>

* remove name arg usage for now

Signed-off-by: rjzamora <[email protected]>

* fix bugs

Signed-off-by: rjzamora <[email protected]>

---------

Signed-off-by: rjzamora <[email protected]>
Signed-off-by: Vinay Raman <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants